Saturday, June 19, 2010

RX Part 5 – Combining multiple IObservable<T> streams

STOP THE PRESS! This series has now been superseded by the online book www.IntroToRx.com. The new site/book offers far better explanations, samples and depth of content. I hope you enjoy!

In the last post we covered some of the flow control features of Rx and how to conceptualise them with Marble diagrams. This post will continue to build on those concepts by looking at different ways of working with multiple streams.
The Concat extension method is probably the most simple extension method. If you have covered the previous flow control post then most of the error handling constructs are more complex than this method. The method will simple publish values from the second stream once the first stream completes.
//Generate values 0,1,2
var stream1 = Observable.Generate(0, i => i < 3, i => i, i => i + 1);
//Generate values 100,101,102,103,104
var stream2 = Observable.Generate(100, i => i < 105, i => i, i => i + 1);

stream1
    .Concat(stream2)
    .Subscribe(Console.WriteLine);
Console.ReadLine();
/* Returns:
 * 
 * stream1 --0--0--0--|
 * stream2 -----------0--0--0--0--|
 * 
 * result  --0--0--0--0--0--0--0--|
 */
If either stream was to OnError then the result stream would OnError too. This means that if stream1 produced an OnError then stream2 would never be used. If you wanted stream2 to be used regardless of if stream1 produced an OnError or not then the extension method OnErrorResumeNext would be your best option.
Quick Video on Concat, Catch and OnErrorResume next on Channel9.
The Amb method was a new concept to me. I believe this comes from functional programming and is an abbreviation of Ambiguous. Effectively this extension method will produce values from the stream that first produces values and will completely ignore the other stream. In the examples below I have 2 streams that both produce values. In the first example stream1 will win the race and the result stream will be stream1’s values. In the second example, I delay the stream1 from producing values so stream2 will win the race and the result stream will be the values from stream2.
//Generate values 0,1,2
var stream1 = Observable.Range(0,3);
//Generate values 100,101,102,103,104
var stream2 = Observable.Range(100,5);

stream1
    .Amb(stream2)
    .Subscribe(Console.WriteLine);
Console.ReadLine();
/* Returns:
 *  if stream 1 produces a value first...
 * stream1 --0--0--0--|
 * stream2 ---0--0--0--0--0--|
 * 
 * result  --0--0--0--|     //All from stream1
 */

stream1.Delay(TimeSpan.FromMilliseconds(100))
    .Amb(stream2)
    .Subscribe(Console.WriteLine);
Console.ReadLine();
/* Returns:
 * stream1 ---0--0--0--|
 * stream2 --0--0--0--0--0--|
 * 
 * result  --0--0--0--0--0--|     //All from stream2
 */
The Merge extension method does a primitive combination of multiple streams where they implement the same type of T. The result will also be an IObservable<T> but will have the values produced to the result stream as the occur in the source streams. The stream will complete when all of the source streams complete or when an OnError is published by any stream.
//Generate values 0,1,2
var stream1 = Observable.Interval(TimeSpan.FromMilliseconds(250)).Take(3);
//Generate values 100,101,102,103,104
var stream2 = Observable.Interval(TimeSpan.FromMilliseconds(150)).Take(5).Select(i => i + 100);
stream1
    .Merge(stream2)
    .Subscribe(Console.WriteLine);
Console.ReadLine();
/*
 * Returns:
 * stream1 ----0----0----0|
 * stream2 --0--0--0--0--0|
 * 
 * result  --0-00--00-0--00-|  
 * Output:
 * 100
 * 0
 * 101
 * 102
 * 1
 * 103
 * 104      //Note this is a race condition. 2 could be
 * 2        //  published before 104.
 */
Merge also provides other overloads that allow you to pass more than 2 source observables via an IEnumerable or params arrays. The Overload that take a params array it great for when we know how many streams we want to merge at compile time, and the IEnumerable overload is better for when we dont know at compile time how many streams we need to merge.
//Create a third stream
var stream3 = Observable.Interval(TimeSpan.FromMilliseconds(100)).Take(10).Select(i => i + 200);

//Number of streams known at compile time.
Observable.Merge(stream1, stream2, stream3)
    .Subscribe(Console.WriteLine);
Console.ReadLine();

//We can dynamically create a list at run time with this overload.
var streams = new List<IObservable<long>>();
streams.Add(stream1);
streams.Add(stream2);
streams.Add(stream3);
Observable.Merge(streams).Subscribe(Console.WriteLine);
Console.ReadLine();
A quick video on Merge on Channe9.
SelectMany, like it’s counter part in IEnumerable<T> extension method will create the Caretisan product of the two streams. So for every item in one stream, it will give you every item in the other stream. A primitive way to think of it is a nexted for loop that creates a 2D array. If you want more info on SelectMany I will leave it to you to do a google search as this fairly well documented in the IEnumerable world.
//Generate values 0,1,2
var stream1 = Enumerable.Range(0, 3).ToObservable();
//Generate values 100,101,102,103,104
var stream2 = Enumerable.Range(100, 5).ToObservable();
stream1
    .SelectMany(i => stream2, (lhs, rhs) => new { Left = lhs, Right = rhs })
    .Subscribe(Console.WriteLine);
Console.ReadLine();
/*
 * Output.
 * { Left = 0, Right = 100 }
 * { Left = 0, Right = 101 }
 * { Left = 1, Right = 100 }
 * { Left = 0, Right = 102 }
 * { Left = 1, Right = 101 }
 * { Left = 2, Right = 100 }
 * { Left = 0, Right = 103 }
 * { Left = 1, Right = 102 }
 * { Left = 2, Right = 101 }
 * { Left = 0, Right = 104 }
 * { Left = 1, Right = 103 }
 * { Left = 2, Right = 102 }
 * { Left = 1, Right = 104 }
 * { Left = 2, Right = 103 }
 * { Left = 2, Right = 104 }
 */
A quick Video on SelectMany on channel9
Zip is another interesting merge feature. Just like a Zipper on clothing or a bag, the Zip method will bring together two sets of values as pairs; two-by-two. Things to note about the Zip function is that the result stream will complete when the first of the streams complete, it will error if either of the streams error and it will only publish once it was a pair. So if one of the source streams publish values faster than the other stream, the rate of publishing will be dictated by the slower of the two streams.
//Generate values 0,1,2
var stream1 = Observable.Interval(TimeSpan.FromMilliseconds(250)).Take(3);
//Generate values a,b,c,d,e,f
var stream2 = Observable.Interval(TimeSpan.FromMilliseconds(150)).Take(6).Select(i => Char.ConvertFromUtf32((int)i + 97));
stream1
    .Zip(stream2, (lhs, rhs) => new { Left = lhs, Right = rhs })
    .Subscribe(Console.WriteLine);
Console.ReadLine();
/* Returns:
 * stream1 ----0----1----2|        stream 1 values represented as ints
 * stream2 --a--b--c--d--e--f|   s2 values represented as chars
 * 
 * result  ----0----1----2|
 *             a    b    c
 */
Here are two short videos on Zip (first, second)on Channel9. Note the second video is actually incorrect, can you spot why?
CombineLatest is worth comparing to the zip method. Both methods will use a function that takes a value from each stream to produce the result value. The difference is that CombineLatest will cache the last value of each stream, and when either stream produces a new value then that new value and that last value from the other stream will be sent to the result function. This example uses the same inputs as the previous Zip example but note that many more values are produced. The leaves CombineLatest somewhere between Zip and SelectMany :-)
//Generate values 0,1,2
var stream1 = Observable.Interval(TimeSpan.FromMilliseconds(250)).Take(3);
//Generate values a,b,c,d,e,f
var stream2 = Observable.Interval(TimeSpan.FromMilliseconds(150)).Take(6).Select(i => Char.ConvertFromUtf32((int)i + 97));
stream1
    .CombineLatest(stream2, (s1, s2) => new { Left = s1, Right = s2 })
    .Subscribe(Console.WriteLine, () => Console.WriteLine("Complete"));
Console.ReadLine();
/*
 * Returns:
 * stream1 ----0----1----2|        stream 1 values represented as ints
 * stream2 --a--b--c--d--e--f|     stream 2 values represented as chars
 * 
 * result  ----00--01-1--22-2|     the result as pairs.
 *             ab  cc d  de f    
 */
Quick video on CombineLatest on Channel9
ForkJoin like the last few extension methods also requires a function to produce the result but this will only return the last values from each stream. Things to note with ForkJoin is that like the previous methods, if either stream error so will the result stream, but if either stream is empty (ie completes with no values) then the result stream will also be empty. This example uses the same values as the previous samples and will only produce a pair from the last values from each stream once they both complete.
//Generate values 0,1,2
var stream1 = Observable.Interval(TimeSpan.FromMilliseconds(250)).Take(3);
//Generate values a,b,c,d,e
var stream2 = Observable.Interval(TimeSpan.FromMilliseconds(150)).Take(5).Select(i => Char.ConvertFromUtf32((int)i + 97));
stream1
    .ForkJoin(stream2, (s1, s2) => new { Left = s1, Right = s2 })
    .Subscribe(Console.WriteLine);
Console.ReadLine();
/*
 * Returns:
 * stream1 ----0----1----2|        stream 1 values represented as ints
 * stream2 --a--b--c--d--e|   s2 values represented as chars
 * 
 * result  --------------2|   the result as pairs. 
 *                       e     
 */
One thing to note with most of the extension methods discussed is that they generally have a matching static method that takes a params array or IEnumerable as discussed in the Merge chapter.
So having looked at these ways to bring multiple observables together we have implicitly brought some concurrency and threading to our code. This allows us to nicely lead into the next post in the series which will be on Scheduling and Threading with Rx.
The full source code is now available either via svn at http://code.google.com/p/rx-samples/source/checkout or as a zip file.
Back to the contents page for Reactive Extensions for .NET Introduction
Back to the previous post; Part 4 - Flow control
Forward to next post; Part 6 - Scheduling and threading
Technorati Tags: ,,

4 comments:

Thorsten Lorenz said...

I believe there is an error in the code of the first part of this post or the API changed.
stream1 = Observable.Generate(0, i => i < 3, i => i, i => i + 1);
will never terminate and print '1' indefinitely, since its state remains unchanged.

stream1 = Observable.Generate(0, i => i < 3, i => i + 1, i => i);
on the other hand works and prints out 0 1 2 as expected.

The same applies to stream2.

Andrea said...

great post indeed

Lee Campbell said...

@Thorsten, the API has indeed changed since this post was written. I will try to update the post to reflect the latest api. I think I may need to start producing Unit Tests for my posts so I know when the crew at DevLabs break my stuff)

:)

Vitalij Kudresov said...

Great Post! I always use it as a reference when deciding which operation to choose.