Tuesday, May 18, 2010

Introduction to Rx Part 1 - Key types

STOP THE PRESS! This series has now been superseded by the online book www.IntroToRx.com. The new site/book offers far better explanations, samples and depth of content. I hope you enjoy!

Microsoft has released a new library for building “reactive” applications. It’s full name is Reactive Extensions for .NET but is generally referred to as just “Rx”. Essentially Rx is built upon the foundations of the Observer pattern. .NET already exposes some other ways to implement the Observer pattern such as Multicast delegates or Events. Multicast delegates (which Events are) however can be cumbersome to use, have a nasty interface and are difficult to compose and can not be queried. Rx looks to solve these problems.
Here I will introduce you to the building blocks and some basic types that make up Rx.

IObservable<T>

IObservable<T> is one of the 2 core interfaces for working with Rx. It is a simple interface with just a Subscribe method. Microsoft are so confident that this interface will be of use to you it has been included in the BCL as of version 4.0 of .NET. You should be able to think of anything that implements IObservable<T> as a Stream of T objects. So if a method returned an IObservable<Price> I could think of it as a stream of Prices.

IObserver<T>

IObserver<T> is the other one of the 2 core interfaces for working with Rx. It too has made it into the BCL as of .NET 4.0. Don’t worry if you are not on .NET 4.0 yet as the Rx team have included these 2 interfaces in a separate assembly for .NET 3.5 users. IObserver<T> is meant to be the “functional dual of IEnumerable<T>”. If you want to know what that last statement meant then enjoy the hours of videos on Channel9 where they discuss the mathematical purity of the types. For everyone else it means that where an IEnumerable<T> can effectively yield 3 things (the next value, an exception or the end of the sequence), so too can IObservable<T> via IObserver<T>’s 3 methods OnNext(T), OnError(Exception) and OnCompleted().
Interestingly, while you will be exposed to the IObservable<T> interface a lot if you work with Rx, I find I don't often need to concern myself with IObserver<T>. Another interesting thing I have found with Rx is that I never actually implement these interfaces myself, Rx provides all of the implementations I need out of the box. Lets have a look at the simple ones.

Subject<T>

If you were to create your own implementation of IObservable<T> you may find that you need to expose method to publish items to the subscribers, throw errors and notify when the stream is complete. Hmmm they all sound like the methods on the IObserver<T> interface. While it may seem odd to have one type implementing both interfaces, it does make life easy. This is what subjects can do for you.  Subject<T> is the most basic of the subjects. Effectively you can expose your Subject<T> behind a method that returns IObservable<T> but internally you can use the OnNext, OnError and OnCompleted methods to control the stream.
In this (awfully basic) example, I create a subject, subscribe to that subject and then publish to the stream.
using System;
using System.Collections.Generic;

namespace RxConsole
{
    class Program
    {
        static void Main(string[] args)
        {
            var subject = new Subject<string>();

            WriteStreamToConsole(subject);

            subject.OnNext("a");
            subject.OnNext("b");
            subject.OnNext("c");
            Console.ReadKey();
        }

        private static void WriteStreamToConsole(IObservable<string> stream)
        {
            stream.Subscribe(Console.WriteLine);
        }
    }
}
Note that the WriteStreamToConsole method takes an IObservable<string> as it only wants access to the subscribe method. Hang on, doesn’t the Subscribe method need an IObserver<string>? Surely Console.WriteLine does not match that interface. Well not it doesn’t but the Rx team supply me with an Extension Method to IObservable<T> that just takes an Action<T>. The action will be executed every time an item is published. There are other overloads to the Subscribe extension method that allows you to pass combinations of delegates to be invoke for OnNext, OnCompleted and OnError. This effectively means I don't need to implement IObserver<T>. Cool.
As you can see, Subject<T> could be quite useful for getting started in Rx programming. Subject<T> is a basic implementation however. There are 3 siblings to Subject<T> that offer subtly different implementations which can drastically change the way your program runs.

ReplaySubject<T>

ReplaySubject<T> will remember all publications to it so that any subscriptions that happen after publications have been made, will still get all of the publications. Consider this example where we have moved our first publication to occur before our subscription
static void Main(string[] args)
{
    var subject = new Subject<string>();

    subject.OnNext("a");
    WriteStreamToConsole(subject);
    
    subject.OnNext("b");
    subject.OnNext("c");
    Console.ReadKey();
}
The result of this would be that “b” and “c” would be written to the console, but “a” ignored. If we were to make the minor change to make subject a ReplaySubject<T> we would see all publications again.
static void Main(string[] args)
{
    var subject = new ReplaySubject<string>();

    subject.OnNext("a");
    WriteStreamToConsole(subject);
    
    subject.OnNext("b");
    subject.OnNext("c");
    Console.ReadKey();
}
This can be very handy for eliminating race conditions.

BehaviorSubject<T>

BehaviorSubject<T> is similar to ReplaySubject<T> except it only remembers the last publication. BehaviorSubject<T> also requires you to provide it a default value of T. This means that all subscribers will receive a value immediately (unless it is already completed).
In this example the value “a” is written to the console.
static void Main(string[] args)
{
    var subject = new BehaviorSubject<string>("a");
    WriteStreamToConsole(subject);
    Console.ReadKey();
}
In this example the value “b” is written to the console, but not “a”.
static void Main(string[] args)
{
    var subject = new BehaviorSubject<string>("a");
    subject.OnNext("b");
    WriteStreamToConsole(subject);
    Console.ReadKey();
}
In this example the values “b”, “c” & “d” are all written to the console, but again not “a”
static void Main(string[] args)
{
    var subject = new BehaviorSubject<string>("a");

    subject.OnNext("b");
    WriteStreamToConsole(subject);
    subject.OnNext("c");
    subject.OnNext("d");
    Console.ReadKey();
}
Finally in this example, no values will be published as the stream has completed. Nothing is written to the console.
static void Main(string[] args)
{
    var subject = new BehaviorSubject<string>("a");

    subject.OnNext("b");
    subject.OnNext("c");
    subject.OnCompleted();
    WriteStreamToConsole(subject);
    
    Console.ReadKey();
}

AsyncSubject<T>

AsyncSubject<T> is similar to the Replay and Behavior subjects, however it will only store the last value, and only publish it when the stream is completed.
In this example no values will be published so no values will be written to the console.
static void Main(string[] args)
{
    var subject = new AsyncSubject<string>();

    subject.OnNext("a");
    WriteStreamToConsole(subject);
    subject.OnNext("b");
    subject.OnNext("c");
    Console.ReadKey();
}
In this example we invoke the OnCompleted method and the value “c” is published and therefore written to the console.
static void Main(string[] args)
{
    var subject = new AsyncSubject<string>();

    subject.OnNext("a");
    WriteStreamToConsole(subject);
    subject.OnNext("b");
    subject.OnNext("c");
    subject.OnCompleted();
    Console.ReadKey();
}
So that is the very basics of Rx. With only that under you belt it may be hard to understand why Rx is a topic of interest. To follow on from this post I will discuss further fundamentals to Rx
  1. Extension methods
  2. Scheduling / Multithreading
  3. LINQ syntax
Once we have covered these it should allow you to really get Rx working for you to produce some tasty Reactive applications. Hopefully after we have covered these background topics we can knock up some Samples where Rx can really help you in your day to day coding.
The full source code is now available either via svn at http://code.google.com/p/rx-samples/source/checkout or as a zip file.
Related links :
IObservable<T> interface - MSDN
IObserver<T> interface - MSDN
Observer Design pattern - MSDN
Rx Home
Exploring the Major Interfaces in Rx – MSDN
ObservableExtensions class - MSDN
Using Rx Subjects - MSDN
System.Reactive.Subjects Namespace - MSDN
Subject<T> - MSDN
AsyncSubject<T> - MSDN
BehaviorSubject<T> - MSDN
ReplaySubject<T> - MSDN
Subject static class - MSDN
ISubject<TSource, TResult> - MSDN
ISubject<T> - MSDN
Back to the contents page for Reactive Extensions for .NET Introduction
Forward to next post; Part 2 - Static and extension methods
Technorati Tags: ,,

11 comments:

Unknown said...

excellent post, cheers

Lee Campbell said...

Plenty more to come....

NoPanic said...

best intro post about RX

Unknown said...

Great post Lee. Time to update to version 1.04 of the framework though - changes to BehaviorSubject and AsyncSubject have invalidated some of your code examples.

Anonymous said...

I think that it would be easier to unerstand if you used lambda expression rather that converting it to method group, like this.

stream.Subscribe(x => Console.WriteLine(x));

There are many devs that are not aware of this conversion.

Shwaindog said...

excellent introduction into reactive framework rx.

Lee Campbell said...

Had a few in-person comments about

stream.Subscribe(Console.WriteLine);

Maybe would be a bit easier to understand if we took that step by step.
1) There is an Extension method to IObservable that allows an Action to be provided that will be called for each OnNext
2) I could create an action like this
Action onNextAction = (value)=>Console.WriteLine(value);
or
Action onNextAction = Console.WriteLine;
3) Next I can use this action in the extension method like
stream.Subscribe(onNextAction);
4) Why create the action variable, I could do this inline, right?
stream.Subscribe((value)=>Console.WriteLine(value));
5)Gosh doesn't it look like Console.WriteLine itself matches the required signature. Lets get rid of all the lambda stuff and just point directly to the delegate of Console.WriteLine and have the compiler figure out the best overload (probably the one that take a single 'object' argument)
stream.Subscribe(Console.WriteLine);

HTH

Anonymous said...

Thought you might find this interesting and a little familiar -

http://www.jeroenverhulst.be/post/2010/09/22/Reactive-Extensions-Unleashed.aspx

Lee Campbell said...

LOL
Plagiary –ahem- imitation is the sincerest form of flattery? :-p

MarkPearl said...

Great post... thanks Lee, enjoying this Rx stuff!

Unknown said...

Nice post very helpful

dbakings