Saturday, May 29, 2010

RX part 4 – Flow control

STOP THE PRESS! This series has now been superseded by the online book www.IntroToRx.com. The new site/book offers far better explanations, samples and depth of content. I hope you enjoy!

In the previous post we looked at the lifecycle of a subscription to a stream. A subscription can be terminated by unsubscribing/disposing and the stream (IObservable<T>) can terminate due to it completing naturally via an OnCompleted or erroneously with an OnError. OnError creates an interesting problem; it publishes the Exception, it does not throw it. This means that you can not use the standard Structured Exception Handling process of try/catch with Observables. Well that is not entirely true. In this example the stream raises an OnError that we can catch with a try/catch block thanks to the extension method overload of subscribe I use. It will take an OnError and then throw the Exception.
try
{
    var subject = new Subject<int>();
    subject.Subscribe(Console.WriteLine);
    //Or explicitly rethrow as per this line.
    //subject.Subscribe(Console.WriteLine, ex => { throw ex.PrepareForRethrow(); });
    subject.OnNext(1);
    subject.OnError(new IOException("test exception"));
}
catch (IOException ioEx)
{
    Console.WriteLine("Caught IO exception: {0}", ioEx.Message);
}
Throwing published OnError exceptions does not allow for a very composable style of coding. It may be useful in cases like the example above, but the power of Rx as you will see in future posts is the ability to create compositions of streams together. Once we are running compositions of streams then SEH is not very helpful or useful. Rx provides several methods to provide a composition friendly way to handle errors and exceptions.

Visualising streams

Before I cover the flow control methods that Rx offers I want to divert quickly and talk about a visual tool we will use to help communicate the concepts relating to streams. Marble diagrams are great for communicating Rx streams and you may find them useful for describing any stream, except for the completely basic. When using marble diagrams to communicate Rx streams there are only a few things you need to know
  1. a stream is represented by a horizontal line
  2. time moves to the right (ie things on the left happened before things on the right)
  3. we only need 3 symbols to represent an Event
    1. “0” for OnNext
    2. “X” for an OnError
    3. “|” for OnCompleted
This is a sample of a stream that publishes 3 values and then completes
--0--0--0—|
This is a sample of a stream that publishes 4 values then errors.
--0--0--0--0--X
While these examples may seem too simple to warrant a visual representation, the simplicity of marble diagrams are great once we using multiple streams.

Flow control constructs

Sometimes when dealing with an observable, it is conceivable that errors may occur that are acceptable and we should try again. Imagine that we want this effect where the error in stream 1 (S1) is acceptable, we try again on stream 2 (S2). The last line is composition of the two streams that is the result we want to expose (R)
S1--0--0--X
S2-----------0--0--0--0
R --0--0-----0--0--0--0
In the example above we could recreate this with several methods.
Retry is the most simple method available to us. Retry will try to re-subscribe to the IObservable<T> on any failure. In this example we just use the simple overload that will always retry on any exception.
public static void RetrySample<T>(IObservable<T> stream)
{
    stream.Retry().Subscribe(t=>Console.WriteLine(t)); //Will always retry the stream
    Console.ReadKey();
}
/*
Given stream that will produce 0,1,2 then error; the output would be
0
1
2
0
1
2
0
1
2
.....
*/
which would look like this as a marble diagram
S--0--0--0--x--0--0--0--x--0--0--0--
R--0--0--0-----0--0--0-----0--0--0--
Alternatively we can specify the max number of times to retry. In this example we only retry once, therefore the error that gets published on the second subscription will be passed up to the final subscription. Note that to retry once you pass a value of 2. Maybe the method should be called Try?
public static void RetryOnceSample<T>(IObservable<T> stream)
{
    stream.Retry(2)
        .Subscribe(t=>Console.WriteLine(t), 
   ex=>Console.WriteLine("Gave up on 2nd Error")); 
    Console.ReadKey();
}
/*Ouput:
0
1
2
0
1
2
Gave up on 2nd Error
*/
As a marble diagram this would look like
S--0--0--0--x--0--0--0--x
R--0--0--0-----0--0--0--x
OnErrorResumeNext may cause some old VB developers to shudder but it offers a different route to use than Retry. While retry will always try to re-subscribe to the same stream; OnErrorResumeNext takes another IObservable<T> as a parameter to use when the original stream publishes and error. In this example when the stream1 publishes and error we re-subscribe to stream2.
public static void FailoverSample<T>(IObservable<T> stream, IObservable<T> failover)
{
    stream
        .OnErrorResumeNext(failover)
        .Subscribe(t=>Console.WriteLine(t), ex => Console.WriteLine(ex));
}
/*
stream  --"1"--"2"--"3"--X
failover--------------------"a"--"b"--"c"--X
result  --"1"--"2"--"3"-----"a"--"b"--"c"--|
*/
An important thing to note here is that when the second stream publishes an error the result stream just completes and ignores the error.
Catch is probably the most useful method to use as the previous 2 methods will react the same regardless of the type of exception published. Catch however allows you to specify which exceptions it can catch just like a normal catch block. This example is similar to the last example, but we will explicitly state the exception we should catch that would allow us to failover to the next stream.
public static void CatchSample<T>(IObservable<T> stream, IObservable<T> failover)
{
    stream
        .Catch((InvalidOperationException ex) => failover)
        .Subscribe(t => Console.WriteLine(t), 
                   ex => Console.WriteLine(ex), 
                   () => Console.WriteLine("Completed"));
}
Catch has other overloads that allow you to pass a params array of Observable<T> or an Enumerable<Observable<T>> instead of just specifying an intial and a failover stream. This means you can effectively have a large list of streams to try when a previous one fails. Of course if any of them actually complete then the next one will not be used. If the last one publishes an OnError then that error will then be published to the IObservalbe that the Catch method returned.
Another pair of interesting methods is the Materialize and Dematerialize methods. Materialize will flatten an observable’s three different publication types (OnNext, OnError & OnCompleted) into wrapped publications of a Notifcation<T> type. Notification<T> is an abstract class that exposes 4 properties and an Overloaded Accept method
public abstract class Notification<T> : IEquatable<Notification<T>>
{
    // Properties
    public abstract Exception Exception { get; }
    public abstract bool HasValue { get; }
    public abstract NotificationKind Kind { get; }
    public abstract T Value { get; }

    // Methods
    public abstract TResult Accept<TResult>(IObserver<T, TResult> observer);
    public abstract void Accept(IObserver<T> observer);
    public abstract void Accept(Action<T> onNext, Action<Exception> onError, Action onCompleted);
    public abstract TResult Accept<TResult>(Func<T, TResult> onNext, Func<Exception, TResult> onError, Func<TResult> onCompleted);

    // ...
}
By inspecting the Kind property you can identify which type of publication you have received. If the Kind is OnError you can access its published Exception via the Exception property. Accessing the Value property will get you either
  • the OnNext value,
  • throw the OnError exception
  • or throw an InvalidOperationException if the Kind is OnCompleted
The HasValue property just provides the convenience to check if the Kind is OnNext so you can safely get the value without having an exception thrown.
As you can imagine the Materialise method can be useful for Logging the content of a stream. In this example we create an extension method that logs all events to the Console.
public static class ExampleExtensions
{
    /// <summary>
    /// Logs implicit notifications to console.
    /// </summary>
    /// <example>
    /// <code>myStream.Log().Subscribe(....);</code>
    /// </example>
    public static IObservable<T> Log<T>(this IObservable<T> stream)
    {
        return stream.Materialize()
        .Do(n => Console.WriteLine(n))
        .Dematerialize();
    }
}
Note that here we use Dematerialize to take our stream of Notification<T> and transform it back to our original stream. You could also use Materialize to create your own more powerful Catch methods, but we will look at applications of Rx later.

Wire tapping a stream

The Do extension method was used in the last example and it would not be fair to continue with out explaining what it does. Do method is used to provide side effects upon a stream. In the example above the side-effect is that we wrote to the console. This is different to a Subscribe because the Do method returns you an IObservable<T> which can be thought of as the same IObservable<T> that was passed to it, however the Subscribe method returns you an IDisposable. I like to think of the Do method as a wire tap to a stream. ;-)
It would be unfair to mention Do and leave out Run. The Run method is very similar to the Do method except for two things:
  1. Run returns void
  2. Run is a blocking call
  3. Run can be called without any parameters. This effectively is a call to block until OnCompleted or OnError is published.
  4. Run has no overload that takes an Action for the OnCompleted publication. It doesn't make sense to do so as the method will just stop blocking when OnCompleted is published so you can just invoke the action you would other wise pass to OnCompleted immediately after the Run method returns.
The things in common with the Run and Do method overloads is they both provide an Overload that:
  • takes an Action<T> to be performed on each OnNext publication.
  • takes an Action<T> for OnNext and an Action<Exception> for OnError publications
  • takes an IObserver<T> that will be used to handle the publications explicitly
Now that we have had a brief look at Flow control and aggregating streams together, in the next post we will uncover the other aggregation and composition methods that Rx exposes.
The full source code is now available either via svn at http://code.google.com/p/rx-samples/source/checkout or as a zip file.
Back to the contents page for Reactive Extensions for .NET Introduction
Back to the previous post; Part 3 - Lifetime management – Completing and Unsubscribing
Forward to next post; Part 5 - Combining multiple IObservable streams
Technorati Tags: ,,

No comments: