Saturday, January 11, 2014

ReplaySubject Performance improvments – code changes

In the previous post I talked about some changes that could be made to the ReplaySubject<T> implementation to squeeze large performance gains out of it. In that post I discussed mainly the result of the changes. In this post I will show some of the changes that I made.

Analysis of existing implementation

As mentioned in the previous post, all constructor overloads of the ReplaySubject<T> eventually called into the same constructor, but just provided default values for the MaxCountBuffer, MaxTimeBuffer and Scheduler. For example :

public ReplaySubject(int bufferSize, TimeSpan window, IScheduler scheduler)
{
    _bufferSize = bufferSize;
    _window = window;
    _scheduler = scheduler;

    _stopwatch = _scheduler.StartStopwatch();
    _queue = new Queue<TimeInterval<T>>();
    _isStopped = false;
    _error = null;

    _observers = new ImmutableList<ScheduledObserver<T>>();
}

public ReplaySubject(int bufferSize, TimeSpan window)
    : this(bufferSize, window, SchedulerDefaults.Iteration)
{ }

public ReplaySubject()
    : this(InfiniteBufferSize, TimeSpan.MaxValue, SchedulerDefaults.Iteration)
{ }

public ReplaySubject(IScheduler scheduler)
    : this(InfiniteBufferSize, TimeSpan.MaxValue, scheduler)
{ }

public ReplaySubject(int bufferSize, IScheduler scheduler)
    : this(bufferSize, TimeSpan.MaxValue, scheduler)
{ }

public ReplaySubject(int bufferSize)
    : this(bufferSize, TimeSpan.MaxValue, SchedulerDefaults.Iteration)
{ }

public ReplaySubject(TimeSpan window, IScheduler scheduler)
    : this(InfiniteBufferSize, window, scheduler)
{ }

public ReplaySubject(TimeSpan window)
    : this(InfiniteBufferSize, window, SchedulerDefaults.Iteration)
{ }

There are a total of 8 constructor overloads, but 7 of them just delegate to the top overload above, passing default values.

Next, lets look at the Subscribe method. Here we see the passed observer is wrapped in a ScheduledObserver<T>, which is only relevant for time based buffers. Also a Trim() command is made which again is only relevant for time based buffers.

public IDisposable Subscribe(IObserver observer)
{
    var so = new ScheduledObserver(_scheduler, observer);
    var n = 0;
    var subscription = new RemovableDisposable(this, so);
    lock (_gate)
    {
        CheckDisposed();
        Trim(_stopwatch.Elapsed);
        _observers = _observers.Add(so);

        n = _queue.Count;
        foreach (var item in _queue)
            so.OnNext(item.Value);

        if (_error != null)
        {
            n++;
            so.OnError(_error);
        }
        else if (_isStopped)
        {
            n++;
            so.OnCompleted();
        }
    }
    so.EnsureActive(n);
    return subscription;
}

The next part of the code that is interesting is the implementation of the OnNext method.

public void OnNext(T value)
{
    var o = default(ScheduledObserver<T>[]);
    lock (_gate)
    {
        CheckDisposed();

        if (!_isStopped)
        {
            var now = _stopwatch.Elapsed;
            _queue.Enqueue(new TimeInterval<T>(value, now));
            Trim(now);

            o = _observers.Data;
            foreach (var observer in o)
                observer.OnNext(value);
        }
    }

    if (o != null)
        foreach (var observer in o)
            observer.EnsureActive();
}

There are several things to note here:

  1. The use of an array of ScheduledObserver<T>
  2. The use of the TimeInterval<T> envelope for the value
  3. The Trim() command

Each of the three things above become quite dubious when we consider ReplayAll and ReplayOne implementations. A ReplayMany implementation, may need a Trim() command, but surely does not need ScheduledObservers nor its values time-stamped.

Next we look at the Trim command itself:

void Trim(TimeSpan now)
{
    while (_queue.Count > _bufferSize)
        _queue.Dequeue();
    while (_queue.Count > 0 && now.Subtract(_queue.Peek().Interval).CompareTo(_window) > 0)
        _queue.Dequeue();
}

Again we see that the code for the second while loop is wholly unnecessary for non-time based implementations.

Each of these small overheads start to add up. However where the cost really start to kick in is in the implementation of the ScheduledObserver<T>.

At a minimum, for each OnNext call to a ReplaySubject<T>, the ScheduledObserver<T> performs some condition checks, virtual method calls and some Interlocked operations. These are all fairly cheap operations. It is the unnecessary scheduling that incurs the costs. The scheduling incurs at least the following allocations

  1. ScheduledItem<TimeSpan, TState>
  2. AnonymousDisposable and the Action
  3. IndexedItem

There is also all the queues that are involved.

  1. ReplaySubject<T>._queue (Queue<TimeInterval<T>>)
  2. ScheduledObserver<T>._queue (ConcurrentQueue<T>)
  3. CurrentThreadScheduler.s_threadLocalQueue (SchedulerQueue<TimeSpan>)

And the concurrency controls

  1. ReplaySubject uses the IStopWatch from _scheduler.StartStopwatch()
  2. ScheduledObserver<T>; will use Interlocked.CompareExchange 3-4 times on a standard OnNext call.
  3. ConcurrentQueue uses a Interlocked.Increment and also a System.Threading.SpinWait when en-queuing a value. It also uses up to 3 SpinWaits and a Interlocked.CompareExchange to de-queue a value.
  4. The recursive scheduling extension method use lock twice
  5. CurrentThreadScheduler uses a Thread.Sleep()

Alternative implementations

All of the code above is fairly innocent looking when looked at in isolation. However, when we consider what we probably need for a ReplayOne, ReplayMany or ReplayAll implementation, all this extra code might make you blush.

The implementations that do not have a time consideration now share a base class and its updated OnNext implementation is now simply:

public void OnNext(T value)
{
    lock (_gate)
    {
        CheckDisposed();

        if (!_isStopped)
        {
            AddValueToBuffer(value);
            Trim();

            var o = _observers.Data;
            foreach (var observer in o)
                observer.OnNext(value);
        }
    }
}

The ReplayOne implementation is now reduced to :

private sealed class ReplayOne : ReplayBufferBase, IReplaySubjectImplementation
{
    private bool _hasValue;
    private T _value;

    protected override void Trim()
    {
        //NoOp. No need to trim.
    }

    protected override void AddValueToBuffer(T value)
    {
        _hasValue = true;
        _value = value;
    }

    protected override void ReplayBuffer(IObserver<T> observer)
    {
        if (_hasValue)
            observer.OnNext(_value);
    }

    protected override void Dispose(bool disposing)
    {
        base.Dispose(disposing);
        _value = default(T);
    }
}

Note that there are no queues, schedulers, allocations etc. We have replaced all of that with simply a field to hold the single value, and a boolean flag to indicate if there has been a value buffered yet.

This is allowed to become so simple due to the base class ReplayBufferBase:

private abstract class ReplayBufferBase : IReplaySubjectImplementation
{
    private readonly object _gate = new object();
    private bool _isDisposed;
    private bool _isStopped;
    private Exception _error;
    private ImmutableList<IObserver<T>> _observers;

    protected ReplayBufferBase()
    {
        _observers = new ImmutableList<IObserver<T>>();
    }

    protected abstract void Trim();
    protected abstract void AddValueToBuffer(T value);
    protected abstract void ReplayBuffer(IObserver<T> observer);

    public bool HasObservers
    {
        get
        {
            var observers = _observers;
            return observers != null && observers.Data.Length > 0;
        }
    }

    public void OnNext(T value)
    {
        lock (_gate)
        {
            CheckDisposed();

            if (!_isStopped)
            {
                AddValueToBuffer(value);
                Trim();

                var o = _observers.Data;
                foreach (var observer in o)
                    observer.OnNext(value);
            }
        }
    }

    public void OnError(Exception error) {/*...*/}

    public void OnCompleted() {/*...*/}

    public IDisposable Subscribe(IObserver<T> observer) {/*...*/}

    public void Unsubscribe(IObserver<T> observer) {/*...*/}

    private void CheckDisposed() {/*...*/}

    public void Dispose() {/*...*/}

    protected virtual void Dispose(bool disposing) {/*...*/}
}

The ReplayMany and ReplayAll implementations are slightly more complex as they require a Queue to store the buffered values. Again we add another base class to do most of the work.

private abstract class ReplayManyBase : ReplayBufferBase, IReplaySubjectImplementation
{
    private readonly Queue<T> _queue;

    protected ReplayManyBase(int queueSize)
        : base()
    {
        _queue = new Queue<T>(queueSize);
    }

    protected Queue<T> Queue { get { return _queue; } }

    protected override void AddValueToBuffer(T value)
    {
        _queue.Enqueue(value);
    }

    protected override void ReplayBuffer(IObserver<T> observer)
    {
        foreach (var item in _queue)
            observer.OnNext(item);
    }

    protected override void Dispose(bool disposing)
    {
        base.Dispose(disposing);
        _queue.Clear();
    }
}

Now the only differences are the initial buffer size and whether the buffer gets trimmed or not. This leaves us with the final two implementations:

private sealed class ReplayMany : ReplayManyBase, IReplaySubjectImplementation
{
    private readonly int _bufferSize;

    public ReplayMany(int bufferSize)
        : base(bufferSize)
    {
        _bufferSize = bufferSize;
    }

    protected override void Trim()
    {
        while (Queue.Count > _bufferSize)
            Queue.Dequeue();
    }
}

private sealed class ReplayAll : ReplayManyBase, IReplaySubjectImplementation
{
    public ReplayAll()
        : base(0)
    {
    }

    protected override void Trim()
    {
        //NoOp; i.e. Dont' trim, keep all values.
    }
}

Less code, more speed

Like Kunu says "Do less".

By removing a lot of the excess code we are able to massively improve the performance of the ReplaySubject<T> for arguably most use-cases.

3 comments:

Niall said...

I've always thought that Kunu would be a great development coach. He should have his own line of motivational posters.

Anonymous said...

Hi, this really good work. Do you have this as working code? Taking it from your blog it won't compile dut to internal RX classes. Thanks

Lee Campbell said...

This code was submitted as a change (Pull Request) to the .NET Rx code base. It was accepted and is in version 2.2.4 of the Rx.NET code base.

If you want to have the code compile you will need to, pull down the old (out of date) Rx.NET code base and make the change. However, you get the benefits from just using the latest version of Rx.