Sunday, June 27, 2010

Rx Part 6 – Scheduling and Threading

STOP THE PRESS! This series has now been superseded by the online book www.IntroToRx.com. The new site/book offers far better explanations, samples and depth of content. I hope you enjoy!

So far in the series of posts we have managed to avoid any explicit usage of threading or concurrency. There are some methods that we have covered that implicitly will be introducing some level of concurrency to perform their jobs (e.g. : Buffer, Delay, Sample etc all require a separate thread to do their magic). However most of this has been kindly abstracted away from us. This post will look at the beauty of the Rx API and its ability to effectively remove the need for WaitHandles, and any explicit calls to using Threads, the ThreadPool and the new shiny Task type.
A friend of mine once wisely stated that you should always understand at least one layer below what you are coding. At the time he was referring to networking protocols, but I think it is sage advice for all programming. On the current project I am working on there are some very savvy developers that are very comfortable working in a multithreaded environment. The project has client and server side threading problems that we have had to tackle. I believe the whole team would agree that it has bee amazing that amount of concurrency that Rx will handle for you in a declarative way. The code base is virtually free of WaitHandles, Monitor or lock usage, or any explicit creation of threads. This has evolved into this state over time as we have come to grips with the power of Rx and the end result is far cleaner code. However, having the experience on the team allowed us to find out ways we should and shouldn’t be using Rx which would have been just too hard for me to do alone.
Getting back to my friend’s comment about understanding the underlying subsystem, this is especially important when dealing with Rx and scheduling. Just because Rx abstracts some of this away, it does not mean that you cant still create problems for yourself if you are not careful. Before I scare you too much let’s look at some of the Scheduling features of Rx.

Scheduling

In the Rx world, you can control the scheduling of two things
  1. The invocation of the subscription
  2. The publishing of notifications
As you could probably guess these are exposed via two extension methods to IObservable<T> called SubscribeOn and ObserveOn. Both methods have an overload that take an IScheduler and will return an IObservable<T> so you can chain methods together.
public static class Observable
{
  public static IObservable<TSource> ObserveOn<TSource>(
      this IObservable<TSource> source, IScheduler scheduler) 
  {...}
public static IObservable<TSource> SubscribeOn<TSource>(
      this IObservable<TSource> source, IScheduler scheduler) 
  {...}
}

public interface IScheduler
{
    IDisposable Schedule(Action action);
    IDisposable Schedule(Action action, TimeSpan dueTime);
    DateTimeOffset Now { get; }
}
The IScheduler interface is of less interest to me than the types that implement the interface. Depending on your platform* (Silverlight3, Silverlight4, .Net 3.5, .Net 4.0) you will be exposed appropriate implementations via a static class Scheduler. These are the static properties that you can find on the Scheduler type that expose different schedulers.
Scheduler.Dispatcher will ensure that the actions are performed on the Dispatcher, which is obviously useful for Silverlight and WPF applications. You can imagine that the implementation for this would just delegate any calls to ISchedule(Action) straight to Dispatcher.BeginInvoke(Action)
Scheduler.NewThread will schedule all actions onto a new thread.
Scheduler.ThreadPool will schedule all actions onto the Thread Pool.
Scheduler.TaskPool (which is only available to Silverlight 4 and .NET 4.0) will schedule actions onto the TaskPool.
Scheduler.Immediate will ensure the action is not scheduled but is executed immediately.
Scheduler.CurrentThread just ensures that the actions are performed on the thread that made the original call. This is different to Immediate, as CurrentThread will queue the action to be performed. Note the difference in the output of the following code. One method passes Scheduler.Immediate, the other passes Scheduler.CurrentThread.
private static void ScheduleTasks(IScheduler scheduler)
{
    Action leafAction = () => Console.WriteLine("leafAction.");
    Action innerAction = () =>
                             {
                                 Console.WriteLine("innerAction start.");
                                 scheduler.Schedule(leafAction);
                                 Console.WriteLine("innerAction end.");
                             };
    Action outerAction = () =>
                             {
                                 Console.WriteLine("outer start.");
                                 scheduler.Schedule(innerAction);
                                 Console.WriteLine("outer end.");
                             };
    scheduler.Schedule(outerAction);
}

public void CurrentThreadExample()
{
    ScheduleTasks(Scheduler.CurrentThread);
    Console.ReadLine();
    /*Output:
     * outer start.
     * outer end.
     * innerAction start.
     * innerAction end.
     * leafAction.
     */
}

public void ImmediateExample()
{
    ScheduleTasks(Scheduler.Immediate);
    Console.ReadLine();
    /*Output:
     * outer start.
     * innerAction start.
     * leafAction.
     * innerAction end.
     * outer end.
     */
}
*Sorry Rx for JavaScript, I have not even opened the box on you and don’t know anything about scheduling in JavaScript.
Examples
So they are each of our Schedulers, lets see some of them in use. The think I want to point out here is that the first few times I used these overloads I had them confused as to what they actually did. You should use the SubscribeOn method to describe how you want any warm up and background processing code to be scheduled. ObserveOn method is used to describe where you want your notification scheduled to. So for example, if you had a WPF application that used Rx to populate and ObservableCollection<T> then you would almost certainly want to use SubscribeOn with one of the Threaded schedulers (NewThread, ThreadPool or maybe TaskPool) and then you would have to use the Dispatcher scheduler to update your collection.
public void LoadCustomers()
{
    _customerService.GetCustomers()
        .SubscribeOn(Scheduler.NewThread)
        .ObserveOn(Scheduler.Dispatcher)
        .Subscribe(Customers.Add);
}
So all of the schedulers just offer a nice abstraction to us to utilise the various ways we can write concurrent code. Besides saving me from having to write the tedious code to get code onto a new thread or thread pool it also makes Rx threading easy. Oh Rx, you thought I had forgotten. I didn’t think that any of the schedulers except Current & Immediate warranted a further explanation but, I do think it is worth pointing out some of the “fun” threading problems you can face even though the scheduling has been abstracted away from you.

Deadlocks

When writing the current application my team is working on we found out the hard way that Rx code can most certainly deadlock. When you consider that some calls (like .First() ) are blocking, and that we can schedule work to be done in the future, it becomes obvious that race condition can apply. This example is the most simple deadlock I could think of. It is fairly silly but it will get the ball rolling.
var stream = new Subject<int>();
Console.WriteLine("Next line should deadlock the system.");
var value = stream.First();
stream.OnNext(1);
Console.WriteLine("I can never execute....");
Hopefully we wont ever write code that silly, and if we did our tests would give us fairly quick feed back that things were wrong. What lets deadlocks slip into the system is when they manifest themselves at integration points. This example may be a little harder to find but is only small step away from the silly 1st example. Here we block in the constructor on a UI element which will always be created on the dispatcher. The blocking call is waiting for an event, that can only be raised from the dispatcher – deadlock.
public Window1()
{
    InitializeComponent();
    DataContext = this;
    Value = "Default value";

    //Deadlock! We need the dispatcher to continue
    // to allow me to click the button to produce a value.
    Value = _subject.First(); 

    //This will give same result but will not be blocking(deadlocking).
    _subject.Take(1).Subscribe(value => Value = value);
}

private void MyButton_Click(object sender, RoutedEventArgs e)
{
    _subject.OnNext("New Value");
}

public string Value
{
    get { return _value; }
    set
    {
        _value = value;
        var handler = PropertyChanged;
        if (handler != null) handler(this, new PropertyChangedEventArgs("Value"));
    }
}
In this example we start seeing things that can become more sinister. This example has a Button that the click command will try to get the first value from an Observable exposed via an interface.
public partial class Window1 : INotifyPropertyChanged
{
    private readonly IMyService _service = new MyService(); //Imagine DI here.
    private int _value2;

    public Window1()
    {
        InitializeComponent();
        DataContext = this;
    }

    public int Value2
    {
        get { return _value2; }
        set
        {
            _value2 = value;
            var handler = PropertyChanged;
            if (handler != null) handler(this, new PropertyChangedEventArgs("Value2"));
        }
    }

    #region INotifyPropertyChanged Members

    public event PropertyChangedEventHandler PropertyChanged;

    #endregion

    private void MyButton2_Click(object sender, RoutedEventArgs e)
    {
        Value2 = _service.GetTemperature().First();
    }
}
There is only a small problem here in that we block on the Dispatcher thread (.First() is a blocking call), however this manifest's itself into a deadlock if the service code is written incorrectly.
class MyService : IMyService
{
    public IObservable<int> GetTemperature()
    {
        return Observable.Create<int>(
            o =>
                {
                    o.OnNext(27);
                    o.OnNext(26);
                    o.OnNext(24);
                    return () => { };
                })
            .SubscribeOnDispatcher();
    }
}
This odd implementation with explicit scheduling will cause the 3 OnNext calls to be scheduled once the .First() call has finished, which is waiting for an OnNext to be called – Deadlock.
So far this post has been a bit doom and gloom about scheduling and the problems you could face, that is not the intent. I just wanted to make it obvious that Rx was not going to solve the age old concurrency problems, but it will make it easier to get it right if you follow this simple rule.
  1. Only the final subscriber should be setting the scheduling.
  2. Avoid using .First() –Ed: that is for you Olivier. We will cal this rule 1b
Where the last example came unstuck is that the service was dictating the scheduling paradigm when really it had no business doing so. Before we had a clear idea of where we should be doing the scheduling in my current project, we had allsorts of layers adding “helpful” scheduling code. What it ended up creating was a threading nightmare. When we removed all scheduling code and then located it in a single layer (at least in the Silverlight client) most of our concurrency problems went away. I recommend you do the same. At least in WPF/Silverlight applications, the pattern should be simple: “Subscribe on a Background thread; Observe on the Dispatcher”.
So my challenge to the readers is to add to the comments:
  1. Any other scheduling rules (2 seems quite small, and I was only going to have 1)
  2. Post some nasty Rx race condition code
  3. What rules do you have for Subscribing on the background thread? Which Scheduler should I use and when i.e. NewThread, ThreadPool & TaskPool. – and I come full circle about understanding one layer below that to which you are working.
Further reading/watching:
  1. This channel9 video has more interesting stuff including testing with schedulers http://channel9.msdn.com/shows/Going+Deep/Wes-Dyer-and-Jeffrey-Van-Gogh-Inside-Rx-Virtual-Time/
The full source code is now available either via svn at http://code.google.com/p/rx-samples/source/checkout or as a zip file.
Back to the contents page for Reactive Extensions for .NET Introduction
Back to the previous post; Part 5 - Combining multiple IObservable streams
Forward to next post; Part 7 - Hot and Cold observables
Technorati Tags: ,,

Saturday, June 19, 2010

RX Part 5 – Combining multiple IObservable<T> streams

STOP THE PRESS! This series has now been superseded by the online book www.IntroToRx.com. The new site/book offers far better explanations, samples and depth of content. I hope you enjoy!

In the last post we covered some of the flow control features of Rx and how to conceptualise them with Marble diagrams. This post will continue to build on those concepts by looking at different ways of working with multiple streams.
The Concat extension method is probably the most simple extension method. If you have covered the previous flow control post then most of the error handling constructs are more complex than this method. The method will simple publish values from the second stream once the first stream completes.
//Generate values 0,1,2
var stream1 = Observable.Generate(0, i => i < 3, i => i, i => i + 1);
//Generate values 100,101,102,103,104
var stream2 = Observable.Generate(100, i => i < 105, i => i, i => i + 1);

stream1
    .Concat(stream2)
    .Subscribe(Console.WriteLine);
Console.ReadLine();
/* Returns:
 * 
 * stream1 --0--0--0--|
 * stream2 -----------0--0--0--0--|
 * 
 * result  --0--0--0--0--0--0--0--|
 */
If either stream was to OnError then the result stream would OnError too. This means that if stream1 produced an OnError then stream2 would never be used. If you wanted stream2 to be used regardless of if stream1 produced an OnError or not then the extension method OnErrorResumeNext would be your best option.
Quick Video on Concat, Catch and OnErrorResume next on Channel9.
The Amb method was a new concept to me. I believe this comes from functional programming and is an abbreviation of Ambiguous. Effectively this extension method will produce values from the stream that first produces values and will completely ignore the other stream. In the examples below I have 2 streams that both produce values. In the first example stream1 will win the race and the result stream will be stream1’s values. In the second example, I delay the stream1 from producing values so stream2 will win the race and the result stream will be the values from stream2.
//Generate values 0,1,2
var stream1 = Observable.Range(0,3);
//Generate values 100,101,102,103,104
var stream2 = Observable.Range(100,5);

stream1
    .Amb(stream2)
    .Subscribe(Console.WriteLine);
Console.ReadLine();
/* Returns:
 *  if stream 1 produces a value first...
 * stream1 --0--0--0--|
 * stream2 ---0--0--0--0--0--|
 * 
 * result  --0--0--0--|     //All from stream1
 */

stream1.Delay(TimeSpan.FromMilliseconds(100))
    .Amb(stream2)
    .Subscribe(Console.WriteLine);
Console.ReadLine();
/* Returns:
 * stream1 ---0--0--0--|
 * stream2 --0--0--0--0--0--|
 * 
 * result  --0--0--0--0--0--|     //All from stream2
 */
The Merge extension method does a primitive combination of multiple streams where they implement the same type of T. The result will also be an IObservable<T> but will have the values produced to the result stream as the occur in the source streams. The stream will complete when all of the source streams complete or when an OnError is published by any stream.
//Generate values 0,1,2
var stream1 = Observable.Interval(TimeSpan.FromMilliseconds(250)).Take(3);
//Generate values 100,101,102,103,104
var stream2 = Observable.Interval(TimeSpan.FromMilliseconds(150)).Take(5).Select(i => i + 100);
stream1
    .Merge(stream2)
    .Subscribe(Console.WriteLine);
Console.ReadLine();
/*
 * Returns:
 * stream1 ----0----0----0|
 * stream2 --0--0--0--0--0|
 * 
 * result  --0-00--00-0--00-|  
 * Output:
 * 100
 * 0
 * 101
 * 102
 * 1
 * 103
 * 104      //Note this is a race condition. 2 could be
 * 2        //  published before 104.
 */
Merge also provides other overloads that allow you to pass more than 2 source observables via an IEnumerable or params arrays. The Overload that take a params array it great for when we know how many streams we want to merge at compile time, and the IEnumerable overload is better for when we dont know at compile time how many streams we need to merge.
//Create a third stream
var stream3 = Observable.Interval(TimeSpan.FromMilliseconds(100)).Take(10).Select(i => i + 200);

//Number of streams known at compile time.
Observable.Merge(stream1, stream2, stream3)
    .Subscribe(Console.WriteLine);
Console.ReadLine();

//We can dynamically create a list at run time with this overload.
var streams = new List<IObservable<long>>();
streams.Add(stream1);
streams.Add(stream2);
streams.Add(stream3);
Observable.Merge(streams).Subscribe(Console.WriteLine);
Console.ReadLine();
A quick video on Merge on Channe9.
SelectMany, like it’s counter part in IEnumerable<T> extension method will create the Caretisan product of the two streams. So for every item in one stream, it will give you every item in the other stream. A primitive way to think of it is a nexted for loop that creates a 2D array. If you want more info on SelectMany I will leave it to you to do a google search as this fairly well documented in the IEnumerable world.
//Generate values 0,1,2
var stream1 = Enumerable.Range(0, 3).ToObservable();
//Generate values 100,101,102,103,104
var stream2 = Enumerable.Range(100, 5).ToObservable();
stream1
    .SelectMany(i => stream2, (lhs, rhs) => new { Left = lhs, Right = rhs })
    .Subscribe(Console.WriteLine);
Console.ReadLine();
/*
 * Output.
 * { Left = 0, Right = 100 }
 * { Left = 0, Right = 101 }
 * { Left = 1, Right = 100 }
 * { Left = 0, Right = 102 }
 * { Left = 1, Right = 101 }
 * { Left = 2, Right = 100 }
 * { Left = 0, Right = 103 }
 * { Left = 1, Right = 102 }
 * { Left = 2, Right = 101 }
 * { Left = 0, Right = 104 }
 * { Left = 1, Right = 103 }
 * { Left = 2, Right = 102 }
 * { Left = 1, Right = 104 }
 * { Left = 2, Right = 103 }
 * { Left = 2, Right = 104 }
 */
A quick Video on SelectMany on channel9
Zip is another interesting merge feature. Just like a Zipper on clothing or a bag, the Zip method will bring together two sets of values as pairs; two-by-two. Things to note about the Zip function is that the result stream will complete when the first of the streams complete, it will error if either of the streams error and it will only publish once it was a pair. So if one of the source streams publish values faster than the other stream, the rate of publishing will be dictated by the slower of the two streams.
//Generate values 0,1,2
var stream1 = Observable.Interval(TimeSpan.FromMilliseconds(250)).Take(3);
//Generate values a,b,c,d,e,f
var stream2 = Observable.Interval(TimeSpan.FromMilliseconds(150)).Take(6).Select(i => Char.ConvertFromUtf32((int)i + 97));
stream1
    .Zip(stream2, (lhs, rhs) => new { Left = lhs, Right = rhs })
    .Subscribe(Console.WriteLine);
Console.ReadLine();
/* Returns:
 * stream1 ----0----1----2|        stream 1 values represented as ints
 * stream2 --a--b--c--d--e--f|   s2 values represented as chars
 * 
 * result  ----0----1----2|
 *             a    b    c
 */
Here are two short videos on Zip (first, second)on Channel9. Note the second video is actually incorrect, can you spot why?
CombineLatest is worth comparing to the zip method. Both methods will use a function that takes a value from each stream to produce the result value. The difference is that CombineLatest will cache the last value of each stream, and when either stream produces a new value then that new value and that last value from the other stream will be sent to the result function. This example uses the same inputs as the previous Zip example but note that many more values are produced. The leaves CombineLatest somewhere between Zip and SelectMany :-)
//Generate values 0,1,2
var stream1 = Observable.Interval(TimeSpan.FromMilliseconds(250)).Take(3);
//Generate values a,b,c,d,e,f
var stream2 = Observable.Interval(TimeSpan.FromMilliseconds(150)).Take(6).Select(i => Char.ConvertFromUtf32((int)i + 97));
stream1
    .CombineLatest(stream2, (s1, s2) => new { Left = s1, Right = s2 })
    .Subscribe(Console.WriteLine, () => Console.WriteLine("Complete"));
Console.ReadLine();
/*
 * Returns:
 * stream1 ----0----1----2|        stream 1 values represented as ints
 * stream2 --a--b--c--d--e--f|     stream 2 values represented as chars
 * 
 * result  ----00--01-1--22-2|     the result as pairs.
 *             ab  cc d  de f    
 */
Quick video on CombineLatest on Channel9
ForkJoin like the last few extension methods also requires a function to produce the result but this will only return the last values from each stream. Things to note with ForkJoin is that like the previous methods, if either stream error so will the result stream, but if either stream is empty (ie completes with no values) then the result stream will also be empty. This example uses the same values as the previous samples and will only produce a pair from the last values from each stream once they both complete.
//Generate values 0,1,2
var stream1 = Observable.Interval(TimeSpan.FromMilliseconds(250)).Take(3);
//Generate values a,b,c,d,e
var stream2 = Observable.Interval(TimeSpan.FromMilliseconds(150)).Take(5).Select(i => Char.ConvertFromUtf32((int)i + 97));
stream1
    .ForkJoin(stream2, (s1, s2) => new { Left = s1, Right = s2 })
    .Subscribe(Console.WriteLine);
Console.ReadLine();
/*
 * Returns:
 * stream1 ----0----1----2|        stream 1 values represented as ints
 * stream2 --a--b--c--d--e|   s2 values represented as chars
 * 
 * result  --------------2|   the result as pairs. 
 *                       e     
 */
One thing to note with most of the extension methods discussed is that they generally have a matching static method that takes a params array or IEnumerable as discussed in the Merge chapter.
So having looked at these ways to bring multiple observables together we have implicitly brought some concurrency and threading to our code. This allows us to nicely lead into the next post in the series which will be on Scheduling and Threading with Rx.
The full source code is now available either via svn at http://code.google.com/p/rx-samples/source/checkout or as a zip file.
Back to the contents page for Reactive Extensions for .NET Introduction
Back to the previous post; Part 4 - Flow control
Forward to next post; Part 6 - Scheduling and threading
Technorati Tags: ,,

Saturday, May 29, 2010

RX part 4 – Flow control

STOP THE PRESS! This series has now been superseded by the online book www.IntroToRx.com. The new site/book offers far better explanations, samples and depth of content. I hope you enjoy!

In the previous post we looked at the lifecycle of a subscription to a stream. A subscription can be terminated by unsubscribing/disposing and the stream (IObservable<T>) can terminate due to it completing naturally via an OnCompleted or erroneously with an OnError. OnError creates an interesting problem; it publishes the Exception, it does not throw it. This means that you can not use the standard Structured Exception Handling process of try/catch with Observables. Well that is not entirely true. In this example the stream raises an OnError that we can catch with a try/catch block thanks to the extension method overload of subscribe I use. It will take an OnError and then throw the Exception.
try
{
    var subject = new Subject<int>();
    subject.Subscribe(Console.WriteLine);
    //Or explicitly rethrow as per this line.
    //subject.Subscribe(Console.WriteLine, ex => { throw ex.PrepareForRethrow(); });
    subject.OnNext(1);
    subject.OnError(new IOException("test exception"));
}
catch (IOException ioEx)
{
    Console.WriteLine("Caught IO exception: {0}", ioEx.Message);
}
Throwing published OnError exceptions does not allow for a very composable style of coding. It may be useful in cases like the example above, but the power of Rx as you will see in future posts is the ability to create compositions of streams together. Once we are running compositions of streams then SEH is not very helpful or useful. Rx provides several methods to provide a composition friendly way to handle errors and exceptions.

Visualising streams

Before I cover the flow control methods that Rx offers I want to divert quickly and talk about a visual tool we will use to help communicate the concepts relating to streams. Marble diagrams are great for communicating Rx streams and you may find them useful for describing any stream, except for the completely basic. When using marble diagrams to communicate Rx streams there are only a few things you need to know
  1. a stream is represented by a horizontal line
  2. time moves to the right (ie things on the left happened before things on the right)
  3. we only need 3 symbols to represent an Event
    1. “0” for OnNext
    2. “X” for an OnError
    3. “|” for OnCompleted
This is a sample of a stream that publishes 3 values and then completes
--0--0--0—|
This is a sample of a stream that publishes 4 values then errors.
--0--0--0--0--X
While these examples may seem too simple to warrant a visual representation, the simplicity of marble diagrams are great once we using multiple streams.

Flow control constructs

Sometimes when dealing with an observable, it is conceivable that errors may occur that are acceptable and we should try again. Imagine that we want this effect where the error in stream 1 (S1) is acceptable, we try again on stream 2 (S2). The last line is composition of the two streams that is the result we want to expose (R)
S1--0--0--X
S2-----------0--0--0--0
R --0--0-----0--0--0--0
In the example above we could recreate this with several methods.
Retry is the most simple method available to us. Retry will try to re-subscribe to the IObservable<T> on any failure. In this example we just use the simple overload that will always retry on any exception.
public static void RetrySample<T>(IObservable<T> stream)
{
    stream.Retry().Subscribe(t=>Console.WriteLine(t)); //Will always retry the stream
    Console.ReadKey();
}
/*
Given stream that will produce 0,1,2 then error; the output would be
0
1
2
0
1
2
0
1
2
.....
*/
which would look like this as a marble diagram
S--0--0--0--x--0--0--0--x--0--0--0--
R--0--0--0-----0--0--0-----0--0--0--
Alternatively we can specify the max number of times to retry. In this example we only retry once, therefore the error that gets published on the second subscription will be passed up to the final subscription. Note that to retry once you pass a value of 2. Maybe the method should be called Try?
public static void RetryOnceSample<T>(IObservable<T> stream)
{
    stream.Retry(2)
        .Subscribe(t=>Console.WriteLine(t), 
   ex=>Console.WriteLine("Gave up on 2nd Error")); 
    Console.ReadKey();
}
/*Ouput:
0
1
2
0
1
2
Gave up on 2nd Error
*/
As a marble diagram this would look like
S--0--0--0--x--0--0--0--x
R--0--0--0-----0--0--0--x
OnErrorResumeNext may cause some old VB developers to shudder but it offers a different route to use than Retry. While retry will always try to re-subscribe to the same stream; OnErrorResumeNext takes another IObservable<T> as a parameter to use when the original stream publishes and error. In this example when the stream1 publishes and error we re-subscribe to stream2.
public static void FailoverSample<T>(IObservable<T> stream, IObservable<T> failover)
{
    stream
        .OnErrorResumeNext(failover)
        .Subscribe(t=>Console.WriteLine(t), ex => Console.WriteLine(ex));
}
/*
stream  --"1"--"2"--"3"--X
failover--------------------"a"--"b"--"c"--X
result  --"1"--"2"--"3"-----"a"--"b"--"c"--|
*/
An important thing to note here is that when the second stream publishes an error the result stream just completes and ignores the error.
Catch is probably the most useful method to use as the previous 2 methods will react the same regardless of the type of exception published. Catch however allows you to specify which exceptions it can catch just like a normal catch block. This example is similar to the last example, but we will explicitly state the exception we should catch that would allow us to failover to the next stream.
public static void CatchSample<T>(IObservable<T> stream, IObservable<T> failover)
{
    stream
        .Catch((InvalidOperationException ex) => failover)
        .Subscribe(t => Console.WriteLine(t), 
                   ex => Console.WriteLine(ex), 
                   () => Console.WriteLine("Completed"));
}
Catch has other overloads that allow you to pass a params array of Observable<T> or an Enumerable<Observable<T>> instead of just specifying an intial and a failover stream. This means you can effectively have a large list of streams to try when a previous one fails. Of course if any of them actually complete then the next one will not be used. If the last one publishes an OnError then that error will then be published to the IObservalbe that the Catch method returned.
Another pair of interesting methods is the Materialize and Dematerialize methods. Materialize will flatten an observable’s three different publication types (OnNext, OnError & OnCompleted) into wrapped publications of a Notifcation<T> type. Notification<T> is an abstract class that exposes 4 properties and an Overloaded Accept method
public abstract class Notification<T> : IEquatable<Notification<T>>
{
    // Properties
    public abstract Exception Exception { get; }
    public abstract bool HasValue { get; }
    public abstract NotificationKind Kind { get; }
    public abstract T Value { get; }

    // Methods
    public abstract TResult Accept<TResult>(IObserver<T, TResult> observer);
    public abstract void Accept(IObserver<T> observer);
    public abstract void Accept(Action<T> onNext, Action<Exception> onError, Action onCompleted);
    public abstract TResult Accept<TResult>(Func<T, TResult> onNext, Func<Exception, TResult> onError, Func<TResult> onCompleted);

    // ...
}
By inspecting the Kind property you can identify which type of publication you have received. If the Kind is OnError you can access its published Exception via the Exception property. Accessing the Value property will get you either
  • the OnNext value,
  • throw the OnError exception
  • or throw an InvalidOperationException if the Kind is OnCompleted
The HasValue property just provides the convenience to check if the Kind is OnNext so you can safely get the value without having an exception thrown.
As you can imagine the Materialise method can be useful for Logging the content of a stream. In this example we create an extension method that logs all events to the Console.
public static class ExampleExtensions
{
    /// <summary>
    /// Logs implicit notifications to console.
    /// </summary>
    /// <example>
    /// <code>myStream.Log().Subscribe(....);</code>
    /// </example>
    public static IObservable<T> Log<T>(this IObservable<T> stream)
    {
        return stream.Materialize()
        .Do(n => Console.WriteLine(n))
        .Dematerialize();
    }
}
Note that here we use Dematerialize to take our stream of Notification<T> and transform it back to our original stream. You could also use Materialize to create your own more powerful Catch methods, but we will look at applications of Rx later.

Wire tapping a stream

The Do extension method was used in the last example and it would not be fair to continue with out explaining what it does. Do method is used to provide side effects upon a stream. In the example above the side-effect is that we wrote to the console. This is different to a Subscribe because the Do method returns you an IObservable<T> which can be thought of as the same IObservable<T> that was passed to it, however the Subscribe method returns you an IDisposable. I like to think of the Do method as a wire tap to a stream. ;-)
It would be unfair to mention Do and leave out Run. The Run method is very similar to the Do method except for two things:
  1. Run returns void
  2. Run is a blocking call
  3. Run can be called without any parameters. This effectively is a call to block until OnCompleted or OnError is published.
  4. Run has no overload that takes an Action for the OnCompleted publication. It doesn't make sense to do so as the method will just stop blocking when OnCompleted is published so you can just invoke the action you would other wise pass to OnCompleted immediately after the Run method returns.
The things in common with the Run and Do method overloads is they both provide an Overload that:
  • takes an Action<T> to be performed on each OnNext publication.
  • takes an Action<T> for OnNext and an Action<Exception> for OnError publications
  • takes an IObserver<T> that will be used to handle the publications explicitly
Now that we have had a brief look at Flow control and aggregating streams together, in the next post we will uncover the other aggregation and composition methods that Rx exposes.
The full source code is now available either via svn at http://code.google.com/p/rx-samples/source/checkout or as a zip file.
Back to the contents page for Reactive Extensions for .NET Introduction
Back to the previous post; Part 3 - Lifetime management – Completing and Unsubscribing
Forward to next post; Part 5 - Combining multiple IObservable streams
Technorati Tags: ,,

Thursday, May 27, 2010

Rx Part 3 – Lifetime management – Completing and Unsubscribing

STOP THE PRESS! This series has now been superseded by the online book www.IntroToRx.com. The new site/book offers far better explanations, samples and depth of content. I hope you enjoy!

So far we have discovered the basics of  the Reactive Framework which allows us to create, subscribe and perform some basic aggregations, buffering and time shifting over implementations of IObservable<T>. We have yet to look at how we could unsubscribe from a subscription. If you were to look for an Unsubscribe method in the Rx public API you would not find any. Instead of supplying an Unsubscribe method, Rx will return an IDisposable when ever a subscription is made. This disposable can be thought of as the subscription itself and therefore disposing it will dispose the subscription and effectively unsubscribe. Note that calling Dispose on the result of a Subscribe call will not affect the underlying IObservable<T>, just the instance of the subscription to the IObservable<T>. This then allows us to call Subscribe many times on a single IObservable<T>, allowing subscriptions to come an go with out affecting each other. In this example we initially have two subscriptions, then we dispose of one subscription early which still allows the other to continue to receive publications from the underlying IObservable<T>.
var interval = Observable.Interval(TimeSpan.FromMilliseconds(100));
var firstSubscription =
    interval.Subscribe(value => Console.WriteLine("1st subscription recieved {0}", value));
var secondSubscription =
    interval.Subscribe(value => Console.WriteLine("2nd subscription recieved {0}", value));

Thread.Sleep(500);
firstSubscription.Dispose();
Console.WriteLine("Disposed of 1st subscription");

Console.ReadKey();
/*Outputs:
1st subscription recieved 0
2nd subscription recieved 0
1st subscription recieved 1
2nd subscription recieved 1
1st subscription recieved 2
2nd subscription recieved 2
1st subscription recieved 3
2nd subscription recieved 3
2nd subscription recieved 4
Disposed of 1st subscription
2nd subscription recieved 5
2nd subscription recieved 6
2nd subscription recieved 7

etc....
*/
In the above example, it looks like the values are being produced by the interval Observable by a single OnNext call, however these are independent and work similarly to how a Observable.Create<T> method would work. In this sample we just pause a bit before making our second subscription. Note that the output is different to the above example.
var interval = Observable.Interval(TimeSpan.FromMilliseconds(100));

var firstSubscription =
    interval.Subscribe(value => Console.WriteLine("1st subscription recieved {0}", value));
Thread.Sleep(500);
var secondSubscription =
    interval.Subscribe(value => Console.WriteLine("2nd subscription recieved {0}", value));

Thread.Sleep(500);
firstSubscription.Dispose();
Console.WriteLine("Disposed of 1st subscription");
/*
Ouput:
1st subscription recieved 0
1st subscription recieved 1
1st subscription recieved 2
1st subscription recieved 3
1st subscription recieved 4
2nd subscription recieved 0 
1st subscription recieved 5
2nd subscription recieved 1
1st subscription recieved 6
1st subscription recieved 7
2nd subscription recieved 2
1st subscription recieved 8
2nd subscription recieved 3
Disposed of 1st subscription
2nd subscription recieved 4
2nd subscription recieved 5
2nd subscription recieved 6
etc...

*/
The benefits of using the IDisposable Type instead of creating a new ISubscription/IUnsubscription interface or amending the IObservable<T> interface to have an Unsubscribe method is that you get all of these things for free:
  • The type already exists
  • people understand the type
  • IDisposable has standard usages and patterns
  • Language support via the Using keyword
  • Static analysis tools like FxCop can help you with its usage.

OnError and OnCompleted()

Both the OnError and OnCompleted signify the completion of a stream. If your stream publishes a OnError or OnCompleted it will be the last publication and no further calls to OnNext can be performed. In this example we try to publish an OnNext call after an OnCompleted and the OnNext is ignored.
var subject = new Subject<int>();
subject.Subscribe(Console.WriteLine, () => Console.WriteLine("Completed"));
subject.OnCompleted();
subject.OnNext(2);
Of course you could implement your own IObservable<T> that did allow publishing after an OnComplete or an OnError, however it would not follow the precedence of the current Subject types and would be a non standard implementation. I think it would be safe to say that the inconsistent behaviour would cause unpredictable behaviour in the applications that consumed your code.
An interesting thing to consider is that when a stream completes or errors, you should still dispose of you subscription. This can make for messy code, but we will discuss best practices in a later post.
The full source code is now available either via svn at http://code.google.com/p/rx-samples/source/checkout or as a zip file.
Back to the contents page for Reactive Extensions for .NET Introduction
Back to the previous post; Part 2 - Static and extension methods
Forward to next post; Part 4 - Flow control
Technorati Tags: ,,

Sunday, May 23, 2010

Rx part 2 – Static and Extension methods

STOP THE PRESS! This series has now been superseded by the online book www.IntroToRx.com. The new site/book offers far better explanations, samples and depth of content. I hope you enjoy!

In the previous post we looked at the core types to the Rx library and how we could get up and running with “Observables”. The key types we discussed were the IObservable<T> interface, IObserver<T>, Subject<T> and its siblings ReplaySubject<T>, BehaviorSubject<T> and AsyncSubject<T>. In this post we are going to explore some of the Static methods on the Observable type and some Extension methods to the IObservable<T> interface.
We have already seen on extension method in the last post which was the Overload to Subscribe which allowed us to pass just an Action<T> to be performed when OnNext was invoked instead of passing an IObserver<T>. The other overloads of Subscribe are also very useful
public static class ObservableExtensions
{
    public static IDisposable Subscribe<TSource>(this IObservable<TSource> source);
    public static IDisposable Subscribe<TSource>(this IObservable<TSource> source, Action<TSource> onNext);
    public static IDisposable Subscribe<TSource>(this IObservable<TSource> source, Action<TSource> onNext, Action<Exception> onError);
    public static IDisposable Subscribe<TSource>(this IObservable<TSource> source, Action<TSource> onNext, Action onCompleted);
    public static IDisposable Subscribe<TSource>(this IObservable<TSource> source, Action<TSource> onNext, Action<Exception> onError, Action onCompleted);
} 
Each of these overloads allows you to pass various combinations of delegates that you want executed for each publication event an IObservable<T> could raise. You will use these overloads a lot but there are plenty of other static methods that are very helpful on the static Observable class.

Creation and Generation of Observables

The Observable class has several methods for conveniently creating common types of Observables. The most simple are shown below with an example of how to use it and what you would have to write to emulate the method.
Observable.Empty<T>() This returns IObservable<T> that just publishes an OnCompleted.
var subject = new ReplaySubject<string>();
subject.OnCompleted();
//Or
var empty = Observable.Empty<string>();
Observable.Return<T>(T). This method will return an Observable that publishes the value of T supplied and then publish OnCompleted.
var subject = new ReplaySubject<string>();
subject.OnNext("Value");
subject.OnCompleted();
//Or
var obReturn = Observable.Return("Value");
Observable.Never<T>(). This method will return an IObservable<T> but will not publish any events.
var subject = new Subject<string>();
//Or
var never = Observable.Never<string>();
Observable.Throw<T>(Exception). This method just publishes the exception passed.
var subject = new ReplaySubject<string>();
subject.OnError(new Exception());
//Or
var throws = Observable.Throw<string>(new Exception());
Observable.Create<T>(Func<IObserver<T>,IDisposable>) is a little different to the above creation methods. The method signature itself is a bit nasty but once you get to know him, he is not that bad. Essentially this method allows you to specify a delegate that will be executed anytime a subscription is made. The IObserver<T> that made the subscription will be passed to your delegate so that you can call the OnNext/OnError/OnCompleted methods as you need. Your delegate is a Func that returns an IDisposable. This IDisposable will have it’s Dispose method called when the subscriber disposes from their subscription. We will cover unsubscribing/disposing in a future post.
The big benefit of using the Create method is that your method will be a non blocking call which is the whole point of the Rx framework. In this example we show how we might first return an Observable via standard blocking call, and then we show the correct way to return an Observable without blocking
private IObservable<string> BlockingMethod()
{
  var subject = new ReplaySubject<string>();
  subject.OnNext("a");
  subject.OnNext("b");
  subject.OnCompleted();
  Thread.Sleep(1000);
  return subject;
}
private IObservable<string> NonBlocking()
{
  return Observable.Create<string>(
      observable =>
      {
        observable.OnNext("a");
        observable.OnNext("b");
        observable.OnCompleted();
        Thread.Sleep(1000);
        return Disposable.Create(() => Console.WriteLine("Observer has unsubscribed"));
        //or can return an Action like
        //return () => Console.WriteLine("Observer has unsubscribed");
      });
}
I would imagine that you will use Observable.Create<T> method a lot once you start using Rx. An obvious application of this method is if you needed to perform a slow request to the network (web service or SQL call). There is also a Observable.Create<T>(Func<IObserver<T>,Action>) method that instead of invoking the Dispose method it will just invoke the action when the subscriber disposes their subscription.
Observable.Range(int, int) returns just a range of integers. The first integer is the initial value and the second is the number of values to yield. This example will write the values "10” through to “24” and then “Completed”.
var range = Observable.Range(10, 15);
range.Subscribe(Console.WriteLine, ()=>Console.WriteLine("Completed"));
Observable.Interval(TimeSpan) will publish incremental values from 0 every period that you supply.  This examples publishes values every 250milliseconds
var interval = Observable.Interval(TimeSpan.FromMilliseconds(250));
interval.Subscribe(Console.WriteLine);
In this example I use the Observable.Interval method to fake ticking prices
var rnd = new Random();
var lastPrice = 100.0;
var interval = Observable.Interval(TimeSpan.FromMilliseconds(250))
    .Select(i =>
      {
          var variation =
              rnd.NextDouble() - 0.5;
          lastPrice += variation;
          return lastPrice;
      });

interval.Subscribe(Console.WriteLine);
Observable.Start method allows you to turn a long running Func<T> or Action into an Observable. If you use the overload that takes an Action then the returned Observable will be of type IObservable<Unit>. Unit is analogous to void and is a Functional Programming construct. In this case it is just used to publish an acknowledgement that the Action is complete, however this is rather inconsequential as OnCompleted will be published straight after Unit anyway. If the overload you use is a Func<T> then when the Func returns its value it will be published and then OnCompleted. Below is an example of using both overloads.
static void StartAction()
{
    var start = Observable.Start(() =>
    {
        Console.Write("Working away");
        for (int i = 0; i < 10; i++)
        {
            Thread.Sleep(100);
            Console.Write(".");
        }
    });
    start.Subscribe(unit => Console.WriteLine("Unit published"), () => Console.WriteLine("Action completed"));
}
static void StartFunc()
{
    var start = Observable.Start(() =>
    {
        Console.Write("Working away");
        for (int i = 0; i < 10; i++)
        {
            Thread.Sleep(100);
            Console.Write(".");
        }
        return "Published value";
    });
    start.Subscribe(Console.WriteLine, () => Console.WriteLine("Action completed"));
}

ToObservable<T>(this IEnumerable<T>) is an extension method for IEnumerable<T> that will translate any IEnumerable<T> to an IObservable<T>. It is a simple an convenient method for switching from one paradigm to another.
var enumT =  new List<string>();
enumT.Add("a");
enumT.Add("b");
enumT.Add("c");
var fromEnum = enumT.ToObservable();

fromEnum.Subscribe(Console.WriteLine);
ToEnumerable<T>(this IObservable<T>) effectively is the opposite of the above method. It will take an IObservable<T> and yield its results as an IEnumerable. Again useful for switching paradigms.
Observable.Generate provides a more elaborate way to construct Observables than Range. In this example I keep it simple and have the Generate method construct me a sequence starting from 5 and incrementing by 3 while the value is less than 15. I also have the values published as strings to show that the type of the State and Result can be different.
var generated = Observable.Generate(5, i => i < 15, i => i + 3, i => i.ToString());
generated.Subscribe(Console.WriteLine, ()=>Console.WriteLine("Completed"));

Checking for existence

Theses method below all provide some sort of check to confirm the existence of a published value.
Any<T>(this IObservable<T>) is an extension method that will either check that the IObservable will publish any value at all, or if you use the overload that take a Func<T,bool> then it will check if any values satisfy your predicate. The interesting thing about Any is that it is non-blocking and so returns an IObservable<bool>. This will only yield one value.
var range = Observable.Range(10, 15);
range.Any().Subscribe(Console.WriteLine);
range.Any(i => i > 100).Subscribe(Console.WriteLine);
Observable.Empty<int>().Any().Subscribe(Console.WriteLine);
Contains<T>(this IObservable<T>, T) is an extension method like Any but will only accept a value of T not a predicate of T. Like Any the result is an IObservable<bool>.
var range = Observable.Range(10, 15);
range.Contains(15).Subscribe(Console.WriteLine);
Observable.Empty<int>().Contains(5).Subscribe(Console.WriteLine);

Filtering and Aggregating

These next few extension methods provide some sort of filter to the underlying stream. These are very similar to the extension methods available to IEnumerable<T>
First<T>(this IObservable<T>) simply returns the first value of an Observable or throws InvalidOperationException if the stream completes without yielding a value. This is a blocking call that wont return until a value or OnCompleted is published.
var range = Observable.Range(10, 15);
Console.WriteLine(range.First());
//InvalidOperationException("Sequence contains no elements.")
Console.WriteLine(Observable.Empty<int>().First());
There are further extension methods that probably don't warrant further explanation if I assume you are familiar with the existing IEnumerable<T> extension methods. These include
  • FirstOrDefault
  • Last
  • LastOrDefault
  • Single
  • Count
  • Min
  • Max
  • Sum
  • Where
  • GroupBy
While I think that most of these are self explanatory, what maybe worth reinforcing is that some of the scalar methods (returns bool or T and not IObservable) are blocking calls. This can be very handy for transforming your non blocking Rx code into sequential blocking code.
Aggregate & Scan extension method overloads allow you to create your own aggregations. Both take similar arguments for their overloads. One of Scan’s overload takes a Func<T,T,T> which is the more simple overload. Effectively given an accumulator and the current value you must return the next accumulator. The other overload can take another type for the accumulator and allows you to specify the seed value for accumulator.
Aggregate has the same overloads. The difference is that Scan will OnNext the result of every Accumulation, Aggregate will only OnNext the last value. Therefore Aggregate is like an AsyncSubject<T> version of Scan.
In this example Scan will publish the running sum of the input sequence, and Aggregate will only publish the final total.
public void Aggregate_can_be_used_to_create_your_own_aggregates()
{
  var interval = Observable.Interval(TimeSpan.FromMilliseconds(150))
                          .Take(10);  //Must complete for Aggregate to publish a value.
  var aggregate = interval.Aggregate(0L, (acc, i) => acc + i);
  WriteStreamToConsole(aggregate, "aggregate");
}

public void Scan_allows_custom_rolling_aggregation()
{
  var interval = Observable.Interval(TimeSpan.FromMilliseconds(150));
  var scan = interval.Scan(0L, (acc, i) => acc + i);
  WriteStreamToConsole(scan, "scan");
}
Take<T>(this IObservable<T>, int) will return you the first number of publications specified by the integer value provided. Take(1) is like First() expect that is returns an IObservable so is not blocking. This example yields just the values 10 and 11.
var range = Observable.Range(10, 15);
range.Take(2).Subscribe(Console.WriteLine);
Skip<T>(this IObservable<T>, int) will ignore the first number of publications specified by the integer value provided. So while Take(2) above returned 10 and 11, Skip(2) will ignore 10 and 11 and return the rest of the stream.
var range = Observable.Range(10, 15);
range.Skip(2).Subscribe(Console.WriteLine);
DistinctUntilChanged<T>(this IObservable<T>) formerly HoldUntilChanged, will ignore any value that is the same as the previous value. In this example the values a, b and c are only written to the console once each.
var subject = new Subject<string>();
subject.DistinctUntilChanged().Subscribe(Console.WriteLine);
subject.OnNext("a");
subject.OnNext("a");
subject.OnNext("a");
subject.OnNext("b");
subject.OnNext("b");
subject.OnNext("b");
subject.OnNext("b");
subject.OnNext("c");
subject.OnNext("c");
subject.OnNext("c");

Buffering and time shifting

In some scenarios your application or user may not be able to keep up with the amount of data that a stream is providing. Alternatively, your user or application may just not need the amount of data that is being published. Rx offer some great extension methods for allowing you to control the rate at which values are published to your stream.
Buffer allows you to buffer a range of values and then re-publish them as a list once the buffer is full. You can buffer a specified number of elements or buffer all the values per timespan. Buffer also offer more advanced overloads.
static void Main(string[] args)
{
    var range = Observable.Range(10, 15);
    range.Buffer(4).Subscribe(
        enumerable =>
           {
               Console.WriteLine("--Buffered values");
               enumerable.ToList().ForEach(Console.WriteLine);
           }, () => Console.WriteLine("Completed"));

    var interval = Observable.Interval(TimeSpan.FromMilliseconds(150));
    interval.Buffer(TimeSpan.FromSeconds(1)).Subscribe(
        enumerable =>
        {
            Console.WriteLine("--Buffered values");
            enumerable.ToList().ForEach(Console.WriteLine);
        }, () => Console.WriteLine("Completed"));

    Console.ReadKey();
}
Delay is an extension method overload that will time-shift the entire Observable by the TimeSpan specified, or until the DateTime. This means that if you get a stream with 5 values in the first second and then 10 values in the 3rd second, using Delay with a timespan of 2 seconds will yield results in second 3 and 5. If you use the DateTime overload the values will be yielded with in the first second of the DateTime and then in the 3rd second after the DateTime. This example will delay by 2 seconds and also in one minute.
var inOneMinute = DateTime.Now.AddMinutes(1);
var range = Observable.Range(10, 15);
range.Delay(TimeSpan.FromSeconds(2)).Subscribe(Console.WriteLine);
range.Delay(inOneMinute).Subscribe(Console.WriteLine);
Sample<T>(TimeSpan) will just take a one sample value for every specified TimeSpan and publish that value. Quite simple, quite nice.
var interval = Observable.Interval(TimeSpan.FromMilliseconds(150));
interval.Sample(TimeSpan.FromSeconds(1)).Subscribe(Console.WriteLine);
Throttle is similar to the sample and buffer methods in that it is a time based filter. It is more like sample in that it returns a single value (not an array of values like Buffer). Unlike sample, the window that the values are sampled over is not fixed, it slides every time the underlying stream publishes a value. The throttled stream will only produce a value when the underlying stream does not publish a value for the given period, in which case the last value will be published. When a value is published, the timespan will start and if another value is published the value will only be republished if the timespan has elapsed. Regardless of whether the value is published or not the timespan is reset. This means that if the underlying stream always produces values more frequently than the throttle period then Throttle will never publish any results.
The most obvious usage of this to me is holding back on sending a "Google suggest" request to the server as a user is still typing. We could throttle the keystrokes so that we only send up the search string when the user pauses for given period.
In this example all values are ignored as the publisher is producing values quicker than the throttle allows.
var interval = Observable.Interval(TimeSpan.FromMilliseconds(100));
interval.Throttle(TimeSpan.FromMilliseconds(200))
    .Subscribe(Console.WriteLine);
This example that varies the rate at which it produces value (emulating sporadic keystrokes), we will get some values written to the console where the delay is more than the throttle period.
var interval = Observable.Create<int>(
    o =>
     {
         for (int i = 0; i < 100; i++)
         {
             o.OnNext(i);
             Thread.Sleep(i++ % 10 < 5 ? 100 : 300);
         }
         return () => { };
     });
interval.Throttle(TimeSpan.FromMilliseconds(200))
    .Subscribe(Console.WriteLine);
In later posts, Buffer is covered in more detail and we expand on more complex time based operators. Also when you introduce concurrency (which these time based operators do) you need to start to understand the Scheduling semantics of Rx which are also covered in later posts.
Most of these methods are simple and pretty easy to understand. I found my problem with these was the vast number of them. I also appears the the API is less prone to changes in recent releases. If you want to get to know these methods a bit better check out these links, but most importantly – Use them. Have a play in LinqPad or just some console app.
Useful links:
Rx Home
Exploring the Major Interfaces in Rx – MSDN
IObservable<T> interface - MSDN
IObserver<T> interface - MSDN
Observer Design pattern - MSDN
ObservableExtensions class - MSDN
Observable class - MSDN
Observer class - MSDN
Qservable class - MSDN
Action<T> Delegate - MSDN
Func<T1, T2, TResult> - MSDN
The Rx Wiki site 101 Samples
Enumerate This
LinqPad
The full source code is now available either via svn at http://code.google.com/p/rx-samples/source/checkout or as a zip file.
Back to the contents page for Reactive Extensions for .NET Introduction
Back to the previous post; Part 1 - Introduction to Rx
Forward to next post; Part 3 - Lifetime management – Completing and Unsubscribing
Technorati Tags: ,,

Tuesday, May 18, 2010

Introduction to Rx Part 1 - Key types

STOP THE PRESS! This series has now been superseded by the online book www.IntroToRx.com. The new site/book offers far better explanations, samples and depth of content. I hope you enjoy!

Microsoft has released a new library for building “reactive” applications. It’s full name is Reactive Extensions for .NET but is generally referred to as just “Rx”. Essentially Rx is built upon the foundations of the Observer pattern. .NET already exposes some other ways to implement the Observer pattern such as Multicast delegates or Events. Multicast delegates (which Events are) however can be cumbersome to use, have a nasty interface and are difficult to compose and can not be queried. Rx looks to solve these problems.
Here I will introduce you to the building blocks and some basic types that make up Rx.

IObservable<T>

IObservable<T> is one of the 2 core interfaces for working with Rx. It is a simple interface with just a Subscribe method. Microsoft are so confident that this interface will be of use to you it has been included in the BCL as of version 4.0 of .NET. You should be able to think of anything that implements IObservable<T> as a Stream of T objects. So if a method returned an IObservable<Price> I could think of it as a stream of Prices.

IObserver<T>

IObserver<T> is the other one of the 2 core interfaces for working with Rx. It too has made it into the BCL as of .NET 4.0. Don’t worry if you are not on .NET 4.0 yet as the Rx team have included these 2 interfaces in a separate assembly for .NET 3.5 users. IObserver<T> is meant to be the “functional dual of IEnumerable<T>”. If you want to know what that last statement meant then enjoy the hours of videos on Channel9 where they discuss the mathematical purity of the types. For everyone else it means that where an IEnumerable<T> can effectively yield 3 things (the next value, an exception or the end of the sequence), so too can IObservable<T> via IObserver<T>’s 3 methods OnNext(T), OnError(Exception) and OnCompleted().
Interestingly, while you will be exposed to the IObservable<T> interface a lot if you work with Rx, I find I don't often need to concern myself with IObserver<T>. Another interesting thing I have found with Rx is that I never actually implement these interfaces myself, Rx provides all of the implementations I need out of the box. Lets have a look at the simple ones.

Subject<T>

If you were to create your own implementation of IObservable<T> you may find that you need to expose method to publish items to the subscribers, throw errors and notify when the stream is complete. Hmmm they all sound like the methods on the IObserver<T> interface. While it may seem odd to have one type implementing both interfaces, it does make life easy. This is what subjects can do for you.  Subject<T> is the most basic of the subjects. Effectively you can expose your Subject<T> behind a method that returns IObservable<T> but internally you can use the OnNext, OnError and OnCompleted methods to control the stream.
In this (awfully basic) example, I create a subject, subscribe to that subject and then publish to the stream.
using System;
using System.Collections.Generic;

namespace RxConsole
{
    class Program
    {
        static void Main(string[] args)
        {
            var subject = new Subject<string>();

            WriteStreamToConsole(subject);

            subject.OnNext("a");
            subject.OnNext("b");
            subject.OnNext("c");
            Console.ReadKey();
        }

        private static void WriteStreamToConsole(IObservable<string> stream)
        {
            stream.Subscribe(Console.WriteLine);
        }
    }
}
Note that the WriteStreamToConsole method takes an IObservable<string> as it only wants access to the subscribe method. Hang on, doesn’t the Subscribe method need an IObserver<string>? Surely Console.WriteLine does not match that interface. Well not it doesn’t but the Rx team supply me with an Extension Method to IObservable<T> that just takes an Action<T>. The action will be executed every time an item is published. There are other overloads to the Subscribe extension method that allows you to pass combinations of delegates to be invoke for OnNext, OnCompleted and OnError. This effectively means I don't need to implement IObserver<T>. Cool.
As you can see, Subject<T> could be quite useful for getting started in Rx programming. Subject<T> is a basic implementation however. There are 3 siblings to Subject<T> that offer subtly different implementations which can drastically change the way your program runs.

ReplaySubject<T>

ReplaySubject<T> will remember all publications to it so that any subscriptions that happen after publications have been made, will still get all of the publications. Consider this example where we have moved our first publication to occur before our subscription
static void Main(string[] args)
{
    var subject = new Subject<string>();

    subject.OnNext("a");
    WriteStreamToConsole(subject);
    
    subject.OnNext("b");
    subject.OnNext("c");
    Console.ReadKey();
}
The result of this would be that “b” and “c” would be written to the console, but “a” ignored. If we were to make the minor change to make subject a ReplaySubject<T> we would see all publications again.
static void Main(string[] args)
{
    var subject = new ReplaySubject<string>();

    subject.OnNext("a");
    WriteStreamToConsole(subject);
    
    subject.OnNext("b");
    subject.OnNext("c");
    Console.ReadKey();
}
This can be very handy for eliminating race conditions.

BehaviorSubject<T>

BehaviorSubject<T> is similar to ReplaySubject<T> except it only remembers the last publication. BehaviorSubject<T> also requires you to provide it a default value of T. This means that all subscribers will receive a value immediately (unless it is already completed).
In this example the value “a” is written to the console.
static void Main(string[] args)
{
    var subject = new BehaviorSubject<string>("a");
    WriteStreamToConsole(subject);
    Console.ReadKey();
}
In this example the value “b” is written to the console, but not “a”.
static void Main(string[] args)
{
    var subject = new BehaviorSubject<string>("a");
    subject.OnNext("b");
    WriteStreamToConsole(subject);
    Console.ReadKey();
}
In this example the values “b”, “c” & “d” are all written to the console, but again not “a”
static void Main(string[] args)
{
    var subject = new BehaviorSubject<string>("a");

    subject.OnNext("b");
    WriteStreamToConsole(subject);
    subject.OnNext("c");
    subject.OnNext("d");
    Console.ReadKey();
}
Finally in this example, no values will be published as the stream has completed. Nothing is written to the console.
static void Main(string[] args)
{
    var subject = new BehaviorSubject<string>("a");

    subject.OnNext("b");
    subject.OnNext("c");
    subject.OnCompleted();
    WriteStreamToConsole(subject);
    
    Console.ReadKey();
}

AsyncSubject<T>

AsyncSubject<T> is similar to the Replay and Behavior subjects, however it will only store the last value, and only publish it when the stream is completed.
In this example no values will be published so no values will be written to the console.
static void Main(string[] args)
{
    var subject = new AsyncSubject<string>();

    subject.OnNext("a");
    WriteStreamToConsole(subject);
    subject.OnNext("b");
    subject.OnNext("c");
    Console.ReadKey();
}
In this example we invoke the OnCompleted method and the value “c” is published and therefore written to the console.
static void Main(string[] args)
{
    var subject = new AsyncSubject<string>();

    subject.OnNext("a");
    WriteStreamToConsole(subject);
    subject.OnNext("b");
    subject.OnNext("c");
    subject.OnCompleted();
    Console.ReadKey();
}
So that is the very basics of Rx. With only that under you belt it may be hard to understand why Rx is a topic of interest. To follow on from this post I will discuss further fundamentals to Rx
  1. Extension methods
  2. Scheduling / Multithreading
  3. LINQ syntax
Once we have covered these it should allow you to really get Rx working for you to produce some tasty Reactive applications. Hopefully after we have covered these background topics we can knock up some Samples where Rx can really help you in your day to day coding.
The full source code is now available either via svn at http://code.google.com/p/rx-samples/source/checkout or as a zip file.
Related links :
IObservable<T> interface - MSDN
IObserver<T> interface - MSDN
Observer Design pattern - MSDN
Rx Home
Exploring the Major Interfaces in Rx – MSDN
ObservableExtensions class - MSDN
Using Rx Subjects - MSDN
System.Reactive.Subjects Namespace - MSDN
Subject<T> - MSDN
AsyncSubject<T> - MSDN
BehaviorSubject<T> - MSDN
ReplaySubject<T> - MSDN
Subject static class - MSDN
ISubject<TSource, TResult> - MSDN
ISubject<T> - MSDN
Back to the contents page for Reactive Extensions for .NET Introduction
Forward to next post; Part 2 - Static and extension methods
Technorati Tags: ,,

Wednesday, May 12, 2010

MergedDictionaries performance problems in WPF

I don’t normally like to blatantly plagiarise other people’s comments, but this seems to be a little know bug that sounds like it should be shared.

A colleague of mine emailed our internal tech list the following email

I strongly urge everyone working with WPF to use this or at least benchmark it in your own applications if you use ResourceDictionaries.MergedDictionaries. I consider this to be a huge problem in WPF. I’m not sure if it exists in Silverlight, but I would assume it does.

I was just debugging a very long render delay in some WPF code and I came across this little tidbit:

http://www.wpftutorial.net/MergedDictionaryPerformance.html

The quote of interest is: “Each time a control references a ResourceDictionary XAML creates a new instance of it. So if you have a custom control library with 30 controls in it and each control references a common dictionary you create 30 identical resource dictionaries!”

Normally that isn’t a huge problem, but when you consider the way that I personally (and have suggested to others) that they organize their resources in Prism projects it gets to be a **serious** problem. For example, let’s say we have this project structure:

/MyProject.Resources
       /Resources
                -Buttons.xaml
                -DataGrid.xaml
                -Global.xaml
                -Brushes.xaml
                -WindowChrome.xaml
                -Icons.xaml
 
/MyProject.Module1
      /Resources
                -Module1Resources.xaml  (References all Dictionaries in /MyProject.Resources/Resources/*)
      /Views
                -View1.xaml
                -View2.xaml
      
/MyProject.Module2
      /Resources
                -Module2Resources.xaml   (References all Dictionaries in /MyProject.Resources/Resources/*)
      /Views
                -View1.xaml
                -View2.xaml
      
/MyProject.Shell
      /Resources
                -ShellResources.xaml   
      /Views
                -MainShell.xaml

If in your views you reference the module-level ResourceDictionary (which helps for maintainability and modularity) then every time you create an instance of View1.xaml for example, you would have to parse all the ResourceDictionaries in /MyProject.Resources/Resources/* every time. This isn’t really a memory concern but it is a huge performance concern. There can potentially be thousands of lines of XAML code to parse and the time really does add up.

I recently switched all of the MergedDictionary references:

<ResourceDictionary>
    <ResourceDictionary.MergedDictionaries>
        <ResourceDictionary Source=”/SomeDictionary.xaml/>
    </ResourceDictionary.MergedDictionaries>
</ResourceDictionary>

To use the attached SharedResourceDictionary which shadows the Source property and keeps a global cache of all ResourceDictionaries parsed:

<ResourceDictionary>
    <ResourceDictionary.MergedDictionaries>
        <SharedResourceDictionary Source=”/SomeDictionary.xaml/>
    </ResourceDictionary.MergedDictionaries>
</ResourceDictionary>

And I saw a performance increase of almost two orders of magnitude … From almost 6000ms to 200ms. I’ve attached this code; I used the basic sample implementation in the link above so this is considered public information for client purposes.

Cheers,

Charlie

Thanks to Charlie Robbins (Lab49) for expanding on Christian’s blog post and for letting me re-print your email.