var interval = Observable.Interval(TimeSpan.FromMilliseconds(100));
var firstSubscription =
interval.Subscribe(value => Console.WriteLine("1st subscription recieved {0}", value));
var secondSubscription =
interval.Subscribe(value => Console.WriteLine("2nd subscription recieved {0}", value));
Thread.Sleep(500);
firstSubscription.Dispose();
Console.WriteLine("Disposed of 1st subscription");
Console.ReadKey();
/*Outputs:
1st subscription recieved 0
2nd subscription recieved 0
1st subscription recieved 1
2nd subscription recieved 1
1st subscription recieved 2
2nd subscription recieved 2
1st subscription recieved 3
2nd subscription recieved 3
2nd subscription recieved 4
Disposed of 1st subscription
2nd subscription recieved 5
2nd subscription recieved 6
2nd subscription recieved 7
etc....
*/
In the above example, it looks like the values are being produced by the interval Observable by a single OnNext call, however these are independent and work similarly to how a Observable.Create<T> method would work. In this sample we just pause a bit before making our second subscription. Note that the output is different to the above example.
var interval = Observable.Interval(TimeSpan.FromMilliseconds(100)); var firstSubscription = interval.Subscribe(value => Console.WriteLine("1st subscription recieved {0}", value)); Thread.Sleep(500); var secondSubscription = interval.Subscribe(value => Console.WriteLine("2nd subscription recieved {0}", value)); Thread.Sleep(500); firstSubscription.Dispose(); Console.WriteLine("Disposed of 1st subscription"); /* Ouput: 1st subscription recieved 0 1st subscription recieved 1 1st subscription recieved 2 1st subscription recieved 3 1st subscription recieved 4 2nd subscription recieved 0 1st subscription recieved 5 2nd subscription recieved 1 1st subscription recieved 6 1st subscription recieved 7 2nd subscription recieved 2 1st subscription recieved 8 2nd subscription recieved 3 Disposed of 1st subscription 2nd subscription recieved 4 2nd subscription recieved 5 2nd subscription recieved 6 etc... */
The benefits of using the IDisposable Type instead of creating a new ISubscription/IUnsubscription interface or amending the IObservable<T> interface to have an Unsubscribe method is that you get all of these things for free:
- The type already exists
- people understand the type
- IDisposable has standard usages and patterns
- Language support via the Using keyword
- Static analysis tools like FxCop can help you with its usage.
OnError and OnCompleted()
Both the OnError and OnCompleted signify the completion of a stream. If your stream publishes a OnError or OnCompleted it will be the last publication and no further calls to OnNext can be performed. In this example we try to publish an OnNext call after an OnCompleted and the OnNext is ignored.
var subject = new Subject<int>(); subject.Subscribe(Console.WriteLine, () => Console.WriteLine("Completed")); subject.OnCompleted(); subject.OnNext(2);Of course you could implement your own IObservable<T> that did allow publishing after an OnComplete or an OnError, however it would not follow the precedence of the current Subject types and would be a non standard implementation. I think it would be safe to say that the inconsistent behaviour would cause unpredictable behaviour in the applications that consumed your code.
An interesting thing to consider is that when a stream completes or errors, you should still dispose of you subscription. This can make for messy code, but we will discuss best practices in a later post.
The full source code is now available either via svn at http://code.google.com/p/rx-samples/source/checkout or as a zip file.
Back to the contents page for Reactive Extensions for .NET Introduction
Back to the previous post; Part 2 - Static and extension methods
Forward to next post; Part 4 - Flow control
2 comments:
Hi Lee,
Great Rx posts. I wanted to ask your thoughts on the second coding example above.
The 2 subscriptions to the same interval observable return different results. I understand how (you mention the Observable.Create similarity) but I can't see WHY the Rx guys did this. It seems that they've gone for 'ReplySubject' behaviour rather than 'Subject'. I was just wondering if there was a general convention and if there should be a naming convention to perhaps illustrate this to unsuspecting developers :-). Thoughts?
J
Your comment about ReplaySubject vs Subject is interesting. While it may look like a ReplaySubject semantics, it is only because Observable.Interval has deterministic output. It will always output 0 then 1 then 2 etc...
This may make it look like it is a replay. However, replay is where the values from a stream are actually cached, so any subsequent subscriptions will get the same object.
Now with value types like long (from Interval) it is hard to see this, but if we project to a reference type we can see the result is different between Replay and Interval
public class SomeObject
{
public long Value { get; set; }
}
var stream1 = Observable.Interval(TimeSpan.FromSeconds(1))
.Select(i=>new SomeObject{Value=i});
var obj1 = stream1.First();
var obj2 = stream1.First();
Assert.IsFalse(Object.ReferenceEquals(obj1, obj2));
var replay = stream1.Replay();
replay.Connect();
obj1 = replay.First();
obj2 = replay.First();
Assert.IsTrue(Object.ReferenceEquals(obj1, obj2));
--End Code--
If you wanted to get the semantics I think you are looking for, then you will want to Publish() and possibly RefCount() your stream. This is covered in Part 7 of the series. It sounds like you are on the right track and asking the right questions, I hope once you have read the entire series it will make more sense why the result is as it is, not what you may initially expect.
HTH
Lee
Post a Comment