Tuesday, November 2, 2010

Rx design guidelines

The Rx team have released a pdf specifiying their Design Guidelines to use when coding with the Reactive Extentions. The original post is here http://blogs.msdn.com/b/rxteam/archive/2010/10/28/rx-design-guidelines.aspx The PDF is here http://go.microsoft.com/fwlink/?LinkID=205219 Great stuff. Next; the FxCop rules for static analysis?

More guidance at IntroToRx.com in the Usage Guidelines appendix

Tuesday, October 26, 2010

Rx Part 8 - Testing Rx

STOP THE PRESS! This series has now been superseded by the online book www.IntroToRx.com. The new site/book offers far better explanations, samples and depth of content. I hope you enjoy!

Having reviewed the scheduling available to us in Rx and now that we are aware of Hot and Cold observables we have almost enough skills in our tool belt to start using Rx in anger. However many developers wouldn’t dream of starting coding without first being able to write tests to prove their code is in fact satisfying their requirements and providing them with that safety net against regression. Rx poses some interesting problems to our Test Driven community
  • Scheduling and therefore Threading are generally avoided in test scenarios as they can introduce race conditions which may lead to non-deterministic tests.
  • Tests should run as fast as possible.
  • Rx is a new technology/library so naturally as we master it, we will refactor our code. We want to use to tests to ensure our refactoring have not altered the internal behaviour of our code base.
So we want to test our code but don't want to introduce false-negatives, false-positives or non-deterministic tests. Also if we look at the Rx library there are plenty of methods that involve Scheduling so it is hard to ignore. This Linq query shows us that there are at least 32 extension methods that accept an IScheduler as a parameter
var query = from method in typeof (Observable).GetMethods()
        from parameter in method.GetParameters()
        where typeof (IScheduler).IsAssignableFrom(parameter.ParameterType)
        group method by method.Name into m
        orderby m.Key
        select m.Key;
            
query.Run(Console.WriteLine);
/* 
BufferWithTime, Catch, Concat, Delay, Empty, Generate, GenerateWithTime, Interval
Merge, ObserveOn, OnErrorResumeNext, Prune, Publish, Range, Repeat, Replay
Retry, Return, Sample, Start, StartWith, Subscribe, SubscribeOn, Take, Throttle
Throw, TimeInterval, Timeout, Timer, Timestamp, ToAsync, ToObservable
*/
There are some methods that we already will be familiar with such as ObserveOn and SubscribeOn. Then there are others that will optionally take an IScheduler in one of the method overloads. TDD/TestFirst coders will want to opt for the overload that takes the IScheduler so that we can have some control over scheduling in our tests.
In this example we create a stream that publishes values every second for 5 seconds.
var interval = Observable
    .Interval(TimeSpan.FromSeconds(1))
    .Take(5);
If we to write a test that ensured that we received 5 values and they were each 1 second apart it would take 5 seconds. That would be no good, I want hundreds if not thousands of tests to run in 5 seconds. A more common time related example that would need tests is setting a timeout.
var stream = Observable.Never<int>();

var exceptionThrown = false;
stream.Timeout(TimeSpan.FromMinutes(1))
    .Run(
        i => Console.WriteLine("This will never run."),
        ex => exceptionThrown = true);
Assert.IsTrue(exceptionThrown);
This test would take 1 minute to run. However if we did some test first code in this style, where we added the timeout after the test, running the test initially to fail (Red-Green-Refactor) would never complete. Hmmmm…..

TestScheduler

To our rescue comes the TestScheduler. This is a recent addition to the Rx library that takes the concept of a virtual scheduler to allow us emulate and control time. Cool.
The concept of a virtual scheduler can sort of be thought of a queue of actions to be executed that are each marked with the point in time they should be executed. Where actions are attempted to be scheduled at the same “time” the second collision will be scheduled or queued after the first action but marked as with the same point in “time” . When we use the TestScheduler we can either “drain the queue” by calling run which will execute all scheduled actions, or we can specify to run all tasks up to a point in time.
In this example we schedule a task on to the queue to be run immediately by using the simple overload. We then execute everything scheduled for the first tick (ie the first action).
var scheduler = new TestScheduler();
var wasExecuted = false;
scheduler.Schedule(() => wasExecuted = true);
Assert.IsFalse(wasExecuted);
scheduler.RunTo(1);         //execute 1 tick of queued actions
Assert.IsTrue(wasExecuted);
The TestScheduler type is actually an implementation of the abstract VirtualScheduler<TAbsolute, TRelative> where the TestSchuduler specifies long for both TAbsolute and TRelative. The net result is that the TestScheduler interface looks something like this
public class TestScheduler
{
    public TestScheduler();

    // Methods
    public long FromTimeSpan(TimeSpan timeSpan);
    public long Increment(long absolute, long relative);
    public void Run();
    public void RunTo(long time);
    public IDisposable Schedule(Action action);
    public IDisposable Schedule(Action action, TimeSpan dueTime);
    public IDisposable Schedule(Action action, long dueTime);
    public void Sleep(long ticks);
    public DateTimeOffset ToDateTimeOffset(long absolute);

    // Properties
    public DateTimeOffset Now { get; }
    public long Ticks { get; }
}
We should already be familiar with what the Schedule methods would do, the other method of interest for this post are the Run() and RunTo(long) methods. Run() will just execute all the actions that have been scheduled. RunTo(long) will execute all the actions that have been scheduled up to the time specified by the long value which represents ticks in this case. Having a quick look into the implementations for Schedule, RunTo and Run give us the insight we need to really understand the virtual schedulers.
public IDisposable Schedule(Action action)
{
    return this.Schedule(action, TimeSpan.Zero);
}
public IDisposable Schedule(Action action, TimeSpan dueTime)
{
    return this.Schedule(action, this.FromTimeSpan(dueTime));
}
public IDisposable Schedule(Action action, TRelative dueTime)
{
    BooleanDisposable disposable = new BooleanDisposable();
    TAbsolute local = this.Increment(this.Ticks, dueTime);
    Action action2 = delegate {
        if (!disposable.IsDisposed)
        {
            action();
        }
    };
    ScheduledItem<TAbsolute> item = new ScheduledItem<TAbsolute>(action2, local);
    this.queue.Enqueue(item);
    return disposable;
}

public void RunTo(TAbsolute time)
{
    while ((this.queue.Count > 0) && (this.queue.Peek().DueTime.CompareTo(time) <= 0))
    {
        ScheduledItem<TAbsolute> item = this.queue.Dequeue();
        this.Ticks = item.DueTime;
        item.Action();
    }
}

public void Run()
{
    while (this.queue.Count > 0)
    {
        ScheduledItem<TAbsolute> item = this.queue.Dequeue();
        this.Ticks = item.DueTime;
        item.Action();
    }
}
Obviously this is the internal implementation and would be subject to change, however as the documentation currently is quite weak for the VirtualScheduler and TestScheduler I think this use of reflection is helpful. This example should clear up what happens when items are scheduled at the same time.
var scheduler = new TestScheduler();
long dueTime = 4L;
scheduler.Schedule(() => Console.WriteLine("1"), dueTime);
scheduler.Schedule(() => Console.WriteLine("2"), dueTime);
scheduler.Schedule(() => Console.WriteLine("3"), dueTime+1);
scheduler.Schedule(() => Console.WriteLine("4"), dueTime+1);
Console.WriteLine("RunTo(dueTime)");
scheduler.RunTo(dueTime); 
Console.WriteLine("Run()");
scheduler.Run();
/* Output:
RunTo(dueTime)
1
2
Run()
3
4
*/

Testing Rx code

Now that we have learnt a little bit about the TestScheduler, lets look at how we could use it to get our 2 initial code snippets (Interval and TimeOut) to execute as fast as possible but still maintaining the semantics of time. In this example we generate our 5 values one second apart but pass in our TestScheduler to the Interval method.
[TestMethod]
public void Testing_with_test_scheduler()
{
    var scheduler = new TestScheduler();
    var interval = Observable
        .Interval(TimeSpan.FromSeconds(1), scheduler)
        .Take(5);

    bool isComplete = false;
    interval.Subscribe(Console.WriteLine, () => isComplete = true);

    scheduler.Run();

    Assert.IsTrue(isComplete); //Executes in less than 0.01s "on my machine"
}
While this is mildly interesting, what I think is more important is how we would test a real piece of code. Imagine if you will a Presenter that subscribes to a stream of prices. As prices are published it adds them to a ViewModel. Assuming this is a WPF or Silverlight implementation we take the liberty of enforcing that the subscription be done on the ThreadPool and the observing is executed on the Dispatcher.
public class MyPresenter
{
...
    public void Show(string symbol)
    {
        _myService.PriceStream(symbol)
                                    .SubscribeOn(Scheduler.ThreadPool)
                                    .ObserveOn(Scheduler.Dispatcher)
                                    .Subscribe(price=>_viewModel.Prices.Add(price));
    }
...
}
While this snippet of code may do what we want it to do, it will be hard to test as it is accessing the schedulers via static properties. To help my testing, I have created my own interface that exposes the same IScheduler implementations that the Scheduler type does.
public interface ISchedulerService
{
    IScheduler CurrentThread { get; }
    IScheduler Dispatcher { get; }
    IScheduler Immediate { get; }
    IScheduler NewThread { get; }
    IScheduler ThreadPool { get; }
    //IScheduler TaskPool { get; }
}
public sealed class SchedulerService : ISchedulerService
{
    public IScheduler CurrentThread { get { return Scheduler.CurrentThread; } }

    public IScheduler Dispatcher { get { return Scheduler.Dispatcher; } }

    public IScheduler Immediate { get { return Scheduler.Immediate; } }

    public IScheduler NewThread { get { return Scheduler.NewThread; } }

    public IScheduler ThreadPool { get { return Scheduler.ThreadPool; } }

    //public IScheduler TaskPool { get { return Scheduler.TaskPool; } }
}
This now allows me to inject an ISchedulerService which gives me a seam to help my testing. I can now write some tests for my Presenter.
[TestInitialize]
public void SetUp()
{
    _myServiceMock = new Mock<IMyService>();
    _viewModelMock = new Mock<IViewModel>();
    _schedulerService = new TestSchedulers();

    var prices = new ObservableCollection<decimal>();
    _viewModelMock.SetupGet(vm => vm.Prices).Returns(prices);
    _viewModelMock.SetupProperty(vm => vm.IsConnected);
}

[TestMethod]
public void Should_pass_symbol_to_MyService_PriceStream()
{
    var expected = "SomeSymbol";
    var priceStream = new Subject<decimal>();
    _myServiceMock.Setup(svc => svc.PriceStream(It.Is<string>(symbol=>symbol==expected))).Returns(priceStream);

    var sut = new MyPresenter(_myServiceMock.Object, _viewModelMock.Object, _schedulerService);
    sut.Show(expected);

    _myServiceMock.Verify();
}

[TestMethod]
public void Should_add_to_VM_Prices_when_MyService_publishes_price()
{
    decimal expected = 1.23m;
    var priceStream = new Subject<decimal>();
    _myServiceMock.Setup(svc => svc.PriceStream(It.IsAny<string>())).Returns(priceStream);

    var sut = new MyPresenter(_myServiceMock.Object, _viewModelMock.Object, _schedulerService);
    sut.Show("SomeSymbol");
    _schedulerService.ThreadPool.Schedule(() => priceStream.OnNext(expected));  //Schedule the OnNext
    _schedulerService.ThreadPool.RunTo(1);  //Execute the OnNext action
    _schedulerService.Dispatcher.RunTo(1);  //Execute the OnNext Handler (ie adding to the Prices collection)

    Assert.AreEqual(1, _viewModelMock.Object.Prices.Count);
    Assert.AreEqual(expected, _viewModelMock.Object.Prices.First());
}
These two tests show first a simple expectation that a string value passed to my Show(string) method will be passed to the underlying service. This is not at all relevant to Rx. The next test shows the usage of my implementation of the ISchedulerService specific for testing. It exposes all of the IScheduler properties as instances of TestSchedulers. This now allows me to inject TestSchedulers for testing which in-turn allows me to control the rate at which things are scheduled.
For those of you new to Moq, some of the syntax my be a little bit confusing. Where you see a Setup(..) or SetupGet(..) method call there is just a little bit of Expression magic that tells my mocks to return correct thing when called. The .Object property hanging off my mocks are the dynamically generated implementations of the interfaces. This next test I hope you find the most interesting. Here we really get the full value out of our TestScheduler, by testing timeouts.
[TestMethod]
public void Should_timeout_if_no_prices_for_10_seconds()
{
    var timeoutPeriod = TimeSpan.FromSeconds(10);
    var priceStream = Observable.Never<decimal>();
    _myServiceMock.Setup(svc => svc.PriceStream(It.IsAny<string>())).Returns(priceStream);

    var sut = new MyPresenter(_myServiceMock.Object, _viewModelMock.Object, _schedulerService);
    sut.Show("SomeSymbol");

    _schedulerService.ThreadPool.RunTo(timeoutPeriod.Ticks - 1);
    Assert.IsTrue(_viewModelMock.Object.IsConnected);

    _schedulerService.ThreadPool.RunTo(timeoutPeriod.Ticks);
    Assert.IsFalse(_viewModelMock.Object.IsConnected);
}
The key points to note are:
  1. We return an Observable.Never for our price stream so that no prices are ever pushed and no OnComplete is published either.
  2. The _schedulerService is a test fake that returns TestSchedulers for all of it’s schedulers
  3. We run the ThreadPool TestScheduler up until 1 tick away from our timeout period and ensure that we have not timed out
  4. We run the ThreadPool TestScheduler up to our timeout period and then ensure that we have timed out.
  5. The test is sub second even though we are testing for a 10 second timeout!
Generally I only want to have one assertion in each of my tests, but for this example I think it elegantly tests that the Timeout is for 10seconds and not longer or shorter. If I was to remove the first assertion my test would only prove that the timeout was no greater than 10 seconds but could reasonably be set to say 3 seconds. The implementation for this simple presenter is as follows
public class MyPresenter
{
    private readonly IMyService _myService;
    private readonly IViewModel _viewModel;
    private readonly ISchedulerService _schedulerService;

    public MyPresenter(IMyService myService, IViewModel viewModel, ISchedulerService schedulerService)
    {
        _myService = myService;
        _schedulerService = schedulerService;
        _viewModel = viewModel;
    }

    public void Show(string symbol)
    {
        _myService.PriceStream(symbol)
                    .SubscribeOn(_schedulerService.ThreadPool)
                    .ObserveOn(_schedulerService.Dispatcher)
                    .Timeout(TimeSpan.FromSeconds(10), _schedulerService.ThreadPool)
                    .Subscribe(OnPriceUpdate, ex =>
                                                    {
                                                        if (ex is TimeoutException)
                                                            _viewModel.IsConnected = false;
                                                    });
        _viewModel.IsConnected = true;
    }

    private void OnPriceUpdate(decimal price)
    {
        _viewModel.Prices.Add(price);
    }
}
I hope this post on testing helps you bring you skills that you have been developing in Rx to a level where you may be comfortable considering them for production use.
For your reference here are the two test versions of the ISchedulerService I find useful. One will just schedule everything with the ImmediateScheduler and the other uses the TestSchedulers for fine grained control.
public sealed class TestSchedulers : ISchedulerService
{
    private readonly TestScheduler _currentThread = new TestScheduler();
    private readonly TestScheduler _dispatcher = new TestScheduler();
    private readonly TestScheduler _immediate = new TestScheduler();
    private readonly TestScheduler _newThread = new TestScheduler();
    private readonly TestScheduler _threadPool = new TestScheduler();

    #region Implementation of ISchedulerService
    IScheduler ISchedulerService.CurrentThread { get { return _currentThread; } }

    IScheduler ISchedulerService.Dispatcher { get { return _dispatcher; } }

    IScheduler ISchedulerService.Immediate { get { return _immediate; } }

    IScheduler ISchedulerService.NewThread { get { return _newThread; } }

    IScheduler ISchedulerService.ThreadPool { get { return _threadPool; } }
    #endregion

    public TestScheduler CurrentThread { get { return _currentThread; } }

    public TestScheduler Dispatcher { get { return _dispatcher; } }

    public TestScheduler Immediate { get { return _immediate; } }

    public TestScheduler NewThread { get { return _newThread; } }

    public TestScheduler ThreadPool { get { return _threadPool; } }
}

public sealed class ImmediateSchedulers : ISchedulerService
{
    public IScheduler CurrentThread { get { return Scheduler.Immediate; } }

    public IScheduler Dispatcher { get { return Scheduler.Immediate; } }

    public IScheduler Immediate { get { return Scheduler.Immediate; } }

    public IScheduler NewThread { get { return Scheduler.Immediate; } }

    public IScheduler ThreadPool { get { return Scheduler.Immediate; } }
}
The full source code is now available either via svn at http://code.google.com/p/rx-samples/source/checkout or as a zip file.
Back to the contents page for Reactive Extensions for .NET Introduction
Back to the previous post; Rx Part 7 – Hot and Cold Observables
Forward to next post; Part 9 – Join, Window, Buffer and Group Join
Technorati Tags: ,,

Thursday, August 19, 2010

Rx Part 7 – Hot and Cold Observables

STOP THE PRESS! This series has now been superseded by the online book www.IntroToRx.com. The new site/book offers far better explanations, samples and depth of content. I hope you enjoy!

In this post we will look how to describe and handle 2 styles of observable streams
  1. Streams that are passive and start publishing on request,
  2. Streams that are active and publish regardless of subscriptions.
In this sense passive streams are called Cold and active are described as being Hot. You can draw some similarities between implementations of the IObservable<T> interface and implementations of the IEnumerable<T> interface with regards to Hot and Cold. With IEnumerable<T> you could have an “On demand” collection via the yield return syntax, or you could have an eager evaluation by populating a List<T>, for example, and returning that (as per the example below)
Do(LazyEvaluation());
Do(EagerEvaluation());


private void Do(IEnumerable<int> list)
{
  foreach (var i in list)
  {
    Console.WriteLine("Read out first value of {0}", i);
    break;
  }
}

public IEnumerable<int> LazyEvaluation()
{
  Console.WriteLine("About to return 1");
  yield return 1;
  Console.WriteLine("About to return 2");//Never called in this example
  yield return 2;
}

public IEnumerable<int> EagerEvaluation()
{
  var result = new List<int>();
  Console.WriteLine("About to return 1");
  result.Add(1);
  Console.WriteLine("About to return 2");//executed but not used.
  result.Add(2);
  return result;
}
Implementations of IObservable<T> can exhibit similar variations in style.
Examples of Hot observables that could publish regardless of if there are any subscribers would be:
  • Mouse movements
  • Timer events
  • broadcasts like ESB channels or UDP network packets.
  • price ticks from a trading exchange
Some examples of Cold observables would be:
  • subscription to a queue
  • when Rx is used for an asynchronous request
  • on demand streams
In this post we will look at 3 scenarios in which cold, hot and both cold & hot are implemented.

Cold Observables

In this first example we have a requirement to fetch a list of products from a service. In our implementation we choose to return an IObservable<string> and as we get the results we publish them until we have the full list and then we publish an OnComplete. This is a pretty simple example.
private static IObservable<string> GetProducts()
{
  return Observable.CreateWithDisposable<string>(
    o =>
    {
      using(var conn = new SqlConnection(@"Data Source=.\SQLSERVER;Initial Catalog=AdventureWorksLT2008;Integrated Security=SSPI;"))
      using (var cmd = new SqlCommand("Select Name FROM SalesLT.ProductModel", conn))
      {
        conn.Open();
        SqlDataReader reader = cmd.ExecuteReader(CommandBehavior.CloseConnection);
        while (reader.Read())
        {
          o.OnNext(reader.GetString(0));
        }
        o.OnCompleted();
        return Disposable.Create(()=>Console.WriteLine("--Disposed--"));
      }
    });
}
This style of API would allow for a non blocking call to fetch the list of products and would inform the consumer of when the list was complete. This is fairly common stuff, but note that every time this is called, the database will be accessed again.
In the example above I use Disposable.Create factory method. This factory method just creates an implementation of IDisposable that executes a given action when disposed. This is perfect for doing a Console.WriteLine once the subscription has been disposed.
In this example below, we have a consumer of our above code, but it explicitly only wants up to 3 values (the full set has 128 values). This code illustrates that the Take(3) expression will restrict what the consumer receives but GetProducts() method will still publish all of the values.
public void ColdSample()
{
  var productStream = GetProducts().Take(3);
  productStream.Subscribe(Console.WriteLine);
  Console.ReadLine();
}

Hot Observables

Trying to come up with an example for Hot Observables has been a real pain. I have started off with examples with some sort of context (streaming stock prices or weather information) but this all seemed to detract from the real working of the code. So I think it is best to step through this slowly with a contrived demo and build it up to a piece of code you might actually want to use.
Let us start with subscribing to an Interval. In the example below we subscribe to the same Observable that is created via the Interval extension method. The delay between the two subscriptions should demonstrate that while they are subscribed to the same observable instance, it is not the same logical stream of data.
public void SimpleColdSample()
{
  var period = TimeSpan.FromSeconds(1);
  var observable = Observable.Interval(period);
  observable.Subscribe(i => Console.WriteLine("first subscription : {0}", i));
  Thread.Sleep(period);
  observable.Subscribe(i => Console.WriteLine("second subscription : {0}", i));
  Console.ReadKey();
  /* Ouput:
   first subscription : 0
   first subscription : 1
   second subscription : 0
   first subscription : 2
   second subscription : 1
   first subscription : 3
   second subscription : 2   
   */
}
Publish and Connect
If I want to be able to share the actual stream of data and not just the instance of the observable, I can use the Publish() extension method. This will return an IConnectableObservable<T>, which extends IObservable<T> by adding the single Connect() method. By using the Publish() then the Connect() method, we can get this functionality.
public void SimpleConnectSample()
{
  var period = TimeSpan.FromSeconds(1);
  var observable = Observable.Interval(period).Publish();
  observable.Connect();
  observable.Subscribe(i => Console.WriteLine("first subscription : {0}", i));
  Thread.Sleep(period);
  observable.Subscribe(i => Console.WriteLine("second subscription : {0}", i));
  Console.ReadKey();
  /* Ouput:
   first subscription : 0
   first subscription : 1
   second subscription : 1
   first subscription : 2
   second subscription : 2   
   */
}
In the example above the observable variable is an IConnectableObservable<T>, and by calling Connect() it will subscribe to the underlying (the Observable.Interval). In this case we are quick enough to subscribe before the first item is published but only on the first subscription. The second subscription subscribes late and misses the first publication. We could move the invocation of the Connect() method until after each of the subscriptions have been made so that even with the Thread.Sleep we wont really subscribe to the underlying until after both subscriptions are made. This would be done as follows:
public void SimpleConnectSample()
{
  var period = TimeSpan.FromSeconds(1);
  var observable = Observable.Interval(period).Publish();
  observable.Subscribe(i => Console.WriteLine("first subscription : {0}", i));
  Thread.Sleep(period);
  observable.Subscribe(i => Console.WriteLine("second subscription : {0}", i));
  observable.Connect();
  Console.ReadKey();
  /* Ouput:
   first subscription : 0
   second subscription : 0
   first subscription : 1
   second subscription : 1
   first subscription : 2
   second subscription : 2   
   */
}
You can probably imagine how this could be quite useful where an application had the need to share streams of data. In a trading application if you wanted to consume a price stream for a certain asset in more than one place, you would want to reuse that stream and not have to make another subscription to the server providing that data. Publish() and Connect() are perfect solutions for this.
Disposal of connections and subscriptions
What does become interesting is how disposal is performed. What was not covered above is that the Connect() method returns an IDisposable. By disposing of the “connection” you can turn the stream on and off (Connect() to turn it on and then disposing of the connection to turn it off). In this example we see that the the stream can be connected and disconnected multiple times.
public void ConnectAndDisposeSample()
{
  var period = TimeSpan.FromSeconds(1);
  var observable = Observable.Interval(period).Publish();
  observable.Subscribe(i => Console.WriteLine("subscription : {0}", i));
  var exit = false;
  while (!exit)
  {
    Console.WriteLine("Press enter to connect, esc to exit.");
    var key = Console.ReadKey(true);
    if(key.Key== ConsoleKey.Enter)
    {
      var connection = observable.Connect();  //--Connects here--
      Console.WriteLine("Press any key to dispose of connection.");
      Console.ReadKey();
      connection.Dispose();                   //--Disconnects here--
    }
    if(key.Key==ConsoleKey.Escape)
    {
      exit = true;
    }
  }
  /* Ouput:
   Press enter to connect, esc to exit.
   Press any key to dispose of connection.
   subscription : 0
   subscription : 1
   subscription : 2
   Press enter to connect, esc to exit.
   Press any key to dispose of connection.
   subscription : 0
   subscription : 1
   subscription : 2
   Press enter to connect, esc to exit.   
   */
}
Let us finally consider automatic disposal of a connection. It would be common place for a single stream to be shared between subscriptions, as per the price stream example mentioned above. It would however also be common place for the developer to want to only have the stream running hot if there are subscriptions to it. Therefore it seems not only obvious that there should be a mechanism for automatically connecting (once a subscription has been made), but also a mechanism for disconnecting (once there are no more subscriptions) from a stream. First let us look at what happens to a stream when we connect with no subscribers, and then later unsubscribe:
public void OrphanedStreamExample()
{
  var period = TimeSpan.FromSeconds(1);
  var observable = Observable.Interval(period)
    .Do(l => Console.WriteLine("Publishing {0}", l)) //produce Side effect to show it is running.
    .Publish();
  observable.Connect();
  Console.WriteLine("Press any key to subscribe");
  Console.ReadKey();
  var subscription = observable.Subscribe(i => Console.WriteLine("subscription : {0}", i));

  Console.WriteLine("Press any key to unsubscribe.");
  Console.ReadKey();
  subscription.Dispose();

  Console.WriteLine("Press any key to exit.");
  Console.ReadKey();
  /* Ouput:
   Press any key to subscribe
   Publishing 0
   Publishing 1
   Press any key to unsubscribe.
   Publishing 2
   subscription : 2
   Publishing 3
   subscription : 3
   Press any key to exit.
   Publishing 4
   Publishing 5
   */
}
A few things to note here:
  1. I use the Do extension method to create side effects on the stream (ie writing to the console). This allows us to see when the stream is actually connected.
  2. We connect first and then subscribe, which means we can be publishing without any subscriptions.
  3. We dispose of our subscription but don’t dispose of the connection which means the stream will still be running. This means we will be publishing even though there are no subscriptions.
RefCount
Taking the last example, if we just comment out the line that makes the Connection, and then add a further extension method to our creation of our observable RefCount we have magically implemented all of our requirements. RefCount will take an IConnectableObservable<T> and turn it back into an IObservable<T> and automatically implement the connect and disconnect behaviour we are looking for.
public void RefCountExample()
{
  var period = TimeSpan.FromSeconds(1);
  var observable = Observable.Interval(period)
    .Do(l => Console.WriteLine("Publishing {0}", l)) //produce Side effect to show it is running.
    .Publish()
    .RefCount();
  //observable.Connect(); Use RefCount instead now
  Console.WriteLine("Press any key to subscribe");
  Console.ReadKey();
  var subscription = observable.Subscribe(i => Console.WriteLine("subscription : {0}", i));

  Console.WriteLine("Press any key to unsubscribe.");
  Console.ReadKey();
  subscription.Dispose();

  Console.WriteLine("Press any key to exit.");
  Console.ReadKey();
  /* Ouput:
   Press any key to subscribe
   Press any key to unsubscribe.
   Publishing 0
   subscription : 0
   Publishing 1
   subscription : 1
   Publishing 2
   subscription : 2
   Press any key to exit.
   */
}

Other Connectable Observables

While this is a post about Hot and Cold Observables, I think it is worth mentioning the other ways IConnectableObservable<T> can pop up.
Prune
The prune method is effectively a non blocking .Last() call. You can consider it similar to an AsyncSubject<T> wrapping your target Observable so that you get equivalent semantics of only returning the last value of an observable and only once it completes.
public void PruneExample()
{
  var period = TimeSpan.FromSeconds(1);
  var observable = Observable.Interval(period)
    .Take(5)
    .Do(l => Console.WriteLine("Publishing {0}", l)) //produce Side effect to show it is running.
    .Prune();
  observable.Connect();
  Console.WriteLine("Press any key to subscribe");
  Console.ReadKey();
  var subscription = observable.Subscribe(i => Console.WriteLine("subscription : {0}", i));

  Console.WriteLine("Press any key to unsubscribe.");
  Console.ReadKey();
  subscription.Dispose();

  Console.WriteLine("Press any key to exit.");
  Console.ReadKey();
  /* Ouput:
   Press any key to subscribe
   Publishing 0
   Publishing 1
   Press any key to unsubscribe.
   Publishing 2
   Publishing 3
   Publishing 4
   subscription : 4
   Press any key to exit.
   */
}
Replay
The Replay extension method allows you take an existing Observable and give it “replay” semantics as per the ReplaySubject<T>. As a reminder, the ReplaySubject<T> will cache all values so that any late subscribers will also get all of the values. In this example 2 subscriptions are made on time, and then a third subscription can be made after they complete. Even though the third subscription can be done after the OnComplete we can still get all of the values.
public void ReplayOnHotExample()
{
  var period = TimeSpan.FromSeconds(1);
  var hot = Observable.Interval(period)
    .Take(3)
    .Publish();
  hot.Connect();
  Thread.Sleep(period); //Run hot and ensure a value is lost.
  var observable = hot.Replay();
  observable.Connect();
  observable.Subscribe(i => Console.WriteLine("first subscription : {0}", i));
  Thread.Sleep(period);
  observable.Subscribe(i => Console.WriteLine("second subscription : {0}", i));

  Console.ReadKey();
  observable.Subscribe(i => Console.WriteLine("third subscription : {0}", i));
  Console.ReadKey();

  /* Ouput:
   first subscription : 1
   second subscription : 1
   first subscription : 2
   second subscription : 2   
   third subscription : 1
   third subscription : 2
   */
}

I hope that gives some insight to Hot and Cold observables. I think this is one of the more mysterious parts of Rx for many newbies so I hope that it helps clear up some of the cool stuff you can do with Rx. I sure many of you will look forward to finding ways to implement this in your next application.
For more information on IConnectableObservable<T> and Hot/Cold streams check out these resources:
channel 9 video on Hot and Cold observables
Hot and Cold by Bnaya Eshet
The full source code is now available either via svn at http://code.google.com/p/rx-samples/source/checkout or as a zip file.
Back to the contents page for Reactive Extensions for .NET Introduction
Back to the previous post; Part 6 - Scheduling and threading
Forward to next post; Part 8 – Testing Rx
Technorati Tags: ,,

Sunday, June 27, 2010

Rx Part 6 – Scheduling and Threading

STOP THE PRESS! This series has now been superseded by the online book www.IntroToRx.com. The new site/book offers far better explanations, samples and depth of content. I hope you enjoy!

So far in the series of posts we have managed to avoid any explicit usage of threading or concurrency. There are some methods that we have covered that implicitly will be introducing some level of concurrency to perform their jobs (e.g. : Buffer, Delay, Sample etc all require a separate thread to do their magic). However most of this has been kindly abstracted away from us. This post will look at the beauty of the Rx API and its ability to effectively remove the need for WaitHandles, and any explicit calls to using Threads, the ThreadPool and the new shiny Task type.
A friend of mine once wisely stated that you should always understand at least one layer below what you are coding. At the time he was referring to networking protocols, but I think it is sage advice for all programming. On the current project I am working on there are some very savvy developers that are very comfortable working in a multithreaded environment. The project has client and server side threading problems that we have had to tackle. I believe the whole team would agree that it has bee amazing that amount of concurrency that Rx will handle for you in a declarative way. The code base is virtually free of WaitHandles, Monitor or lock usage, or any explicit creation of threads. This has evolved into this state over time as we have come to grips with the power of Rx and the end result is far cleaner code. However, having the experience on the team allowed us to find out ways we should and shouldn’t be using Rx which would have been just too hard for me to do alone.
Getting back to my friend’s comment about understanding the underlying subsystem, this is especially important when dealing with Rx and scheduling. Just because Rx abstracts some of this away, it does not mean that you cant still create problems for yourself if you are not careful. Before I scare you too much let’s look at some of the Scheduling features of Rx.

Scheduling

In the Rx world, you can control the scheduling of two things
  1. The invocation of the subscription
  2. The publishing of notifications
As you could probably guess these are exposed via two extension methods to IObservable<T> called SubscribeOn and ObserveOn. Both methods have an overload that take an IScheduler and will return an IObservable<T> so you can chain methods together.
public static class Observable
{
  public static IObservable<TSource> ObserveOn<TSource>(
      this IObservable<TSource> source, IScheduler scheduler) 
  {...}
public static IObservable<TSource> SubscribeOn<TSource>(
      this IObservable<TSource> source, IScheduler scheduler) 
  {...}
}

public interface IScheduler
{
    IDisposable Schedule(Action action);
    IDisposable Schedule(Action action, TimeSpan dueTime);
    DateTimeOffset Now { get; }
}
The IScheduler interface is of less interest to me than the types that implement the interface. Depending on your platform* (Silverlight3, Silverlight4, .Net 3.5, .Net 4.0) you will be exposed appropriate implementations via a static class Scheduler. These are the static properties that you can find on the Scheduler type that expose different schedulers.
Scheduler.Dispatcher will ensure that the actions are performed on the Dispatcher, which is obviously useful for Silverlight and WPF applications. You can imagine that the implementation for this would just delegate any calls to ISchedule(Action) straight to Dispatcher.BeginInvoke(Action)
Scheduler.NewThread will schedule all actions onto a new thread.
Scheduler.ThreadPool will schedule all actions onto the Thread Pool.
Scheduler.TaskPool (which is only available to Silverlight 4 and .NET 4.0) will schedule actions onto the TaskPool.
Scheduler.Immediate will ensure the action is not scheduled but is executed immediately.
Scheduler.CurrentThread just ensures that the actions are performed on the thread that made the original call. This is different to Immediate, as CurrentThread will queue the action to be performed. Note the difference in the output of the following code. One method passes Scheduler.Immediate, the other passes Scheduler.CurrentThread.
private static void ScheduleTasks(IScheduler scheduler)
{
    Action leafAction = () => Console.WriteLine("leafAction.");
    Action innerAction = () =>
                             {
                                 Console.WriteLine("innerAction start.");
                                 scheduler.Schedule(leafAction);
                                 Console.WriteLine("innerAction end.");
                             };
    Action outerAction = () =>
                             {
                                 Console.WriteLine("outer start.");
                                 scheduler.Schedule(innerAction);
                                 Console.WriteLine("outer end.");
                             };
    scheduler.Schedule(outerAction);
}

public void CurrentThreadExample()
{
    ScheduleTasks(Scheduler.CurrentThread);
    Console.ReadLine();
    /*Output:
     * outer start.
     * outer end.
     * innerAction start.
     * innerAction end.
     * leafAction.
     */
}

public void ImmediateExample()
{
    ScheduleTasks(Scheduler.Immediate);
    Console.ReadLine();
    /*Output:
     * outer start.
     * innerAction start.
     * leafAction.
     * innerAction end.
     * outer end.
     */
}
*Sorry Rx for JavaScript, I have not even opened the box on you and don’t know anything about scheduling in JavaScript.
Examples
So they are each of our Schedulers, lets see some of them in use. The think I want to point out here is that the first few times I used these overloads I had them confused as to what they actually did. You should use the SubscribeOn method to describe how you want any warm up and background processing code to be scheduled. ObserveOn method is used to describe where you want your notification scheduled to. So for example, if you had a WPF application that used Rx to populate and ObservableCollection<T> then you would almost certainly want to use SubscribeOn with one of the Threaded schedulers (NewThread, ThreadPool or maybe TaskPool) and then you would have to use the Dispatcher scheduler to update your collection.
public void LoadCustomers()
{
    _customerService.GetCustomers()
        .SubscribeOn(Scheduler.NewThread)
        .ObserveOn(Scheduler.Dispatcher)
        .Subscribe(Customers.Add);
}
So all of the schedulers just offer a nice abstraction to us to utilise the various ways we can write concurrent code. Besides saving me from having to write the tedious code to get code onto a new thread or thread pool it also makes Rx threading easy. Oh Rx, you thought I had forgotten. I didn’t think that any of the schedulers except Current & Immediate warranted a further explanation but, I do think it is worth pointing out some of the “fun” threading problems you can face even though the scheduling has been abstracted away from you.

Deadlocks

When writing the current application my team is working on we found out the hard way that Rx code can most certainly deadlock. When you consider that some calls (like .First() ) are blocking, and that we can schedule work to be done in the future, it becomes obvious that race condition can apply. This example is the most simple deadlock I could think of. It is fairly silly but it will get the ball rolling.
var stream = new Subject<int>();
Console.WriteLine("Next line should deadlock the system.");
var value = stream.First();
stream.OnNext(1);
Console.WriteLine("I can never execute....");
Hopefully we wont ever write code that silly, and if we did our tests would give us fairly quick feed back that things were wrong. What lets deadlocks slip into the system is when they manifest themselves at integration points. This example may be a little harder to find but is only small step away from the silly 1st example. Here we block in the constructor on a UI element which will always be created on the dispatcher. The blocking call is waiting for an event, that can only be raised from the dispatcher – deadlock.
public Window1()
{
    InitializeComponent();
    DataContext = this;
    Value = "Default value";

    //Deadlock! We need the dispatcher to continue
    // to allow me to click the button to produce a value.
    Value = _subject.First(); 

    //This will give same result but will not be blocking(deadlocking).
    _subject.Take(1).Subscribe(value => Value = value);
}

private void MyButton_Click(object sender, RoutedEventArgs e)
{
    _subject.OnNext("New Value");
}

public string Value
{
    get { return _value; }
    set
    {
        _value = value;
        var handler = PropertyChanged;
        if (handler != null) handler(this, new PropertyChangedEventArgs("Value"));
    }
}
In this example we start seeing things that can become more sinister. This example has a Button that the click command will try to get the first value from an Observable exposed via an interface.
public partial class Window1 : INotifyPropertyChanged
{
    private readonly IMyService _service = new MyService(); //Imagine DI here.
    private int _value2;

    public Window1()
    {
        InitializeComponent();
        DataContext = this;
    }

    public int Value2
    {
        get { return _value2; }
        set
        {
            _value2 = value;
            var handler = PropertyChanged;
            if (handler != null) handler(this, new PropertyChangedEventArgs("Value2"));
        }
    }

    #region INotifyPropertyChanged Members

    public event PropertyChangedEventHandler PropertyChanged;

    #endregion

    private void MyButton2_Click(object sender, RoutedEventArgs e)
    {
        Value2 = _service.GetTemperature().First();
    }
}
There is only a small problem here in that we block on the Dispatcher thread (.First() is a blocking call), however this manifest's itself into a deadlock if the service code is written incorrectly.
class MyService : IMyService
{
    public IObservable<int> GetTemperature()
    {
        return Observable.Create<int>(
            o =>
                {
                    o.OnNext(27);
                    o.OnNext(26);
                    o.OnNext(24);
                    return () => { };
                })
            .SubscribeOnDispatcher();
    }
}
This odd implementation with explicit scheduling will cause the 3 OnNext calls to be scheduled once the .First() call has finished, which is waiting for an OnNext to be called – Deadlock.
So far this post has been a bit doom and gloom about scheduling and the problems you could face, that is not the intent. I just wanted to make it obvious that Rx was not going to solve the age old concurrency problems, but it will make it easier to get it right if you follow this simple rule.
  1. Only the final subscriber should be setting the scheduling.
  2. Avoid using .First() –Ed: that is for you Olivier. We will cal this rule 1b
Where the last example came unstuck is that the service was dictating the scheduling paradigm when really it had no business doing so. Before we had a clear idea of where we should be doing the scheduling in my current project, we had allsorts of layers adding “helpful” scheduling code. What it ended up creating was a threading nightmare. When we removed all scheduling code and then located it in a single layer (at least in the Silverlight client) most of our concurrency problems went away. I recommend you do the same. At least in WPF/Silverlight applications, the pattern should be simple: “Subscribe on a Background thread; Observe on the Dispatcher”.
So my challenge to the readers is to add to the comments:
  1. Any other scheduling rules (2 seems quite small, and I was only going to have 1)
  2. Post some nasty Rx race condition code
  3. What rules do you have for Subscribing on the background thread? Which Scheduler should I use and when i.e. NewThread, ThreadPool & TaskPool. – and I come full circle about understanding one layer below that to which you are working.
Further reading/watching:
  1. This channel9 video has more interesting stuff including testing with schedulers http://channel9.msdn.com/shows/Going+Deep/Wes-Dyer-and-Jeffrey-Van-Gogh-Inside-Rx-Virtual-Time/
The full source code is now available either via svn at http://code.google.com/p/rx-samples/source/checkout or as a zip file.
Back to the contents page for Reactive Extensions for .NET Introduction
Back to the previous post; Part 5 - Combining multiple IObservable streams
Forward to next post; Part 7 - Hot and Cold observables
Technorati Tags: ,,

Saturday, June 19, 2010

RX Part 5 – Combining multiple IObservable<T> streams

STOP THE PRESS! This series has now been superseded by the online book www.IntroToRx.com. The new site/book offers far better explanations, samples and depth of content. I hope you enjoy!

In the last post we covered some of the flow control features of Rx and how to conceptualise them with Marble diagrams. This post will continue to build on those concepts by looking at different ways of working with multiple streams.
The Concat extension method is probably the most simple extension method. If you have covered the previous flow control post then most of the error handling constructs are more complex than this method. The method will simple publish values from the second stream once the first stream completes.
//Generate values 0,1,2
var stream1 = Observable.Generate(0, i => i < 3, i => i, i => i + 1);
//Generate values 100,101,102,103,104
var stream2 = Observable.Generate(100, i => i < 105, i => i, i => i + 1);

stream1
    .Concat(stream2)
    .Subscribe(Console.WriteLine);
Console.ReadLine();
/* Returns:
 * 
 * stream1 --0--0--0--|
 * stream2 -----------0--0--0--0--|
 * 
 * result  --0--0--0--0--0--0--0--|
 */
If either stream was to OnError then the result stream would OnError too. This means that if stream1 produced an OnError then stream2 would never be used. If you wanted stream2 to be used regardless of if stream1 produced an OnError or not then the extension method OnErrorResumeNext would be your best option.
Quick Video on Concat, Catch and OnErrorResume next on Channel9.
The Amb method was a new concept to me. I believe this comes from functional programming and is an abbreviation of Ambiguous. Effectively this extension method will produce values from the stream that first produces values and will completely ignore the other stream. In the examples below I have 2 streams that both produce values. In the first example stream1 will win the race and the result stream will be stream1’s values. In the second example, I delay the stream1 from producing values so stream2 will win the race and the result stream will be the values from stream2.
//Generate values 0,1,2
var stream1 = Observable.Range(0,3);
//Generate values 100,101,102,103,104
var stream2 = Observable.Range(100,5);

stream1
    .Amb(stream2)
    .Subscribe(Console.WriteLine);
Console.ReadLine();
/* Returns:
 *  if stream 1 produces a value first...
 * stream1 --0--0--0--|
 * stream2 ---0--0--0--0--0--|
 * 
 * result  --0--0--0--|     //All from stream1
 */

stream1.Delay(TimeSpan.FromMilliseconds(100))
    .Amb(stream2)
    .Subscribe(Console.WriteLine);
Console.ReadLine();
/* Returns:
 * stream1 ---0--0--0--|
 * stream2 --0--0--0--0--0--|
 * 
 * result  --0--0--0--0--0--|     //All from stream2
 */
The Merge extension method does a primitive combination of multiple streams where they implement the same type of T. The result will also be an IObservable<T> but will have the values produced to the result stream as the occur in the source streams. The stream will complete when all of the source streams complete or when an OnError is published by any stream.
//Generate values 0,1,2
var stream1 = Observable.Interval(TimeSpan.FromMilliseconds(250)).Take(3);
//Generate values 100,101,102,103,104
var stream2 = Observable.Interval(TimeSpan.FromMilliseconds(150)).Take(5).Select(i => i + 100);
stream1
    .Merge(stream2)
    .Subscribe(Console.WriteLine);
Console.ReadLine();
/*
 * Returns:
 * stream1 ----0----0----0|
 * stream2 --0--0--0--0--0|
 * 
 * result  --0-00--00-0--00-|  
 * Output:
 * 100
 * 0
 * 101
 * 102
 * 1
 * 103
 * 104      //Note this is a race condition. 2 could be
 * 2        //  published before 104.
 */
Merge also provides other overloads that allow you to pass more than 2 source observables via an IEnumerable or params arrays. The Overload that take a params array it great for when we know how many streams we want to merge at compile time, and the IEnumerable overload is better for when we dont know at compile time how many streams we need to merge.
//Create a third stream
var stream3 = Observable.Interval(TimeSpan.FromMilliseconds(100)).Take(10).Select(i => i + 200);

//Number of streams known at compile time.
Observable.Merge(stream1, stream2, stream3)
    .Subscribe(Console.WriteLine);
Console.ReadLine();

//We can dynamically create a list at run time with this overload.
var streams = new List<IObservable<long>>();
streams.Add(stream1);
streams.Add(stream2);
streams.Add(stream3);
Observable.Merge(streams).Subscribe(Console.WriteLine);
Console.ReadLine();
A quick video on Merge on Channe9.
SelectMany, like it’s counter part in IEnumerable<T> extension method will create the Caretisan product of the two streams. So for every item in one stream, it will give you every item in the other stream. A primitive way to think of it is a nexted for loop that creates a 2D array. If you want more info on SelectMany I will leave it to you to do a google search as this fairly well documented in the IEnumerable world.
//Generate values 0,1,2
var stream1 = Enumerable.Range(0, 3).ToObservable();
//Generate values 100,101,102,103,104
var stream2 = Enumerable.Range(100, 5).ToObservable();
stream1
    .SelectMany(i => stream2, (lhs, rhs) => new { Left = lhs, Right = rhs })
    .Subscribe(Console.WriteLine);
Console.ReadLine();
/*
 * Output.
 * { Left = 0, Right = 100 }
 * { Left = 0, Right = 101 }
 * { Left = 1, Right = 100 }
 * { Left = 0, Right = 102 }
 * { Left = 1, Right = 101 }
 * { Left = 2, Right = 100 }
 * { Left = 0, Right = 103 }
 * { Left = 1, Right = 102 }
 * { Left = 2, Right = 101 }
 * { Left = 0, Right = 104 }
 * { Left = 1, Right = 103 }
 * { Left = 2, Right = 102 }
 * { Left = 1, Right = 104 }
 * { Left = 2, Right = 103 }
 * { Left = 2, Right = 104 }
 */
A quick Video on SelectMany on channel9
Zip is another interesting merge feature. Just like a Zipper on clothing or a bag, the Zip method will bring together two sets of values as pairs; two-by-two. Things to note about the Zip function is that the result stream will complete when the first of the streams complete, it will error if either of the streams error and it will only publish once it was a pair. So if one of the source streams publish values faster than the other stream, the rate of publishing will be dictated by the slower of the two streams.
//Generate values 0,1,2
var stream1 = Observable.Interval(TimeSpan.FromMilliseconds(250)).Take(3);
//Generate values a,b,c,d,e,f
var stream2 = Observable.Interval(TimeSpan.FromMilliseconds(150)).Take(6).Select(i => Char.ConvertFromUtf32((int)i + 97));
stream1
    .Zip(stream2, (lhs, rhs) => new { Left = lhs, Right = rhs })
    .Subscribe(Console.WriteLine);
Console.ReadLine();
/* Returns:
 * stream1 ----0----1----2|        stream 1 values represented as ints
 * stream2 --a--b--c--d--e--f|   s2 values represented as chars
 * 
 * result  ----0----1----2|
 *             a    b    c
 */
Here are two short videos on Zip (first, second)on Channel9. Note the second video is actually incorrect, can you spot why?
CombineLatest is worth comparing to the zip method. Both methods will use a function that takes a value from each stream to produce the result value. The difference is that CombineLatest will cache the last value of each stream, and when either stream produces a new value then that new value and that last value from the other stream will be sent to the result function. This example uses the same inputs as the previous Zip example but note that many more values are produced. The leaves CombineLatest somewhere between Zip and SelectMany :-)
//Generate values 0,1,2
var stream1 = Observable.Interval(TimeSpan.FromMilliseconds(250)).Take(3);
//Generate values a,b,c,d,e,f
var stream2 = Observable.Interval(TimeSpan.FromMilliseconds(150)).Take(6).Select(i => Char.ConvertFromUtf32((int)i + 97));
stream1
    .CombineLatest(stream2, (s1, s2) => new { Left = s1, Right = s2 })
    .Subscribe(Console.WriteLine, () => Console.WriteLine("Complete"));
Console.ReadLine();
/*
 * Returns:
 * stream1 ----0----1----2|        stream 1 values represented as ints
 * stream2 --a--b--c--d--e--f|     stream 2 values represented as chars
 * 
 * result  ----00--01-1--22-2|     the result as pairs.
 *             ab  cc d  de f    
 */
Quick video on CombineLatest on Channel9
ForkJoin like the last few extension methods also requires a function to produce the result but this will only return the last values from each stream. Things to note with ForkJoin is that like the previous methods, if either stream error so will the result stream, but if either stream is empty (ie completes with no values) then the result stream will also be empty. This example uses the same values as the previous samples and will only produce a pair from the last values from each stream once they both complete.
//Generate values 0,1,2
var stream1 = Observable.Interval(TimeSpan.FromMilliseconds(250)).Take(3);
//Generate values a,b,c,d,e
var stream2 = Observable.Interval(TimeSpan.FromMilliseconds(150)).Take(5).Select(i => Char.ConvertFromUtf32((int)i + 97));
stream1
    .ForkJoin(stream2, (s1, s2) => new { Left = s1, Right = s2 })
    .Subscribe(Console.WriteLine);
Console.ReadLine();
/*
 * Returns:
 * stream1 ----0----1----2|        stream 1 values represented as ints
 * stream2 --a--b--c--d--e|   s2 values represented as chars
 * 
 * result  --------------2|   the result as pairs. 
 *                       e     
 */
One thing to note with most of the extension methods discussed is that they generally have a matching static method that takes a params array or IEnumerable as discussed in the Merge chapter.
So having looked at these ways to bring multiple observables together we have implicitly brought some concurrency and threading to our code. This allows us to nicely lead into the next post in the series which will be on Scheduling and Threading with Rx.
The full source code is now available either via svn at http://code.google.com/p/rx-samples/source/checkout or as a zip file.
Back to the contents page for Reactive Extensions for .NET Introduction
Back to the previous post; Part 4 - Flow control
Forward to next post; Part 6 - Scheduling and threading
Technorati Tags: ,,

Saturday, May 29, 2010

RX part 4 – Flow control

STOP THE PRESS! This series has now been superseded by the online book www.IntroToRx.com. The new site/book offers far better explanations, samples and depth of content. I hope you enjoy!

In the previous post we looked at the lifecycle of a subscription to a stream. A subscription can be terminated by unsubscribing/disposing and the stream (IObservable<T>) can terminate due to it completing naturally via an OnCompleted or erroneously with an OnError. OnError creates an interesting problem; it publishes the Exception, it does not throw it. This means that you can not use the standard Structured Exception Handling process of try/catch with Observables. Well that is not entirely true. In this example the stream raises an OnError that we can catch with a try/catch block thanks to the extension method overload of subscribe I use. It will take an OnError and then throw the Exception.
try
{
    var subject = new Subject<int>();
    subject.Subscribe(Console.WriteLine);
    //Or explicitly rethrow as per this line.
    //subject.Subscribe(Console.WriteLine, ex => { throw ex.PrepareForRethrow(); });
    subject.OnNext(1);
    subject.OnError(new IOException("test exception"));
}
catch (IOException ioEx)
{
    Console.WriteLine("Caught IO exception: {0}", ioEx.Message);
}
Throwing published OnError exceptions does not allow for a very composable style of coding. It may be useful in cases like the example above, but the power of Rx as you will see in future posts is the ability to create compositions of streams together. Once we are running compositions of streams then SEH is not very helpful or useful. Rx provides several methods to provide a composition friendly way to handle errors and exceptions.

Visualising streams

Before I cover the flow control methods that Rx offers I want to divert quickly and talk about a visual tool we will use to help communicate the concepts relating to streams. Marble diagrams are great for communicating Rx streams and you may find them useful for describing any stream, except for the completely basic. When using marble diagrams to communicate Rx streams there are only a few things you need to know
  1. a stream is represented by a horizontal line
  2. time moves to the right (ie things on the left happened before things on the right)
  3. we only need 3 symbols to represent an Event
    1. “0” for OnNext
    2. “X” for an OnError
    3. “|” for OnCompleted
This is a sample of a stream that publishes 3 values and then completes
--0--0--0—|
This is a sample of a stream that publishes 4 values then errors.
--0--0--0--0--X
While these examples may seem too simple to warrant a visual representation, the simplicity of marble diagrams are great once we using multiple streams.

Flow control constructs

Sometimes when dealing with an observable, it is conceivable that errors may occur that are acceptable and we should try again. Imagine that we want this effect where the error in stream 1 (S1) is acceptable, we try again on stream 2 (S2). The last line is composition of the two streams that is the result we want to expose (R)
S1--0--0--X
S2-----------0--0--0--0
R --0--0-----0--0--0--0
In the example above we could recreate this with several methods.
Retry is the most simple method available to us. Retry will try to re-subscribe to the IObservable<T> on any failure. In this example we just use the simple overload that will always retry on any exception.
public static void RetrySample<T>(IObservable<T> stream)
{
    stream.Retry().Subscribe(t=>Console.WriteLine(t)); //Will always retry the stream
    Console.ReadKey();
}
/*
Given stream that will produce 0,1,2 then error; the output would be
0
1
2
0
1
2
0
1
2
.....
*/
which would look like this as a marble diagram
S--0--0--0--x--0--0--0--x--0--0--0--
R--0--0--0-----0--0--0-----0--0--0--
Alternatively we can specify the max number of times to retry. In this example we only retry once, therefore the error that gets published on the second subscription will be passed up to the final subscription. Note that to retry once you pass a value of 2. Maybe the method should be called Try?
public static void RetryOnceSample<T>(IObservable<T> stream)
{
    stream.Retry(2)
        .Subscribe(t=>Console.WriteLine(t), 
   ex=>Console.WriteLine("Gave up on 2nd Error")); 
    Console.ReadKey();
}
/*Ouput:
0
1
2
0
1
2
Gave up on 2nd Error
*/
As a marble diagram this would look like
S--0--0--0--x--0--0--0--x
R--0--0--0-----0--0--0--x
OnErrorResumeNext may cause some old VB developers to shudder but it offers a different route to use than Retry. While retry will always try to re-subscribe to the same stream; OnErrorResumeNext takes another IObservable<T> as a parameter to use when the original stream publishes and error. In this example when the stream1 publishes and error we re-subscribe to stream2.
public static void FailoverSample<T>(IObservable<T> stream, IObservable<T> failover)
{
    stream
        .OnErrorResumeNext(failover)
        .Subscribe(t=>Console.WriteLine(t), ex => Console.WriteLine(ex));
}
/*
stream  --"1"--"2"--"3"--X
failover--------------------"a"--"b"--"c"--X
result  --"1"--"2"--"3"-----"a"--"b"--"c"--|
*/
An important thing to note here is that when the second stream publishes an error the result stream just completes and ignores the error.
Catch is probably the most useful method to use as the previous 2 methods will react the same regardless of the type of exception published. Catch however allows you to specify which exceptions it can catch just like a normal catch block. This example is similar to the last example, but we will explicitly state the exception we should catch that would allow us to failover to the next stream.
public static void CatchSample<T>(IObservable<T> stream, IObservable<T> failover)
{
    stream
        .Catch((InvalidOperationException ex) => failover)
        .Subscribe(t => Console.WriteLine(t), 
                   ex => Console.WriteLine(ex), 
                   () => Console.WriteLine("Completed"));
}
Catch has other overloads that allow you to pass a params array of Observable<T> or an Enumerable<Observable<T>> instead of just specifying an intial and a failover stream. This means you can effectively have a large list of streams to try when a previous one fails. Of course if any of them actually complete then the next one will not be used. If the last one publishes an OnError then that error will then be published to the IObservalbe that the Catch method returned.
Another pair of interesting methods is the Materialize and Dematerialize methods. Materialize will flatten an observable’s three different publication types (OnNext, OnError & OnCompleted) into wrapped publications of a Notifcation<T> type. Notification<T> is an abstract class that exposes 4 properties and an Overloaded Accept method
public abstract class Notification<T> : IEquatable<Notification<T>>
{
    // Properties
    public abstract Exception Exception { get; }
    public abstract bool HasValue { get; }
    public abstract NotificationKind Kind { get; }
    public abstract T Value { get; }

    // Methods
    public abstract TResult Accept<TResult>(IObserver<T, TResult> observer);
    public abstract void Accept(IObserver<T> observer);
    public abstract void Accept(Action<T> onNext, Action<Exception> onError, Action onCompleted);
    public abstract TResult Accept<TResult>(Func<T, TResult> onNext, Func<Exception, TResult> onError, Func<TResult> onCompleted);

    // ...
}
By inspecting the Kind property you can identify which type of publication you have received. If the Kind is OnError you can access its published Exception via the Exception property. Accessing the Value property will get you either
  • the OnNext value,
  • throw the OnError exception
  • or throw an InvalidOperationException if the Kind is OnCompleted
The HasValue property just provides the convenience to check if the Kind is OnNext so you can safely get the value without having an exception thrown.
As you can imagine the Materialise method can be useful for Logging the content of a stream. In this example we create an extension method that logs all events to the Console.
public static class ExampleExtensions
{
    /// <summary>
    /// Logs implicit notifications to console.
    /// </summary>
    /// <example>
    /// <code>myStream.Log().Subscribe(....);</code>
    /// </example>
    public static IObservable<T> Log<T>(this IObservable<T> stream)
    {
        return stream.Materialize()
        .Do(n => Console.WriteLine(n))
        .Dematerialize();
    }
}
Note that here we use Dematerialize to take our stream of Notification<T> and transform it back to our original stream. You could also use Materialize to create your own more powerful Catch methods, but we will look at applications of Rx later.

Wire tapping a stream

The Do extension method was used in the last example and it would not be fair to continue with out explaining what it does. Do method is used to provide side effects upon a stream. In the example above the side-effect is that we wrote to the console. This is different to a Subscribe because the Do method returns you an IObservable<T> which can be thought of as the same IObservable<T> that was passed to it, however the Subscribe method returns you an IDisposable. I like to think of the Do method as a wire tap to a stream. ;-)
It would be unfair to mention Do and leave out Run. The Run method is very similar to the Do method except for two things:
  1. Run returns void
  2. Run is a blocking call
  3. Run can be called without any parameters. This effectively is a call to block until OnCompleted or OnError is published.
  4. Run has no overload that takes an Action for the OnCompleted publication. It doesn't make sense to do so as the method will just stop blocking when OnCompleted is published so you can just invoke the action you would other wise pass to OnCompleted immediately after the Run method returns.
The things in common with the Run and Do method overloads is they both provide an Overload that:
  • takes an Action<T> to be performed on each OnNext publication.
  • takes an Action<T> for OnNext and an Action<Exception> for OnError publications
  • takes an IObserver<T> that will be used to handle the publications explicitly
Now that we have had a brief look at Flow control and aggregating streams together, in the next post we will uncover the other aggregation and composition methods that Rx exposes.
The full source code is now available either via svn at http://code.google.com/p/rx-samples/source/checkout or as a zip file.
Back to the contents page for Reactive Extensions for .NET Introduction
Back to the previous post; Part 3 - Lifetime management – Completing and Unsubscribing
Forward to next post; Part 5 - Combining multiple IObservable streams
Technorati Tags: ,,

Thursday, May 27, 2010

Rx Part 3 – Lifetime management – Completing and Unsubscribing

STOP THE PRESS! This series has now been superseded by the online book www.IntroToRx.com. The new site/book offers far better explanations, samples and depth of content. I hope you enjoy!

So far we have discovered the basics of  the Reactive Framework which allows us to create, subscribe and perform some basic aggregations, buffering and time shifting over implementations of IObservable<T>. We have yet to look at how we could unsubscribe from a subscription. If you were to look for an Unsubscribe method in the Rx public API you would not find any. Instead of supplying an Unsubscribe method, Rx will return an IDisposable when ever a subscription is made. This disposable can be thought of as the subscription itself and therefore disposing it will dispose the subscription and effectively unsubscribe. Note that calling Dispose on the result of a Subscribe call will not affect the underlying IObservable<T>, just the instance of the subscription to the IObservable<T>. This then allows us to call Subscribe many times on a single IObservable<T>, allowing subscriptions to come an go with out affecting each other. In this example we initially have two subscriptions, then we dispose of one subscription early which still allows the other to continue to receive publications from the underlying IObservable<T>.
var interval = Observable.Interval(TimeSpan.FromMilliseconds(100));
var firstSubscription =
    interval.Subscribe(value => Console.WriteLine("1st subscription recieved {0}", value));
var secondSubscription =
    interval.Subscribe(value => Console.WriteLine("2nd subscription recieved {0}", value));

Thread.Sleep(500);
firstSubscription.Dispose();
Console.WriteLine("Disposed of 1st subscription");

Console.ReadKey();
/*Outputs:
1st subscription recieved 0
2nd subscription recieved 0
1st subscription recieved 1
2nd subscription recieved 1
1st subscription recieved 2
2nd subscription recieved 2
1st subscription recieved 3
2nd subscription recieved 3
2nd subscription recieved 4
Disposed of 1st subscription
2nd subscription recieved 5
2nd subscription recieved 6
2nd subscription recieved 7

etc....
*/
In the above example, it looks like the values are being produced by the interval Observable by a single OnNext call, however these are independent and work similarly to how a Observable.Create<T> method would work. In this sample we just pause a bit before making our second subscription. Note that the output is different to the above example.
var interval = Observable.Interval(TimeSpan.FromMilliseconds(100));

var firstSubscription =
    interval.Subscribe(value => Console.WriteLine("1st subscription recieved {0}", value));
Thread.Sleep(500);
var secondSubscription =
    interval.Subscribe(value => Console.WriteLine("2nd subscription recieved {0}", value));

Thread.Sleep(500);
firstSubscription.Dispose();
Console.WriteLine("Disposed of 1st subscription");
/*
Ouput:
1st subscription recieved 0
1st subscription recieved 1
1st subscription recieved 2
1st subscription recieved 3
1st subscription recieved 4
2nd subscription recieved 0 
1st subscription recieved 5
2nd subscription recieved 1
1st subscription recieved 6
1st subscription recieved 7
2nd subscription recieved 2
1st subscription recieved 8
2nd subscription recieved 3
Disposed of 1st subscription
2nd subscription recieved 4
2nd subscription recieved 5
2nd subscription recieved 6
etc...

*/
The benefits of using the IDisposable Type instead of creating a new ISubscription/IUnsubscription interface or amending the IObservable<T> interface to have an Unsubscribe method is that you get all of these things for free:
  • The type already exists
  • people understand the type
  • IDisposable has standard usages and patterns
  • Language support via the Using keyword
  • Static analysis tools like FxCop can help you with its usage.

OnError and OnCompleted()

Both the OnError and OnCompleted signify the completion of a stream. If your stream publishes a OnError or OnCompleted it will be the last publication and no further calls to OnNext can be performed. In this example we try to publish an OnNext call after an OnCompleted and the OnNext is ignored.
var subject = new Subject<int>();
subject.Subscribe(Console.WriteLine, () => Console.WriteLine("Completed"));
subject.OnCompleted();
subject.OnNext(2);
Of course you could implement your own IObservable<T> that did allow publishing after an OnComplete or an OnError, however it would not follow the precedence of the current Subject types and would be a non standard implementation. I think it would be safe to say that the inconsistent behaviour would cause unpredictable behaviour in the applications that consumed your code.
An interesting thing to consider is that when a stream completes or errors, you should still dispose of you subscription. This can make for messy code, but we will discuss best practices in a later post.
The full source code is now available either via svn at http://code.google.com/p/rx-samples/source/checkout or as a zip file.
Back to the contents page for Reactive Extensions for .NET Introduction
Back to the previous post; Part 2 - Static and extension methods
Forward to next post; Part 4 - Flow control
Technorati Tags: ,,