Thursday, August 19, 2010

Rx Part 7 – Hot and Cold Observables

STOP THE PRESS! This series has now been superseded by the online book www.IntroToRx.com. The new site/book offers far better explanations, samples and depth of content. I hope you enjoy!

In this post we will look how to describe and handle 2 styles of observable streams
  1. Streams that are passive and start publishing on request,
  2. Streams that are active and publish regardless of subscriptions.
In this sense passive streams are called Cold and active are described as being Hot. You can draw some similarities between implementations of the IObservable<T> interface and implementations of the IEnumerable<T> interface with regards to Hot and Cold. With IEnumerable<T> you could have an “On demand” collection via the yield return syntax, or you could have an eager evaluation by populating a List<T>, for example, and returning that (as per the example below)
Do(LazyEvaluation());
Do(EagerEvaluation());


private void Do(IEnumerable<int> list)
{
  foreach (var i in list)
  {
    Console.WriteLine("Read out first value of {0}", i);
    break;
  }
}

public IEnumerable<int> LazyEvaluation()
{
  Console.WriteLine("About to return 1");
  yield return 1;
  Console.WriteLine("About to return 2");//Never called in this example
  yield return 2;
}

public IEnumerable<int> EagerEvaluation()
{
  var result = new List<int>();
  Console.WriteLine("About to return 1");
  result.Add(1);
  Console.WriteLine("About to return 2");//executed but not used.
  result.Add(2);
  return result;
}
Implementations of IObservable<T> can exhibit similar variations in style.
Examples of Hot observables that could publish regardless of if there are any subscribers would be:
  • Mouse movements
  • Timer events
  • broadcasts like ESB channels or UDP network packets.
  • price ticks from a trading exchange
Some examples of Cold observables would be:
  • subscription to a queue
  • when Rx is used for an asynchronous request
  • on demand streams
In this post we will look at 3 scenarios in which cold, hot and both cold & hot are implemented.

Cold Observables

In this first example we have a requirement to fetch a list of products from a service. In our implementation we choose to return an IObservable<string> and as we get the results we publish them until we have the full list and then we publish an OnComplete. This is a pretty simple example.
private static IObservable<string> GetProducts()
{
  return Observable.CreateWithDisposable<string>(
    o =>
    {
      using(var conn = new SqlConnection(@"Data Source=.\SQLSERVER;Initial Catalog=AdventureWorksLT2008;Integrated Security=SSPI;"))
      using (var cmd = new SqlCommand("Select Name FROM SalesLT.ProductModel", conn))
      {
        conn.Open();
        SqlDataReader reader = cmd.ExecuteReader(CommandBehavior.CloseConnection);
        while (reader.Read())
        {
          o.OnNext(reader.GetString(0));
        }
        o.OnCompleted();
        return Disposable.Create(()=>Console.WriteLine("--Disposed--"));
      }
    });
}
This style of API would allow for a non blocking call to fetch the list of products and would inform the consumer of when the list was complete. This is fairly common stuff, but note that every time this is called, the database will be accessed again.
In the example above I use Disposable.Create factory method. This factory method just creates an implementation of IDisposable that executes a given action when disposed. This is perfect for doing a Console.WriteLine once the subscription has been disposed.
In this example below, we have a consumer of our above code, but it explicitly only wants up to 3 values (the full set has 128 values). This code illustrates that the Take(3) expression will restrict what the consumer receives but GetProducts() method will still publish all of the values.
public void ColdSample()
{
  var productStream = GetProducts().Take(3);
  productStream.Subscribe(Console.WriteLine);
  Console.ReadLine();
}

Hot Observables

Trying to come up with an example for Hot Observables has been a real pain. I have started off with examples with some sort of context (streaming stock prices or weather information) but this all seemed to detract from the real working of the code. So I think it is best to step through this slowly with a contrived demo and build it up to a piece of code you might actually want to use.
Let us start with subscribing to an Interval. In the example below we subscribe to the same Observable that is created via the Interval extension method. The delay between the two subscriptions should demonstrate that while they are subscribed to the same observable instance, it is not the same logical stream of data.
public void SimpleColdSample()
{
  var period = TimeSpan.FromSeconds(1);
  var observable = Observable.Interval(period);
  observable.Subscribe(i => Console.WriteLine("first subscription : {0}", i));
  Thread.Sleep(period);
  observable.Subscribe(i => Console.WriteLine("second subscription : {0}", i));
  Console.ReadKey();
  /* Ouput:
   first subscription : 0
   first subscription : 1
   second subscription : 0
   first subscription : 2
   second subscription : 1
   first subscription : 3
   second subscription : 2   
   */
}
Publish and Connect
If I want to be able to share the actual stream of data and not just the instance of the observable, I can use the Publish() extension method. This will return an IConnectableObservable<T>, which extends IObservable<T> by adding the single Connect() method. By using the Publish() then the Connect() method, we can get this functionality.
public void SimpleConnectSample()
{
  var period = TimeSpan.FromSeconds(1);
  var observable = Observable.Interval(period).Publish();
  observable.Connect();
  observable.Subscribe(i => Console.WriteLine("first subscription : {0}", i));
  Thread.Sleep(period);
  observable.Subscribe(i => Console.WriteLine("second subscription : {0}", i));
  Console.ReadKey();
  /* Ouput:
   first subscription : 0
   first subscription : 1
   second subscription : 1
   first subscription : 2
   second subscription : 2   
   */
}
In the example above the observable variable is an IConnectableObservable<T>, and by calling Connect() it will subscribe to the underlying (the Observable.Interval). In this case we are quick enough to subscribe before the first item is published but only on the first subscription. The second subscription subscribes late and misses the first publication. We could move the invocation of the Connect() method until after each of the subscriptions have been made so that even with the Thread.Sleep we wont really subscribe to the underlying until after both subscriptions are made. This would be done as follows:
public void SimpleConnectSample()
{
  var period = TimeSpan.FromSeconds(1);
  var observable = Observable.Interval(period).Publish();
  observable.Subscribe(i => Console.WriteLine("first subscription : {0}", i));
  Thread.Sleep(period);
  observable.Subscribe(i => Console.WriteLine("second subscription : {0}", i));
  observable.Connect();
  Console.ReadKey();
  /* Ouput:
   first subscription : 0
   second subscription : 0
   first subscription : 1
   second subscription : 1
   first subscription : 2
   second subscription : 2   
   */
}
You can probably imagine how this could be quite useful where an application had the need to share streams of data. In a trading application if you wanted to consume a price stream for a certain asset in more than one place, you would want to reuse that stream and not have to make another subscription to the server providing that data. Publish() and Connect() are perfect solutions for this.
Disposal of connections and subscriptions
What does become interesting is how disposal is performed. What was not covered above is that the Connect() method returns an IDisposable. By disposing of the “connection” you can turn the stream on and off (Connect() to turn it on and then disposing of the connection to turn it off). In this example we see that the the stream can be connected and disconnected multiple times.
public void ConnectAndDisposeSample()
{
  var period = TimeSpan.FromSeconds(1);
  var observable = Observable.Interval(period).Publish();
  observable.Subscribe(i => Console.WriteLine("subscription : {0}", i));
  var exit = false;
  while (!exit)
  {
    Console.WriteLine("Press enter to connect, esc to exit.");
    var key = Console.ReadKey(true);
    if(key.Key== ConsoleKey.Enter)
    {
      var connection = observable.Connect();  //--Connects here--
      Console.WriteLine("Press any key to dispose of connection.");
      Console.ReadKey();
      connection.Dispose();                   //--Disconnects here--
    }
    if(key.Key==ConsoleKey.Escape)
    {
      exit = true;
    }
  }
  /* Ouput:
   Press enter to connect, esc to exit.
   Press any key to dispose of connection.
   subscription : 0
   subscription : 1
   subscription : 2
   Press enter to connect, esc to exit.
   Press any key to dispose of connection.
   subscription : 0
   subscription : 1
   subscription : 2
   Press enter to connect, esc to exit.   
   */
}
Let us finally consider automatic disposal of a connection. It would be common place for a single stream to be shared between subscriptions, as per the price stream example mentioned above. It would however also be common place for the developer to want to only have the stream running hot if there are subscriptions to it. Therefore it seems not only obvious that there should be a mechanism for automatically connecting (once a subscription has been made), but also a mechanism for disconnecting (once there are no more subscriptions) from a stream. First let us look at what happens to a stream when we connect with no subscribers, and then later unsubscribe:
public void OrphanedStreamExample()
{
  var period = TimeSpan.FromSeconds(1);
  var observable = Observable.Interval(period)
    .Do(l => Console.WriteLine("Publishing {0}", l)) //produce Side effect to show it is running.
    .Publish();
  observable.Connect();
  Console.WriteLine("Press any key to subscribe");
  Console.ReadKey();
  var subscription = observable.Subscribe(i => Console.WriteLine("subscription : {0}", i));

  Console.WriteLine("Press any key to unsubscribe.");
  Console.ReadKey();
  subscription.Dispose();

  Console.WriteLine("Press any key to exit.");
  Console.ReadKey();
  /* Ouput:
   Press any key to subscribe
   Publishing 0
   Publishing 1
   Press any key to unsubscribe.
   Publishing 2
   subscription : 2
   Publishing 3
   subscription : 3
   Press any key to exit.
   Publishing 4
   Publishing 5
   */
}
A few things to note here:
  1. I use the Do extension method to create side effects on the stream (ie writing to the console). This allows us to see when the stream is actually connected.
  2. We connect first and then subscribe, which means we can be publishing without any subscriptions.
  3. We dispose of our subscription but don’t dispose of the connection which means the stream will still be running. This means we will be publishing even though there are no subscriptions.
RefCount
Taking the last example, if we just comment out the line that makes the Connection, and then add a further extension method to our creation of our observable RefCount we have magically implemented all of our requirements. RefCount will take an IConnectableObservable<T> and turn it back into an IObservable<T> and automatically implement the connect and disconnect behaviour we are looking for.
public void RefCountExample()
{
  var period = TimeSpan.FromSeconds(1);
  var observable = Observable.Interval(period)
    .Do(l => Console.WriteLine("Publishing {0}", l)) //produce Side effect to show it is running.
    .Publish()
    .RefCount();
  //observable.Connect(); Use RefCount instead now
  Console.WriteLine("Press any key to subscribe");
  Console.ReadKey();
  var subscription = observable.Subscribe(i => Console.WriteLine("subscription : {0}", i));

  Console.WriteLine("Press any key to unsubscribe.");
  Console.ReadKey();
  subscription.Dispose();

  Console.WriteLine("Press any key to exit.");
  Console.ReadKey();
  /* Ouput:
   Press any key to subscribe
   Press any key to unsubscribe.
   Publishing 0
   subscription : 0
   Publishing 1
   subscription : 1
   Publishing 2
   subscription : 2
   Press any key to exit.
   */
}

Other Connectable Observables

While this is a post about Hot and Cold Observables, I think it is worth mentioning the other ways IConnectableObservable<T> can pop up.
Prune
The prune method is effectively a non blocking .Last() call. You can consider it similar to an AsyncSubject<T> wrapping your target Observable so that you get equivalent semantics of only returning the last value of an observable and only once it completes.
public void PruneExample()
{
  var period = TimeSpan.FromSeconds(1);
  var observable = Observable.Interval(period)
    .Take(5)
    .Do(l => Console.WriteLine("Publishing {0}", l)) //produce Side effect to show it is running.
    .Prune();
  observable.Connect();
  Console.WriteLine("Press any key to subscribe");
  Console.ReadKey();
  var subscription = observable.Subscribe(i => Console.WriteLine("subscription : {0}", i));

  Console.WriteLine("Press any key to unsubscribe.");
  Console.ReadKey();
  subscription.Dispose();

  Console.WriteLine("Press any key to exit.");
  Console.ReadKey();
  /* Ouput:
   Press any key to subscribe
   Publishing 0
   Publishing 1
   Press any key to unsubscribe.
   Publishing 2
   Publishing 3
   Publishing 4
   subscription : 4
   Press any key to exit.
   */
}
Replay
The Replay extension method allows you take an existing Observable and give it “replay” semantics as per the ReplaySubject<T>. As a reminder, the ReplaySubject<T> will cache all values so that any late subscribers will also get all of the values. In this example 2 subscriptions are made on time, and then a third subscription can be made after they complete. Even though the third subscription can be done after the OnComplete we can still get all of the values.
public void ReplayOnHotExample()
{
  var period = TimeSpan.FromSeconds(1);
  var hot = Observable.Interval(period)
    .Take(3)
    .Publish();
  hot.Connect();
  Thread.Sleep(period); //Run hot and ensure a value is lost.
  var observable = hot.Replay();
  observable.Connect();
  observable.Subscribe(i => Console.WriteLine("first subscription : {0}", i));
  Thread.Sleep(period);
  observable.Subscribe(i => Console.WriteLine("second subscription : {0}", i));

  Console.ReadKey();
  observable.Subscribe(i => Console.WriteLine("third subscription : {0}", i));
  Console.ReadKey();

  /* Ouput:
   first subscription : 1
   second subscription : 1
   first subscription : 2
   second subscription : 2   
   third subscription : 1
   third subscription : 2
   */
}

I hope that gives some insight to Hot and Cold observables. I think this is one of the more mysterious parts of Rx for many newbies so I hope that it helps clear up some of the cool stuff you can do with Rx. I sure many of you will look forward to finding ways to implement this in your next application.
For more information on IConnectableObservable<T> and Hot/Cold streams check out these resources:
channel 9 video on Hot and Cold observables
Hot and Cold by Bnaya Eshet
The full source code is now available either via svn at http://code.google.com/p/rx-samples/source/checkout or as a zip file.
Back to the contents page for Reactive Extensions for .NET Introduction
Back to the previous post; Part 6 - Scheduling and threading
Forward to next post; Part 8 – Testing Rx
Technorati Tags: ,,

8 comments:

Anonymous said...

Hi Lee,

I'm following your sample about Could Observable:

var s = Observable.CreateWithDisposable(
observer =>
{
for (int i = 0; i < 100; i++)
{
observer.OnNext(i);
Thread.Sleep(300);
}

observer.OnCompleted();
return Disposable.Create(() => Console.WriteLine("--Disposed--"));
});

s.Subscribe(Console.WriteLine);

//MyEnumerable t = new MyEnumerable();
//t.ForEach(i => Console.WriteLine(i));
Console.WriteLine("app continues running while async enumerable is processing");
Console.ReadLine();

As you can see, each "on next" I'm slepping a little time. But I can't view the "app running" message, until all elements in array are processed.

Isn't Subscribe method async?!

Thanks a lot.

Lee Campbell said...

Apologies for the delay in my reply. The reason you will not see any progress is because you are subscribing and observing on the same scheduler.

If you change your one line
s.Subscribe(Console.WriteLine);
to
s.SubscribeOn(Scheduler.ThreadPool).Subscribe(Console.WriteLine);

Otherwise you are effectively telling the code to block on the current thread while you run your for loop.

An even easier way to do what you have here is to use the built in methods:
var s = Observable.Interval(TimeSpan.FromMilliseconds(300), scheduler)
.Take(100) //Else it will run forever
.Finally(() => Console.WriteLine("--Disposed--"));

Andrea said...

your rx articles are simply the best I have read so far!

A big thank!!!

..and a little question...
How can I create an observer which gives values with a certain pace?

is it enought to write

....

o.OnNext("1");
Thread.Sleep(30);
o.OnNext("1");
Thread.Sleep(30);
....

thnx!!

Lee Campbell said...

I think this will do exactly what you are after:
Observable.Interval(TimeSpan.FromMilliSeconds(300).Select(_=>1);

See the 2nd post for an example on the Interval factory method

Observable.Interval(TimeSpan) will publish incremental values from 0 every period that you supply. This examples publishes values every 250milliseconds

var interval = Observable.Interval(TimeSpan.FromMilliseconds(250));
interval.Subscribe(Console.WriteLine);

I hope that helps

Andrea said...

Sorry,
I left "1" value everywhere.. but I meant:
how to buind an arbitrary stream (for example audio wawe data)with each value dalayed by a fixed amount of time?

...
2
(0.1 sec delay)
5
(0.1 sec delay)
10
(0.1 sec delay)
...


tnx!

Lee Campbell said...

The latest drop of Rx has some new windowing functionality that would help you here. To answer your question properly I would need to know more;
Are all the values ready or are they getting streamed to you?
If they are being streamed and you get many values within the period, do you want to buffer the value or ignore all but the latest?
If you were streamed values and in one period no values came through, do you want to onnext something, or perhaps wait till the next value comes and immediately publish the result, or only publish on the set period if there has been a value.

Anyway, this will be one of the next topics stay tuned.

Andrea said...

I meant to use it for a sort of realtime audio processing with mic and speackers for silverlight...

anyway looking forward for the new posts.

GREAT SERIES INDEED!

Dan Harman said...

Thanks for a great tutorial. The RefCount semantics perfectly capture the market data subscription pattern that I want to solve with rx.