Monday, March 14, 2011

Rx Part 9–Join, Window, Buffer and Group Join

STOP THE PRESS! This series has now been superseded by the online book www.IntroToRx.com. The new site/book offers far better explanations, samples and depth of content. I hope you enjoy!

While this series of posts is labelled as an introduction, this post takes us past the 100 level style posts. This post looks to tackle the interesting way of combing multiple streams of data. We have looked at versions of combining streams in earlier posts with SelectMany, Merge, Zip, Concat etc. The operators that we look at in this post are different for several reasons
  1. They are new as of 2011
  2. Their matching is based on events coinciding with each other based on some given window of time
  3. They offer some pretty powerful stuff that would otherwise be complex to code and just nasty if you went sans-Rx.

Buffer

Buffer is not a new operator to us, however it can now be conceptually grouped with the group-join operators. These operators all do something with a stream and a window of time. Each operator will open a window when the source stream produces a value. The way the window is closed and which values are exposed is the main difference between each of the operators. Let us first go back to our old friend BufferWithCount that we saw in the second post in this series.
BufferWithCount will create a window when the first value is produced. It will then put that value in to an internal cache. The window will stay open until the count of values has been reached. Each of these values will have been cached. Now that the count has been reached the window will close and the cache will be OnNext’ed as an IList<T>. When the next value is produced from the source, the cache is cleared and we start again. This means that BufferWithCount will take an IObservable<T> and return an IObservable<IList<T>>.
Example Buffer with count of 3
source|-0-1-2-3-4-5-6-7-8-9|
result|-----0-----3-----6-9|
            1     4     7
            2     5     8 
In this marble diagram, I have represented the list of values being returned at a point in time as a column of data. i.e. the values 0, 1 & 2 are all returned in the first buffer.
Understanding this it is not much of a leap to understand BufferWithTime. Instead of passing a count we pass a TimeSpan. The closing of the window (and therefore the buffer’s cache) is now dictated by time instead of the count of values produced. This is ever-so more complicated as we now have introduced some sort of scheduling. To produce the IList<T> at the correct point in time we need a scheduler assigned to performing the timing. This also makes testing this stuff a lot easier.
Example Buffer with time of 5 units
source|-0-1-2-3-4-5-6-7-8-9-|
result|----0----2----5----7-|
           1    3    6    8
                4         9

Window

The Window operators are very similar to the Buffer operators, they only really differ by their return type. Where Buffer would take an IObservable<T> and return an IObservable<IList<T>>, the Window operators return an IObservable<IObservable<T>>. It is also worth noting that the Buffer operators will not yield their buffers until the window closes.
Example of Window with a count of 3
source |-0-1-2-3-4-5-6-7-8-9|
window0|-0-1-2|
window1        3-4-5|
window2              6-7-8|
window3                    9|
Example of Window with time of 5 units
source |-0-1-2-3-4-5-6-7-8-9|
window0|-0-1-|
window1      2-3-4|
window2           -5-6-|
window3                7-8-9|
So the obvious difference here is that with the Window operators you get hold of the values from the source as soon as they are produced, but the Buffer operators you must wait until the window closes before the values are accessible.

Switch is the Anti Window Smile

I think it is worth noting, at least from an academic point, that the Window operators produce IObservable<IObservable<T>> and that the Switch operator takes an IObservable<IObservable<T>> and returns an IObservable<T>. As the Window operators ensure that the windows (child streams) do not overlap, we can use the Switch operator to turn a windowed stream back into its original stream.
//is the same as Observable.Interval(TimeSpan.FromMilliseconds(200)).Take(10)
var switchedWindow = Observable.Interval(TimeSpan.FromMilliseconds(200)).Take(10)
    .WindowWithTime(TimeSpan.FromMilliseconds(500))
    .Switch();

Join

Join is not a new Method to the Rx library, but overload we are interested today in is new. From what I can see on the 2 original overloads that take an Array or an IEnumerable of Plan<T>, the usage can be replicated with Merge and Select. They are a bit of a mystery to me.
The overload we are interested in is
public static IObservable<TResult> Join<TLeft, TRight, TLeftDuration, TRightDuration, TResult>
(
    this IObservable<TLeft> left, 
    IObservable<TRight> right, 
    Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector, 
    Func<TRight, IObservable<TRightDuration>> rightDurationSelector, 
    Func<TLeft, TRight, TResult> resultSelector
)
Now this is a fairly hairy signature to try and understand in one go, so let’s take it one parameter at a time.
IObservable<TLeft> left is source stream that defines when a window starts. This is just like the Buffer and Window operators, except that every value published from this source opens a new window. In Buffer and Window, some values just fell into an existing window.
I like to think IObservable<TRight> right as the window value stream. While the left stream controls opening the windows, the right stream will try to pair up with a value from the left stream.
Let us imagine that our left stream produces a value, which creates a new window. If the right stream produces a value while the window is open then the resultSelector function is called with the two values. This is the crux of join, pairing two values from a stream that occur with in the same window. This then leads us to our next question; When does the window close? The answer to this question is both the power and complexity of the Join operator.
When left produces a value, a window is opened. That value is also then passed to the leftDurationSelector function. The result of this function is an IObservable<TLeftDuration>. When that IObservable OnNext’s or Completes then the window for that value is closed. Note that it is irrelevant what the type of TLeftDuration is. This initially left me with the feeling that IObservable<TLeftDuration> was all a bit over kill as you effectively just need some sort of event to say “Closed!”. However by allowing you to use IObservable you can do some clever stuff as we will see later.
So let us first imagine a scenario where we have the left stream producing values twice as fast as the right stream. Imagine that we also never close the windows. We could do this by always returning Observable.Never<Unit>() from the leftDurationSelector function. This would result in the following pairs being produced.
left  |-0-1-2-3-4-5|
right |---A---B---C|

result|---0---0---0
          A   B   C

          1   1   1
          A   B   C

              2   2
              B   C

              3   3
              B   C

                  4
                  C

                  5
                  C
As you can see the left values are cached and replayed each time the right produces a value.
Now it seems fairly obvious that if I immediately closed the window by returning Observable.Empty<Unit> or perhaps Observable.Return(0) that windows would never be opened so no pairs would ever get produced. However what could I do to make sure that these windows did not overlap so that once a second value was produced I would no longer see the first value? Well, if we returned the left stream from the leftDurationSelector that could do it. But wait, when we return the left from the leftDurationSelector it would try to create another subscription and that may introduce side effects. The quick answer to that is to Publish and RefCount the left stream. If we do that the results look more like this.
left  |-0-1-2-3-4-5|
right |---A---B---C|
result|---1---3---5
          A   B   C
This made me think that I could use Join to produce my own version of CombineLatest that we saw in the 5th post in the series. If I had the values from left expire when the next value from left was OnNext’ed then I would be well on my way. However I need the same thing to happen for the right. Luckily the Join operator also provides us with a rightDurationSelector that works just like the leftDurationSelector. This is simple to implement, all I need to do is return a reference to the same left stream when a left value is produced and then the same for the right. The code looks like this.
public static IObservable<TResult> MyCombineLatest<TLeft, TRight, TResult>
(
    IObservable<TLeft> left, 
    IObservable<TRight> right, 
    Func<TLeft, TRight, TResult> resultSelector)
{
    var refcountedLeft = left.Publish().RefCount();
    var refcountedRight = right.Publish().RefCount();
    return Observable.Join(
            refcountedLeft,
            refcountedRight,
            value => refcountedLeft,
            value => refcountedRight,
            resultSelector);
}
While the code above is not production quality (it would need to have some gates in place to mitigate race conditions), it shows us the power that we could get with Join; we can actually use it to create other operators!

GroupJoin

When the Join operator pairs up values that coincide within a window, it would always produce just the left value and the right value to the resultSelector. The GroupJoin operator takes this one step further by passing the left value immediately to the resultSelector with an IObservable of the right values that occur within the window. It’s signature is very similar to Join but note the difference in the resultSelector Func.
public static IObservable<TResult> GroupJoin<TLeft, TRight, TLeftDuration, TRightDuration, TResult>
(
    this IObservable<TLeft> left, 
    IObservable<TRight> right, 
    Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector, 
    Func<TRight, IObservable<TRightDuration>> rightDurationSelector, 
    Func<TLeft, IObservable<TRight>, TResult> resultSelector
)
If we went back to our first Join example where we had
  • the left producing values twice as fast as the right,
  • the left never expiring
  • the right immediately expiring
this is what the result may look like
left              |-0-1-2-3-4-5|
right             |---A---B---C|

0th window values   --A---B---C|
1st window values     A---B---C|
2nd window values       --B---C|
3rd window values         B---C|
4th window values           --C|
5th window values             C|
Now we could switch it around and have it that the left expired immediately and the right never expired the result may look like this
left              |-0-1-2-3-4-5|
right             |---A---B---C|

0th window values   |
1st window values     A|
2nd window values       A|
3rd window values         AB|
4th window values           AB|
5th window values             ABC|
This starts to make things interesting. Sharp readers may have noticed that with GroupJoin you could effectively re-create your own Join by doing something like this
public IObservable<TResult> MyJoin<TLeft, TRight, TLeftDuration, TRightDuration, TResult>(
    IObservable<TLeft> left,
    IObservable<TRight> right,
    Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector,
    Func<TRight, IObservable<TRightDuration>> rightDurationSelector,
    Func<TLeft, TRight, TResult> resultSelector)
{
    return Observable.GroupJoin
        (
            left, 
            right,
            leftDurationSelector,
            rightDurationSelector,
            (leftValue, rightValues)=>rightValues.Select(rightValue=>resultSelector(leftValue, rightValue))
        )
        .Merge();
}
I even was able to knock up my own version of WindowWithTime with this code below
public IObservable<IObservable<T>> MyWindowWithTime<T>(IObservable<T> source, TimeSpan windowPeriod)
{
    return Observable.CreateWithDisposable<IObservable<T>>(o =>
        {
            var windower = new Subject<long>();
            var intervals = Observable.Concat(
                    Observable.Return(0l),
                    Observable.Interval(windowPeriod)
                )
                .Publish()
                .RefCount();

            var subscription = Observable.GroupJoin
                (
                    windower,
                    source.Do(_ => { }, windower.OnCompleted),
                    _ => windower,
                    _ => Observable.Empty<Unit>(),
                    (left, sourceValues) => sourceValues
                )
                .Subscribe(o);
            var intervalSubscription = intervals.Subscribe(windower);
            return new CompositeDisposable(subscription, intervalSubscription);
        });
}
Yeah it is not so pretty, but it is an academic exercise to show case GroupJoin. Those that have read Bart DeSmet’s excellent MiniLinq post (and follow up video) can see that GroupJoin could almost be added to the 3 basic operators Cata, Ana and Bind.
GroupJoin and the other window operators can make otherwise fiddly and difficult tasks a cinch to put together. For example, those in the Finance game can now pretty much use GroupJoin to create their own VWAP and TWAP extension methods. Nice!
The full source code is now available either via svn at http://code.google.com/p/rx-samples/source/checkout or as a zip file.
Back to the contents page for Reactive Extensions for .NET Introduction
Back to the previous post; Rx Part 8 - Testing Rx
Technorati Tags: ,,

6 comments:

Anonymous said...

Hi Lee,

Excellent post!

RE: Switch is the Anti Window

I see what you're saying however Switch is actually derived from SelectMany (Bind).

Also Switch is not the only combinator that does this;

IObservable Merge(this IObservable> source)
IObservable Concat(this IObservable> source)

Lee Campbell said...

LOL I actually meant to write Merge, however as the windows dont overlap, then all three are eligible (Switch, Merge, Concat).

Merge is more appropriate for the GroupJoin stuff later in the post. Maybe I need to update it to say Merge is the anti-window. Maybe anti-window is not what I am trying to express - it sounds like the evil brother. How about Un-window?

Unknown said...

Hi Lee,

Nice post! We could with some brown bagging though ;-)

I had a quick look at how to create overlapping windows (maybe best to call them moving windows to distinguish from non-overlapping windows?), for moving averages etc. i.e.

source |-0-1-2-3-4-5-6-7-8-9|
window0|-0-1-2|
window1 1-2-3|
window2 2-3-4|
window3 3-4-5|

The WindowWithTime and WindowWithCount operators create a new window whenever the previous window closes, but the Window operator lets you do this:

static void Main(string[] args)
{
var obs = Observable.Interval(TimeSpan.FromMilliseconds(200))
.Select(i => i * 100).Take(12);

Func> windowClosings =
i => Observable.Interval(TimeSpan.FromMilliseconds(600))
.Select(_ => true);

var movingWindowWithTime = obs.Window(obs, windowClosings);
movingWindowWithTime.Select(s => s.ToList()).Subscribe(
i => i.Subscribe(j =>
{
foreach(var value in j) Console.Write("{0} ", value);
}, () => Console.WriteLine()));
Thread.Sleep(2000);
}

Cheers,

Gary

Lee Campbell said...

Nice one Gary. I have the VWAP/TWAP (Moving overlapping window) code on my computer at work so I will show you tomorrow.
Other readers that don't sit 3 desks away from me, you can see it in a couple of days when the next post comes out :)

Unknown said...

I know it's old post but I have no idea how did you achieve this result:
left |-0-1-2-3-4-5|
right |---A---B---C|
result|---1---3---5
A B C

I'm very interested in ignoring left values when there is no corresponding right value.
Could you post the code for that?

Lee Campbell said...

Hi Jozef,

you can reproduce it with something like this


var testScheduler = new TestScheduler();
var left = testScheduler.CreateColdObservable(
ReactiveTest.OnNext(020, 0),
ReactiveTest.OnNext(040, 1),
ReactiveTest.OnNext(060, 2),
ReactiveTest.OnNext(080, 3),
ReactiveTest.OnNext(100, 4),
ReactiveTest.OnNext(120, 5),
ReactiveTest.OnCompleted(121));

var right = testScheduler.CreateColdObservable(
ReactiveTest.OnNext(041, 'A'),
ReactiveTest.OnNext(081, 'B'),
ReactiveTest.OnNext(121, 'C'),
ReactiveTest.OnCompleted(121));

var query = left.Publish(l=>l.Join(right,
_=>l,
_ => Observable.Empty(),
(lhs, rhs) => Tuple.Create( lhs, rhs)));

var observer = testScheduler.CreateObserver>();
query.Subscribe(observer);

testScheduler.Start();

var values = observer.Messages.Where(m=>m.Value.HasValue).Select(m => m.Value.Value);
string.Join(",", values).Dump();


P.s. I have move the blog over to LeeCampbell.com. I just have not set up the redirects yet.