Sunday, May 23, 2010

Rx part 2 – Static and Extension methods

STOP THE PRESS! This series has now been superseded by the online book www.IntroToRx.com. The new site/book offers far better explanations, samples and depth of content. I hope you enjoy!

In the previous post we looked at the core types to the Rx library and how we could get up and running with “Observables”. The key types we discussed were the IObservable<T> interface, IObserver<T>, Subject<T> and its siblings ReplaySubject<T>, BehaviorSubject<T> and AsyncSubject<T>. In this post we are going to explore some of the Static methods on the Observable type and some Extension methods to the IObservable<T> interface.
We have already seen on extension method in the last post which was the Overload to Subscribe which allowed us to pass just an Action<T> to be performed when OnNext was invoked instead of passing an IObserver<T>. The other overloads of Subscribe are also very useful
public static class ObservableExtensions
{
    public static IDisposable Subscribe<TSource>(this IObservable<TSource> source);
    public static IDisposable Subscribe<TSource>(this IObservable<TSource> source, Action<TSource> onNext);
    public static IDisposable Subscribe<TSource>(this IObservable<TSource> source, Action<TSource> onNext, Action<Exception> onError);
    public static IDisposable Subscribe<TSource>(this IObservable<TSource> source, Action<TSource> onNext, Action onCompleted);
    public static IDisposable Subscribe<TSource>(this IObservable<TSource> source, Action<TSource> onNext, Action<Exception> onError, Action onCompleted);
} 
Each of these overloads allows you to pass various combinations of delegates that you want executed for each publication event an IObservable<T> could raise. You will use these overloads a lot but there are plenty of other static methods that are very helpful on the static Observable class.

Creation and Generation of Observables

The Observable class has several methods for conveniently creating common types of Observables. The most simple are shown below with an example of how to use it and what you would have to write to emulate the method.
Observable.Empty<T>() This returns IObservable<T> that just publishes an OnCompleted.
var subject = new ReplaySubject<string>();
subject.OnCompleted();
//Or
var empty = Observable.Empty<string>();
Observable.Return<T>(T). This method will return an Observable that publishes the value of T supplied and then publish OnCompleted.
var subject = new ReplaySubject<string>();
subject.OnNext("Value");
subject.OnCompleted();
//Or
var obReturn = Observable.Return("Value");
Observable.Never<T>(). This method will return an IObservable<T> but will not publish any events.
var subject = new Subject<string>();
//Or
var never = Observable.Never<string>();
Observable.Throw<T>(Exception). This method just publishes the exception passed.
var subject = new ReplaySubject<string>();
subject.OnError(new Exception());
//Or
var throws = Observable.Throw<string>(new Exception());
Observable.Create<T>(Func<IObserver<T>,IDisposable>) is a little different to the above creation methods. The method signature itself is a bit nasty but once you get to know him, he is not that bad. Essentially this method allows you to specify a delegate that will be executed anytime a subscription is made. The IObserver<T> that made the subscription will be passed to your delegate so that you can call the OnNext/OnError/OnCompleted methods as you need. Your delegate is a Func that returns an IDisposable. This IDisposable will have it’s Dispose method called when the subscriber disposes from their subscription. We will cover unsubscribing/disposing in a future post.
The big benefit of using the Create method is that your method will be a non blocking call which is the whole point of the Rx framework. In this example we show how we might first return an Observable via standard blocking call, and then we show the correct way to return an Observable without blocking
private IObservable<string> BlockingMethod()
{
  var subject = new ReplaySubject<string>();
  subject.OnNext("a");
  subject.OnNext("b");
  subject.OnCompleted();
  Thread.Sleep(1000);
  return subject;
}
private IObservable<string> NonBlocking()
{
  return Observable.Create<string>(
      observable =>
      {
        observable.OnNext("a");
        observable.OnNext("b");
        observable.OnCompleted();
        Thread.Sleep(1000);
        return Disposable.Create(() => Console.WriteLine("Observer has unsubscribed"));
        //or can return an Action like
        //return () => Console.WriteLine("Observer has unsubscribed");
      });
}
I would imagine that you will use Observable.Create<T> method a lot once you start using Rx. An obvious application of this method is if you needed to perform a slow request to the network (web service or SQL call). There is also a Observable.Create<T>(Func<IObserver<T>,Action>) method that instead of invoking the Dispose method it will just invoke the action when the subscriber disposes their subscription.
Observable.Range(int, int) returns just a range of integers. The first integer is the initial value and the second is the number of values to yield. This example will write the values "10” through to “24” and then “Completed”.
var range = Observable.Range(10, 15);
range.Subscribe(Console.WriteLine, ()=>Console.WriteLine("Completed"));
Observable.Interval(TimeSpan) will publish incremental values from 0 every period that you supply.  This examples publishes values every 250milliseconds
var interval = Observable.Interval(TimeSpan.FromMilliseconds(250));
interval.Subscribe(Console.WriteLine);
In this example I use the Observable.Interval method to fake ticking prices
var rnd = new Random();
var lastPrice = 100.0;
var interval = Observable.Interval(TimeSpan.FromMilliseconds(250))
    .Select(i =>
      {
          var variation =
              rnd.NextDouble() - 0.5;
          lastPrice += variation;
          return lastPrice;
      });

interval.Subscribe(Console.WriteLine);
Observable.Start method allows you to turn a long running Func<T> or Action into an Observable. If you use the overload that takes an Action then the returned Observable will be of type IObservable<Unit>. Unit is analogous to void and is a Functional Programming construct. In this case it is just used to publish an acknowledgement that the Action is complete, however this is rather inconsequential as OnCompleted will be published straight after Unit anyway. If the overload you use is a Func<T> then when the Func returns its value it will be published and then OnCompleted. Below is an example of using both overloads.
static void StartAction()
{
    var start = Observable.Start(() =>
    {
        Console.Write("Working away");
        for (int i = 0; i < 10; i++)
        {
            Thread.Sleep(100);
            Console.Write(".");
        }
    });
    start.Subscribe(unit => Console.WriteLine("Unit published"), () => Console.WriteLine("Action completed"));
}
static void StartFunc()
{
    var start = Observable.Start(() =>
    {
        Console.Write("Working away");
        for (int i = 0; i < 10; i++)
        {
            Thread.Sleep(100);
            Console.Write(".");
        }
        return "Published value";
    });
    start.Subscribe(Console.WriteLine, () => Console.WriteLine("Action completed"));
}

ToObservable<T>(this IEnumerable<T>) is an extension method for IEnumerable<T> that will translate any IEnumerable<T> to an IObservable<T>. It is a simple an convenient method for switching from one paradigm to another.
var enumT =  new List<string>();
enumT.Add("a");
enumT.Add("b");
enumT.Add("c");
var fromEnum = enumT.ToObservable();

fromEnum.Subscribe(Console.WriteLine);
ToEnumerable<T>(this IObservable<T>) effectively is the opposite of the above method. It will take an IObservable<T> and yield its results as an IEnumerable. Again useful for switching paradigms.
Observable.Generate provides a more elaborate way to construct Observables than Range. In this example I keep it simple and have the Generate method construct me a sequence starting from 5 and incrementing by 3 while the value is less than 15. I also have the values published as strings to show that the type of the State and Result can be different.
var generated = Observable.Generate(5, i => i < 15, i => i + 3, i => i.ToString());
generated.Subscribe(Console.WriteLine, ()=>Console.WriteLine("Completed"));

Checking for existence

Theses method below all provide some sort of check to confirm the existence of a published value.
Any<T>(this IObservable<T>) is an extension method that will either check that the IObservable will publish any value at all, or if you use the overload that take a Func<T,bool> then it will check if any values satisfy your predicate. The interesting thing about Any is that it is non-blocking and so returns an IObservable<bool>. This will only yield one value.
var range = Observable.Range(10, 15);
range.Any().Subscribe(Console.WriteLine);
range.Any(i => i > 100).Subscribe(Console.WriteLine);
Observable.Empty<int>().Any().Subscribe(Console.WriteLine);
Contains<T>(this IObservable<T>, T) is an extension method like Any but will only accept a value of T not a predicate of T. Like Any the result is an IObservable<bool>.
var range = Observable.Range(10, 15);
range.Contains(15).Subscribe(Console.WriteLine);
Observable.Empty<int>().Contains(5).Subscribe(Console.WriteLine);

Filtering and Aggregating

These next few extension methods provide some sort of filter to the underlying stream. These are very similar to the extension methods available to IEnumerable<T>
First<T>(this IObservable<T>) simply returns the first value of an Observable or throws InvalidOperationException if the stream completes without yielding a value. This is a blocking call that wont return until a value or OnCompleted is published.
var range = Observable.Range(10, 15);
Console.WriteLine(range.First());
//InvalidOperationException("Sequence contains no elements.")
Console.WriteLine(Observable.Empty<int>().First());
There are further extension methods that probably don't warrant further explanation if I assume you are familiar with the existing IEnumerable<T> extension methods. These include
  • FirstOrDefault
  • Last
  • LastOrDefault
  • Single
  • Count
  • Min
  • Max
  • Sum
  • Where
  • GroupBy
While I think that most of these are self explanatory, what maybe worth reinforcing is that some of the scalar methods (returns bool or T and not IObservable) are blocking calls. This can be very handy for transforming your non blocking Rx code into sequential blocking code.
Aggregate & Scan extension method overloads allow you to create your own aggregations. Both take similar arguments for their overloads. One of Scan’s overload takes a Func<T,T,T> which is the more simple overload. Effectively given an accumulator and the current value you must return the next accumulator. The other overload can take another type for the accumulator and allows you to specify the seed value for accumulator.
Aggregate has the same overloads. The difference is that Scan will OnNext the result of every Accumulation, Aggregate will only OnNext the last value. Therefore Aggregate is like an AsyncSubject<T> version of Scan.
In this example Scan will publish the running sum of the input sequence, and Aggregate will only publish the final total.
public void Aggregate_can_be_used_to_create_your_own_aggregates()
{
  var interval = Observable.Interval(TimeSpan.FromMilliseconds(150))
                          .Take(10);  //Must complete for Aggregate to publish a value.
  var aggregate = interval.Aggregate(0L, (acc, i) => acc + i);
  WriteStreamToConsole(aggregate, "aggregate");
}

public void Scan_allows_custom_rolling_aggregation()
{
  var interval = Observable.Interval(TimeSpan.FromMilliseconds(150));
  var scan = interval.Scan(0L, (acc, i) => acc + i);
  WriteStreamToConsole(scan, "scan");
}
Take<T>(this IObservable<T>, int) will return you the first number of publications specified by the integer value provided. Take(1) is like First() expect that is returns an IObservable so is not blocking. This example yields just the values 10 and 11.
var range = Observable.Range(10, 15);
range.Take(2).Subscribe(Console.WriteLine);
Skip<T>(this IObservable<T>, int) will ignore the first number of publications specified by the integer value provided. So while Take(2) above returned 10 and 11, Skip(2) will ignore 10 and 11 and return the rest of the stream.
var range = Observable.Range(10, 15);
range.Skip(2).Subscribe(Console.WriteLine);
DistinctUntilChanged<T>(this IObservable<T>) formerly HoldUntilChanged, will ignore any value that is the same as the previous value. In this example the values a, b and c are only written to the console once each.
var subject = new Subject<string>();
subject.DistinctUntilChanged().Subscribe(Console.WriteLine);
subject.OnNext("a");
subject.OnNext("a");
subject.OnNext("a");
subject.OnNext("b");
subject.OnNext("b");
subject.OnNext("b");
subject.OnNext("b");
subject.OnNext("c");
subject.OnNext("c");
subject.OnNext("c");

Buffering and time shifting

In some scenarios your application or user may not be able to keep up with the amount of data that a stream is providing. Alternatively, your user or application may just not need the amount of data that is being published. Rx offer some great extension methods for allowing you to control the rate at which values are published to your stream.
Buffer allows you to buffer a range of values and then re-publish them as a list once the buffer is full. You can buffer a specified number of elements or buffer all the values per timespan. Buffer also offer more advanced overloads.
static void Main(string[] args)
{
    var range = Observable.Range(10, 15);
    range.Buffer(4).Subscribe(
        enumerable =>
           {
               Console.WriteLine("--Buffered values");
               enumerable.ToList().ForEach(Console.WriteLine);
           }, () => Console.WriteLine("Completed"));

    var interval = Observable.Interval(TimeSpan.FromMilliseconds(150));
    interval.Buffer(TimeSpan.FromSeconds(1)).Subscribe(
        enumerable =>
        {
            Console.WriteLine("--Buffered values");
            enumerable.ToList().ForEach(Console.WriteLine);
        }, () => Console.WriteLine("Completed"));

    Console.ReadKey();
}
Delay is an extension method overload that will time-shift the entire Observable by the TimeSpan specified, or until the DateTime. This means that if you get a stream with 5 values in the first second and then 10 values in the 3rd second, using Delay with a timespan of 2 seconds will yield results in second 3 and 5. If you use the DateTime overload the values will be yielded with in the first second of the DateTime and then in the 3rd second after the DateTime. This example will delay by 2 seconds and also in one minute.
var inOneMinute = DateTime.Now.AddMinutes(1);
var range = Observable.Range(10, 15);
range.Delay(TimeSpan.FromSeconds(2)).Subscribe(Console.WriteLine);
range.Delay(inOneMinute).Subscribe(Console.WriteLine);
Sample<T>(TimeSpan) will just take a one sample value for every specified TimeSpan and publish that value. Quite simple, quite nice.
var interval = Observable.Interval(TimeSpan.FromMilliseconds(150));
interval.Sample(TimeSpan.FromSeconds(1)).Subscribe(Console.WriteLine);
Throttle is similar to the sample and buffer methods in that it is a time based filter. It is more like sample in that it returns a single value (not an array of values like Buffer). Unlike sample, the window that the values are sampled over is not fixed, it slides every time the underlying stream publishes a value. The throttled stream will only produce a value when the underlying stream does not publish a value for the given period, in which case the last value will be published. When a value is published, the timespan will start and if another value is published the value will only be republished if the timespan has elapsed. Regardless of whether the value is published or not the timespan is reset. This means that if the underlying stream always produces values more frequently than the throttle period then Throttle will never publish any results.
The most obvious usage of this to me is holding back on sending a "Google suggest" request to the server as a user is still typing. We could throttle the keystrokes so that we only send up the search string when the user pauses for given period.
In this example all values are ignored as the publisher is producing values quicker than the throttle allows.
var interval = Observable.Interval(TimeSpan.FromMilliseconds(100));
interval.Throttle(TimeSpan.FromMilliseconds(200))
    .Subscribe(Console.WriteLine);
This example that varies the rate at which it produces value (emulating sporadic keystrokes), we will get some values written to the console where the delay is more than the throttle period.
var interval = Observable.Create<int>(
    o =>
     {
         for (int i = 0; i < 100; i++)
         {
             o.OnNext(i);
             Thread.Sleep(i++ % 10 < 5 ? 100 : 300);
         }
         return () => { };
     });
interval.Throttle(TimeSpan.FromMilliseconds(200))
    .Subscribe(Console.WriteLine);
In later posts, Buffer is covered in more detail and we expand on more complex time based operators. Also when you introduce concurrency (which these time based operators do) you need to start to understand the Scheduling semantics of Rx which are also covered in later posts.
Most of these methods are simple and pretty easy to understand. I found my problem with these was the vast number of them. I also appears the the API is less prone to changes in recent releases. If you want to get to know these methods a bit better check out these links, but most importantly – Use them. Have a play in LinqPad or just some console app.
Useful links:
Rx Home
Exploring the Major Interfaces in Rx – MSDN
IObservable<T> interface - MSDN
IObserver<T> interface - MSDN
Observer Design pattern - MSDN
ObservableExtensions class - MSDN
Observable class - MSDN
Observer class - MSDN
Qservable class - MSDN
Action<T> Delegate - MSDN
Func<T1, T2, TResult> - MSDN
The Rx Wiki site 101 Samples
Enumerate This
LinqPad
The full source code is now available either via svn at http://code.google.com/p/rx-samples/source/checkout or as a zip file.
Back to the contents page for Reactive Extensions for .NET Introduction
Back to the previous post; Part 1 - Introduction to Rx
Forward to next post; Part 3 - Lifetime management – Completing and Unsubscribing
Technorati Tags: ,,

10 comments:

Lee Campbell said...

Whoops, I completely missed out the Aggregate extension method. You can use this method to create your own types of aggregations.

djsun said...

Thanks for great explaination.
Small typo:
The Generate sample line 1 should be
var generated = Observable.Generate(5, i => i < 15, i => i + 3, i => i.ToString());

Lee Campbell said...

Cheers djsun,
I imagine the API has changed since this post. I have all the code ready to be posted, but will try to check it runs in both .NET 3.5 & 4.0 on the latest release before releasing.

Edvard Pitka said...

Generate has incorrect signature. It should be

var generated = Observable.Generate(5, i => i < 15,i => i + 3,i=>i.ToString());

Lee Campbell said...

Updated to match the official v1 release, a year after the post was written :)

Sandeep said...

Lee,

I really like your articles on Rx. I have one huge compliant though. I don't really like the black background. Reason: When I am trying out your examples on Visual Studio side-by-side as I read your blog, I get a glare because of the alternating white and black backgrounds to the eye. Please do something about it.

Sandeep said...
This comment has been removed by a blog administrator.
Lee Campbell said...

Im not sure if this is a joke or not. I will put it on my "TODO list"....at the bottom of the list.
:)
Maybe the upcoming kindle version of the blog will help. Alternatively you could use a blog reader?

Matt said...

Thanks for the great explanation of Observable.Create, the offical documentation is pretty thin. One thing though, there is a semantic error in your code example.

it reads:
Observable.Create(
observable =>
{ ...

it should read:
Observable.Create(
observer =>
{ ...

Lee Campbell said...

@Sandeep, you may like the new IntroToRx.com site that is currently being built it has a white theme. Hope you like it :)