In the previous post I talked about some changes that could be made to the ReplaySubject<T>
implementation to squeeze large performance gains out of it. In that post I discussed mainly the result of the changes. In this post I will show some of the changes that I made.
Analysis of existing implementation
As mentioned in the previous post, all constructor overloads of the ReplaySubject<T>
eventually called into the same constructor, but just provided default values for the MaxCountBuffer
, MaxTimeBuffer
and Scheduler
. For example :
public ReplaySubject(int bufferSize, TimeSpan window, IScheduler scheduler)
{
_bufferSize = bufferSize;
_window = window;
_scheduler = scheduler;
_stopwatch = _scheduler.StartStopwatch();
_queue = new Queue<TimeInterval<T>>();
_isStopped = false;
_error = null;
_observers = new ImmutableList<ScheduledObserver<T>>();
}
public ReplaySubject(int bufferSize, TimeSpan window)
: this(bufferSize, window, SchedulerDefaults.Iteration)
{ }
public ReplaySubject()
: this(InfiniteBufferSize, TimeSpan.MaxValue, SchedulerDefaults.Iteration)
{ }
public ReplaySubject(IScheduler scheduler)
: this(InfiniteBufferSize, TimeSpan.MaxValue, scheduler)
{ }
public ReplaySubject(int bufferSize, IScheduler scheduler)
: this(bufferSize, TimeSpan.MaxValue, scheduler)
{ }
public ReplaySubject(int bufferSize)
: this(bufferSize, TimeSpan.MaxValue, SchedulerDefaults.Iteration)
{ }
public ReplaySubject(TimeSpan window, IScheduler scheduler)
: this(InfiniteBufferSize, window, scheduler)
{ }
public ReplaySubject(TimeSpan window)
: this(InfiniteBufferSize, window, SchedulerDefaults.Iteration)
{ }
There are a total of 8 constructor overloads, but 7 of them just delegate to the top overload above, passing default values.
Next, lets look at the Subscribe
method. Here we see the passed observer is wrapped in a ScheduledObserver<T>
, which is only relevant for time based buffers. Also a Trim()
command is made which again is only relevant for time based buffers.
public IDisposable Subscribe(IObserver observer)
{
var so = new ScheduledObserver(_scheduler, observer);
var n = 0;
var subscription = new RemovableDisposable(this, so);
lock (_gate)
{
CheckDisposed();
Trim(_stopwatch.Elapsed);
_observers = _observers.Add(so);
n = _queue.Count;
foreach (var item in _queue)
so.OnNext(item.Value);
if (_error != null)
{
n++;
so.OnError(_error);
}
else if (_isStopped)
{
n++;
so.OnCompleted();
}
}
so.EnsureActive(n);
return subscription;
}
The next part of the code that is interesting is the implementation of the OnNext
method.
public void OnNext(T value)
{
var o = default(ScheduledObserver<T>[]);
lock (_gate)
{
CheckDisposed();
if (!_isStopped)
{
var now = _stopwatch.Elapsed;
_queue.Enqueue(new TimeInterval<T>(value, now));
Trim(now);
o = _observers.Data;
foreach (var observer in o)
observer.OnNext(value);
}
}
if (o != null)
foreach (var observer in o)
observer.EnsureActive();
}
There are several things to note here:
- The use of an array of
ScheduledObserver<T>
- The use of the
TimeInterval<T>
envelope for the value - The
Trim()
command
Each of the three things above become quite dubious when we consider ReplayAll and ReplayOne implementations. A ReplayMany implementation, may need a Trim()
command, but surely does not need ScheduledObservers
nor its values time-stamped.
Next we look at the Trim
command itself:
void Trim(TimeSpan now)
{
while (_queue.Count > _bufferSize)
_queue.Dequeue();
while (_queue.Count > 0 && now.Subtract(_queue.Peek().Interval).CompareTo(_window) > 0)
_queue.Dequeue();
}
Again we see that the code for the second while loop is wholly unnecessary for non-time based implementations.
Each of these small overheads start to add up. However where the cost really start to kick in is in the implementation of the ScheduledObserver<T>
.
At a minimum, for each OnNext
call to a ReplaySubject<T>
, the ScheduledObserver<T>
performs some condition checks, virtual method calls and some Interlocked operations. These are all fairly cheap operations. It is the unnecessary scheduling that incurs the costs. The scheduling incurs at least the following allocations
ScheduledItem<TimeSpan, TState>
AnonymousDisposable
and theAction
IndexedItem
There is also all the queues that are involved.
ReplaySubject<T>._queue
(Queue<TimeInterval<T>>
)ScheduledObserver<T>._queue
(ConcurrentQueue<T>
)CurrentThreadScheduler.s_threadLocalQueue
(SchedulerQueue<TimeSpan>
)
And the concurrency controls
ReplaySubject
uses theIStopWatch
from_scheduler.StartStopwatch()
ScheduledObserver<T>
; will useInterlocked.CompareExchange
3-4 times on a standardOnNext
call.ConcurrentQueue
uses aInterlocked.Increment
and also aSystem.Threading.SpinWait
when en-queuing a value. It also uses up to 3SpinWait
s and aInterlocked.CompareExchange
to de-queue a value.- The recursive scheduling extension method use
lock
twice CurrentThreadScheduler
uses aThread.Sleep()
Alternative implementations
All of the code above is fairly innocent looking when looked at in isolation. However, when we consider what we probably need for a ReplayOne, ReplayMany or ReplayAll implementation, all this extra code might make you blush.
The implementations that do not have a time consideration now share a base class and its updated OnNext
implementation is now simply:
public void OnNext(T value)
{
lock (_gate)
{
CheckDisposed();
if (!_isStopped)
{
AddValueToBuffer(value);
Trim();
var o = _observers.Data;
foreach (var observer in o)
observer.OnNext(value);
}
}
}
The ReplayOne implementation is now reduced to :
private sealed class ReplayOne : ReplayBufferBase, IReplaySubjectImplementation
{
private bool _hasValue;
private T _value;
protected override void Trim()
{
//NoOp. No need to trim.
}
protected override void AddValueToBuffer(T value)
{
_hasValue = true;
_value = value;
}
protected override void ReplayBuffer(IObserver<T> observer)
{
if (_hasValue)
observer.OnNext(_value);
}
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
_value = default(T);
}
}
Note that there are no queues, schedulers, allocations etc. We have replaced all of that with simply a field to hold the single value, and a boolean flag to indicate if there has been a value buffered yet.
This is allowed to become so simple due to the base class ReplayBufferBase
:
private abstract class ReplayBufferBase : IReplaySubjectImplementation
{
private readonly object _gate = new object();
private bool _isDisposed;
private bool _isStopped;
private Exception _error;
private ImmutableList<IObserver<T>> _observers;
protected ReplayBufferBase()
{
_observers = new ImmutableList<IObserver<T>>();
}
protected abstract void Trim();
protected abstract void AddValueToBuffer(T value);
protected abstract void ReplayBuffer(IObserver<T> observer);
public bool HasObservers
{
get
{
var observers = _observers;
return observers != null && observers.Data.Length > 0;
}
}
public void OnNext(T value)
{
lock (_gate)
{
CheckDisposed();
if (!_isStopped)
{
AddValueToBuffer(value);
Trim();
var o = _observers.Data;
foreach (var observer in o)
observer.OnNext(value);
}
}
}
public void OnError(Exception error) {/*...*/}
public void OnCompleted() {/*...*/}
public IDisposable Subscribe(IObserver<T> observer) {/*...*/}
public void Unsubscribe(IObserver<T> observer) {/*...*/}
private void CheckDisposed() {/*...*/}
public void Dispose() {/*...*/}
protected virtual void Dispose(bool disposing) {/*...*/}
}
The ReplayMany and ReplayAll implementations are slightly more complex as they require a Queue to store the buffered values. Again we add another base class to do most of the work.
private abstract class ReplayManyBase : ReplayBufferBase, IReplaySubjectImplementation
{
private readonly Queue<T> _queue;
protected ReplayManyBase(int queueSize)
: base()
{
_queue = new Queue<T>(queueSize);
}
protected Queue<T> Queue { get { return _queue; } }
protected override void AddValueToBuffer(T value)
{
_queue.Enqueue(value);
}
protected override void ReplayBuffer(IObserver<T> observer)
{
foreach (var item in _queue)
observer.OnNext(item);
}
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
_queue.Clear();
}
}
Now the only differences are the initial buffer size and whether the buffer gets trimmed or not. This leaves us with the final two implementations:
private sealed class ReplayMany : ReplayManyBase, IReplaySubjectImplementation
{
private readonly int _bufferSize;
public ReplayMany(int bufferSize)
: base(bufferSize)
{
_bufferSize = bufferSize;
}
protected override void Trim()
{
while (Queue.Count > _bufferSize)
Queue.Dequeue();
}
}
private sealed class ReplayAll : ReplayManyBase, IReplaySubjectImplementation
{
public ReplayAll()
: base(0)
{
}
protected override void Trim()
{
//NoOp; i.e. Dont' trim, keep all values.
}
}
Less code, more speed
Like Kunu says "Do less".
By removing a lot of the excess code we are able to massively improve the performance of the ReplaySubject<T>
for arguably most use-cases.