STOP THE PRESS! This series has now been superseded by the online book
www.IntroToRx.com. The new site/book offers far better explanations, samples and depth of content. I hope you enjoy!
Having
reviewed the scheduling available to us in Rx and now that we are aware of
Hot and Cold observables we have almost enough skills in our tool belt to start using Rx in anger. However many developers wouldn’t dream of starting coding without first being able to write tests to prove their code is in fact satisfying their requirements and providing them with that safety net against regression. Rx poses some interesting problems to our Test Driven community
- Scheduling and therefore Threading are generally avoided in test scenarios as they can introduce race conditions which may lead to non-deterministic tests.
- Tests should run as fast as possible.
- Rx is a new technology/library so naturally as we master it, we will refactor our code. We want to use to tests to ensure our refactoring have not altered the internal behaviour of our code base.
So we want to test our code but don't want to introduce false-negatives, false-positives or non-deterministic tests. Also if we look at the Rx library there are plenty of methods that involve Scheduling so it is hard to ignore. This Linq query shows us that there are at least 32 extension methods that accept an IScheduler as a parameter
var query = from method in typeof (Observable).GetMethods()
from parameter in method.GetParameters()
where typeof (IScheduler).IsAssignableFrom(parameter.ParameterType)
group method by method.Name into m
orderby m.Key
select m.Key;
query.Run(Console.WriteLine);
/*
BufferWithTime, Catch, Concat, Delay, Empty, Generate, GenerateWithTime, Interval
Merge, ObserveOn, OnErrorResumeNext, Prune, Publish, Range, Repeat, Replay
Retry, Return, Sample, Start, StartWith, Subscribe, SubscribeOn, Take, Throttle
Throw, TimeInterval, Timeout, Timer, Timestamp, ToAsync, ToObservable
*/
There are some methods that we already will be familiar with such as ObserveOn and SubscribeOn. Then there are others that will optionally take an IScheduler in one of the method overloads. TDD/TestFirst coders will want to opt for the overload that takes the IScheduler so that we can have some control over scheduling in our tests.
In this example we create a stream that publishes values every second for 5 seconds.
var interval = Observable
.Interval(TimeSpan.FromSeconds(1))
.Take(5);
If we to write a test that ensured that we received 5 values and they were each 1 second apart it would take 5 seconds. That would be no good, I want hundreds if not thousands of tests to run in 5 seconds. A more common time related example that would need tests is setting a timeout.
var stream = Observable.Never<int>();
var exceptionThrown = false;
stream.Timeout(TimeSpan.FromMinutes(1))
.Run(
i => Console.WriteLine("This will never run."),
ex => exceptionThrown = true);
Assert.IsTrue(exceptionThrown);
This test would take 1 minute to run. However if we did some test first code in this style, where we added the timeout after the test, running the test initially to fail (Red-Green-Refactor) would never complete. Hmmmm…..
TestScheduler
To our rescue comes the TestScheduler. This is a recent addition to the Rx library that takes the concept of a virtual scheduler to allow us emulate and control time. Cool.
The concept of a virtual scheduler can sort of be thought of a queue of actions to be executed that are each marked with the point in time they should be executed. Where actions are attempted to be scheduled at the same “time” the second collision will be scheduled or queued after the first action but marked as with the same point in “time” . When we use the TestScheduler we can either “drain the queue” by calling run which will execute all scheduled actions, or we can specify to run all tasks up to a point in time.
In this example we schedule a task on to the queue to be run immediately by using the simple overload. We then execute everything scheduled for the first tick (ie the first action).
var scheduler = new TestScheduler();
var wasExecuted = false;
scheduler.Schedule(() => wasExecuted = true);
Assert.IsFalse(wasExecuted);
scheduler.RunTo(1); //execute 1 tick of queued actions
Assert.IsTrue(wasExecuted);
The TestScheduler type is actually an implementation of the abstract VirtualScheduler<TAbsolute, TRelative> where the TestSchuduler specifies long for both TAbsolute and TRelative. The net result is that the TestScheduler interface looks something like this
public class TestScheduler
{
public TestScheduler();
// Methods
public long FromTimeSpan(TimeSpan timeSpan);
public long Increment(long absolute, long relative);
public void Run();
public void RunTo(long time);
public IDisposable Schedule(Action action);
public IDisposable Schedule(Action action, TimeSpan dueTime);
public IDisposable Schedule(Action action, long dueTime);
public void Sleep(long ticks);
public DateTimeOffset ToDateTimeOffset(long absolute);
// Properties
public DateTimeOffset Now { get; }
public long Ticks { get; }
}
We should already be familiar with what the Schedule methods would do, the other method of interest for this post are the Run() and RunTo(long) methods. Run() will just execute all the actions that have been scheduled. RunTo(long) will execute all the actions that have been scheduled up to the time specified by the long value which represents ticks in this case. Having a quick look into the implementations for Schedule, RunTo and Run give us the insight we need to really understand the virtual schedulers.
public IDisposable Schedule(Action action)
{
return this.Schedule(action, TimeSpan.Zero);
}
public IDisposable Schedule(Action action, TimeSpan dueTime)
{
return this.Schedule(action, this.FromTimeSpan(dueTime));
}
public IDisposable Schedule(Action action, TRelative dueTime)
{
BooleanDisposable disposable = new BooleanDisposable();
TAbsolute local = this.Increment(this.Ticks, dueTime);
Action action2 = delegate {
if (!disposable.IsDisposed)
{
action();
}
};
ScheduledItem<TAbsolute> item = new ScheduledItem<TAbsolute>(action2, local);
this.queue.Enqueue(item);
return disposable;
}
public void RunTo(TAbsolute time)
{
while ((this.queue.Count > 0) && (this.queue.Peek().DueTime.CompareTo(time) <= 0))
{
ScheduledItem<TAbsolute> item = this.queue.Dequeue();
this.Ticks = item.DueTime;
item.Action();
}
}
public void Run()
{
while (this.queue.Count > 0)
{
ScheduledItem<TAbsolute> item = this.queue.Dequeue();
this.Ticks = item.DueTime;
item.Action();
}
}
Obviously this is the internal implementation and would be subject to change, however as the documentation currently is quite weak for the VirtualScheduler and TestScheduler I think this use of reflection is helpful. This example should clear up what happens when items are scheduled at the same time.
var scheduler = new TestScheduler();
long dueTime = 4L;
scheduler.Schedule(() => Console.WriteLine("1"), dueTime);
scheduler.Schedule(() => Console.WriteLine("2"), dueTime);
scheduler.Schedule(() => Console.WriteLine("3"), dueTime+1);
scheduler.Schedule(() => Console.WriteLine("4"), dueTime+1);
Console.WriteLine("RunTo(dueTime)");
scheduler.RunTo(dueTime);
Console.WriteLine("Run()");
scheduler.Run();
/* Output:
RunTo(dueTime)
1
2
Run()
3
4
*/
Testing Rx code
Now that we have learnt a little bit about the TestScheduler, lets look at how we could use it to get our 2 initial code snippets (Interval and TimeOut) to execute as fast as possible but still maintaining the semantics of time. In this example we generate our 5 values one second apart but pass in our TestScheduler to the Interval method.
[TestMethod]
public void Testing_with_test_scheduler()
{
var scheduler = new TestScheduler();
var interval = Observable
.Interval(TimeSpan.FromSeconds(1), scheduler)
.Take(5);
bool isComplete = false;
interval.Subscribe(Console.WriteLine, () => isComplete = true);
scheduler.Run();
Assert.IsTrue(isComplete); //Executes in less than 0.01s "on my machine"
}
While this is mildly interesting, what I think is more important is how we would test a real piece of code. Imagine if you will a Presenter that subscribes to a stream of prices. As prices are published it adds them to a ViewModel. Assuming this is a WPF or Silverlight implementation we take the liberty of enforcing that the subscription be done on the ThreadPool and the observing is executed on the Dispatcher.
public class MyPresenter
{
...
public void Show(string symbol)
{
_myService.PriceStream(symbol)
.SubscribeOn(Scheduler.ThreadPool)
.ObserveOn(Scheduler.Dispatcher)
.Subscribe(price=>_viewModel.Prices.Add(price));
}
...
}
While this snippet of code may do what we want it to do, it will be hard to test as it is accessing the schedulers via static properties. To help my testing, I have created my own interface that exposes the same IScheduler implementations that the Scheduler type does.
public interface ISchedulerService
{
IScheduler CurrentThread { get; }
IScheduler Dispatcher { get; }
IScheduler Immediate { get; }
IScheduler NewThread { get; }
IScheduler ThreadPool { get; }
//IScheduler TaskPool { get; }
}
public sealed class SchedulerService : ISchedulerService
{
public IScheduler CurrentThread { get { return Scheduler.CurrentThread; } }
public IScheduler Dispatcher { get { return Scheduler.Dispatcher; } }
public IScheduler Immediate { get { return Scheduler.Immediate; } }
public IScheduler NewThread { get { return Scheduler.NewThread; } }
public IScheduler ThreadPool { get { return Scheduler.ThreadPool; } }
//public IScheduler TaskPool { get { return Scheduler.TaskPool; } }
}
This now allows me to inject an ISchedulerService which gives me a seam to help my testing. I can now write some tests for my Presenter.
[TestInitialize]
public void SetUp()
{
_myServiceMock = new Mock<IMyService>();
_viewModelMock = new Mock<IViewModel>();
_schedulerService = new TestSchedulers();
var prices = new ObservableCollection<decimal>();
_viewModelMock.SetupGet(vm => vm.Prices).Returns(prices);
_viewModelMock.SetupProperty(vm => vm.IsConnected);
}
[TestMethod]
public void Should_pass_symbol_to_MyService_PriceStream()
{
var expected = "SomeSymbol";
var priceStream = new Subject<decimal>();
_myServiceMock.Setup(svc => svc.PriceStream(It.Is<string>(symbol=>symbol==expected))).Returns(priceStream);
var sut = new MyPresenter(_myServiceMock.Object, _viewModelMock.Object, _schedulerService);
sut.Show(expected);
_myServiceMock.Verify();
}
[TestMethod]
public void Should_add_to_VM_Prices_when_MyService_publishes_price()
{
decimal expected = 1.23m;
var priceStream = new Subject<decimal>();
_myServiceMock.Setup(svc => svc.PriceStream(It.IsAny<string>())).Returns(priceStream);
var sut = new MyPresenter(_myServiceMock.Object, _viewModelMock.Object, _schedulerService);
sut.Show("SomeSymbol");
_schedulerService.ThreadPool.Schedule(() => priceStream.OnNext(expected)); //Schedule the OnNext
_schedulerService.ThreadPool.RunTo(1); //Execute the OnNext action
_schedulerService.Dispatcher.RunTo(1); //Execute the OnNext Handler (ie adding to the Prices collection)
Assert.AreEqual(1, _viewModelMock.Object.Prices.Count);
Assert.AreEqual(expected, _viewModelMock.Object.Prices.First());
}
These two tests show first a simple expectation that a string value passed to my Show(string) method will be passed to the underlying service. This is not at all relevant to Rx. The next test shows the usage of my implementation of the ISchedulerService specific for testing. It exposes all of the IScheduler properties as instances of TestSchedulers. This now allows me to inject TestSchedulers for testing which in-turn allows me to control the rate at which things are scheduled.
For those of you new to Moq, some of the syntax my be a little bit confusing. Where you see a Setup(..) or SetupGet(..) method call there is just a little bit of Expression magic that tells my mocks to return correct thing when called. The .Object property hanging off my mocks are the dynamically generated implementations of the interfaces. This next test I hope you find the most interesting. Here we really get the full value out of our TestScheduler, by testing timeouts.
[TestMethod]
public void Should_timeout_if_no_prices_for_10_seconds()
{
var timeoutPeriod = TimeSpan.FromSeconds(10);
var priceStream = Observable.Never<decimal>();
_myServiceMock.Setup(svc => svc.PriceStream(It.IsAny<string>())).Returns(priceStream);
var sut = new MyPresenter(_myServiceMock.Object, _viewModelMock.Object, _schedulerService);
sut.Show("SomeSymbol");
_schedulerService.ThreadPool.RunTo(timeoutPeriod.Ticks - 1);
Assert.IsTrue(_viewModelMock.Object.IsConnected);
_schedulerService.ThreadPool.RunTo(timeoutPeriod.Ticks);
Assert.IsFalse(_viewModelMock.Object.IsConnected);
}
The key points to note are:
- We return an Observable.Never for our price stream so that no prices are ever pushed and no OnComplete is published either.
- The _schedulerService is a test fake that returns TestSchedulers for all of it’s schedulers
- We run the ThreadPool TestScheduler up until 1 tick away from our timeout period and ensure that we have not timed out
- We run the ThreadPool TestScheduler up to our timeout period and then ensure that we have timed out.
- The test is sub second even though we are testing for a 10 second timeout!
Generally I only want to have one assertion in each of my tests, but for this example I think it elegantly tests that the Timeout is for 10seconds and not longer or shorter. If I was to remove the first assertion my test would only prove that the timeout was no greater than 10 seconds but could reasonably be set to say 3 seconds. The implementation for this simple presenter is as follows
public class MyPresenter
{
private readonly IMyService _myService;
private readonly IViewModel _viewModel;
private readonly ISchedulerService _schedulerService;
public MyPresenter(IMyService myService, IViewModel viewModel, ISchedulerService schedulerService)
{
_myService = myService;
_schedulerService = schedulerService;
_viewModel = viewModel;
}
public void Show(string symbol)
{
_myService.PriceStream(symbol)
.SubscribeOn(_schedulerService.ThreadPool)
.ObserveOn(_schedulerService.Dispatcher)
.Timeout(TimeSpan.FromSeconds(10), _schedulerService.ThreadPool)
.Subscribe(OnPriceUpdate, ex =>
{
if (ex is TimeoutException)
_viewModel.IsConnected = false;
});
_viewModel.IsConnected = true;
}
private void OnPriceUpdate(decimal price)
{
_viewModel.Prices.Add(price);
}
}
I hope this post on testing helps you bring you skills that you have been developing in Rx to a level where you may be comfortable considering them for production use.
For your reference here are the two test versions of the ISchedulerService I find useful. One will just schedule everything with the ImmediateScheduler and the other uses the TestSchedulers for fine grained control.
public sealed class TestSchedulers : ISchedulerService
{
private readonly TestScheduler _currentThread = new TestScheduler();
private readonly TestScheduler _dispatcher = new TestScheduler();
private readonly TestScheduler _immediate = new TestScheduler();
private readonly TestScheduler _newThread = new TestScheduler();
private readonly TestScheduler _threadPool = new TestScheduler();
#region Implementation of ISchedulerService
IScheduler ISchedulerService.CurrentThread { get { return _currentThread; } }
IScheduler ISchedulerService.Dispatcher { get { return _dispatcher; } }
IScheduler ISchedulerService.Immediate { get { return _immediate; } }
IScheduler ISchedulerService.NewThread { get { return _newThread; } }
IScheduler ISchedulerService.ThreadPool { get { return _threadPool; } }
#endregion
public TestScheduler CurrentThread { get { return _currentThread; } }
public TestScheduler Dispatcher { get { return _dispatcher; } }
public TestScheduler Immediate { get { return _immediate; } }
public TestScheduler NewThread { get { return _newThread; } }
public TestScheduler ThreadPool { get { return _threadPool; } }
}
public sealed class ImmediateSchedulers : ISchedulerService
{
public IScheduler CurrentThread { get { return Scheduler.Immediate; } }
public IScheduler Dispatcher { get { return Scheduler.Immediate; } }
public IScheduler Immediate { get { return Scheduler.Immediate; } }
public IScheduler NewThread { get { return Scheduler.Immediate; } }
public IScheduler ThreadPool { get { return Scheduler.Immediate; } }
}
The full source code is now available either via svn at
http://code.google.com/p/rx-samples/source/checkout or as a
zip file.
Back to the
contents page for
Reactive Extensions for .NET Introduction
Back to the previous post;
Rx Part 7 – Hot and Cold Observables
Forward to next post;
Part 9 – Join, Window, Buffer and Group Join