STOP THE PRESS! This series has now been superseded by the online book www.IntroToRx.com. The new site/book offers far better explanations, samples and depth of content. I hope you enjoy!
So far in the series of posts we have managed to avoid any explicit usage of threading or concurrency. There are some methods that we have covered that implicitly will be introducing some level of concurrency to perform their jobs (e.g. : Buffer, Delay, Sample etc all require a separate thread to do their magic). However most of this has been kindly abstracted away from us. This post will look at the beauty of the Rx API and its ability to effectively remove the need for WaitHandles, and any explicit calls to using Threads, the ThreadPool and the new shiny Task type.
A friend of mine once wisely stated that you should always understand at least one layer below what you are coding. At the time he was referring to networking protocols, but I think it is sage advice for all programming. On the current project I am working on there are some very savvy developers that are very comfortable working in a multithreaded environment. The project has client and server side threading problems that we have had to tackle. I believe the whole team would agree that it has bee amazing that amount of concurrency that Rx will handle for you in a declarative way. The code base is virtually free of WaitHandles, Monitor or lock usage, or any explicit creation of threads. This has evolved into this state over time as we have come to grips with the power of Rx and the end result is far cleaner code. However, having the experience on the team allowed us to find out ways we should and shouldn’t be using Rx which would have been just too hard for me to do alone.
Getting back to my friend’s comment about understanding the underlying subsystem, this is especially important when dealing with Rx and scheduling. Just because Rx abstracts some of this away, it does not mean that you cant still create problems for yourself if you are not careful. Before I scare you too much let’s look at some of the Scheduling features of Rx.
Scheduler.Dispatcher will ensure that the actions are performed on the Dispatcher, which is obviously useful for Silverlight and WPF applications. You can imagine that the implementation for this would just delegate any calls to ISchedule(Action) straight to Dispatcher.BeginInvoke(Action)
Scheduler.NewThread will schedule all actions onto a new thread.
Scheduler.ThreadPool will schedule all actions onto the Thread Pool.
Scheduler.TaskPool (which is only available to Silverlight 4 and .NET 4.0) will schedule actions onto the TaskPool.
Scheduler.Immediate will ensure the action is not scheduled but is executed immediately.
Scheduler.CurrentThread just ensures that the actions are performed on the thread that made the original call. This is different to Immediate, as CurrentThread will queue the action to be performed. Note the difference in the output of the following code. One method passes Scheduler.Immediate, the other passes Scheduler.CurrentThread.
So far this post has been a bit doom and gloom about scheduling and the problems you could face, that is not the intent. I just wanted to make it obvious that Rx was not going to solve the age old concurrency problems, but it will make it easier to get it right if you follow this simple rule.
So my challenge to the readers is to add to the comments:
Back to the contents page for Reactive Extensions for .NET Introduction
Back to the previous post; Part 5 - Combining multiple IObservable streams
Forward to next post; Part 7 - Hot and Cold observables
So far in the series of posts we have managed to avoid any explicit usage of threading or concurrency. There are some methods that we have covered that implicitly will be introducing some level of concurrency to perform their jobs (e.g. : Buffer, Delay, Sample etc all require a separate thread to do their magic). However most of this has been kindly abstracted away from us. This post will look at the beauty of the Rx API and its ability to effectively remove the need for WaitHandles, and any explicit calls to using Threads, the ThreadPool and the new shiny Task type.
A friend of mine once wisely stated that you should always understand at least one layer below what you are coding. At the time he was referring to networking protocols, but I think it is sage advice for all programming. On the current project I am working on there are some very savvy developers that are very comfortable working in a multithreaded environment. The project has client and server side threading problems that we have had to tackle. I believe the whole team would agree that it has bee amazing that amount of concurrency that Rx will handle for you in a declarative way. The code base is virtually free of WaitHandles, Monitor or lock usage, or any explicit creation of threads. This has evolved into this state over time as we have come to grips with the power of Rx and the end result is far cleaner code. However, having the experience on the team allowed us to find out ways we should and shouldn’t be using Rx which would have been just too hard for me to do alone.
Getting back to my friend’s comment about understanding the underlying subsystem, this is especially important when dealing with Rx and scheduling. Just because Rx abstracts some of this away, it does not mean that you cant still create problems for yourself if you are not careful. Before I scare you too much let’s look at some of the Scheduling features of Rx.
Scheduling
In the Rx world, you can control the scheduling of two things- The invocation of the subscription
- The publishing of notifications
public static class Observable { public static IObservable<TSource> ObserveOn<TSource>( this IObservable<TSource> source, IScheduler scheduler) {...} public static IObservable<TSource> SubscribeOn<TSource>( this IObservable<TSource> source, IScheduler scheduler) {...} } public interface IScheduler { IDisposable Schedule(Action action); IDisposable Schedule(Action action, TimeSpan dueTime); DateTimeOffset Now { get; } }The IScheduler interface is of less interest to me than the types that implement the interface. Depending on your platform* (Silverlight3, Silverlight4, .Net 3.5, .Net 4.0) you will be exposed appropriate implementations via a static class Scheduler. These are the static properties that you can find on the Scheduler type that expose different schedulers.
Scheduler.Dispatcher will ensure that the actions are performed on the Dispatcher, which is obviously useful for Silverlight and WPF applications. You can imagine that the implementation for this would just delegate any calls to ISchedule(Action) straight to Dispatcher.BeginInvoke(Action)
Scheduler.NewThread will schedule all actions onto a new thread.
Scheduler.ThreadPool will schedule all actions onto the Thread Pool.
Scheduler.TaskPool (which is only available to Silverlight 4 and .NET 4.0) will schedule actions onto the TaskPool.
Scheduler.Immediate will ensure the action is not scheduled but is executed immediately.
Scheduler.CurrentThread just ensures that the actions are performed on the thread that made the original call. This is different to Immediate, as CurrentThread will queue the action to be performed. Note the difference in the output of the following code. One method passes Scheduler.Immediate, the other passes Scheduler.CurrentThread.
private static void ScheduleTasks(IScheduler scheduler) { Action leafAction = () => Console.WriteLine("leafAction."); Action innerAction = () => { Console.WriteLine("innerAction start."); scheduler.Schedule(leafAction); Console.WriteLine("innerAction end."); }; Action outerAction = () => { Console.WriteLine("outer start."); scheduler.Schedule(innerAction); Console.WriteLine("outer end."); }; scheduler.Schedule(outerAction); } public void CurrentThreadExample() { ScheduleTasks(Scheduler.CurrentThread); Console.ReadLine(); /*Output: * outer start. * outer end. * innerAction start. * innerAction end. * leafAction. */ } public void ImmediateExample() { ScheduleTasks(Scheduler.Immediate); Console.ReadLine(); /*Output: * outer start. * innerAction start. * leafAction. * innerAction end. * outer end. */ }*Sorry Rx for JavaScript, I have not even opened the box on you and don’t know anything about scheduling in JavaScript.
Examples
So they are each of our Schedulers, lets see some of them in use. The think I want to point out here is that the first few times I used these overloads I had them confused as to what they actually did. You should use the SubscribeOn method to describe how you want any warm up and background processing code to be scheduled. ObserveOn method is used to describe where you want your notification scheduled to. So for example, if you had a WPF application that used Rx to populate and ObservableCollection<T> then you would almost certainly want to use SubscribeOn with one of the Threaded schedulers (NewThread, ThreadPool or maybe TaskPool) and then you would have to use the Dispatcher scheduler to update your collection.public void LoadCustomers() { _customerService.GetCustomers() .SubscribeOn(Scheduler.NewThread) .ObserveOn(Scheduler.Dispatcher) .Subscribe(Customers.Add); }So all of the schedulers just offer a nice abstraction to us to utilise the various ways we can write concurrent code. Besides saving me from having to write the tedious code to get code onto a new thread or thread pool it also makes Rx threading easy. Oh Rx, you thought I had forgotten. I didn’t think that any of the schedulers except Current & Immediate warranted a further explanation but, I do think it is worth pointing out some of the “fun” threading problems you can face even though the scheduling has been abstracted away from you.
Deadlocks
When writing the current application my team is working on we found out the hard way that Rx code can most certainly deadlock. When you consider that some calls (like .First() ) are blocking, and that we can schedule work to be done in the future, it becomes obvious that race condition can apply. This example is the most simple deadlock I could think of. It is fairly silly but it will get the ball rolling.var stream = new Subject<int>(); Console.WriteLine("Next line should deadlock the system."); var value = stream.First(); stream.OnNext(1); Console.WriteLine("I can never execute....");Hopefully we wont ever write code that silly, and if we did our tests would give us fairly quick feed back that things were wrong. What lets deadlocks slip into the system is when they manifest themselves at integration points. This example may be a little harder to find but is only small step away from the silly 1st example. Here we block in the constructor on a UI element which will always be created on the dispatcher. The blocking call is waiting for an event, that can only be raised from the dispatcher – deadlock.
public Window1() { InitializeComponent(); DataContext = this; Value = "Default value"; //Deadlock! We need the dispatcher to continue // to allow me to click the button to produce a value. Value = _subject.First(); //This will give same result but will not be blocking(deadlocking). _subject.Take(1).Subscribe(value => Value = value); } private void MyButton_Click(object sender, RoutedEventArgs e) { _subject.OnNext("New Value"); } public string Value { get { return _value; } set { _value = value; var handler = PropertyChanged; if (handler != null) handler(this, new PropertyChangedEventArgs("Value")); } }In this example we start seeing things that can become more sinister. This example has a Button that the click command will try to get the first value from an Observable exposed via an interface.
public partial class Window1 : INotifyPropertyChanged { private readonly IMyService _service = new MyService(); //Imagine DI here. private int _value2; public Window1() { InitializeComponent(); DataContext = this; } public int Value2 { get { return _value2; } set { _value2 = value; var handler = PropertyChanged; if (handler != null) handler(this, new PropertyChangedEventArgs("Value2")); } } #region INotifyPropertyChanged Members public event PropertyChangedEventHandler PropertyChanged; #endregion private void MyButton2_Click(object sender, RoutedEventArgs e) { Value2 = _service.GetTemperature().First(); } }There is only a small problem here in that we block on the Dispatcher thread (.First() is a blocking call), however this manifest's itself into a deadlock if the service code is written incorrectly.
class MyService : IMyService { public IObservable<int> GetTemperature() { return Observable.Create<int>( o => { o.OnNext(27); o.OnNext(26); o.OnNext(24); return () => { }; }) .SubscribeOnDispatcher(); } }This odd implementation with explicit scheduling will cause the 3 OnNext calls to be scheduled once the .First() call has finished, which is waiting for an OnNext to be called – Deadlock.
So far this post has been a bit doom and gloom about scheduling and the problems you could face, that is not the intent. I just wanted to make it obvious that Rx was not going to solve the age old concurrency problems, but it will make it easier to get it right if you follow this simple rule.
- Only the final subscriber should be setting the scheduling.
- Avoid using .First() –Ed: that is for you Olivier. We will cal this rule 1b
So my challenge to the readers is to add to the comments:
- Any other scheduling rules (2 seems quite small, and I was only going to have 1)
- Post some nasty Rx race condition code
- What rules do you have for Subscribing on the background thread? Which Scheduler should I use and when i.e. NewThread, ThreadPool & TaskPool. – and I come full circle about understanding one layer below that to which you are working.
- This channel9 video has more interesting stuff including testing with schedulers http://channel9.msdn.com/shows/Going+Deep/Wes-Dyer-and-Jeffrey-Van-Gogh-Inside-Rx-Virtual-Time/
Back to the contents page for Reactive Extensions for .NET Introduction
Back to the previous post; Part 5 - Combining multiple IObservable streams
Forward to next post; Part 7 - Hot and Cold observables