Thursday, May 27, 2010

Rx Part 3 – Lifetime management – Completing and Unsubscribing

STOP THE PRESS! This series has now been superseded by the online book www.IntroToRx.com. The new site/book offers far better explanations, samples and depth of content. I hope you enjoy!

So far we have discovered the basics of  the Reactive Framework which allows us to create, subscribe and perform some basic aggregations, buffering and time shifting over implementations of IObservable<T>. We have yet to look at how we could unsubscribe from a subscription. If you were to look for an Unsubscribe method in the Rx public API you would not find any. Instead of supplying an Unsubscribe method, Rx will return an IDisposable when ever a subscription is made. This disposable can be thought of as the subscription itself and therefore disposing it will dispose the subscription and effectively unsubscribe. Note that calling Dispose on the result of a Subscribe call will not affect the underlying IObservable<T>, just the instance of the subscription to the IObservable<T>. This then allows us to call Subscribe many times on a single IObservable<T>, allowing subscriptions to come an go with out affecting each other. In this example we initially have two subscriptions, then we dispose of one subscription early which still allows the other to continue to receive publications from the underlying IObservable<T>.
var interval = Observable.Interval(TimeSpan.FromMilliseconds(100));
var firstSubscription =
    interval.Subscribe(value => Console.WriteLine("1st subscription recieved {0}", value));
var secondSubscription =
    interval.Subscribe(value => Console.WriteLine("2nd subscription recieved {0}", value));

Thread.Sleep(500);
firstSubscription.Dispose();
Console.WriteLine("Disposed of 1st subscription");

Console.ReadKey();
/*Outputs:
1st subscription recieved 0
2nd subscription recieved 0
1st subscription recieved 1
2nd subscription recieved 1
1st subscription recieved 2
2nd subscription recieved 2
1st subscription recieved 3
2nd subscription recieved 3
2nd subscription recieved 4
Disposed of 1st subscription
2nd subscription recieved 5
2nd subscription recieved 6
2nd subscription recieved 7

etc....
*/
In the above example, it looks like the values are being produced by the interval Observable by a single OnNext call, however these are independent and work similarly to how a Observable.Create<T> method would work. In this sample we just pause a bit before making our second subscription. Note that the output is different to the above example.
var interval = Observable.Interval(TimeSpan.FromMilliseconds(100));

var firstSubscription =
    interval.Subscribe(value => Console.WriteLine("1st subscription recieved {0}", value));
Thread.Sleep(500);
var secondSubscription =
    interval.Subscribe(value => Console.WriteLine("2nd subscription recieved {0}", value));

Thread.Sleep(500);
firstSubscription.Dispose();
Console.WriteLine("Disposed of 1st subscription");
/*
Ouput:
1st subscription recieved 0
1st subscription recieved 1
1st subscription recieved 2
1st subscription recieved 3
1st subscription recieved 4
2nd subscription recieved 0 
1st subscription recieved 5
2nd subscription recieved 1
1st subscription recieved 6
1st subscription recieved 7
2nd subscription recieved 2
1st subscription recieved 8
2nd subscription recieved 3
Disposed of 1st subscription
2nd subscription recieved 4
2nd subscription recieved 5
2nd subscription recieved 6
etc...

*/
The benefits of using the IDisposable Type instead of creating a new ISubscription/IUnsubscription interface or amending the IObservable<T> interface to have an Unsubscribe method is that you get all of these things for free:
  • The type already exists
  • people understand the type
  • IDisposable has standard usages and patterns
  • Language support via the Using keyword
  • Static analysis tools like FxCop can help you with its usage.

OnError and OnCompleted()

Both the OnError and OnCompleted signify the completion of a stream. If your stream publishes a OnError or OnCompleted it will be the last publication and no further calls to OnNext can be performed. In this example we try to publish an OnNext call after an OnCompleted and the OnNext is ignored.
var subject = new Subject<int>();
subject.Subscribe(Console.WriteLine, () => Console.WriteLine("Completed"));
subject.OnCompleted();
subject.OnNext(2);
Of course you could implement your own IObservable<T> that did allow publishing after an OnComplete or an OnError, however it would not follow the precedence of the current Subject types and would be a non standard implementation. I think it would be safe to say that the inconsistent behaviour would cause unpredictable behaviour in the applications that consumed your code.
An interesting thing to consider is that when a stream completes or errors, you should still dispose of you subscription. This can make for messy code, but we will discuss best practices in a later post.
The full source code is now available either via svn at http://code.google.com/p/rx-samples/source/checkout or as a zip file.
Back to the contents page for Reactive Extensions for .NET Introduction
Back to the previous post; Part 2 - Static and extension methods
Forward to next post; Part 4 - Flow control
Technorati Tags: ,,

2 comments:

Anonymous said...

Hi Lee,
Great Rx posts. I wanted to ask your thoughts on the second coding example above.

The 2 subscriptions to the same interval observable return different results. I understand how (you mention the Observable.Create similarity) but I can't see WHY the Rx guys did this. It seems that they've gone for 'ReplySubject' behaviour rather than 'Subject'. I was just wondering if there was a general convention and if there should be a naming convention to perhaps illustrate this to unsuspecting developers :-). Thoughts?
J

Lee Campbell said...

Your comment about ReplaySubject vs Subject is interesting. While it may look like a ReplaySubject semantics, it is only because Observable.Interval has deterministic output. It will always output 0 then 1 then 2 etc...
This may make it look like it is a replay. However, replay is where the values from a stream are actually cached, so any subsequent subscriptions will get the same object.
Now with value types like long (from Interval) it is hard to see this, but if we project to a reference type we can see the result is different between Replay and Interval


public class SomeObject
{
public long Value { get; set; }
}

var stream1 = Observable.Interval(TimeSpan.FromSeconds(1))
.Select(i=>new SomeObject{Value=i});

var obj1 = stream1.First();
var obj2 = stream1.First();

Assert.IsFalse(Object.ReferenceEquals(obj1, obj2));

var replay = stream1.Replay();

replay.Connect();
obj1 = replay.First();
obj2 = replay.First();

Assert.IsTrue(Object.ReferenceEquals(obj1, obj2));

--End Code--
If you wanted to get the semantics I think you are looking for, then you will want to Publish() and possibly RefCount() your stream. This is covered in Part 7 of the series. It sounds like you are on the right track and asking the right questions, I hope once you have read the entire series it will make more sense why the result is as it is, not what you may initially expect.
HTH
Lee