tag:blogger.com,1999:blog-4550726464486734162024-02-02T12:50:24.768+00:00LeeCampbellConcept to solutionLee Campbellhttp://www.blogger.com/profile/16932445715757919177noreply@blogger.comBlogger89125tag:blogger.com,1999:blog-455072646448673416.post-2815923487747494982016-05-22T16:22:00.002+01:002016-05-23T01:22:58.308+01:00Rx.NET community improvments<div dir="ltr" style="text-align: left;" trbidi="on">
In recent months (early to mid 2016) there has been rising confusion and frustration with the lack of openness and direction of the Rx.NET code base. Code being open-sourced and hosted on <a href="https://github.com/Reactive-Extensions/Rx.NET">GitHub</a> in late 2012 was cause for celebration. The community could see the code without the need for a decompiler, they could raise issues and could they even submit code changes via a <a href="https://help.github.com/articles/using-pull-requests/">Pull Request</a>.<br />
<br />
<h4 style="text-align: left;">
Bright future</h4>
Over time, the community created <a href="https://github.com/Reactive-Extensions/Rx.NET/issues">issues</a> and <a href="https://github.com/Reactive-Extensions/Rx.NET/pulls">Pull Requests</a> accumulated, but activity in the master branch seemed to <a href="https://github.com/Reactive-Extensions/Rx.NET/commit/2252cb4edbb25aca12005b9a912311edd2f095f3">come to a halt</a> in Oct 2015. However anticipation was in the air as one of the members of the original team (and who I think is the lead of the Rx team inside Microsoft), Bart De Smet announced that <a href="https://vimeo.com/132192255">Rx3.0 was underway at NDC</a> in Mid 2015. Features seemed to include cross platform version of expression trees - <i>Bonsai Trees</i>. "We are going to ship this later this year" 19:33. The community was very excited.<br />
<br />
Another key event was happening around the same time; Microsoft was making a large change to .NET. .NET Core was to be a massive undertaking that allowed .NET code to run cross platform, not just Windows, Windows RT and Windows Phone, but on Linux and OS X too.<br />
<br />
On top of these key events, there was the growing organic momentum of the Reactive movement. A movement that in my opinion the Rx.NET team had a large hand in popularizing and setting the standard for. Rx had matured in the eyes of the development community. It had been ported from the original implementation in .NET to JVM, JavaScript, C++, Ruby and others. This widespread adoption across the industry gave developers and managers alike confidence in the tool.<br />
<br />
<h4 style="text-align: left;">
Bright future?</h4>
However, as time passed new cautious developers looking to adopt Rx wanted to know that this technology wasn't at a dead end. With recent effective end-of-life for technologies such as browser plugins (Silverlight, Flash etc) and <a href="http://blog.parse.com/announcements/moving-on/">Parse</a>, it is understandable why people with long term plans are looking for assurances. Even intrepid developers were looking for some assurances that things were still moving. After the announcement of Rx3.0 and it being inferred that we would see a change in late 2015, in conjunction with the .NET Core activity questions started coming.<br />
<br />
In the <a href="https://gitter.im/Reactive-Extensions/Rx.NET">gitter chat room</a> for Rx.NET, questions about the next release started as far back as Sept 2015. Then in Jan 2016 more questions about the next release and if Rx is still a supported library. And again in February, March, April and May. The tone of the chat room seemed glummer and glummer each time the question was asked, and we realized that no-one from the Rx Team was answering. Even Tamir Dresher who was writing the <a href="https://www.manning.com/books/reactive-extensions-in-action">next book on Rx</a> couldn't get any response from Microsoft. To add to the feeling of rejection the community was feeling, we became aware of a private chat room that members of the Rx Team were engaging with a select few people from outside of Microsoft. While I was aware of one person in this chat who I would consider a key member of the community, most of the other people I would consider key members with a wealth of experience in Rx and relevant streaming technologies were left in the dark, excluded.<br />
<br />
Just because we can't get a transparent and comforting answer about the future of Rx from someone at Microsoft doesn't mean the project is dead. However the 18 Pull Requests that have had no interaction from the Rx Team, and the lack of any commits to master in 7 months did leave a lot of people pretty uneasy about what was happening. Not only were we getting radio silence, but this was being contrasted with what appeared to be massive amounts of activity happening in the RxJs space.<br />
<h4 style="text-align: left;">
Bright Future!</h4>
And then <a href="https://twitter.com/ReactiveX/status/733043735613693952">this tweet</a> happened<br />
<blockquote class="tr_bq">
The future of Rx .NET is still bright, and yes we are transitioning it to the .NET Foundation</blockquote>
To me it seemed to be a self congratulatory tweet. In my opinion, there wasn't much worth celebrating. Some people did some good work porting some Rx code to CoreCLR and did so on a private fork/repo. Not on a feature branch on the main repo where others could watch, but somewhere else. A heated exchange followed on twitter, for which I apologize. It is not the forum for an exchange like that. But it did prompt me to write what I hope is constructive feedback below.<br />
<br />
<h4 style="text-align: left;">
A more collaborative and transparent future?</h4>
The Rx code base is open source, which I understand doesn't mean a free-for-all. However, I think it is reasonable to expect a little more from a project than what we are getting. Here are some things I think are reasonable expectations:<br />
<br />
<h3 style="text-align: left;">
Be clear about the status of the Repo/Project</h3>
In the <a href="https://github.com/Reactive-Extensions/Rx.NET">front page</a> (Readme.md) have the status of the repo. This should probably include which version is the current latest, which is the current pre-release. Link to them in nuget and their tags/branches (like <a href="https://github.com/Reactive-Extensions/RxJS">RxJs</a>). Highlight what the roadmap is (like <a href="https://github.com/aspnet/Home/wiki/Roadmap">ASP.NET</a>).<br />
<br />
Instead of people having to ask in chat rooms, twitter and forums about the status of the project it should be right their front and center. Something like<br />
<blockquote class="tr_bq">
<span style="background-color: white; color: #333333; font-family: "helvetica neue" , "helvetica" , "segoe ui" , "arial" , "freesans" , sans-serif , "apple color emoji" , "segoe ui emoji" , "segoe ui symbol"; font-size: 14px; line-height: 22.4px;">We're hoping to release a new batch of functionality later this year, based on internal developments that took place within the Bing organization. As part of this, we're looking at doing future developments in the area of our reactive programming cloud platform in the open here on GitHub. Currently, we're working out the details to transition parts of the technology to the .NET Foundation.</span></blockquote>
Followed up with something like "<i>Transition to the .NET foundation can be slow, so this may take til Q3 2016. After that we can start porting our internal work in Q4 2016. In the meantime we would love to get your help on the issues that are up for grabs</i>." (quote my own and made up). Now we would all know what was going on. It would take 10min to update that <a href="https://github.com/Reactive-Extensions/Rx.NET/blob/master/README.md">readme.md</a> file.<br />
<br />
<h3 style="text-align: left;">
Be clear about how to contribute</h3>
<div>
Contribution is not just about signing a CLA (Contributor License Agreement). It should include some guidance on how to raise an issue, link it to a PR and create some dialogue to validate that the change is inline with the direction of the project. This should include which branch to target 'master', 'develop' or perhaps a feature branch? It should also set the expectation of what the project custodians will do for the contributor with regards to labeling, setting milestones or closing the issue.</div>
<h3 style="text-align: left;">
Documentation</h3>
<div>
The readme, the wiki and the <a href="http://reactivex.io/">reactivex.io</a> documentation is sub-par. Allow the community to add more links to other helpful resources. There are 2 PRs to update documentation, but they appear to have been ignored.</div>
<div>
<br /></div>
<div>
If <a href="https://msdn.microsoft.com/en-us/library/hh242985(v=vs.103).aspx">this</a> is Microsoft's documentation for Rx, then maybe the link should be front and center. However, it suggests that you "download" the Rx SDK from the Microsoft download center instead of using Nuget, and the samples kick you straight off in the wrong direction by sticking subjects in your face as the first example. I would imagine that there would be links to external resources like the various books and websites that are out there. A good starting point for the loads of options could be this compilation -<a href="http://stackoverflow.com/questions/1596158/good-introduction-to-the-net-reactive-framework/">http://stackoverflow.com/questions/1596158/good-introduction-to-the-net-reactive-framework/</a>. A link to <a href="http://reactivex.io/">ReactiveX.io</a> would make sense. On that note, the ReactiveX.io samples for .NET are incomplete. Even the basic `<a href="http://reactivex.io/documentation/operators/subscribe.html">subscribe</a>` and `<a href="http://reactivex.io/documentation/operators/create.html">create</a>` methods are missing. </div>
<div>
<br /></div>
<h3 style="text-align: left;">
Clean up the branches</h3>
It appears that master is the only real branch (and gh-pages). So can 'develop' and 'BetterErrors' be merged, deleted or documented?<br />
<br />
Creating a tag that relates to the last commit that was used to create a package is a helpful thing to do. This allows people to identify bugs for specific versions and see if a fix is already in, or if they should consider raising an issue or a PR.<br />
<br />
<h3 style="text-align: left;">
Engage with issues</h3>
<div>
In the last few days there appears to be a flurry of activity, however I think we can improve in this area. Issues seem to be one of three broad categories: "Acceptable", "Maybe Later" and "Not acceptable". For things that won't get accepted into the repository, let's be honest and transparent. Say this doesn't follow the direction of the project, but thanks for your contribution. Then close the issue, or give the author a few days for rebuttal or to close it themselves. For issues that look like they match to the current planned release or would be suitable for a future release then label it as such. The CoreFx team do a great job of this in <a href="https://github.com/dotnet/corefx/issues">their issues</a>. </div>
<div>
<br /></div>
<div>
Labels are not the only tool the team could use to communicate with. Milestones also are a great way to give visibility to what is current and what is for later. They also allow you to see how close to complete a milestone is. It seems that this was a prime opportunity to use them. Milestone 2.2.6 could have had just two issues. CI and CoreCLR build. These issues could have been labeled as <a href="http://up-for-grabs.net/">Up for grabs</a>. The community would have jumped at the task. However with 18 of the current 23 PRs having had no official interaction, you can see why the community appears aloof when the task is significant, but may not even get noticed.</div>
<div>
<br /></div>
<div>
As a concrete challenge to the Rx.NET team: Aim to close or label every issue in the Repo. </div>
<div>
<br /></div>
<div>
Maybe create some new issues, and mark them as up for grabs. Watch the community jump to action.</div>
<div>
<br /></div>
<div>
Which brings me on to my last ask, which is basically the same as for issues, but for PRs. Engage. People that have raised a PR, probably have used the library heavily, found an opportunity for change, then forked and cloned the repo. From here figured out how to get it build (because it doesn't out of the box in <i>Debug</i> or <i>Release</i> targets). Figured out the code base, made a change and ideally some tests to support it. Finally they have submitted the PR. Best case scenario, I think would be 4hrs of their life they could have been doing something else. I imagine it would be quite a disappointing feeling to have <a href="https://github.com/Reactive-Extensions/Rx.NET/pulls?q=is%3Apr+is%3Aopen+sort%3Acomments-asc">0 comments</a> on your PR.</div>
<br />
<h3 style="text-align: left;">
Symbiotic Relationship</h3>
I am not hating on Rx, nor am I hating on <a href="https://twitter.com/ReactiveX">@ReactiveX</a> quite the opposite. I think what the Rx (Volta) team have done is brilliant. I enjoy it so much I poured 6 months into writing documentation for it and a Christmas holiday writing a PR for it. There are others too out there that are equally as passionate.<br />
<br />
But like in a loving family, sometimes you need to be the unpopular one and point out that <i>hey, this is not okay</i>.<br />
All I am asking from the Rx.NET team is a little back-and-forth, a little transparency. I would think they have far more to gain from doing this than anyone else. I have worked with hundreds of people that have used Rx. A handful have contributed to RxJava and RxJs but none to Rx.NET, because it is too hard. I think there is a great opportunity for positive change here.<br />
<br />
<blockquote class="tr_bq">
RxJS is almost 100% community run at this point with Microsoft helping steer the project</blockquote>
<br />
Can Rx.NET take a step in this direction? With brains like Bart's at the helm and the army of the willing already out there, then yes, we would have a bright future indeed. Just tell us what you want us to do.</div>
Lee Campbellhttp://www.blogger.com/profile/16932445715757919177noreply@blogger.com2tag:blogger.com,1999:blog-455072646448673416.post-84771540722038243212016-03-18T00:53:00.002+00:002016-03-18T00:54:27.598+00:00Measuing latency with HdrHistogram<div dir="ltr" style="text-align: left;" trbidi="on">
I had the pleasure last year to meet with Gil Tene, an authority on building high performance software and specifically high performance <a href="https://www.azul.com/" target="_blank">JVM implementations</a>. He gave a brilliant presentation at <a href="https://www.youtube.com/watch?v=9MKY4KypBzg" target="_blank">React San Francisco</a> and then again at <a href="https://yow.eventer.com/yow-2014-1222/how-not-to-measure-latency-by-gil-tene-1710" target="_blank">YOW in Australia</a> on common mistakes made when measuring performance. He he explained that measuring latency is not about getting a number, but identifying behavior and characteristics of a system.<br />
<br />
Often when we set out to measure the performance of our software we can be guided by NFR (Non-Functional Requirements) that really don't make too much sense. More than once I have been presented with a requirement that the system must process <i>x</i> requests per <i>time-period</i> e.g 5 messages per second. However as Gil points out this single number is either unreasonable, or misleading. If the system must always operate in a state to support these targets then it may be cost prohibitive. This requirement must also define 100% up-time. To work around that, some requirements specify that the mean response time should be <i>y</i>. However this is potentially less useful. By definition what we are really specifying is the 50% of requests <i>must </i>see worse performance than the target.<br />
<br />
A useful visualization for pointing out the folly of chasing a mean measurement is illustrated below.<br />
<br />
<img alt="File:Anscombe's quartet 3.svg" data-file-height="720" data-file-width="990" src="//upload.wikimedia.org/wikipedia/commons/thumb/e/ec/Anscombe%27s_quartet_3.svg/800px-Anscombe%27s_quartet_3.svg.png" srcset="//upload.wikimedia.org/wikipedia/commons/thumb/e/ec/Anscombe%27s_quartet_3.svg/1200px-Anscombe%27s_quartet_3.svg.png 1.5x, //upload.wikimedia.org/wikipedia/commons/thumb/e/ec/Anscombe%27s_quartet_3.svg/1600px-Anscombe%27s_quartet_3.svg.png 2x" width="550" />
<br />
[Source - <a href="https://en.wikipedia.org/wiki/Anscombe%27s_quartet">https://en.wikipedia.org/wiki/Anscombe%27s_quartet</a>]<br />
<br />
All of these charts have the same mean value, but clearly show different shapes of data. If you measuring latency in your application and were targeting a mean value, you may be able to hit these targets but still have unhappy customers.<br />
<br />
When discussing single value targets, a mean value can be thought of as just the 50th percentile. In the first case the requirement was for the 100th percentile.<br />
<br />
Perhaps what is more useful is to measure and target several values. Maybe the 99th percentile plus targets at 99.9% and 99.99% etc is what you really are looking for.<br />
<br />
<h3 style="text-align: left;">
Measuring latency with histograms</h3>
Instead of capturing a count and a sum of all latency recorded to then calculate a mean latency, you can capture latency values and assign them to a bucket. The assignment of this value to a bucket is to simply increment the count of that bucket. This now allows us to analyse the spread of latency recordings.<br />
<br />
The example of a histogram from Wikipedia shows how to represent heights by grouping into buckets of 5cm ranges. For each value of the 31 Black Cheery Trees measured, the height is assigned to the bucket and the count for that bucket increased. Note that the x axis is linear.<br />
<br />
<img alt="An example histogram of the heights of 31 Black Cherry trees" data-file-height="216" data-file-width="216" src="//upload.wikimedia.org/wikipedia/commons/thumb/d/d9/Black_cherry_tree_histogram.svg/220px-Black_cherry_tree_histogram.svg.png" srcset="//upload.wikimedia.org/wikipedia/commons/thumb/d/d9/Black_cherry_tree_histogram.svg/330px-Black_cherry_tree_histogram.svg.png 1.5x, //upload.wikimedia.org/wikipedia/commons/thumb/d/d9/Black_cherry_tree_histogram.svg/440px-Black_cherry_tree_histogram.svg.png 2x" />
<br />
<br />
A naive implementation of a histogram however, may require you to pre-plan your number and width of your buckets. Gil Tene has helped out here by creating an implementation of a histogram that specifically is design for high dynamic ranges, hence its name HdrHistogram.<br />
<br />
When you create an instance of an HdrHistogram you simply specify<br />
<ol style="text-align: left;">
<li>a maximum value that you will support</li>
<li>the precision you want to capture as the number of significant digits</li>
<li>optionally, the minimum value you will support</li>
</ol>
<div>
The internal data structures of the HdrHistogram are such that you can very cheaply specify a maximum value that is an order of magnitude larger than you will expect, thus giving you enough headroom for your recorded values. As the HdrHistogram is designed to measure latency a common usage would be to measure a range from the minimum supported value for the platform (nanoseconds on JVM+Linux, or ticks on .NET+Windows) up to an hour, with a fidelity of 3 significant figures.</div>
<div>
<br /></div>
<br />
<div>
For example, a Histogram could be configured to track the counts of observed integer values between 0 and 36,000,000,000 while maintaining a value precision of 3 significant digits across that range. Value quantization within the range will thus be no larger than 1/1,000th (or 0.1%) of any value. This example Histogram could be used to track and analyze the counts of observed response times ranging between 1 tick (100 nanoseconds) and 1 hour in magnitude, while maintaining a value resolution of 100 nanosecond up to 100 microseconds, a resolution of 1 millisecond(or better) up to one second, and a resolution of 1 second (or better) up to 1,000 seconds. At it's maximum tracked value(1 hour), it would still maintain a resolution of 3.6 seconds (or better).</div>
<div>
<br /></div>
<h4 style="text-align: left;">
Application of the HdrHistogram</h4>
When Matt (<a href="https://twitter.com/mattbarrett" target="_blank">@mattbarrett</a>) and I presented <a href="https://www.youtube.com/watch?v=Tp5mRlHwZ7M" target="_blank">Reactive User Interfaces</a>, we used the elements of drama and crowd reaction to illustrate the differences between various ways of conflating fast moving data from a server into a client GUI application. To best illustrate the problems of flooding a client with too much data in a server-push system, we used a modestly powered Intel i3 laptop. This worked fairly well in showing the client application coming to its knees when overloaded. However it also occasionally showed Windows coming to its knees too, which was a wee bit too much drama to have on stage during a live presentation.<br />
<br />
Instead we thought it better to provide a static visualization of what was happening in our system when it was overloaded with data from the server. We could then contrast that with alternative implementations showing how we can perform load-shedding on the client. This also meant we could present with a single high powered laptop, instead of bringing the toy i3 along with us just to demo.<br />
<br />
We added a port of the original Java HdrHistogram to our .NET code base. We used it to capture the latency of prices from the server, to the client, and then the additional latency for the client to actually dispatch the rendering of the price. As GUI applications are single threaded, if you provided more updates than the GUI can render, there are two things that can happen:<br />
<br />
<ul style="text-align: left;">
<li>updates are queued</li>
<li>updates are conflated</li>
</ul>
<div>
What you do in your client application depends on your requirements. Some systems will need to process every message. In this case they may choose to just allow the updates to be queued. Other systems may allow updates to be conflated. Conflation is the act of taking many and reducing to one. So for some systems, they maybe able to conflate many updates and average them or aggregate them. For other systems, it may only be the last message that is the most important, so the conflation algorithm here would be to only process the last message. Matt discusses this <a href="http://weareadaptive.com/blog/2014/05/05/everything-is-a-stream/">in more detail</a> on the <a href="http://weareadaptive.com/blog">Adaptive Blog</a>.</div>
<div>
<br /></div>
<div>
In the demo for <a href="http://reactivetrader.com/">ReactiveTrader</a> we demo queuing all updates and 3 styles of conflation. When we applied the HdrHistogram to our code base, we were quick to see we actually had a bug in our code base.</div>
<br />
<br />
<div class="separator" style="clear: both; text-align: center;">
<a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEiTOsPPJdGvlP7qHDbu7t2GgPcbNjqrGCP0oEY7QlNRztfV6eeTePQgd14mNcjYjXbdLBWPUQCxBuKvB8hr1983_Ru2S1IlpIxPJ1VSUaEbuECQVpf1zMwo9eDhm-P_Nr3qeU0Bl-lxF7c/s1600/ReactiveTraderLatencyHistogram.png" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" height="202" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEiTOsPPJdGvlP7qHDbu7t2GgPcbNjqrGCP0oEY7QlNRztfV6eeTePQgd14mNcjYjXbdLBWPUQCxBuKvB8hr1983_Ru2S1IlpIxPJ1VSUaEbuECQVpf1zMwo9eDhm-P_Nr3qeU0Bl-lxF7c/s640/ReactiveTraderLatencyHistogram.png" width="640" /></a></div>
<br />
We had two problems. The first problem was an assumption that what worked for Silverlight, would also work for WPF. As WPF has two threads dedicated to presentation (a UI thread and a dedicated Render thread), we were actually only measuring how long it took for us to put a price on another queue! You can see that the <i>ObserveLatest1</i> and <i>ObserverLatest2</i> (red and yellow) lines show worse performance than just processing all items on the dispatcher. I believe this is due to us just doing more work to conflate before sending to the render thread. Unlike in Silverlight, once we send something to the Render thread in WPF we can no longer measure the time taking to actually render the change. So our measurements here were not really telling us the full story.<br />
<br />
The second problem we see is that there was actually a bug in the code we copied from our original silverlight (Rx v1) code. The original code (red line) accidentally used a <i>MultipleAssignmentDisposable</i> instead of a <i>SerialDisposable</i>. The simple change gave us the improvements seen in the yellow line.<br />
<br />
We were happy to see that the <i>Conflate </i>and <i>ConstantRate</i> algorithms were measuring great results, which were clearly supported visually when using the application.<br />
<br />
<br />
<br />
To find out more about the brilliant Gil Tene<br />
<ul style="text-align: left;">
<li>see his talk at <a href="https://www.youtube.com/watch?v=9MKY4KypBzg" target="_blank">React San Francisco</a> and then again at <a href="https://yow.eventer.com/yow-2014-1222/how-not-to-measure-latency-by-gil-tene-1710" target="_blank">YOW in Australia</a></li>
<li>check out his web site - <a href="http://latencytipoftheday.blogspot.co.uk/">http://latencytipoftheday.blogspot.co.uk</a> </li>
<li>follow him on twitter <a href="https://twitter.com/giltene" target="_blank">@giltene</a></li>
<li>see his content on infoQ - <a href="http://www.infoq.com/author/Gil-Tene">http://www.infoq.com/author/Gil-Tene</a></li>
<li>or look at Azul (<a href="https://www.azul.com/">https://www.azul.com/</a>) and the things they can make Java code do</li>
</ul>
<div>
I am currently working on the final details of a complete port of the original Java HdrHsitogram to .NET. You can see my work here - <a href="https://github.com/LeeCampbell/HdrHistogram.NET">https://github.com/LeeCampbell/HdrHistogram.NET</a></div>
</div>
Lee Campbellhttp://www.blogger.com/profile/16932445715757919177noreply@blogger.com2tag:blogger.com,1999:blog-455072646448673416.post-79746827070904072062014-01-11T10:53:00.001+00:002014-01-11T10:53:20.944+00:00ReplaySubject Performance improvments – code changes<p>In the previous post I talked about some changes that could be made to the <code>ReplaySubject<T></code> implementation to squeeze large performance gains out of it. In that post I discussed mainly the result of the changes. In this post I will show some of the changes that I made.</p> <h4>Analysis of existing implementation</h4> <p>As mentioned in the previous post, all constructor overloads of the <code>ReplaySubject<T></code> eventually called into the same constructor, but just provided default values for the <code>MaxCountBuffer</code>, <code>MaxTimeBuffer</code> and <code>Scheduler</code>. For example :</p> <pre><code>public ReplaySubject(int bufferSize, TimeSpan window, IScheduler scheduler)
{
_bufferSize = bufferSize;
_window = window;
_scheduler = scheduler;
_stopwatch = _scheduler.StartStopwatch();
_queue = new Queue<TimeInterval<T>>();
_isStopped = false;
_error = null;
_observers = new ImmutableList<ScheduledObserver<T>>();
}
public ReplaySubject(int bufferSize, TimeSpan window)
: this(bufferSize, window, SchedulerDefaults.Iteration)
{ }
public ReplaySubject()
: this(InfiniteBufferSize, TimeSpan.MaxValue, SchedulerDefaults.Iteration)
{ }
public ReplaySubject(IScheduler scheduler)
: this(InfiniteBufferSize, TimeSpan.MaxValue, scheduler)
{ }
public ReplaySubject(int bufferSize, IScheduler scheduler)
: this(bufferSize, TimeSpan.MaxValue, scheduler)
{ }
public ReplaySubject(int bufferSize)
: this(bufferSize, TimeSpan.MaxValue, SchedulerDefaults.Iteration)
{ }
public ReplaySubject(TimeSpan window, IScheduler scheduler)
: this(InfiniteBufferSize, window, scheduler)
{ }
public ReplaySubject(TimeSpan window)
: this(InfiniteBufferSize, window, SchedulerDefaults.Iteration)
{ }
</code></pre>
<p>There are a total of 8 constructor overloads, but 7 of them just delegate to the top overload above, passing default values. </p>
<p>Next, lets look at the <code>Subscribe</code> method. Here we see the passed observer is wrapped in a <code>ScheduledObserver<T></code>, which is only relevant for time based buffers. Also a <code>Trim()</code> command is made which again is only relevant for time based buffers.</p>
<pre><code>public IDisposable Subscribe(IObserver observer)
{
var so = new ScheduledObserver(_scheduler, observer);
var n = 0;
var subscription = new RemovableDisposable(this, so);
lock (_gate)
{
CheckDisposed();
Trim(_stopwatch.Elapsed);
_observers = _observers.Add(so);
n = _queue.Count;
foreach (var item in _queue)
so.OnNext(item.Value);
if (_error != null)
{
n++;
so.OnError(_error);
}
else if (_isStopped)
{
n++;
so.OnCompleted();
}
}
so.EnsureActive(n);
return subscription;
}
</code></pre>
<p>The next part of the code that is interesting is the implementation of the <code>OnNext</code> method.</p>
<pre><code>public void OnNext(T value)
{
var o = default(ScheduledObserver<T>[]);
lock (_gate)
{
CheckDisposed();
if (!_isStopped)
{
var now = _stopwatch.Elapsed;
_queue.Enqueue(new TimeInterval<T>(value, now));
Trim(now);
o = _observers.Data;
foreach (var observer in o)
observer.OnNext(value);
}
}
if (o != null)
foreach (var observer in o)
observer.EnsureActive();
}
</code></pre>
<p>There are several things to note here:</p>
<ol>
<li>The use of an array of <code>ScheduledObserver<T></code>
</li>
<li>The use of the <code>TimeInterval<T></code> envelope for the value </li>
<li>The <code>Trim()</code> command</li>
</ol>
<p>Each of the three things above become quite dubious when we consider ReplayAll and ReplayOne implementations. A ReplayMany implementation, may need a <code>Trim()</code> command, but surely does not need <code>ScheduledObservers</code> nor its values time-stamped.</p>
<p>Next we look at the <code>Trim</code> command itself:</p>
<pre><code>void Trim(TimeSpan now)
{
while (_queue.Count > _bufferSize)
_queue.Dequeue();
while (_queue.Count > 0 && now.Subtract(_queue.Peek().Interval).CompareTo(_window) > 0)
_queue.Dequeue();
}
</code></pre>
<p>Again we see that the code for the second while loop is wholly unnecessary for non-time based implementations. </p>
<p>Each of these small overheads start to add up. However where the cost really start to kick in is in the implementation of the <code>ScheduledObserver<T></code>.</p>
<p>At a minimum, for each <code>OnNext</code> call to a <code>ReplaySubject<T></code>, the <code>ScheduledObserver<T></code> performs some condition checks, virtual method calls and some Interlocked operations. These are all fairly cheap operations. It is the unnecessary scheduling that incurs the costs. The scheduling incurs at least the following allocations</p>
<ol>
<li><code>ScheduledItem<TimeSpan, TState></code>
</li>
<li><code>AnonymousDisposable</code> and the <code>Action</code>
</li>
<li><code>IndexedItem</code>
</li>
</ol>
<p>There is also all the queues that are involved.
</p>
<ol>
<li><code>ReplaySubject<T>._queue</code> (<code>Queue<TimeInterval<T>></code>)</li>
<li><code>ScheduledObserver<T>._queue</code> (<code>ConcurrentQueue<T></code>)</li>
<li><code>CurrentThreadScheduler.s_threadLocalQueue</code> (<code>SchedulerQueue<TimeSpan></code>)</li>
</ol>
<p>And the concurrency controls</p>
<ol>
<li><code>ReplaySubject</code> uses the <code>IStopWatch</code> from <code>_scheduler.StartStopwatch()</code></li>
<li><code>ScheduledObserver<T></code>; will use <code>Interlocked.CompareExchange</code> 3-4 times on a standard <code>OnNext</code> call.
</li>
<li><code>ConcurrentQueue</code> uses a <code>Interlocked.Increment</code> and also a <code>System.Threading.SpinWait</code> when en-queuing a value. It also uses up to 3 <code>SpinWait</code>s and a <code>Interlocked.CompareExchange</code> to de-queue a value.</li>
<li>The recursive scheduling extension method use <code>lock</code> twice</li>
<li><code>CurrentThreadScheduler</code> uses a <code>Thread.Sleep()</code></li>
</ol>
<h4>Alternative implementations</h4>
<p>All of the code above is fairly innocent looking when looked at in isolation. However, when we consider what we probably need for a ReplayOne, ReplayMany or ReplayAll implementation, all this extra code might make you blush.</p>
<p>The implementations that do not have a time consideration now share a base class and its updated <code>OnNext</code> implementation is now simply:</p>
<pre><code>public void OnNext(T value)
{
lock (_gate)
{
CheckDisposed();
if (!_isStopped)
{
AddValueToBuffer(value);
Trim();
var o = _observers.Data;
foreach (var observer in o)
observer.OnNext(value);
}
}
}
</code></pre>
<p>The ReplayOne implementation is now reduced to :</p>
<pre><code>private sealed class ReplayOne : ReplayBufferBase, IReplaySubjectImplementation
{
private bool _hasValue;
private T _value;
protected override void Trim()
{
//NoOp. No need to trim.
}
protected override void AddValueToBuffer(T value)
{
_hasValue = true;
_value = value;
}
protected override void ReplayBuffer(IObserver<T> observer)
{
if (_hasValue)
observer.OnNext(_value);
}
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
_value = default(T);
}
}
</code></pre>
<p>Note that there are no queues, schedulers, allocations etc. We have replaced all of that with simply a field to hold the single value, and a boolean flag to indicate if there has been a value buffered yet.</p>
<p>This is allowed to become so simple due to the base class <code>ReplayBufferBase</code>:</p>
<pre><code>private abstract class ReplayBufferBase : IReplaySubjectImplementation
{
private readonly object _gate = new object();
private bool _isDisposed;
private bool _isStopped;
private Exception _error;
private ImmutableList<IObserver<T>> _observers;
protected ReplayBufferBase()
{
_observers = new ImmutableList<IObserver<T>>();
}
protected abstract void Trim();
protected abstract void AddValueToBuffer(T value);
protected abstract void ReplayBuffer(IObserver<T> observer);
public bool HasObservers
{
get
{
var observers = _observers;
return observers != null && observers.Data.Length > 0;
}
}
public void OnNext(T value)
{
lock (_gate)
{
CheckDisposed();
if (!_isStopped)
{
AddValueToBuffer(value);
Trim();
var o = _observers.Data;
foreach (var observer in o)
observer.OnNext(value);
}
}
}
public void OnError(Exception error) {/*...*/}
public void OnCompleted() {/*...*/}
public IDisposable Subscribe(IObserver<T> observer) {/*...*/}
public void Unsubscribe(IObserver<T> observer) {/*...*/}
private void CheckDisposed() {/*...*/}
public void Dispose() {/*...*/}
protected virtual void Dispose(bool disposing) {/*...*/}
}
</code></pre>
<p>The ReplayMany and ReplayAll implementations are slightly more complex as they require a Queue to store the buffered values. Again we add another base class to do most of the work.</p>
<pre><code>private abstract class ReplayManyBase : ReplayBufferBase, IReplaySubjectImplementation
{
private readonly Queue<T> _queue;
protected ReplayManyBase(int queueSize)
: base()
{
_queue = new Queue<T>(queueSize);
}
protected Queue<T> Queue { get { return _queue; } }
protected override void AddValueToBuffer(T value)
{
_queue.Enqueue(value);
}
protected override void ReplayBuffer(IObserver<T> observer)
{
foreach (var item in _queue)
observer.OnNext(item);
}
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
_queue.Clear();
}
}
</code></pre>
<p>Now the only differences are the initial buffer size and whether the buffer gets trimmed or not. This leaves us with the final two implementations:</p>
<pre><code>private sealed class ReplayMany : ReplayManyBase, IReplaySubjectImplementation
{
private readonly int _bufferSize;
public ReplayMany(int bufferSize)
: base(bufferSize)
{
_bufferSize = bufferSize;
}
protected override void Trim()
{
while (Queue.Count > _bufferSize)
Queue.Dequeue();
}
}
private sealed class ReplayAll : ReplayManyBase, IReplaySubjectImplementation
{
public ReplayAll()
: base(0)
{
}
protected override void Trim()
{
//NoOp; i.e. Dont' trim, keep all values.
}
}
</code></pre>
<h4>Less code, more speed</h4>
<p>Like <a href="http://www.youtube.com/watch?v=PKIpCPS-oZc" title="Forgetting Sarah Marshall - "The less you do, the more you do" surfing lesson with Kunu" target="_blank">Kunu says</a> "Do less".</p>
<p>By removing a lot of the excess code we are able to massively improve the performance of the <code>ReplaySubject<T></code> for arguably most use-cases.</p> Lee Campbellhttp://www.blogger.com/profile/16932445715757919177noreply@blogger.com3tag:blogger.com,1999:blog-455072646448673416.post-40986053248028885032014-01-04T23:00:00.001+00:002014-01-05T02:20:07.655+00:00ReplaySubject performance improvements<p>Over the last year I have been looking at improving the performance of the ReplaySubject<T> in the .NET implementation of the Reactive Extensions. It has been a fun journey and I am really excited about the results.</p> <h4>How it started</h4> <p>In a project i was working on in early 2013, we were doing our standard pre-release performance checks when @marcuswhit found an unusual spike in allocations and therefore increase in GC pressure. For the user, this resulted in a drop in performance for the application we had built. We had a look at the culprit code and it was simply the introduction of a Replay operator. The replay operator was required for the new functionality, but the performance hit was unexpected.</p> <p>Later when <a href="http://www.sharpfellows.com/author/johnrayner.aspx">John Rayner</a> and I looked into the underlying code in the Rx codebase we found that all flavours of a ReplaySubject shared the same code path. This meant that if you were creating a Replay-One sequence, you would incur the same costs that a Replay-By-Time sequence would. These costs are outlined in the codeplex workitem - <a href="https://rx.codeplex.com/workitem/35">https://rx.codeplex.com/workitem/35</a>. To summarize: In my opinion, if you construct a ReplaySubject without a TimeSpan argument, then you shouldn’t have to pay the cost of schedulers and stopwatches. In my experience, Replay() and Replay(1) (or Replay-All and Replay-One) make up at least 80% of the uses of Replay.</p> <h4>Performance testing</h4> <p>When I opened up the code base I was able to see the excess code that would run for each variation of the ReplaySubject. I was sure that I could make it faster, but I obviously needed to get some metrics for it. So I created a little performance test harness (<a href="https://github.com/LeeCampbell/RxPerfTests">https://github.com/LeeCampbell/RxPerfTests</a>) that I could use to test implementations of ReplaySubject from various versions of Rx.</p> <p>Running on my little holiday traveller i3 laptop i was able to get maximum throughput of up to ~225k messages per second on version 2.1.30214.0 of Rx. I then pulled down the latest (v2.2.0.0) code base from GitHub, compiled the Release40 build. This also showed throughput of up to ~225k messages per second. The performance scaled in a fairly linear fashion: doubling the subscriptions seemed to halve the throughput. Here is the results of a test against v2.1.30214.0 for the a Replay-All subject.</p><pre>Rx v2.1.30214.0 Replay() - Throughput (msg/sec)
Subscriptions
Messages 1 2 4 8 16
1000 218747 92306 56871 23445 13673
10000 215906 109191 54546 26872 13244
100000 200946 105164 52730 26595 13311
1000000 204420 104031 53133 26775 13164
</pre><iframe height="192" src="https://skydrive.live.com/embed?cid=76A2EA424CB9F9AE&resid=76A2EA424CB9F9AE%212371&authkey=AKXLDyASOkMzXg8" frameborder="0" width="320" scrolling="no"></iframe>
<p>Each flavour (Replay-All, Replay-One, Replay-Many, Replay-by-Time and Replay-by-Time-and-Count) showed similar performance characteristics.</p>
<h4>The changes</h4>
<p>I then pulled apart the ReplaySubject on my local git repo of Rx and made some changes. I decided that as each flavour had different requirements, that they also could have different implementations. I couldn’t change the public API, so I opted for creating private nested implementations for each flavour of the ReplaySubject. Each implementation started off as a copy of the original ReplaySubject and was cut back to just the code that was required. Then any duplicate code was refactored back into a base class. I found a few bugs in my own code that were not covered by the existing test, so I added some more tests.</p>
<p>Using the performance test harness mentioned above, I was able to tweak here and there and get massive improvements. The Replay-by-Time and Replay-by-Time-and-Count flavours showed no performance changes. This is because they basically remained the same code base with just a new layer of indirection. The big improvements came from the Replay-All, Replay-One and Replay-Many flavours. Now instead of single subscriptions getting ~225msg/s, I now started seeing throughput of over 9million msg/s. While this was great, and even better improvement was the scalability profile. Now adding subscriptions had a far lower impact. In the unmodified implementations adding 16 subscriptions had the effect of reducing the throughput by a factor of 16. The new implementation now could add 16 subscriptions and see a reduction in throughput by a factor of less than 2.</p><pre>Rx v2.2.0.0(modified) Replay() - Throughput (msg/sec)
Subscriptions
Messages 1 2 4 8 16
1000 9823183 9541985 8583691 7107321 5167959
10000 9863878 9447331 8727527 7175660 5174912
100000 9788280 9356550 8727450 6881744 5154719
1000000 8998729 8885193 8108485 6720683 5031854
</pre><iframe height="192" src="https://skydrive.live.com/embed?cid=76A2EA424CB9F9AE&resid=76A2EA424CB9F9AE%212372&authkey=ABJyIySv2408qgk" frameborder="0" width="320" scrolling="no"></iframe>
<h4>Why such large performance gains?</h4>
<p>While these are micro-benchmarks and need to be taken with a grain of salt, we can still consider why these large improvements are being seen. We can look to another metric that I collect while running these tests : Generation 0 Garbage collections. In the case of a single subscription being pushed 100k messages (just integer values as the messages), each flavour consistently causes 41 or 42 Gen0 GC’s to occur. With the new implementations, only Replay-by-Time and Replay-by-Time-and-Count still cause this much GC activity. This is due to them being essentially the old implementations. In all other cases, 100k messages <strong>cause no Gen0 collections</strong>. In fact we have to push the messages up to 1 million messages to get a single Gen0 Collection. </p>
<p>Here is table of the unmodified implementation’s Garbage Collection profile:</p><pre>Rx v2.1.30214.0 Replay() - GCs
Subscriptions
Messages 1 2 4 8 16
1000 0 0 1 3 6
10000 4 8 16 33 64
100000 42 83 164 325 655
1000000 422 838 1664 3321 6550
</pre>
<p>Here is the data for the new implementation:</p><pre>Rx v2.2.0.0(modified) Replay() - GCs
Subscriptions
Messages 1 2 4 8 16
1000 0 0 0 0 0
10000 0 0 0 0 0
100000 0 0 0 0 0
1000000 1 1 1 1 1
</pre>
<p>This data makes it very clear to see that we are just doing a lot less work, less allocations & less garbage collection which means a lot more speed.</p>
<p>As most people interested in performance probably don’t run an i3 processor, I also re-ran the tests on my Surface Pro that has an i5 processor. The improvements were still just as good. The i5 could squeeze out just over 400k msg/s on the unmodified code base, and still subscriptions had the same linear effect on performance.</p><pre>Rx v2.1.30214.0 Replay() - Throughput (msg/sec)
Subscriptions
Messages 1 2 4 8 16
1000 380431 218871 114416 57253 28110
10000 398996 204956 113002 55781 27993
100000 414960 217336 109840 55951 27422
1000000 418740 217453 110330 56145 26576
</pre>
<p>With the modified code however, we could now reach over 17million msg/s. Where 16 subscriptions caused the original implementation to fall down to just under 30k msg/s, we can still reach over 9million. </p><pre>Rx v2.2.0.0(modified) Replay() - Throughput (msg/sec)
Subscriptions
Messages 1 2 4 8 16
1000 17452007 17452007 16260163 12953368 9363296
10000 18975332 17869907 15928640 13326226 9470594
100000 18110693 18102824 16780494 13748350 9360316
1000000 17682715 17005559 16196903 12687618 9242486
</pre>
<h4> </h4>
<h4>Full Test results</h4>
<h5></h5>
<h5>i3 processor – Unmodified code</h5><pre>Rx v2.1.30214.0 Replay() - Throughput (msg/sec)
Subscriptions
Messages 1 2 4 8 16
1000 218747 92306 56871 23445 13673
10000 215906 109191 54546 26872 13244
100000 200946 105164 52730 26595 13311
1000000 204420 104031 53133 26775 13164
Rx v2.1.30214.0 Replay() - GCs
Subscriptions
Messages 1 2 4 8 16
1000 0 0 1 3 6
10000 4 8 16 33 64
100000 42 83 164 325 655
1000000 422 838 1664 3321 6550
Rx v2.1.30214.0 Replay(1) - Throughput (msg/sec)
Subscriptions
Messages 1 2 4 8 16
1000 214450 110044 55577 28174 14292
10000 182842 102424 53205 27201 13200
100000 202244 103372 53023 26730 12901
1000000 199423 100937 52423 26692 13175
Rx v2.1.30214.0 Replay(1) - GCs
Subscriptions
Messages 1 2 4 8 16
1000 0 0 1 3 6
10000 4 8 16 33 64
100000 41 82 164 329 658
1000000 416 833 1667 3325 6610
Rx v2.1.30214.0 Replay(5) - Throughput (msg/sec)
Subscriptions
Messages 1 2 4 8 16
1000 154466 112586 53806 28533 12066
10000 209941 105418 52227 25618 12894
100000 203439 103846 52172 26408 12773
1000000 199373 102434 52428 26218 13154
Rx v2.1.30214.0 Replay(5) - GCs
Subscriptions
Messages 1 2 4 8 16
1000 0 0 1 3 6
10000 4 8 16 33 64
100000 41 82 164 329 658
1000000 416 833 1667 3325 6610
Rx v2.1.30214.0 Replay(5.Seconds()) - Throughput (msg/sec)
Subscriptions
Messages 1 2 4 8 16
1000 165085 113025 52570 26510 12423
10000 208303 106808 52318 26740 13070
100000 197664 103754 51916 26164 13254
1000000 198327 103020 52606 26467 13200
Rx v2.1.30214.0 Replay(5.Seconds()) - GCs
Subscriptions
Messages 1 2 4 8 16
1000 0 0 1 3 6
10000 4 8 16 33 64
100000 42 83 164 325 655
1000000 422 837 1664 3319 6547
Rx v2.1.30214.0 Replay(5, 5.Seconds()) - Throughput (msg/sec)
Subscriptions
Messages 1 2 4 8 16
1000 216216 112792 56599 28629 13872
10000 203111 106995 51609 26650 13174
100000 198527 102207 52097 26304 13025
1000000 198140 103024 49670 26410 13067
Rx v2.1.30214.0 Replay(5, 5.Seconds()) - GCs
Subscriptions
Messages 1 2 4 8 16
1000 0 0 1 3 6
10000 4 8 16 33 64
100000 41 82 164 329 658
1000000 416 833 1667 3325 6610
</pre>
<h5>i3 processor - Modified code</h5><pre>Rx v2.2.0.0 Replay() - Throughput (msg/sec)
Subscriptions
Messages 1 2 4 8 16
1000 9823183 9541985 8583691 7107321 5167959
10000 9863878 9447331 8727527 7175660 5174912
100000 9788280 9356550 8727450 6881744 5154719
1000000 8998729 8885193 8108485 6720683 5031854
Rx v2.2.0.0 Replay() - GCs
Subscriptions
Messages 1 2 4 8 16
1000 0 0 0 0 0
10000 0 0 0 0 0
100000 0 0 0 0 0
1000000 1 1 1 1 1
Rx v2.2.0.0 Replay(1) - Throughput (msg/sec)
Subscriptions
Messages 1 2 4 8 16
1000 11560694 11976048 10256410 8077544 5707763
10000 11444266 11689071 10127608 7958615 5674403
100000 11923925 11776760 10193472 8016096 5638377
1000000 11963712 11374908 9795912 7469844 5392897
Rx v2.2.0.0 Replay(1) - GCs
Subscriptions
Messages 1 2 4 8 16
1000 0 0 0 0 0
10000 0 0 0 0 0
100000 0 0 0 0 0
1000000 0 0 0 0 0
Rx v2.2.0.0 Replay(5) - Throughput (msg/sec)
Subscriptions
Messages 1 2 4 8 16
1000 8748906 8583691 7072136 6289308 4688233
10000 8683571 8504848 7512019 6407381 4731712
100000 8470556 8434904 7535170 6329915 4552138
1000000 8516944 8072641 7322719 5943437 4545893
Rx v2.2.0.0 Replay(5) - GCs
Subscriptions
Messages 1 2 4 8 16
1000 0 0 0 0 0
10000 0 0 0 0 0
100000 0 0 0 0 0
1000000 0 0 0 0 0
Rx v2.2.0.0 Replay(5.Seconds()) - Throughput (msg/sec)
Subscriptions
Messages 1 2 4 8 16
1000 203145 112941 55796 27764 13894
10000 210778 103922 52273 25893 13369
100000 199651 103438 52908 25945 13168
1000000 201586 103890 51972 26753 12945
Rx v2.2.0.0 Replay(5.Seconds()) - GCs
Subscriptions
Messages 1 2 4 8 16
1000 0 0 1 3 6
10000 4 8 16 33 64
100000 42 83 164 325 655
1000000 422 838 1663 3319 6548
Rx v2.2.0.0 Replay(5, 5.Seconds()) - Throughput (msg/sec)
Subscriptions
Messages 1 2 4 8 16
1000 207215 111986 56923 28671 13387
10000 200563 106150 52456 25914 13460
100000 195847 103178 53104 26365 13366
1000000 194919 103045 52904 26238 13325
Rx v2.2.0.0 Replay(5, 5.Seconds()) - GCs
Subscriptions
Messages 1 2 4 8 16
1000 0 0 1 3 6
10000 4 8 16 33 64
100000 41 82 164 329 658
1000000 416 833 1667 3325 6610
</pre>
<p> </p>
<h5>i5 processor - unmodified code</h5><pre>Rx v2.1.30214.0 Replay() - Throughput (msg/sec)
Subscriptions
Messages 1 2 4 8 16
1000 380431 218871 114416 57253 28110
10000 398996 204956 113002 55781 27993
100000 414960 217336 109840 55951 27422
1000000 418740 217453 110330 56145 26576
Rx v2.1.30214.0 Replay() - GCs
Subscriptions
Messages 1 2 4 8 16
1000 0 0 1 3 6
10000 4 8 16 33 64
100000 42 83 164 325 655
1000000 422 838 1664 3321 6627
Rx v2.1.30214.0 Replay(1) - Throughput (msg/sec)
Subscriptions
Messages 1 2 4 8 16
1000 344222 223389 113538 52829 29044
10000 417744 219372 109963 55726 27561
100000 415620 216711 110546 56107 28060
1000000 419986 218789 110905 56176 28149
Rx v2.1.30214.0 Replay(1) - GCs
Subscriptions
Messages 1 2 4 8 16
1000 0 0 1 3 6
10000 4 8 16 33 64
100000 41 82 164 329 658
1000000 416 833 1667 3325 6610
Rx v2.1.30214.0 Replay(5) - Throughput (msg/sec)
Subscriptions
Messages 1 2 4 8 16
1000 361978 223899 113654 56229 28129
10000 412809 212685 111607 55976 27862
100000 422559 217703 110316 55860 28014
1000000 423255 217803 110782 56183 28099
Rx v2.1.30214.0 Replay(5) - GCs
Subscriptions
Messages 1 2 4 8 16
1000 0 0 1 3 6
10000 4 8 16 33 64
100000 41 82 164 329 658
1000000 416 833 1667 3325 6610
Rx v2.1.30214.0 Replay(5.Seconds()) - Throughput (msg/sec)
Subscriptions
Messages 1 2 4 8 16
1000 384675 221239 106288 56164 28373
10000 416682 219457 110083 56036 27671
100000 422329 218200 110706 55608 27383
1000000 422114 217501 110514 56183 26502
Rx v2.1.30214.0 Replay(5.Seconds()) - GCs
Subscriptions
Messages 1 2 4 8 16
1000 0 0 1 3 6
10000 4 8 16 33 64
100000 42 83 164 325 655
1000000 422 838 1664 3320 6551
Rx v2.1.30214.0 Replay(5, 5.Seconds()) - Throughput (msg/sec)
Subscriptions
Messages 1 2 4 8 16
1000 375094 225119 113670 56691 28672
10000 410477 220168 111093 55885 27876
100000 420379 217921 110347 56104 27049
1000000 420783 217080 111352 56076 28017
Rx v2.1.30214.0 Replay(5, 5.Seconds()) - GCs
Subscriptions
Messages 1 2 4 8 16
1000 0 0 1 3 6
10000 4 8 16 33 64
100000 41 82 164 329 658
1000000 416 833 1667 3325 6610
</pre>
<h5>i5 processor - modified code</h5><pre>Rx v2.2.0.0 Replay() - Throughput (msg/sec)
Subscriptions
Messages 1 2 4 8 16
1000 17452007 17452007 16260163 12953368 9363296
10000 18975332 17869907 15928640 13326226 9470594
100000 18110693 18102824 16780494 13748350 9360316
1000000 17682715 17005559 16196903 12687618 9242486
Rx v2.2.0.0 Replay() - GCs
Subscriptions
Messages 1 2 4 8 16
1000 0 0 0 0 0
10000 0 0 0 0 0
100000 0 0 0 0 0
1000000 1 1 1 1 1
Rx v2.2.0.0 Replay(1) - Throughput (msg/sec)
Subscriptions
Messages 1 2 4 8 16
1000 22421525 23364486 20449898 16103060 11049724
10000 20449898 23529412 20733983 16097875 10883761
100000 22837829 22088220 20341327 15736384 10903817
1000000 23659399 23495633 20324584 15783451 10902201
Rx v2.2.0.0 Replay(1) - GCs
Subscriptions
Messages 1 2 4 8 16
1000 0 0 0 0 0
10000 0 0 0 0 0
100000 0 0 0 0 0
1000000 0 0 0 0 0
Rx v2.2.0.0 Replay(5) - Throughput (msg/sec)
Subscriptions
Messages 1 2 4 8 16
1000 15948963 17094017 15797788 13054830 9578544
10000 16417665 16716817 15852885 12655024 9417083
100000 16009990 16974759 15540016 12802622 9371456
1000000 17090833 16916123 15518023 12824245 9297327
Rx v2.2.0.0 Replay(5) - GCs
Subscriptions
Messages 1 2 4 8 16
1000 0 0 0 0 0
10000 0 0 0 0 0
100000 0 0 0 0 0
1000000 0 0 0 0 0
Rx v2.2.0.0 Replay(5.Seconds()) - Throughput (msg/sec)
Subscriptions
Messages 1 2 4 8 16
1000 320328 222336 112826 53494 28674
10000 421850 221479 112933 56733 27410
100000 412311 216091 112233 56235 27692
1000000 416718 219312 112004 56672 26932
Rx v2.2.0.0 Replay(5.Seconds()) - GCs
Subscriptions
Messages 1 2 4 8 16
1000 0 0 1 3 6
10000 4 8 16 33 64
100000 42 83 164 325 655
1000000 422 838 1664 3320 6550
Rx v2.2.0.0 Replay(5, 5.Seconds()) - Throughput (msg/sec)
Subscriptions
Messages 1 2 4 8 16
1000 388018 222306 114519 57530 27928
10000 415835 222314 109739 57219 28178
100000 421980 215986 113116 56194 28354
1000000 419603 218809 112619 56641 28380
Rx v2.2.0.0 Replay(5, 5.Seconds()) - GCs
Subscriptions
Messages 1 2 4 8 16
1000 0 0 1 3 6
10000 4 8 16 33 64
100000 41 82 164 329 658
1000000 416 833 1667 3325 6610
</pre>
<h4> </h4>
<h4>TL;DR</h4>
<p>I have submitted a pull request that should give at least 25x the throughput on ReplaySubjects where the buffer has no time component. For multiple subscriptions, performance improvements are much higher, with tests for 16 subscriptions showing ~350x improvement. So if you have any Last-value caches, if you use the .Replay() extension method, or the Replay(int) overload, and you want to see less garbage collection then feel free to up vote <a href="https://rx.codeplex.com/workitem/35">this issue</a> on codeplex, and hopefully we will see the Rx team at Microsoft accept the pull request.</p> Lee Campbellhttp://www.blogger.com/profile/16932445715757919177noreply@blogger.com5tag:blogger.com,1999:blog-455072646448673416.post-88194051934804244392012-11-06T23:35:00.001+00:002012-11-06T23:35:57.105+00:00Rx now open source<div dir="ltr" style="text-align: left;" trbidi="on">
Awesome news today that will please many (<a href="http://social.msdn.microsoft.com/Forums/en-US/rx/thread/f5da7fb9-e982-45f8-803e-0de4194cf98d">Thargy?</a>) is that Rx is now open source.<br />
<br />
The new home page has just gone up<br />
<a href="http://rx.codeplex.com/">http://rx.codeplex.com/</a><br />
<br />
Scott Hanselman himself is plugging it<br />
<a href="http://www.hanselman.com/blog/ReactiveExtensionsRxIsNowOpenSource.aspx">http://www.hanselman.com/blog/ReactiveExtensionsRxIsNowOpenSource.aspx</a><br />
<br />
And we even get a mention for the IntroToRx.com site (Yay!)<br />
<br />
<br /></div>
Lee Campbellhttp://www.blogger.com/profile/16932445715757919177noreply@blogger.com2tag:blogger.com,1999:blog-455072646448673416.post-67746513338524018582012-09-21T09:07:00.000+01:002012-10-10T21:54:40.614+01:00.NET 4 dynamic feature<div style="text-align: left" dir="ltr" trbidi="on">When .NET 4 was coming out a couple of years ago, one of the neat new features was the inclusion of (better support for?) dynamic languages. C# even got some love and the <i>dynamic </i>keyword was added. I was quite excited about the prospect of having dynamic coding features in my C# code base. Now about 3 years later I have finally used the <i>dynamic </i>keyword; once in "real" code and once for debugging purposes.</div> <div style="text-align: left" dir="ltr" trbidi="on"> </div> <div style="text-align: left" dir="ltr" trbidi="on">Now that I have used the feature, it has opened my eyes to other opportunities to leverage it. I hope this helps someone else too.<br><br>The first time I used the dynamic keyword was when I needed to access some COM interop component. My specific example is when I wanted access to the document of a WPF WebBroswer object. The problem is the <a href="http://msdn.microsoft.com/en-us/library/system.windows.controls.webbrowser.document.aspx">WebBrowser.Document property</a> is of type <i>System.Object.</i> As I debug, I can peek into the object and see that it has the <i>title</i> property I want with all the data I need. However, you need to cast it to the COM object to use it in code. I have always been a bit queasy when I need to do COM interop.<br><br>Regardless of my feelings towards COM interop, there could be COM interface version issues depending on the version of Browser installed and it just seems like a big hammer for simply getting the <i>title</i> from the <i>Document</i> property. Well <a href="http://stackoverflow.com/users/136842/kmontgom">this clever clogs</a> had already posted a <a href="http://stackoverflow.com/questions/3914970/wpf-save-webbrowser-html">super simple solution</a> to my problem; just use the <i>dynamic</i> keyword. I know I want the <i>title</i> property, I just don't care which interface is used to get it. Great stuff!<br><br>Today I find myself hacking around a part of a code base that I am unfamiliar with. There seems to be some sort of <a title="Big O notation" href="http://en.wikipedia.org/wiki/Big_O_notation#Orders_of_common_functions">On^2</a> operation introduced into the code base. I want to hack in some logging into a Generic usercontrol that is doing too much work. When I go to log the events I just get the Infragistics* <i>DataRecord</i>'s <i>ToString()</i> implementation filling up my logs. I know that in this case I have a <i>Deal</i> in the data context for the problem I am logging, but it is an open generic type of T in the control. The control has no reference to the assembly that the Deal object lives but I want to log the <font face="Courier New">Deal.DealId</font> property. I want to see the <font face="Courier New">DealId</font> so I know if we are looping over the same deal the whole time or if we have jitter or just double handling etc...<br><br>Well you guessed it, where I need to get the <font face="Courier New">DealId</font>, I just assign the DataRecord's DataItem to a dynamic variable and then just log the magical <font face="Courier New">DealId</font> property on that dynamic object. Super! (Obviously once we found the issue, I ripped out my hack)</div> <div style="text-align: left" dir="ltr" trbidi="on"> </div> <div style="text-align: left" dir="ltr" trbidi="on">So I don’t think this is a feature I will use a lot of, but it really helped in these two scenarios.</div> Lee Campbellhttp://www.blogger.com/profile/16932445715757919177noreply@blogger.com1tag:blogger.com,1999:blog-455072646448673416.post-41741511288845537442012-08-20T13:35:00.000+01:002012-08-20T13:35:09.276+01:00Intro To Rx on TWC9<div dir="ltr" style="text-align: left;" trbidi="on">
<a href="http://channel9.msdn.com/Niners/briankel">Brian Keller</a> and <a href="http://channel9.msdn.com/Niners/Dan">Dan Fernandez</a> mentioned <a href="http://www.introtorx.com/">IntroToRx.com</a> on their <a href="http://channel9.msdn.com/">Channel9</a> show <a href="http://channel9.msdn.com/Shows/This+Week+On+Channel+9/TWC9-August-17-2012">This week on Channel 9</a>. It was somewhat dwarfed by the talk of Win8, VS2012 etc as one would expect.<br />
<br />
For those that don't know, <a href="http://channel9.msdn.com/">Channel9 </a>is Microsoft's virtual/online TV channel for keep the public in touch with all that is happening around the world relating to Microsoft. <a href="http://channel9.msdn.com/Shows/This+Week+On+Channel+9/">This week on Channel9</a> is a great show that quickly summarizes what has been published on the site for the week. Other shows to check out is the <a href="http://channel9.msdn.com/Shows/The-Defrag-Show">Defrag </a>and <a href="http://channel9.msdn.com/Shows/PingShow">Ping</a>.</div>
Lee Campbellhttp://www.blogger.com/profile/16932445715757919177noreply@blogger.com0tag:blogger.com,1999:blog-455072646448673416.post-17585293954314390982012-07-01T22:57:00.001+01:002012-07-02T09:43:06.449+01:00Introduction to Rx on Amazon<div dir="ltr" style="text-align: left;" trbidi="on">
<br />
We finally got the book published on Amazon in Kindle format. So if you use Kindle and want that quick & easy way to get offline content in your hands, then this is for you. If you don't own a kindle, you can still read the book offline with <a href="http://www.amazon.co.uk/gp/feature.html/ref=kcp_short_kindleapps?ie=UTF8&docId=1000425503">Kindle for PC/Mac/iOs/Android</a>.<br />
<br />
Sorry about the 99c (77p) price tag, but I couldn't find a way to get it up there for free, while keeping the agreement non-exclusive.<br />
<br />
<a href="http://www.amazon.com/Introduction-to-Rx-ebook/dp/B008GM3YPM/ref=sr_1_1?ie=UTF8&qid=1341179553&sr=8-1&keywords=Introduction+to+rx">Introduction to Rx @ Amazon.com</a>
<br />
<br />
<a href="http://www.amazon.co.uk/Introduction-to-Rx-ebook/dp/B008GM3YPM/ref=sr_1_1?ie=UTF8&qid=1341179230&sr=8-1">Introduction to Rx @ Amazon.co.uk</a><br />
<br />
It should also be available on the other Amazon sites.<br />
<br />
The completely free version is available for manual download at <a href="http://www.introtorx.com/">www.IntroToRx.com</a></div>Lee Campbellhttp://www.blogger.com/profile/16932445715757919177noreply@blogger.com3tag:blogger.com,1999:blog-455072646448673416.post-27732416048539246672012-06-21T10:15:00.001+01:002012-06-21T10:15:46.443+01:00Rx v2.0 release candidate is available<p>Last night at the LDNUG meetup, Bart De Smet gave his presentation on Rx. At the end of the presentation he announced that Rx v2.0 was now officially a release candidate. He then went to go on and release to Nuget in front of the team. You could feel the nerdy excitement in the room.</p> <p><img src="https://lh3.googleusercontent.com/-kfKo50Csqu0/T-JjK-iZ23I/AAAAAAAADWw/R992CpqZBP0/w304-h507-k/IMAG0410.jpg"></p> <p>The release notes can be found on the <a href="http://blogs.msdn.com/b/rxteam/archive/2012/06/20/reactive-extensions-v2-0-release-candidate-available-now.aspx">Rx team blog</a>. The post is huge (~48 printed pages). </p> <p>Now I have a race to update the <a href="http://www.introtorx.com/">Introduction to Rx</a> book to have all the v2 features before they actually release it.</p> Lee Campbellhttp://www.blogger.com/profile/16932445715757919177noreply@blogger.com0tag:blogger.com,1999:blog-455072646448673416.post-51432989741731504512012-06-21T09:31:00.001+01:002012-06-21T09:31:38.706+01:00Introduction to Rx online book is available<p>I am super excited to announce that the book I have been working on is now live at <a href="http://www.IntroToRx.com">www.IntroToRx.com</a></p> <p>I have been working hard over the last 6 months taking the content from the blogs and giving it a good coat of paint to make it a little more professional than my "thought bombs" that I put up on the blog. This is the main reason why this blog has been quiet for the first half of the year. The content from the blog series is 9 chapters/posts, the book however is 17 chapters and has some extra pre/post content too. I cover more (almost all) Rx operators, show you how to think in a functional way to really get Rx working for you, provide more examples and give guidance of best practices.</p> <p>I still have some small updates from my editor to make, and then I will submit it to Amazon. This means that you should be able to get the offline version straight to your kindle. The book will be free, the website is too, obviously. The content in both is exactly the same so don't feel like there may be little hidden gems that are in one and not the other.</p> <p>I have already been getting good feedback from the small group that have seen it pre-release, so I hope you find value it too.</p> Lee Campbellhttp://www.blogger.com/profile/16932445715757919177noreply@blogger.com2tag:blogger.com,1999:blog-455072646448673416.post-18525929940206006612012-05-29T08:53:00.001+01:002012-05-29T08:55:01.325+01:00But it is not our competitive advantage!<p>The financial industry (Capital Markets specifically) is a fiercely competitive arena of high risk, high return and space age technology. The competition between banks, hedge funds and exchanges leads to a very closed-source style of application development. IT secrets are closely guarded. Non-compete clauses are common place and Non Disclosure Agreements are taken very seriously. Like in other industries, time-to-market and rich feature sets increase the likely hood of an application or platform making profit.</p> <p>To get the best product delivered in the fastest time, the best of breed developers and consultants are wheeled in and money is generally thrown at the problem. All the same old stuff is built from ground up hidden away from prying eyes to protect any market secret getting out. Features start getting built; Security (Authentication/Authorization/Auditing), Low latency Messaging, Blotters/Grids controls, Concurrency frameworks, Modular UIs, Blotters, Order books, Matching algorithms, Blotters, Pricing engines, Discounting curves, JSON/Fix/ProtoBuf Serializers, Risk Visualizers, Blotters, Search tools, Research tickers and of course we should build a blotter!</p> <p>It is most certainly the case that all of these things are being done by the direct competitor, by a group of similar people, with similar skill sets, falling into the same problems and building the same bugs. All behind closed doors. All at consultant rates. </p> <p>Managers and business owners feel the need to carefully guard their babies (the project source code) because it is the sum of the effort and more importantly the money invested in the project. To let it out in to the public would be (in their minds) the same as leaving a chest of gold open in a public park or their password to the company bank account on a post-it note. But this is not the case. </p> <p>Let's enumerate the silly realities of the scenario:</p> <ol> <li>For the consultant to get the job they must exhibit the requisite experience delivering said technology. This means the consultant may be just reproducing their effort. Surely you are hiring the consultant to succeed on their last project right? So to get ahead of the competition you have hired the guy that has finished building their competing product.</li> <li>Once the consultant has built their part of the product, they either have to document it at huge cost, train the team in how it works, or just leave the project keeping all the knowledge of how it works. They move on to the next project, build another blotter, rinse and repeat.</li> <li>Consider the need to apply any modifications (improvements or bug fixes) to the code base. The consultant could be applying their new features to their new version of the feature at competitor X. Your current team of developers could be applying it to your product. Either way, the same general concept is being refined either at twice the necessary cost, or features are being added to one leaving the other behind.</li></ol> <p>For a concrete example, consider the feature we are talking about is a grid control (commonly called a blotter in finance). Many will argue that the cost of a blotter is just not even enough to warrant even considering. Not true. I have been on 4 projects now where the Blotter or central grid has been the number one development cost. You can argue that we can just buy a control, but those 3rd party controls are lousy. My position is not a case of <a href="http://en.wikipedia.org/wiki/Not_invented_here">Not Invented Here</a> mentality; I do advocate 3rd party tools like Rx, Moq, Prism, charting tools, Messaging layers etc...<strong>but only when they pay for themselves</strong>. Adopting large 3rd party libraries for a single feature or needing competing libraries installed because neither quite fits the requirements needs to be a thing of the past. I have worked with amazing developers that get pulled off otherwise critical work to help work on the blotter. With all that money that is poured into the blotter, we had better keep that blotter source code a trade secret! or should we.....?</p> <p><strong>But it is not our competitive advantage!</strong></p> <p>None of the four projects I worked on wanted to be defined by their blotter; they wanted to be defined by </p> <ul> <li>Speed of user input</li> <li>Quality of context sensitive research</li> <li>Timeliness of data</li> <li>User experience</li> <li>Competitive prices</li> <li>Depth of research data</li> <li>Breadth of data across assets</li> <li>Liquidity</li></ul> <p>So now considering that a blotter is not your competitive advantage and you as a manager were brave, and smart enough, to split your code base into code that is not sensitive and code that contains actual trade secrets. Now when that super expensive consultant builds that blotter for you, it may already be half built because it was open source in the first place. Consider when the consultant leaves and re implements that at the competitor, they may find bugs and fix them. Great! You just got that hugely expensive consultant to upgrade your product for free! Sure the competitor got the fix too, but you have tighter spreads, right? Lower latency, right? Better customer service, right? Why pour money into another blotter, another concurrent collection library, another export to Excel feature, another login system, another drag-n-drop feature? Spend your money on your competitive advantage.</p> <p>Leaders in the field are coming together now to banish this kind of wasteful behaviour. The <a href="http://lodestonefoundation.wordpress.com/">Lodestone Foundation</a> is looking to open source much of what is burning a hole in capital market IT budgets around the world. They have a sort of <a href="http://lodestonefoundation.wordpress.com/projects/">rough road map</a> of features and areas that they want to address. This is not a silver bullet but it is a huge step in the right direction. With people like <a href="http://mechanical-sympathy.blogspot.co.uk/">Martin Thompson</a> involved, it is likely to get traction at least with lowly developers. Martin has already proved that you can open-source key parts of the system while still keeping trade secrets and remaining competitive.</p> <p>Microsoft, lead by <a href="http://herbsutter.com/">Herb Sutter</a>, is pushing for the same kind of collaboration in the C++ space. Projects like <a href="http://herbsutter.com/2012/04/30/c-libraries-casablanca/">Casablanca</a> are aiming to reduce the same wasteful practices in the C++ world by providing portable libraries for general consumption.</p> <p>From what I understand (via rumours mainly), the thinking is; if we as an industry can spend less money and time reinventing the same wheel for each project, real effort can be put into valuable technical advancement. The beauty with this is the huge IT budgets the capital market projects have can now effectively fund R&D for the greater good. If all the code produced is Open-Source, well maintained and has a thriving community, all will benefit regardless of industry. Imagine advances in risk analysis simulations being used to help design safer cars, order matching algorithms used to help DNA research, financial time-series predication techniques used in speech recognition software.</p> <p>I personally have been working on my own (much smaller scale) project that fits in with these ideals. Time will tell if they end up being complementary (I hope they do). Until then, go sew the seed with your manager that if your data-access layer, unit testing framework and logging framework are all open source, why can't your blotter* be too?</p> <p>*blotter is my bugbear. Use whatever industry agnostic thing your project is building that really is just the same as the competitions as your focal point.</p> <p> <div style="padding-bottom: 0px; margin: 0px; padding-left: 0px; padding-right: 0px; display: inline; float: none; padding-top: 0px" id="scid:0767317B-992E-4b12-91E0-4F059A8CECA8:e82e0df5-a9d2-45e1-9fdc-ae372e369053" class="wlWriterEditableSmartContent">Technorati Tags: <a href="http://technorati.com/tags/Lodestone" rel="tag">Lodestone</a>,<a href="http://technorati.com/tags/Casablanca" rel="tag">Casablanca</a></div></p> Lee Campbellhttp://www.blogger.com/profile/16932445715757919177noreply@blogger.com3tag:blogger.com,1999:blog-455072646448673416.post-7524942586183659662012-03-23T13:29:00.006+00:002012-03-23T13:39:57.546+00:00Rx v2.0 Beta released<p>Some good news recently that the Rx team have released the beta of Rx 2.0.</p>
<p>It is a move forward and will leave .NET 3.5, .NET4.0, Silverlight 3 and Silverlight 4.0 behind. For this release the supported platforms are .NET4.5 beta (+ Metro), Silverlight 5 and Windows Phone 7.1.</p>
<p>There is a <a href="http://channel9.msdn.com/Shows/Going+Deep/Bart-De-Smet-Inside-Rx-V2-Beta">channel9 video (+1hr) with Bart De Smet</a> explaining the new release. Bart also has a <a href="http://blogs.msdn.com/b/rxteam/archive/2012/03/12/reactive-extensions-v2-0-beta-available-now.aspx">blog post</a> on the release too.</p>
<p>As the underlying platform has moved rather drastically there are some surface/api changes. So some old code may break. The good news is that the Rx team have worked hard to squeeze even more performance out of this release. Part of this is support for async/await pattern (which was in experimental releases already).</p>
<p>I am looking forward to giving it crack. Win8 seems faster, Visual Studio 11 seems faster and now Rx is faster. Happy days!</p>Lee Campbellhttp://www.blogger.com/profile/16932445715757919177noreply@blogger.com0tag:blogger.com,1999:blog-455072646448673416.post-42760477159183121662011-06-21T16:22:00.001+01:002012-06-18T17:28:02.992+01:00The definitive list of Rx sites<div dir="ltr" style="text-align: left;" trbidi="on">
Over at the <a href="http://social.msdn.microsoft.com/Forums/en-US/rx" target="_blank" title="Rx forums">Rx forums</a>, <a href="http://social.msdn.microsoft.com/profile/eamon_otuathail/?type=forum" target="_blank" title="Eamon Otuathail">Eamon_OTuathail</a> has created what seems to be <a href="http://social.msdn.microsoft.com/Forums/en-US/rx/thread/2cbd3b1c-d535-46ba-a9cf-3cd576a8e7c2" target="_blank" title="The list of Rx sites!">the list of Rx sites</a>. It is a list of Blogs and open source projects.<br />
Great stuff. This should save a lot of time for a lot of people.<br />
<a href="http://social.msdn.microsoft.com/Forums/en-US/rx/thread/2cbd3b1c-d535-46ba-a9cf-3cd576a8e7c2">http://social.msdn.microsoft.com/Forums/en-US/rx/thread/2cbd3b1c-d535-46ba-a9cf-3cd576a8e7c2</a><br />
Humbled to make the grade.<br />
<br />
EDIT: Hopefully <a href="http://www.introtorx.com/">www.IntroToRx.com</a> will also make the grade. It is the online book that evolved from the blog series.</div>Lee Campbellhttp://www.blogger.com/profile/16932445715757919177noreply@blogger.com4tag:blogger.com,1999:blog-455072646448673416.post-51636446775810752382011-06-18T12:00:00.001+01:002011-06-18T12:00:57.326+01:00Isolating custom Library dependencies versions from consumer dependency versions<p><em>This post is more about the CLR and dependency management than it is about Rx. However at this point in Rx’s lifecycle it seems relevant to comment on. The same principles could obviously be applied to open source projects et. al.</em></p> <p>I have recently heard of an interesting situation that no-doubt has proved troublesome to people before. This problem is particularly interesting with Rx and it’s recent rate of versions. If you are trying to incorporate Rx into a library of yours and you then want to publish that library, you are effectively forcing users to use the same version of Rx that you do. Considering that a version of Rx comes out say every two months and that often there are <a title="Breaking changes to Rx v1.0.10425" href="http://leecampbell.blogspot.com/2011/06/rx-v1010425breaking-changes.html">breaking changes</a>, this can create quite a mess. It is also made more interesting that up until recently they have not specified if a release was experimental or considered stable.</p> <p>To provide an example to better understand my particular point; imagine you have a library that wraps a messaging platform. You want to avoid the use of Events and APM, and expose things via IObservable<T>. You feel happy that IObservable<T> is exposed natively in .NET 4 so you should not have to expose your implementations of Rx. You do however, want to use Rx as it has features you need and don’t want to (re-)write yourself. Your standard approaches to package/deployment are:</p> <ul> <li>deploy the parts of the Rx libraries you use with your code. Users can just put them all into a “lib\MyFramework” folder and reference them.</li> <li>excluded the libraries from your code and set Specific Version = False and hope your code will work with the consumer’s version of Rx</li> <li>use a package management tool like Nuget to publish your package and specify the valid versions of Rx your library will work with.</li> <li>rely on things being in the GAC so you can utilise the side-by-side versioning it provides</li> <li>just try and implement the parts of Rx you want and avoid DLL Hell.</li> </ul> <p>I gave this some thought and I think I have come up with a solution that could help library authors protect themselves. While there is the obvious option of using Nuget as part of your dependency management, this does not solve the problem, it just eases the pain. If the customer wants to use the latest version of Rx and you only support the 3 previous versions, your customer is still in some trouble.</p> <p>The theory i had was that I can specify the specific version of Rx I want to reference in my library, the problem being that it may be named the same as the client’s referenced version. Depending on where their references were built to, they could overwrite each other. It seemed the solution was to embed the dependency into my library. </p> <p>This turns out to actually be quite easy. If you have a project in visual studio that references your version of Rx, you have to follow these steps:</p> <ol> <li>Ensure you have a file reference (not a project or a GAC reference) to the dependency, in this case System.Reactive.dll</li> <li>Set the reference to be Specific Version = True</li> <li>Set the reference Copy Local = False</li> <li>Embed the dependency into your library. I created a folder in my project called EmbeddedAssemblies. I “<em>Add Existing…</em>” to this folder, navigate to the dependencies (just System.Reactive.dll in this case), and then choose “<em>Add as Link…</em>”</li> <li>Set the “<em>Build Action</em>” of  the newly added link to <em>Embedded Resource</em></li> <li>Ensure that the resource is loaded correctly at run time…</li> </ol> <p>The last part of that list proves to be not too hard. You can hook on to the <a title="AppDomain.AssemblyResolve Event - MSDN" href="http://msdn.microsoft.com/en-us/library/system.appdomain.assemblyresolve.aspx">AppDomain.AssemblyResolve</a> event and load your embedded resource. You can return your embedded dependency by reading the byte stream and creating the assembly from it and then returning that in the event handler</p> <pre class="csharpcode"><span class="kwrd">internal</span> <span class="kwrd">static</span> <span class="kwrd">class</span> DependencyResolver
{
<span class="kwrd">private</span> <span class="kwrd">static</span> <span class="kwrd">int</span> _isSet = 0;
<span class="kwrd">internal</span> <span class="kwrd">static</span> <span class="kwrd">void</span> Ensure()
{
<span class="kwrd">if</span> (Interlocked.CompareExchange(<span class="kwrd">ref</span> _isSet, 1, 0) == 0)
{
var thisAssembly = Assembly.GetExecutingAssembly();
var assemblyName = <span class="kwrd">new</span> AssemblyName(thisAssembly.FullName).Name;
var embededAssemblyPrefix = assemblyName + <span class="str">".EmbeddedAssemblies."</span>;
var myEmbeddedAssemblies =
Assembly.GetExecutingAssembly().GetManifestResourceNames()
.Where(name => name.StartsWith(embededAssemblyPrefix))
.Select(resourceName =>
{
<span class="kwrd">using</span> (var stream = Assembly.GetExecutingAssembly().GetManifestResourceStream(resourceName))
{
var assemblyData = <span class="kwrd">new</span> Byte[stream.Length];
stream.Read(assemblyData, 0, assemblyData.Length);
<span class="kwrd">return</span> Assembly.Load(assemblyData);
}
})
.ToDictionary(ass => ass.FullName);
AppDomain.CurrentDomain.AssemblyResolve += (sender, args) =>
{
Assembly assemblyToLoad = <span class="kwrd">null</span>;
myEmbeddedAssemblies.TryGetValue(args.Name, <span class="kwrd">out</span> assemblyToLoad);
<span class="kwrd">return</span> assemblyToLoad;
};
}
}
}</pre>
<p>You can return null in the event handler to say I don't know how to load this assembly. This allows others to be able to have a go at loading the assembly in the same way.</p>
<p>Next, to ensure that my embedded dependency resolver is called, I opted to make a call to it in the static constructor of the key types in my library. eg</p>
<pre class="csharpcode"><span class="kwrd">public</span> <span class="kwrd">class</span> SomeProvider
{
<span class="kwrd">static</span> SomeProvider()
{
DependencyResolver.Ensure();
}
<span class="rem">//other stuff goes here...</span>
}</pre>
<p>Have a look at the example code by downloading this <a title="MultiVersionSpike1 - code.google.com" href="http://code.google.com/p/rx-samples/downloads/detail?name=MultiVersionSpike1.zip&can=2&q=">zip file</a></p>
<p>Caveats : This is a thought and the code and concepts are only demo quality. I have not used this in production quality code. I also have not verified that the license allows for this style of packaging.</p>
<p>Further links:</p>
<ul>
<li><a title="Using different version of the same dll in one application" href="http://blogs.msdn.com/b/abhinaba/archive/2005/11/30/498278.aspx">Using different version of the same dll in one application</a> – extern alias keyword</li>
<li><a title="Best Practice for Assembly Loading" href="http://msdn.microsoft.com/en-us/library/dd153782.aspx">Best Practice for Assembly Loading</a> – MSDN</li>
<li>Questions around the <a title="ETA for RTM version - Rx Forums" href="http://social.msdn.microsoft.com/Forums/en/rx/thread/95f16367-89df-4e89-bd8d-b4aef55ba969">stability</a> and <a title="golive and redist.txt - Rx Forums" href="http://social.msdn.microsoft.com/Forums/eu/rx/thread/e128d25c-c40f-4cb2-ae09-b4ade19cb98e">license</a> for <a title="Is the latest build a No-Go-Live Preview?" href="http://social.msdn.microsoft.com/Forums/en-US/rx/thread/77d01908-873b-4ef6-8ac3-adbd7aee424b">Rx releases</a></li>
<li><a title="Embed application’s depended dlls - Excerpt #2 from CLR via C#, Third Edition" href="http://blogs.msdn.com/b/microsoft_press/archive/2010/02/03/jeffrey-richter-excerpt-2-from-clr-via-c-third-edition.aspx">Embed application’s depended dlls</a> – Jeffery Richter </li>
</ul>
<div style="padding-bottom: 0px; margin: 0px; padding-left: 0px; padding-right: 0px; display: inline; float: none; padding-top: 0px" id="scid:0767317B-992E-4b12-91E0-4F059A8CECA8:8d000ad2-3908-42a9-a27b-021a0abccdf6" class="wlWriterEditableSmartContent">Technorati Tags: <a href="http://technorati.com/tags/Reactive-Extensions" rel="tag">Reactive-Extensions</a>,<a href="http://technorati.com/tags/Rx" rel="tag">Rx</a>,<a href="http://technorati.com/tags/Tutorial" rel="tag">Tutorial</a></div> Lee Campbellhttp://www.blogger.com/profile/16932445715757919177noreply@blogger.com3tag:blogger.com,1999:blog-455072646448673416.post-59625598735516412972011-06-05T23:42:00.000+01:002011-06-05T23:42:00.126+01:00Rx v.1.0.10425–Breaking changes!<p>For those that follow Rx, many will have noticed a new drop was made available in late April 2011. This was interesting for numerous reasons:</p> <ol> <li>I believe that it was the first drop since the Rx team move off of the dev labs site and on to the Microsoft Data site </li> <li>They have supplied two downloads, Supported and Experimental </li> <li>It is available via Nuget </li> <li>It broke most of the sample code of this blog</li> <li>...It has massive breaking changes ! </li> </ol> <p>The first thing I noticed was that only the more recent frameworks were in the “supported” release i.e. .NET 4, Silverlight 4 and Windows Phone. Next, when you open up the install directory (which has moved, again) the files are different to previous releases. </p> <p>This is a comparison of the location and files of the old v.s. new version (.NET 4 target):</p> <pre class="csharpcode">C:\Program Files\Microsoft Cloud Programmability\Reactive Extensions\v1.0.2856.0\Net4
System.CoreEx.dll
System.Interactive.dll
System.Linq.Async.dll
System.Reactive.ClientProfile.dll
System.Reactive.dll
System.Reactive.ExtendedProfile.dll
System.Reactive.Testing.dll
C:\Program Files\Microsoft Reactive Extensions SDK\v1.0.10425\Binaries\.NETFramework\v4.0
Microsoft.Reactive.Testing.dll
System.Reactive.dll
System.Reactive.Providers.dll
System.Reactive.Windows.Forms.dll
System.Reactive.Windows.Threading.dll</pre>
<p>Note the files names are named <em>System.Reactive</em> instead of hijacking the existing BCL namespaces such as <em>System</em>, <em>System.IO</em>, <em>System.Linq</em> etc. Here are a list of the old and new Namespaces found in the Rx libraries with the count of the Types in each namespace.</p>
<pre class="csharpcode">C:\Program Files\Microsoft Cloud Programmability\Reactive Extensions\v1.0.2856.0\Net4
System 6
System.Collections.Generic 21
System.Concurrency 14
System.Diagnostics 1
System.Disposables 8
System.IO 1
System.Joins 34
System.Linq 13
System.Reactive.Testing 4
System.Reactive.Testing.Mocks 10
System.Threading.Tasks 1
System.Web 1
System.Windows.Forms 1
System.Windows.Threading 1
C:\Program Files\Microsoft Reactive Extensions SDK\v1.0.10425\Binaries\.NETFramework\v4.0
Microsoft.Reactive.Testing 7
System 1
System.Reactive 10
System.Reactive.Concurrency 16
System.Reactive.Disposables 9
System.Reactive.Joins 34
System.Reactive.Linq 8
System.Reactive.Subjects 8
System.Reactive.Threading.Tasks 1</pre>
<h4>Summary of impact</h4>
<p>Here is a quick list of the changes that have affected the code from the samples off this blog</p>
<p><em>Unit </em>has been moved to <em>System.Reactive</em>. I imagine to prevent any conflicts with FSharp’s <em>Unit</em>?</p>
<p><em>EventLoopScheduler</em> no longer has a constructor that takes a string for the name of the thread, it instead takes a function to act as a Thread Factory so you can use that to specify a name for the thread instead.</p>
<p>Similar methods have been collapsed to overloads. For example; <em>BufferWithTime</em>, <em>BufferWithCount</em> & <em>BufferTimeOrCount</em> are all just <em>Buffer</em> with various overloads. Same goes for <em>WindowWithTime</em>, <em>WIndowWithCount</em> & <em>WindowTimeOrCount</em>. <em>GenerateWithTime</em> is now just an overload of <em>Generate</em>. <em>CreateWithDisposable</em> is now just an overload of <em>Create</em>.</p>
<p>Potentially confusing names have been adjusted. I must admit, I found <em>AsyncSubject</em> and odd name, and also found it odd that <em>Prune</em> was the method you would use to transform an existing observable to one that exhibited AsynchSubject behaviour. The new version of this is the far more appropriately named <em>TakeLast(int)</em>. Obviously just pass a value of 1 to get the old semantics of <em>Prune</em>.</p>
<p>The<em> Run</em> method has now been renamed the more obvious (well to new comers) <em>ForEach</em>. It is a bit of an enumerable construct, but that is cool.</p>
<p>My big problem has been the disappearance of the IsEmpty extension method. So I had a long think about this and have decided that you can replace it by creating your own extension method that looks like this</p>
<pre class="csharpcode"><span class="kwrd">public</span> <span class="kwrd">static</span> <span class="kwrd">class</span> ObservableExtensions
{
<span class="rem">//Is missing now the System.Interactive has been dropped</span>
<span class="kwrd">public</span> <span class="kwrd">static</span> IObservable<<span class="kwrd">bool</span>> IsEmpty<T>(<span class="kwrd">this</span> IObservable<T> source)
{
<span class="kwrd">return</span> source.Materialize()
.Take(1)
.Select(n => n.Kind == NotificationKind.OnCompleted
|| n.Kind == NotificationKind.OnError);
}
}</pre>
<p>For even more details, check out the <a title="New release: Reactive Extensions 1.0.10425" href="http://social.msdn.microsoft.com/Forums/en-US/rx/thread/527002a3-18af-4eda-8e35-760ca0006b98">forum post</a> that has the summary release notes and allsorts of rants about what has changed and what is causing problems for other (+ some helpful tips).</p>
<p><a href="http://social.msdn.microsoft.com/Forums/en-US/rx/thread/527002a3-18af-4eda-8e35-760ca0006b98">http://social.msdn.microsoft.com/Forums/en-US/rx/thread/527002a3-18af-4eda-8e35-760ca0006b98</a></p>
<p>This change is not one that you can just take and expect all to be well. However assuming that you can just update some references and add the missing extensions to you own library, you could be fine. </p>
<h4>Changes to TestScheduler</h4>
<p>The real problems come from the big changes to the testing part of Rx. This appears to have undergone a complete change and most of my existing code does not work. This looks like a big and possibly welcome change to the TestScheduler. </p>
<h5>Problems with the previous TestScheduler</h5>
<p>There were some fundamental problems with the TestScheduler as it was. There were problems with running it to a specific point in time if there were not any actions scheduled for that point in time. For example, the old TestScheduler would allow you to request that it run to say 5seconds. If there were no actions scheduled, then the clock would not actually advance. You could then Schedule an action to happen at 3Seconds (effectively in the past) and then call Run and it would execute the actions. Hmmm.</p>
<pre class="csharpcode">TestScheduler scheduler = <span class="kwrd">new</span> TestScheduler();
scheduler.Schedule(()=>{Console.WriteLine(<span class="str">"Running at 1seconds"</span>);}, TimeSpan.FromSeconds(1));
scheduler.RunTo( scheduler.FromTimeSpan(TimeSpan.FromSeconds(5)));
<span class="rem">//The next line is scheduled in the past and somehow this all works!</span>
scheduler.Schedule(()=>{Console.WriteLine(<span class="str">"Running at 3seconds"</span>);}, TimeSpan.FromSeconds(3));
scheduler.Run();</pre>
<p>There also seemed to be a lack of functionality for running to a relative point in time. I have used these extension methods to work around this shortcoming.</p>
<pre class="csharpcode"><span class="kwrd">public</span> <span class="kwrd">static</span> <span class="kwrd">class</span> TestSchedulerExtensions
{
<span class="rem">/// <summary></span>
<span class="rem">/// Runs the scheduler from now to the given TimeSpan. Advances relative to it's <c>Now</c> value.</span>
<span class="rem">/// </summary></span>
<span class="kwrd">public</span> <span class="kwrd">static</span> <span class="kwrd">void</span> RunNext(<span class="kwrd">this</span> TestScheduler scheduler, TimeSpan interval)
{
var tickInterval = scheduler.FromTimeSpan(interval);
scheduler.RunTo(scheduler.Ticks + tickInterval + 1);
}
<span class="kwrd">public</span> <span class="kwrd">static</span> <span class="kwrd">void</span> RunTo(<span class="kwrd">this</span> TestScheduler scheduler, TimeSpan interval)
{
var tickInterval = scheduler.FromTimeSpan(interval);
scheduler.RunTo(tickInterval);
}
<span class="kwrd">public</span> <span class="kwrd">static</span> <span class="kwrd">void</span> Step(<span class="kwrd">this</span> TestScheduler scheduler)
{
scheduler.RunTo(scheduler.Ticks + 1);
}
<span class="rem">/// <summary></span>
<span class="rem">/// Provides a fluent interface so that you can write<c>7.Seconds()</c> instead of <c>TimeSpan.FromSeconds(7)</c>.</span>
<span class="rem">/// </summary></span>
<span class="rem">/// <param name="seconds">A number of seconds</param></span>
<span class="rem">/// <returns>Returns a System.TimeSpan to represents the specified number of seconds.</returns></span>
<span class="kwrd">public</span> <span class="kwrd">static</span> TimeSpan Seconds(<span class="kwrd">this</span> <span class="kwrd">int</span> seconds)
{
<span class="kwrd">return</span> TimeSpan.FromSeconds(seconds);
}
}</pre>
<p>However on initial inspection of the new TestScheduler there appears to be features that support running to an absolute or relative time. It looks like that the Post on testing Rx will need a re-write <img style="border-bottom-style: none; border-left-style: none; border-top-style: none; border-right-style: none" class="wlEmoticon wlEmoticon-sadsmile" alt="Sad smile" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhcOCUxBYLqsy821Ro11xZKG0yX8XdRjQoFzc_p9nZdH0CdHJrSoI5Sw9Lts6Hdb3rJQ9kXm4xyhhUo9DQwNAjriev0jv_rGyZsf2C6muXt3GArVlizIKZx2oh2_q7NRiQJbUiiHWOlwLo/?imgmax=800" /></p>
<p>
<div style="padding-bottom: 0px; margin: 0px; padding-left: 0px; padding-right: 0px; display: inline; float: none; padding-top: 0px" id="scid:0767317B-992E-4b12-91E0-4F059A8CECA8:5dda8b86-494b-451a-9dba-f42b361470c2" class="wlWriterEditableSmartContent">Technorati Tags: <a href="http://technorati.com/tags/Reactive-Extensions" rel="tag">Reactive-Extensions</a>,<a href="http://technorati.com/tags/Rx" rel="tag">Rx</a></div></p> Lee Campbellhttp://www.blogger.com/profile/16932445715757919177noreply@blogger.com1tag:blogger.com,1999:blog-455072646448673416.post-8988532309251690452011-05-29T00:00:00.000+01:002011-05-29T00:00:01.101+01:00Rxx - Extension to the Reactive Extensions…<p>For those of you that use the Rx forums you will have no doubt found an answer or even had your question answered by either James Miles or Dave Sexton. These two have put their mighty brains together to produce some handy extensions to Rx. They went with the fairly obvious (but tongue-in-cheek) name Rxx.</p> <p><a href="http://social.msdn.microsoft.com/Forums/hu-HU/rx/thread/32f6ee34-5edf-4038-894c-ab47fc893a78">http://social.msdn.microsoft.com/Forums/hu-HU/rx/thread/32f6ee34-5edf-4038-894c-ab47fc893a78</a></p> <p>The key features that anyone that has been using Rx for a while will immediately be interested in are the</p> <ul> <li>Tracing/Logging</li> <li>PropertyChanged and PropertyDescriptor extensions</li> <li>FileSystemWatcher</li> <li>and the uber-funky OperationalObservable eg. var os = xs + ys – zs;</li> </ul> <p>For a more complete list look at the <a title="Rxx - documentation" href="http://rxx.codeplex.com/documentation">documentation</a>.</p> <p>This could well be a community contribution that could help guide the actual Rx implementations that start coming out of Microsoft (like the Community Extensions around for P&P propjects). James and Dave seem to be taking this quite seriously. They have been checking regularly (with sensible comments!), constructed the codeplex site very well and have already got their issue trackers running.</p> <p>This is one to keep an eye on.</p> <p><a href="http://rxx.codeplex.com/">http://rxx.codeplex.com/</a></p> <div style="padding-bottom: 0px; margin: 0px; padding-left: 0px; padding-right: 0px; display: inline; float: none; padding-top: 0px" id="scid:0767317B-992E-4b12-91E0-4F059A8CECA8:9aab118f-c31a-4226-a09f-8687cbfaea8e" class="wlWriterEditableSmartContent">Technorati Tags: <a href="http://technorati.com/tags/Reactive-Extensions" rel="tag">Reactive-Extensions</a>,<a href="http://technorati.com/tags/Rx" rel="tag">Rx</a></div> Lee Campbellhttp://www.blogger.com/profile/16932445715757919177noreply@blogger.com0tag:blogger.com,1999:blog-455072646448673416.post-88143382947776837092011-05-25T23:50:00.001+01:002011-05-26T00:17:18.271+01:00Rx code from Perth Presentation<p>Sorry about the delay in getting this code up. For those who could not make it, my part of the presentation did a bit of an intro and then discussed the testability of Rx and the easy way to deal with streaming data such as pricing in a financial industry.</p> <p><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhN8yeLziBDDNAK3RfwfzvJ97gqPo3ViNncy4oPutrbeRLTC77V-0FeW6zPn7q-H30gHXMQaIWqwMBgBfdATufCrO-fRoQkZZleltiVYBSot3k4jQJFyj5tZJbnDXyu0weV5WL8KWkBX9g/s1600-h/RxSamplesGalleryScreenShot%25255B16%25255D.png"><img style="display: block; float: none; margin-left: auto; margin-right: auto" title="RxSamplesGalleryScreenShot" alt="RxSamplesGalleryScreenShot" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjSfKhbA_sp8gJvCICL0Ms2-wqLEaKPDJBAErCUzvJ1ZGN6-O6MVVO1pfAEzaGm8_3qCqhLwNDfqASe68VH38lzONSF5wat5gPhdVOugtTjutGqQMBmhQacprMH_gUatDiqCvcdeVxO9EM/?imgmax=800" width="630" height="470" /></a></p> <p><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhjct4brvANsCHdByYsn4BOG92A_oaLxcm2KtYGt17HAjfeKdG4Dk_D0TodRse-9mdcC4ZoBxF1PF4yLqg14XNK-v_c8eKZ2JrG0emL6V3vohUTTXvtHaYRPpu2hpx3gomp_Sh-dMlBDd4/s1600-h/RxSamplesTWAPChartScreenShot%25255B3%25255D.png"><img style="background-image: none; border-bottom: 0px; border-left: 0px; padding-left: 0px; padding-right: 0px; display: block; float: none; margin-left: auto; border-top: 0px; margin-right: auto; border-right: 0px; padding-top: 0px" title="RxSamplesTWAPChartScreenShot" border="0" alt="RxSamplesTWAPChartScreenShot" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgD3_KiXlD-0nS8kVh6smWanTvGQqe52C196-pMlEdEW_p9xmwuX7Bz15Bn_rAap-O6_Uoi6tQpSGQyBpLV3vk9cU2QBtKgpgoNnJk4oWxPTg1iwVKuETHZF7Mjq9rdTUxlVShYwscfA3M/?imgmax=800" width="634" height="474" /></a></p> <p>The key samples from my half of the presentation that raised some interest was the testability of the Photo Gallery View model. The Gallery ViewModel was effectively this</p> <pre class="csharpcode"><span class="rem">/// <summary></span>
<span class="rem">/// Tested Rx implementation of the ViewModel</span>
<span class="rem">/// </summary></span>
<span class="kwrd">public</span> <span class="kwrd">sealed</span> <span class="kwrd">class</span> RxPhotoGalleryViewModel : INotifyPropertyChanged
{
<span class="kwrd">public</span> RxPhotoGalleryViewModel(IImageService imageService, ISchedulerProvider scheduler)
{
IsLoading = <span class="kwrd">true</span>;
var files = imageService.EnumerateImages()
.ToObservable();
files
.SubscribeOn(scheduler.ThreadPool)
.ObserveOn(scheduler.Dispatcher)
.Subscribe(
imagePath =>
{
Images.Add(imagePath);
},
() =>
{
IsLoading = <span class="kwrd">false</span>;
});
}
<span class="kwrd">private</span> <span class="kwrd">readonly</span> ObservableCollection<<span class="kwrd">string</span>> _images = <span class="kwrd">new</span> ObservableCollection<<span class="kwrd">string</span>>();
<span class="kwrd">public</span> ObservableCollection<<span class="kwrd">string</span>> Images
{
get { <span class="kwrd">return</span> _images; }
}
<span class="kwrd">private</span> <span class="kwrd">bool</span> _isLoading;
<span class="kwrd">public</span> <span class="kwrd">bool</span> IsLoading
{
get { <span class="kwrd">return</span> _isLoading; }
set
{
<span class="kwrd">if</span> (_isLoading != <span class="kwrd">value</span>)
{
_isLoading = <span class="kwrd">value</span>;
InvokePropertyChanged(<span class="str">"IsLoading"</span>);
}
}
}
<span class="preproc">#region</span> Implementation of INotifyPropertyChanged
<span class="kwrd">public</span> <span class="kwrd">event</span> PropertyChangedEventHandler PropertyChanged;
<span class="kwrd">public</span> <span class="kwrd">void</span> InvokePropertyChanged(<span class="kwrd">string</span> propertyName)
{
PropertyChangedEventHandler handler = PropertyChanged;
<span class="kwrd">if</span> (handler != <span class="kwrd">null</span>) handler(<span class="kwrd">this</span>, <span class="kwrd">new</span> PropertyChangedEventArgs(propertyName));
}
<span class="preproc">#endregion</span>
}</pre>
<p>Except from the constructor, there is really just two properties that expose change notification for the WPF binding engine. Now the code in the constructor is demo-quality in the sense that it is not good practice to do so much work in the constructor. Maybe this would be better</p>
<pre class="csharpcode"><span class="kwrd">public</span> <span class="kwrd">sealed</span> <span class="kwrd">class</span> RxPhotoGalleryViewModel
{
<span class="kwrd">private</span> <span class="kwrd">readonly</span> IImageService _imageService;
<span class="kwrd">private</span> <span class="kwrd">readonly</span> ISchedulerProvider _scheduler;
<span class="kwrd">public</span> RxPhotoGalleryViewModel(IImageService imageService, ISchedulerProvider scheduler)
{
_imageService = imageService;
_scheduler = scheduler;
}
<span class="kwrd">public</span> <span class="kwrd">void</span> Start()
{
IsLoading = <span class="kwrd">true</span>;
var files = _imageService.EnumerateImages()
.ToObservable();
files
.SubscribeOn(_scheduler.ThreadPool)
.ObserveOn(_scheduler.Dispatcher)
.Subscribe(
imagePath => Images.Add(imagePath),
() =>IsLoading = <span class="kwrd">false</span>);
}
<span class="rem">//....</span>
}</pre>
<p>The test fixture is fairly simple. We pass in a mock implementation of the IImageService and the TestSchedulerProvider similar to the one shown in <a title="Rx Part 8 – Testing Rx" href="http://leecampbell.blogspot.com/2010/10/rx-part-8-testing-rx.html">Rx Part 8 – Testing Rx</a>. </p>
<pre class="csharpcode">[TestClass]
<span class="kwrd">public</span> <span class="kwrd">class</span> RxPhotoGalleryViewModelTests
{
<span class="kwrd">private</span> Mock<IImageService> _imageSrvMock;
<span class="kwrd">private</span> TestSchedulderProvider _testSchedulderProvider;
<span class="kwrd">private</span> List<<span class="kwrd">string</span>> _expectedImages;
[TestInitialize]
<span class="kwrd">public</span> <span class="kwrd">void</span> SetUp()
{
_imageSrvMock = <span class="kwrd">new</span> Mock<IImageService>();
_testSchedulderProvider = <span class="kwrd">new</span> TestSchedulderProvider();
_expectedImages = <span class="kwrd">new</span> List<<span class="kwrd">string</span>> { <span class="str">"one.jpg"</span>, <span class="str">"two.jpg"</span>, <span class="str">"three.jpg"</span> };
_imageSrvMock.Setup(svc => svc.EnumerateImages())
.Returns(_expectedImages);
}
[TestMethod]
<span class="kwrd">public</span> <span class="kwrd">void</span> Should_add_ImagesServiceResults_to_Images()
{
<span class="rem">//Arrange</span>
<span class="rem">// done in setup</span>
<span class="rem">//Act</span>
var sut = <span class="kwrd">new</span> RxPhotoGalleryViewModel(_imageSrvMock.Object, _testSchedulderProvider);
_testSchedulderProvider.ThreadPool.Run();
_testSchedulderProvider.Dispatcher.Run();
<span class="rem">//Assert</span>
CollectionAssert.AreEqual(_expectedImages, sut.Images);
}
[TestMethod]
<span class="kwrd">public</span> <span class="kwrd">void</span> Should_set_IsLoading_to_true()
{
<span class="rem">//Arrange</span>
<span class="rem">// done in setup</span>
<span class="rem">//Act</span>
var sut = <span class="kwrd">new</span> RxPhotoGalleryViewModel(_imageSrvMock.Object, _testSchedulderProvider);
<span class="rem">//--NOTE-- note the missing TestScheduler.Run() calls. This will stop any observable being processed. Cool.</span>
<span class="rem">//Assert</span>
Assert.IsTrue(sut.IsLoading);
}
[TestMethod]
<span class="kwrd">public</span> <span class="kwrd">void</span> Should_set_IsLoading_to_false_when_completed_loading()
{
<span class="rem">//Arrange</span>
<span class="rem">// done in setup</span>
<span class="rem">//Act</span>
var sut = <span class="kwrd">new</span> RxPhotoGalleryViewModel(_imageSrvMock.Object, _testSchedulderProvider);
_testSchedulderProvider.ThreadPool.Run();
_testSchedulderProvider.Dispatcher.Run();
<span class="rem">//Assert</span>
Assert.IsFalse(sut.IsLoading);
}
}</pre>
<p>As we now control the scheduling/concurrency we don't have to try to do anything fancy with Dispatchers,  BackgroundWorkers, ThreadPools or Tasks which are very difficult to perform unit testing on. Check out the pain that I went through to test responsive WPF apps in this post on <a title="Responsive WPF UI - Part 5 -Testing multithreaded WPF code" href="http://leecampbell.blogspot.com/2009/02/responsive-wpf-user-interfaces-part-5.html">Testing Responsive WPF</a> complete with DispatcherFrame and Thread.Sleep(300) in my tests <img style="border-bottom-style: none; border-left-style: none; border-top-style: none; border-right-style: none" class="wlEmoticon wlEmoticon-sadsmile" alt="Sad smile" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjpmqmigfUb0HZPSfEXFyH4hvZYc5XuJEhGEqn2S628NfLdk83RECo2ZJo6fbgvL21BLKT60q8245FY6_2C_Ah5RVtrLO0FTl0sEuZK-Qd9u0s9YumRjgVA3bx0B9a0UkEiT5QmPAPRCLk/?imgmax=800" /></p>
<p>If you want the running code you can either pull the code down via SVN by following this <a href="http://code.google.com/p/rx-samples/source/checkout">http://code.google.com/p/rx-samples/source/checkout</a> or you can <a title="rx-samples_PerthDemo.zip" href="http://code.google.com/p/rx-samples/downloads/detail?name=rx-samples_PerthDemo.zip&can=2&q=">download the zip</a>.</p>
<p>The PowerPoint presentation is <a title="IntroductionToRx.pptx" href="http://code.google.com/p/rx-samples/downloads/detail?name=Introduction%20to%20Rx.pptx&can=2&q=">here</a></p>
<p>You may also be interested in the <a title="Rx Design Guidelines" href="http://blogs.msdn.com/b/rxteam/archive/2010/10/28/rx-design-guidelines.aspx">Design Guidelines</a> produced by the Rx team at Microsoft and also where to get the <a title="Get Started Developing with the Reactive Extensions" href="http://msdn.microsoft.com/en-us/data/gg577610">latest version of Rx</a></p>
<p>Back to the <a href="http://leecampbell.blogspot.com/2010/08/reactive-extensions-for-net.html">contents</a> page for <a href="http://leecampbell.blogspot.com/2010/08/reactive-extensions-for-net.html">Reactive Extensions for .NET Introduction</a></p>
<p>
<div style="padding-bottom: 0px; margin: 0px; padding-left: 0px; padding-right: 0px; display: inline; float: none; padding-top: 0px" id="scid:0767317B-992E-4b12-91E0-4F059A8CECA8:1932855f-fcc3-4d7b-8802-7b6fc6e41242" class="wlWriterEditableSmartContent">Technorati Tags: <a href="http://technorati.com/tags/Reactive-Extensions" rel="tag">Reactive-Extensions</a>,<a href="http://technorati.com/tags/Rx" rel="tag">Rx</a>,<a href="http://technorati.com/tags/Tutorial" rel="tag">Tutorial</a></div></p> Lee Campbellhttp://www.blogger.com/profile/16932445715757919177noreply@blogger.com0tag:blogger.com,1999:blog-455072646448673416.post-71049373676534143202011-05-11T05:48:00.003+01:002011-05-11T05:54:52.246+01:00Rx session in Perth, Australia<p>James Miles and I will be giving a presentation on Rx to a Perth audience this Thursday. If you are in town then come on down!</p>
<p>As those that read this blog will know, Rx is my thing at the moment so feel free to bring plenty of questions.</p>
<p>Full details can be found at the <a href="http://perthdotnet.org/">Perth .NET Community of Practice</a>.</p>
<p><a href="http://perthdotnet.org/blogs/events/archive/2011/05/07/introduction-to-the-net-reactive-extensions-rx-with-lee-campbell-and-james-miles.aspx">Introduction to Rx with Lee Campbell and James Miles</a></p>
<p>See you there.<br/>
Lee</p>Lee Campbellhttp://www.blogger.com/profile/16932445715757919177noreply@blogger.com3tag:blogger.com,1999:blog-455072646448673416.post-53162705932356374132011-03-14T07:25:00.001+00:002012-06-18T17:24:39.481+01:00Rx Part 9–Join, Window, Buffer and Group Join<div dir="ltr" style="text-align: left;" trbidi="on">
<b>STOP THE PRESS!</b> This series has now been superseded by the online book <a href="http://www.introtorx.com/">www.IntroToRx.com</a>. The new site/book offers far better explanations, samples and depth of content. I hope you enjoy!
<br />
<br />
While this series of posts is labelled as an introduction, this post takes us past the <a href="http://blogs.technet.com/b/ieitpro/archive/2006/09/29/459944.aspx" target="_blank" title="Microsoft's Standard Level Definitions (100 to 400)">100 level</a> style posts. This post looks to tackle the interesting way of combing multiple streams of data. We have looked at versions of combining streams in earlier posts with SelectMany, Merge, Zip, Concat etc. The operators that we look at in this post are different for several reasons<br />
<ol>
<li>They are new as of 2011 </li>
<li>Their matching is based on events coinciding with each other based on some given window of time </li>
<li>They offer some pretty powerful stuff that would otherwise be complex to code and just nasty if you went <em>sans-Rx</em>. </li>
</ol>
<h4>
Buffer</h4>
Buffer is not a new operator to us, however it can now be conceptually grouped with the group-join operators. These operators all do something with a stream and a window of time. Each operator will open a window when the source stream produces a value. The way the window is closed and which values are exposed is the main difference between each of the operators. Let us first go back to our old friend BufferWithCount that we saw in the <a href="http://leecampbell.blogspot.com/2010/05/rx-part-2-static-and-extension-methods.html" target="_blank" title="Rx part 2 – Static and Extension methods">second post</a> in <a href="http://leecampbell.blogspot.com/2010/08/reactive-extensions-for-net.html" target="_blank" title="Reactive Extensions for .NET an Introduction - LeeCampbell">this series</a>. <br />
BufferWithCount will create a window when the first value is produced. It will then put that value in to an internal cache. The window will stay open until the count of values has been reached. Each of these values will have been cached. Now that the count has been reached the window will close and the cache will be OnNext’ed as an IList<T>. When the next value is produced from the source, the cache is cleared and we start again. This means that BufferWithCount will take an IObservable<T> and return an IObservable<IList<T>>. <br />
<em>Example Buffer with count of 3</em><br />
<pre class="csharpcode">source|-0-1-2-3-4-5-6-7-8-9|
result|-----0-----3-----6-9|
1 4 7
2 5 8 </pre>
<em>In this marble diagram, I have represented the list of values being returned at a point in time as a column of data. i.e. the values 0, 1 & 2 are all returned in the first buffer.</em><br />
Understanding this it is not much of a leap to understand BufferWithTime. Instead of passing a count we pass a TimeSpan. The closing of the window (and therefore the buffer’s cache) is now dictated by time instead of the count of values produced. This is ever-so more complicated as we now have introduced some sort of scheduling. To produce the IList<T> at the correct point in time we need a scheduler assigned to performing the timing. This also makes testing this stuff a lot easier.<br />
<em>Example Buffer with time of 5 units</em>
<br />
<pre class="csharpcode">source|-0-1-2-3-4-5-6-7-8-9-|
result|----0----2----5----7-|
1 3 6 8
4 9</pre>
<h4>
Window</h4>
The Window operators are very similar to the Buffer operators, they only really differ by their return type. Where Buffer would take an IObservable<T> and return an IObservable<IList<T>>, the Window operators return an IObservable<IObservable<T>>. It is also worth noting that the Buffer operators will not yield their buffers until the window closes.<br />
<em>Example of Window with a count of 3</em><br />
<pre class="csharpcode">source |-0-1-2-3-4-5-6-7-8-9|
window0|-0-1-2|
window1 3-4-5|
window2 6-7-8|
window3 9|</pre>
<em>Example of Window with time of 5 units</em><br />
<pre class="csharpcode">source |-0-1-2-3-4-5-6-7-8-9|
window0|-0-1-|
window1 2-3-4|
window2 -5-6-|
window3 7-8-9|</pre>
So the obvious difference here is that with the Window operators you get hold of the values from the source as soon as they are produced, but the Buffer operators you must wait until the window closes before the values are accessible.<br />
<h4>
Switch is the Anti Window <img alt="Smile" class="wlEmoticon wlEmoticon-smile" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgCpMSPEXb3RKae-tnFmx8ZCz18oa6Tt0RHuPpjT4NPCzLQGFYyQb6G3iTdh0dqFTRXqYixosPXdwA6LsBUH3gJsslW0i675wVRSLpo6CrYzMVUnUx3sHGI5D_-tno72UOhksW0RuDQDXc/?imgmax=800" style="border-bottom-style: none; border-left-style: none; border-right-style: none; border-top-style: none;" /></h4>
I think it is worth noting, at least from an academic point, that the Window operators produce IObservable<IObservable<T>> and that the Switch operator takes an IObservable<IObservable<T>> and returns an IObservable<T>. As the Window operators ensure that the windows (child streams) do not overlap, we can use the Switch operator to turn a windowed stream back into its original stream.<br />
<pre class="csharpcode"><span class="rem">//is the same as Observable.Interval(TimeSpan.FromMilliseconds(200)).Take(10)</span>
var switchedWindow = Observable.Interval(TimeSpan.FromMilliseconds(200)).Take(10)
.WindowWithTime(TimeSpan.FromMilliseconds(500))
.Switch();</pre>
<h4>
Join</h4>
Join is not a new Method to the Rx library, but overload we are interested today in is new. From what I can see on the 2 original overloads that take an Array or an IEnumerable of Plan<T>, the usage can be replicated with Merge and Select. They are a bit of a mystery to me.<br />
The overload we are interested in is <br />
<pre class="csharpcode"><span class="kwrd">public</span> <span class="kwrd">static</span> IObservable<TResult> Join<TLeft, TRight, TLeftDuration, TRightDuration, TResult>
(
<span class="kwrd">this</span> IObservable<TLeft> left,
IObservable<TRight> right,
Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector,
Func<TRight, IObservable<TRightDuration>> rightDurationSelector,
Func<TLeft, TRight, TResult> resultSelector
)</pre>
Now this is a fairly hairy signature to try and understand in one go, so let’s take it one parameter at a time.<br />
<strong>IObservable<TLeft> left</strong> is source stream that defines when a window starts. This is just like the Buffer and Window operators, except that every value published from this source opens a new window. In Buffer and Window, some values just fell into an existing window.<br />
I like to think <strong>IObservable<TRight> right</strong> as the window value stream. While the left stream controls opening the windows, the right stream will try to pair up with a value from the left stream.<br />
Let us imagine that our left stream produces a value, which creates a new window. If the right stream produces a value while the window is open then the <strong>resultSelector</strong> function is called with the two values. This is the crux of join, pairing two values from a stream that occur with in the same window. This then leads us to our next question; When does the window close? The answer to this question is both the power and complexity of the Join operator. <br />
When <strong>left</strong> produces a value, a window is opened. That value is also then passed to the <strong>leftDurationSelector</strong> function. The result of this function is an IObservable<TLeftDuration>. When that IObservable OnNext’s <em>or</em> Completes then the window for that value is closed. Note that it is irrelevant what the type of <strong>TLeftDuration</strong> is. This initially left me with the feeling that IObservable<TLeftDuration> was all a bit over kill as you effectively just need some sort of event to say “Closed!”. However by allowing you to use IObservable you can do some clever stuff as we will see later.<br />
So let us first imagine a scenario where we have the left stream producing values twice as fast as the right stream. Imagine that we also never close the windows. We could do this by always returning Observable.Never<Unit>() from the <strong>leftDurationSelector</strong> function. This would result in the following pairs being produced.<br />
<pre class="csharpcode">left |-0-1-2-3-4-5|
right |---A---B---C|
result|---0---0---0
A B C
1 1 1
A B C
2 2
B C
3 3
B C
4
C
5
C</pre>
As you can see the left values are cached and replayed each time the right produces a value. <br />
Now it seems fairly obvious that if I immediately closed the window by returning Observable.Empty<Unit> or perhaps Observable.Return(0) that windows would never be opened so no pairs would ever get produced. However what could I do to make sure that these windows did not overlap so that once a second value was produced I would no longer see the first value? Well, if we returned the <strong>left</strong> stream from the <strong>leftDurationSelector</strong> that could do it. But wait, when we return the <strong>left</strong> from the <strong>leftDurationSelector</strong> it would try to create another subscription and that may introduce side effects. The quick answer to that is to Publish and RefCount the <strong>left</strong> stream. If we do that the results look more like this.<br />
<pre class="csharpcode">left |-0-1-2-3-4-5|
right |---A---B---C|
result|---1---3---5
A B C</pre>
This made me think that I could use Join to produce my own version of CombineLatest that we saw in the <a href="http://leecampbell.blogspot.com/2010/06/rx-part-5-combining-multiple.html" target="_blank" title="RX Part 5 – Combining multiple IObservable streams">5th post in the series</a>. If I had the values from left expire when the next value from left was OnNext’ed then I would be well on my way. However I need the same thing to happen for the right. Luckily the Join operator also provides us with a <strong>rightDurationSelector</strong> that works just like the <strong>leftDurationSelector</strong>. This is simple to implement, all I need to do is return a reference to the same left stream when a left value is produced and then the same for the right. The code looks like this.<br />
<pre class="csharpcode"><span class="kwrd">public</span> <span class="kwrd">static</span> IObservable<TResult> MyCombineLatest<TLeft, TRight, TResult>
(
IObservable<TLeft> left,
IObservable<TRight> right,
Func<TLeft, TRight, TResult> resultSelector)
{
var refcountedLeft = left.Publish().RefCount();
var refcountedRight = right.Publish().RefCount();
<span class="kwrd">return</span> Observable.Join(
refcountedLeft,
refcountedRight,
<span class="kwrd">value</span> => refcountedLeft,
<span class="kwrd">value</span> => refcountedRight,
resultSelector);
}</pre>
While the code above is not production quality (it would need to have some gates in place to mitigate race conditions), it shows us the power that we could get with Join; we can actually use it to create other operators!<br />
<h4>
GroupJoin</h4>
When the Join operator pairs up values that coincide within a window, it would always produce just the left value and the right value to the <strong>resultSelector</strong>. The <strong>GroupJoin</strong> operator takes this one step further by passing the left value immediately to the <strong>resultSelector</strong> with an IObservable of the right values that occur within the window. It’s signature is very similar to Join but note the difference in the <strong>resultSelector</strong> Func.<br />
<pre class="csharpcode"><span class="kwrd">public</span> <span class="kwrd">static</span> IObservable<TResult> GroupJoin<TLeft, TRight, TLeftDuration, TRightDuration, TResult>
(
<span class="kwrd">this</span> IObservable<TLeft> left,
IObservable<TRight> right,
Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector,
Func<TRight, IObservable<TRightDuration>> rightDurationSelector,
Func<TLeft, IObservable<TRight>, TResult> resultSelector
)</pre>
If we went back to our first Join example where we had <br />
<ul>
<li>the <strong>left</strong> producing values twice as fast as the right, </li>
<li>the left never expiring </li>
<li>the right immediately expiring </li>
</ul>
this is what the result may look like<br />
<pre class="csharpcode">left |-0-1-2-3-4-5|
right |---A---B---C|
0th window values --A---B---C|
1st window values A---B---C|
2nd window values --B---C|
3rd window values B---C|
4th window values --C|
5th window values C|</pre>
Now we could switch it around and have it that the left expired immediately and the right never expired the result may look like this<br />
<pre class="csharpcode">left |-0-1-2-3-4-5|
right |---A---B---C|
0th window values |
1st window values A|
2nd window values A|
3rd window values AB|
4th window values AB|
5th window values ABC|</pre>
This starts to make things interesting. Sharp readers may have noticed that with GroupJoin you could effectively re-create your own Join by doing something like this<br />
<pre class="csharpcode"><span class="kwrd">public</span> IObservable<TResult> MyJoin<TLeft, TRight, TLeftDuration, TRightDuration, TResult>(
IObservable<TLeft> left,
IObservable<TRight> right,
Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector,
Func<TRight, IObservable<TRightDuration>> rightDurationSelector,
Func<TLeft, TRight, TResult> resultSelector)
{
<span class="kwrd">return</span> Observable.GroupJoin
(
left,
right,
leftDurationSelector,
rightDurationSelector,
(leftValue, rightValues)=>rightValues.Select(rightValue=>resultSelector(leftValue, rightValue))
)
.Merge();
}</pre>
I even was able to knock up my own version of WindowWithTime with this code below<br />
<pre class="csharpcode"><span class="kwrd">public</span> IObservable<IObservable<T>> MyWindowWithTime<T>(IObservable<T> source, TimeSpan windowPeriod)
{
<span class="kwrd">return</span> Observable.CreateWithDisposable<IObservable<T>>(o =>
{
var windower = <span class="kwrd">new</span> Subject<<span class="kwrd">long</span>>();
var intervals = Observable.Concat(
Observable.Return(0l),
Observable.Interval(windowPeriod)
)
.Publish()
.RefCount();
var subscription = Observable.GroupJoin
(
windower,
source.Do(_ => { }, windower.OnCompleted),
_ => windower,
_ => Observable.Empty<Unit>(),
(left, sourceValues) => sourceValues
)
.Subscribe(o);
var intervalSubscription = intervals.Subscribe(windower);
<span class="kwrd">return</span> <span class="kwrd">new</span> CompositeDisposable(subscription, intervalSubscription);
});
}</pre>
Yeah it is not so pretty, but it is an academic exercise to show case GroupJoin. Those that have read Bart DeSmet’s <a href="http://blogs.bartdesmet.net/blogs/bart/archive/2010/01/01/the-essence-of-linq-minlinq.aspx" target="_blank" title="The essence of Linq - MiniLinq">excellent MiniLinq post</a> (and <a href="http://channel9.msdn.com/Shows/Going+Deep/Bart-De-Smet-MinLINQ-The-Essence-of-LINQ" target="_blank" title="The essence of Linq - MiniLinq - Channel9">follow up video</a>) can see that GroupJoin could almost be added to the 3 basic operators Cata, Ana and Bind.
<br />
GroupJoin and the other window operators can make otherwise fiddly and difficult tasks a cinch to put together. For example, those in the Finance game can now pretty much use GroupJoin to create their own VWAP and TWAP extension methods. Nice!<br />
The full source code is now available either via svn at <a href="http://code.google.com/p/rx-samples/source/checkout">http://code.google.com/p/rx-samples/source/checkout</a> or as a <a href="http://rx-samples.googlecode.com/files/rx-samples_v009.zip">zip file</a>.<br />
Back to the <a href="http://leecampbell.blogspot.com/2010/08/reactive-extensions-for-net.html">contents</a> page for <a href="http://leecampbell.blogspot.com/2010/08/reactive-extensions-for-net.html">Reactive Extensions for .NET Introduction</a><br />
Back to the previous post; <a href="http://leecampbell.blogspot.com/2010/10/rx-part-8-testing-rx.html" title="Part 8 - Testing Rx - Lee Campbell">Rx Part 8 - Testing Rx</a><br />
<div class="wlWriterEditableSmartContent" id="scid:0767317B-992E-4b12-91E0-4F059A8CECA8:08d0ad63-e148-4b19-a1f8-76b606922211" style="display: inline; float: none; margin: 0px; padding-bottom: 0px; padding-left: 0px; padding-right: 0px; padding-top: 0px;">
Technorati Tags: <a href="http://technorati.com/tags/Reactive-Extensions" rel="tag">Reactive-Extensions</a>,<a href="http://technorati.com/tags/Rx" rel="tag">Rx</a>,<a href="http://technorati.com/tags/Tutorial" rel="tag">Tutorial</a></div>
</div>Lee Campbellhttp://www.blogger.com/profile/16932445715757919177noreply@blogger.com6tag:blogger.com,1999:blog-455072646448673416.post-18074061501281237272011-02-20T17:07:00.001+00:002011-02-20T17:07:19.615+00:00Changes to the Rx API<p>Readers of the <a title="Reactive Extensions for .NET - An introduction" href="http://leecampbell.blogspot.com/2010/08/reactive-extensions-for-net.html">Reactive Extensions series</a> on this blog will have noticed recently that not all of the code always works* if you download the <a title="DevLabs: Reactive Extensions for .NET (Rx)" href="http://msdn.microsoft.com/en-us/devlabs/ee794896" target="_blank">Rx assemblies</a> and then copy the code off the blog. This is due to the quite frequent changes to the API. I personally have 7 different versions of the libraries and I don’t think I have downloaded them all as they have been released! Also the blog post were started all the way back in May 2010 so it is fair that there has been some movement in the API</p> <p>*If you get the code from the <a href="http://code.google.com/p/rx-samples/source/checkout">http://code.google.com/p/rx-samples/source/checkout</a> or as a <a href="http://rx-samples.googlecode.com/files/rx-samples_v008.zip">zip file</a> then it will work, as the correct version of the assembly is included.</p> <p>For fun I thought I would try to exercise my LINQ skills and write a quick diff tool so I can see what on the public API is actually changing on me. I threw this class together in LinqPad</p> <pre class="csharpcode"><span class="kwrd">public</span> <span class="kwrd">class</span> AssemblyDiff
{
<span class="kwrd">private</span> <span class="kwrd">readonly</span> <span class="kwrd">string</span> _oldAssemblyPath;
<span class="kwrd">private</span> <span class="kwrd">readonly</span> <span class="kwrd">string</span> _newAssemblyPath;
<span class="kwrd">public</span> AssemblyDiff(<span class="kwrd">string</span> oldAssemblyPath, <span class="kwrd">string</span> newAssemblyPath)
{
_oldAssemblyPath = oldAssemblyPath;
_newAssemblyPath = newAssemblyPath;
}
<span class="kwrd">public</span> IEnumerable<String> NewMethodNames()
{
<span class="kwrd">return</span> MethodNameDelta(GetDeclaredMethods(_newAssemblyPath), GetDeclaredMethods(_oldAssemblyPath));
}
<span class="kwrd">public</span> IEnumerable<String> DeprecatedMethodNames()
{
<span class="kwrd">return</span> MethodNameDelta(GetDeclaredMethods(_oldAssemblyPath), GetDeclaredMethods(_newAssemblyPath));
}
<span class="kwrd">public</span> <span class="kwrd">static</span> IEnumerable<String> MethodNameDelta(IEnumerable<MethodInfo> original, IEnumerable<MethodInfo> modified)
{
<span class="kwrd">return</span> from methodName <span class="kwrd">in</span> original.Select(MethodName).Except(modified.Select(MethodName))
orderby methodName
select methodName;
}
<span class="kwrd">public</span> IEnumerable<MethodInfo> NewMethods()
{
var oldMethods = GetDeclaredMethods(_oldAssemblyPath);
var currentMethods = GetDeclaredMethods(_newAssemblyPath);
<span class="kwrd">return</span> MethodDelta(oldMethods, currentMethods);
}
<span class="kwrd">public</span> IEnumerable<MethodInfo> DeprecatedMethods()
{
var oldMethods = GetDeclaredMethods(_oldAssemblyPath);
var currentMethods = GetDeclaredMethods(_newAssemblyPath);
<span class="kwrd">return</span> MethodDelta(currentMethods, oldMethods);
}
<span class="kwrd">public</span> <span class="kwrd">static</span> IEnumerable<MethodInfo> MethodDelta(IEnumerable<MethodInfo> original, IEnumerable<MethodInfo> changed)
{
var existingTypes = original.Select(m => m.ReflectedType.FullName)
.Distinct()
.ToList();
<span class="kwrd">return</span> from method <span class="kwrd">in</span> changed.Except(original, <span class="kwrd">new</span> MethodSignatureComparer())
<span class="kwrd">where</span> existingTypes.Contains(method.ReflectedType.FullName)
orderby method.ReflectedType.Name, method.Name
select method;
}
<span class="kwrd">public</span> IEnumerable<Type> NewTypes()
{
var currentTypes = GetTypes(_newAssemblyPath);
var oldTypes = GetTypes(_oldAssemblyPath);
<span class="kwrd">return</span> from type <span class="kwrd">in</span> currentTypes
<span class="kwrd">where</span> !oldTypes.Select (t => t.FullName).Contains(type.FullName)
select type;
}
<span class="kwrd">public</span> IEnumerable<Type> DeprecatedTypes()
{
var currentTypes = GetTypes(_newAssemblyPath);
var oldTypes = GetTypes(_oldAssemblyPath);
<span class="kwrd">return</span> from type <span class="kwrd">in</span> oldTypes
<span class="kwrd">where</span> !currentTypes.Select (t => t.FullName).Contains(type.FullName)
select type;
}
<span class="kwrd">private</span> <span class="kwrd">static</span> IEnumerable<MethodInfo> GetAllMethods(<span class="kwrd">string</span> path)
{
<span class="kwrd">return</span> from type <span class="kwrd">in</span> GetTypes(path)
from method <span class="kwrd">in</span> type.GetMethods()
<span class="kwrd">where</span> method.IsPublic
select method;
}
<span class="kwrd">private</span> <span class="kwrd">static</span> IEnumerable<MethodInfo> GetDeclaredMethods(<span class="kwrd">string</span> path)
{
<span class="kwrd">return</span> GetAllMethods(path).Where(method => method.DeclaringType == method.ReflectedType);
}
<span class="kwrd">private</span> <span class="kwrd">static</span> IEnumerable<Type> GetTypes(<span class="kwrd">string</span> path)
{
<span class="kwrd">return</span> from file <span class="kwrd">in</span> Directory.EnumerateFiles(path, <span class="str">"*.dll"</span>)
from module <span class="kwrd">in</span> Assembly.LoadFrom(file).GetModules()
from type <span class="kwrd">in</span> module.GetTypes()
<span class="kwrd">where</span> type.IsPublic
select type;
}
<span class="kwrd">private</span> <span class="kwrd">static</span> <span class="kwrd">string</span> MethodName(MethodInfo m)
{
<span class="kwrd">return</span> <span class="kwrd">string</span>.Format(<span class="str">"{0}.{1}"</span>, m.ReflectedType.Name, m.Name);
}
<span class="kwrd">public</span> <span class="kwrd">static</span> <span class="kwrd">string</span> MethodSignature(MethodInfo m)
{
<span class="rem">//return m.ToString();</span>
var ps = m.GetParameters();
var args = ps.Select(p=>ParameterSignature(p, ps.Length));
var argsDemlimted = <span class="kwrd">string</span>.Join(<span class="str">","</span>, args);
<span class="kwrd">return</span> <span class="kwrd">string</span>.Format(<span class="str">"{0} {1}.{2}({3})"</span>, m.ReturnType.Name, m.ReflectedType.Name, m.Name, argsDemlimted);
}
<span class="kwrd">private</span> <span class="kwrd">static</span> <span class="kwrd">string</span> ParameterSignature(ParameterInfo parameter, <span class="kwrd">int</span> parameterCount)
{
var modifier = <span class="str">""</span>;<span class="rem">//out/ref/params/</span>
var defaultValue = <span class="str">""</span>;
<span class="kwrd">if</span>(parameter.IsOut) modifier = <span class="str">"out "</span>;
<span class="kwrd">if</span>(parameter.IsOptional)
{
modifier = <span class="str">"optional "</span>;
defaultValue = parameter.DefaultValue.ToString();
}
<span class="kwrd">if</span>(parameter.IsRetval) modifier += <span class="str">"isretval "</span>;
<span class="kwrd">if</span>(parameter.IsIn) modifier += <span class="str">"IsIn "</span>;
<span class="kwrd">if</span>(parameter.IsLcid) modifier += <span class="str">"IsLcid "</span>;
<span class="kwrd">if</span>(parameter.Position== parameterCount-1 && parameter.ParameterType.IsArray)
{
modifier = <span class="str">"params "</span>;
}
<span class="kwrd">return</span> <span class="kwrd">string</span>.Format(<span class="str">"{0}{1}{2}"</span>, modifier,parameter.Name, defaultValue);
}
<span class="kwrd">private</span> <span class="kwrd">class</span> MethodSignatureComparer : IEqualityComparer<MethodInfo>
{
<span class="kwrd">public</span> <span class="kwrd">bool</span> Equals(MethodInfo lhs, MethodInfo rhs)
{
<span class="kwrd">return</span> <span class="kwrd">string</span>.Equals(lhs.ToString(), rhs.ToString());
}
<span class="kwrd">public</span> <span class="kwrd">int</span> GetHashCode(MethodInfo method)
{
<span class="kwrd">return</span> method.ToString().GetHashCode();
}
}
}</pre>
<p>and then I used it like this </p>
<pre class="csharpcode">var old = <span class="str">@"C:\Program Files\Microsoft Cloud Programmability\Reactive Extensions\v1.0.2787.0\Net4"</span>;
var current = <span class="str">@"C:\Program Files\Microsoft Cloud Programmability\Reactive Extensions\v1.0.2838.0\Net4"</span>;
var dllDiff = <span class="kwrd">new</span> AssemblyDiff(old, current);
<span class="str">"New Methods"</span>.Dump();
dllDiff.NewMethodNames().Dump();
<span class="str">"Deprecated Methods"</span>.Dump();
dllDiff.DeprecatedMethodNames().Dump();
<span class="str">"New Types"</span>.Dump();
dllDiff.NewTypes().Select (t => t.FullName).Dump();
<span class="str">"Deprecated Types"</span>.Dump();
dllDiff.DeprecatedTypes().Select (t => t.FullName).Dump();
<span class="str">"New overloads"</span>.Dump();
dllDiff.NewMethods().Select(AssemblyDiff.MethodSignature).Dump();
<span class="str">"Deprecated overloads"</span>.Dump();
dllDiff.DeprecatedMethods().Select(AssemblyDiff.MethodSignature).Dump();</pre>
<p>and I get this neat output.</p>
<p><strong>New Methods (7 Items)</strong></p>
<ul>
<li>ConnectableObservable`2.Connect</li>
<li>ConnectableObservable`2.Subscribe</li>
<li>Observable.GroupJoin</li>
<li>Observable.Multicast</li>
<li>Observable.Window</li>
<li>Qbservable.GroupJoin</li>
<li>Qbservable.Window</li>
</ul>
<p><strong>Deprecated Methods (6 Items)</strong></p>
<ul>
<li>ConnectableObservable`1.Connect</li>
<li>ConnectableObservable`1.Subscribe</li>
<li>Observable.Prune</li>
<li>Observable.Replay</li>
<li>Qbservable.Prune</li>
<li>Qbservable.Replay</li>
</ul>
<p><strong>New Types (1 Item)</strong></p>
<ul>
<li>System.Collections.Generic.ConnectableObservable`2</li>
</ul>
<p><strong>Deprecated Types (0 Items)</strong></p>
<p><strong>New Overloads (30 Items)</strong></p>
<ul>
<li>IEnumerable`1 EnumerableEx.Generate(initialState,condition,iterate,resultSelector)</li>
<li>IObservable`1 Observable.BufferWithTime(source,timeSpan,timeShift,scheduler)</li>
<li>IObservable`1 Observable.BufferWithTime(source,timeSpan,scheduler)</li>
<li>IObservable`1 Observable.BufferWithTime(source,timeSpan,timeShift)</li>
<li>IObservable`1 Observable.BufferWithTime(source,timeSpan)</li>
<li>IObservable`1 Observable.BufferWithTimeOrCount(source,timeSpan,count,scheduler)</li>
<li>IObservable`1 Observable.BufferWithTimeOrCount(source,timeSpan,count)</li>
<li>IObservable`1 Observable.GroupJoin(left,right,leftDurationSelector,rightDurationSelector,resultSelector)</li>
<li>IObservable`1 Observable.If(condition,thenSource)</li>
<li>IObservable`1 Observable.Join(left,right,leftDurationSelector,rightDurationSelector,resultSelector)</li>
<li>IConnectableObservable`1 Observable.Multicast(source,subject)</li>
<li>IObservable`1 Observable.Publish(source,subject)</li>
<li>IObservable`1 Observable.Publish(source,subject,selector)</li>
<li>IObservable`1 Observable.Window(source,windowOpenings,windowClosingSelector)</li>
<li>IObservable`1 Observable.Window(source,windowClosingSelector,scheduler)</li>
<li>IObservable`1 Observable.Window(source,windowClosingSelector)</li>
<li>IQbservable`1 Qbservable.BufferWithTime(source,timeSpan,timeShift,scheduler)</li>
<li>IQbservable`1 Qbservable.BufferWithTime(source,timeSpan,scheduler)</li>
<li>IQbservable`1 Qbservable.BufferWithTime(source,timeSpan,timeShift)</li>
<li>IQbservable`1 Qbservable.BufferWithTime(source,timeSpan)</li>
<li>IQbservable`1 Qbservable.BufferWithTimeOrCount(source,timeSpan,count,scheduler)</li>
<li>IQbservable`1 Qbservable.BufferWithTimeOrCount(source,timeSpan,count)</li>
<li>IQbservable`1 Qbservable.GroupJoin(left,right,leftDurationSelector,rightDurationSelector,resultSelector)</li>
<li>IQbservable`1 Qbservable.If(provider,condition,thenSource)</li>
<li>IQbservable`1 Qbservable.Join(left,right,leftDurationSelector,rightDurationSelector,resultSelector)</li>
<li>IQbservable`1 Qbservable.Publish(source,subject,selector)</li>
<li>IQbservable`1 Qbservable.Publish(source,subject)</li>
<li>IQbservable`1 Qbservable.Window(source,windowOpenings,windowClosingSelector)</li>
<li>IQbservable`1 Qbservable.Window(source,windowClosingSelector,scheduler)</li>
<li>IQbservable`1 Qbservable.Window(source,windowClosingSelector)</li>
</ul>
<p><strong>Deprecated overloads (71 Items)</strong></p>
<ul>
<li>IEnumerable`1 EnumerableEx.Generate(initialState,condition,resultSelector,iterate)</li>
<li>IEnumerable`1 EnumerableEx.Generate(function)</li>
<li>IEnumerable`1 EnumerableEx.Generate(initialState,resultSelector,iterate)</li>
<li>IEnumerable`1 EnumerableEx.Generate(initial,resultSelector,iterate)</li>
<li>IEnumerable`1 EnumerableEx.Generate(initial,condition,resultSelector,iterate)</li>
<li>IObservable`1 Observable.BufferWithTime(source,timeSpan,timeShift,scheduler)</li>
<li>IObservable`1 Observable.BufferWithTime(source,timeSpan,scheduler)</li>
<li>IObservable`1 Observable.BufferWithTime(source,timeSpan,timeShift)</li>
<li>IObservable`1 Observable.BufferWithTime(source,timeSpan)</li>
<li>IObservable`1 Observable.BufferWithTimeOrCount(source,timeSpan,count,scheduler)</li>
<li>IObservable`1 Observable.BufferWithTimeOrCount(source,timeSpan,count)</li>
<li>IConnectableObservable`1 Observable.Prune(source)</li>
<li>IConnectableObservable`1 Observable.Prune(source,scheduler)</li>
<li>IObservable`1 Observable.Prune(source,selector)</li>
<li>IObservable`1 Observable.Prune(source,selector,scheduler)</li>
<li>IObservable`1 Observable.Publish(source1,source2,selector)</li>
<li>IObservable`1 Observable.Publish(source1,source2,selector,scheduler)</li>
<li>IObservable`1 Observable.Publish(source1,source2,source3,selector)</li>
<li>IObservable`1 Observable.Publish(source1,source2,source3,selector,scheduler)</li>
<li>IObservable`1 Observable.Publish(source1,source2,source3,source4,selector)</li>
<li>IObservable`1 Observable.Publish(source1,source2,source3,source4,selector,scheduler)</li>
<li>IConnectableObservable`1 Observable.Publish(source,initialValue)</li>
<li>IConnectableObservable`1 Observable.Publish(source,initialValue,scheduler)</li>
<li>IObservable`1 Observable.Publish(source,selector,initialValue)</li>
<li>IObservable`1 Observable.Publish(source,selector,initialValue,scheduler)</li>
<li>IConnectableObservable`1 Observable.Publish(source)</li>
<li>IConnectableObservable`1 Observable.Publish(source,scheduler)</li>
<li>IObservable`1 Observable.Publish(source,selector)</li>
<li>IObservable`1 Observable.Publish(source,selector,scheduler)</li>
<li>IConnectableObservable`1 Observable.Replay(source)</li>
<li>IConnectableObservable`1 Observable.Replay(source,scheduler)</li>
<li>IObservable`1 Observable.Replay(source,selector)</li>
<li>IObservable`1 Observable.Replay(source,selector,scheduler)</li>
<li>IConnectableObservable`1 Observable.Replay(source,window)</li>
<li>IObservable`1 Observable.Replay(source,selector,window)</li>
<li>IConnectableObservable`1 Observable.Replay(source,window,scheduler)</li>
<li>IObservable`1 Observable.Replay(source,selector,window,scheduler)</li>
<li>IConnectableObservable`1 Observable.Replay(source,bufferSize,scheduler)</li>
<li>IObservable`1 Observable.Replay(source,selector,bufferSize,scheduler)</li>
<li>IConnectableObservable`1 Observable.Replay(source,bufferSize)</li>
<li>IObservable`1 Observable.Replay(source,selector,bufferSize)</li>
<li>IConnectableObservable`1 Observable.Replay(source,bufferSize,window)</li>
<li>IObservable`1 Observable.Replay(source,selector,bufferSize,window)</li>
<li>IConnectableObservable`1 Observable.Replay(source,bufferSize,window,scheduler)</li>
<li>IObservable`1 Observable.Replay(source,selector,bufferSize,window,scheduler)</li>
<li>IQbservable`1 Qbservable.BufferWithTime(source,timeSpan,timeShift,scheduler)</li>
<li>IQbservable`1 Qbservable.BufferWithTime(source,timeSpan,scheduler)</li>
<li>IQbservable`1 Qbservable.BufferWithTime(source,timeSpan,timeShift)</li>
<li>IQbservable`1 Qbservable.BufferWithTime(source,timeSpan)</li>
<li>IQbservable`1 Qbservable.BufferWithTimeOrCount(source,timeSpan,count,scheduler)</li>
<li>IQbservable`1 Qbservable.BufferWithTimeOrCount(source,timeSpan,count)</li>
<li>IQbservable`1 Qbservable.Prune(source,selector,scheduler)</li>
<li>IQbservable`1 Qbservable.Prune(source,selector)</li>
<li>IQbservable`1 Qbservable.Publish(source,selector,initialValue)</li>
<li>IQbservable`1 Qbservable.Publish(source,selector,scheduler)</li>
<li>IQbservable`1 Qbservable.Publish(source,selector,initialValue,scheduler)</li>
<li>IQbservable`1 Qbservable.Publish(source,selector)</li>
<li>IQbservable`1 Qbservable.Publish(source1,source2,selector)</li>
<li>IQbservable`1 Qbservable.Publish(source1,source2,selector,scheduler)</li>
<li>IQbservable`1 Qbservable.Publish(source1,source2,source3,selector)</li>
<li>IQbservable`1 Qbservable.Publish(source1,source2,source3,selector,scheduler)</li>
<li>IQbservable`1 Qbservable.Publish(source1,source2,source3,source4,selector)</li>
<li>IQbservable`1 Qbservable.Publish(source1,source2,source3,source4,selector,scheduler)</li>
<li>IQbservable`1 Qbservable.Replay(source,selector,bufferSize)</li>
<li>IQbservable`1 Qbservable.Replay(source,selector,bufferSize,window)</li>
<li>IQbservable`1 Qbservable.Replay(source,selector,bufferSize,window,scheduler)</li>
<li>IQbservable`1 Qbservable.Replay(source,selector)</li>
<li>IQbservable`1 Qbservable.Replay(source,selector,scheduler)</li>
<li>IQbservable`1 Qbservable.Replay(source,selector,window)</li>
<li>IQbservable`1 Qbservable.Replay(source,selector,window,scheduler)</li>
<li>IQbservable`1 Qbservable.Replay(source,selector,bufferSize,scheduler)</li>
</ul>
<p>From memory Generate has been a constant source of change which has confused some readers that are using different versions of the library to what the Part 2 post was done with. This diff script  goes to show that it is still undergoing changes </p>
<p><img style="border-bottom-style: none; border-right-style: none; border-top-style: none; border-left-style: none" class="wlEmoticon wlEmoticon-smile" alt="Smile" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEivliXti4SXnRhjFCnkXLQZkHJVLTaZPFSIxIss6djs2ZJly7PSDonhA-tGpJm4WSepvLNngUpEvTE1GuorUPxoZUhggmK0tRSkIrRgYzRXMoAVK6Dt2Rr2Yh5ScWY9sL5A7j7W4l6iptA/?imgmax=800" /></p>
<p>Links:</p>
<p><a title="Reactive Extensions for .NET -An Introduction" href="http://leecampbell.blogspot.com/2010/08/reactive-extensions-for-net.html">Reactive Extensions for .NET an Introduction</a></p>
<p><a title="DevLabs: Reactive Extensions for .NET (Rx)" href="http://msdn.microsoft.com/en-us/devlabs/ee794896">DevLabs: Reactive Extensions for .NET (Rx)</a></p> Lee Campbellhttp://www.blogger.com/profile/16932445715757919177noreply@blogger.com1tag:blogger.com,1999:blog-455072646448673416.post-76510188926202465022011-02-13T15:02:00.002+00:002011-02-20T16:26:12.906+00:00Silverlight testing<p>I am putting this out there to see if I can get some traction with other Test Driven Silverlight coders out there. If you are one of these people you will know of the strife your day-to-day coding. For those who don't know what I mean these are the three options a Silverlight developer has with regards to test driven coding: </p> <p>1) Use the <a href="http://code.msdn.microsoft.com/silverlightut">Silverlight Unit Test Framework</a>. The problem with this is that you lose any integrated development support, it is amazingly slow (in the area of 1-5 tests per second), and doesn't have any useful build tool support (coverage, TFS, TeamCity). Massive Fail. </p> <p>2) Cross compile to .NET 4 all of your Models, ViewModels, Controllers, Modules, Presenters (i.e. everything that is not a View or a Control). Now write unit tests against this project. This means you get back to fast tests (100s tests per second) but take a hit on compiling twice and managing the project linking and just having twice as many projects floating around. </p> <p>3) What I imagine as the most popular option, just don't write any tests. </p> <p>Looking at what most of the requests are for, tells me most people are using Silverlight for Marketing websites to stream rich content. Business applications have yet to stake any dominance. What I am hoping that anyone reading this will if they feel my pain, just go to <a href="http://dotnet.uservoice.com/forums/4325-silverlight-feature-suggestions/suggestions/313397-unit-testing-integrated-in-visual-studio-and-msbui?ref=title">this link</a> and vote. It seems that these polls really have an effect; DataTemplates appear to be part of SL 5 due to massive demand. I am hoping that Microsoft can focus on getting the underlying framework right before they go off and give us a 3D-multitouch-proximity aware API :) </p> <p><a href="http://dotnet.uservoice.com/forums/4325-silverlight-feature-suggestions/suggestions/313397-unit-testing-integrated-in-visual-studio-and-msbui?ref=title">http://dotnet.uservoice.com/forums/4325-silverlight-feature-suggestions/suggestions/313397-unit-testing-integrated-in-visual-studio-and-msbui?ref=title</a></p> Lee Campbellhttp://www.blogger.com/profile/16932445715757919177noreply@blogger.com2tag:blogger.com,1999:blog-455072646448673416.post-36937909435692091762010-11-02T14:16:00.003+00:002012-06-18T17:26:34.410+01:00Rx design guidelines<div dir="ltr" style="text-align: left;" trbidi="on">
The Rx team have released a pdf specifiying their Design Guidelines to use when coding with the Reactive Extentions.
The original post is here
<a href="http://blogs.msdn.com/b/rxteam/archive/2010/10/28/rx-design-guidelines.aspx">http://blogs.msdn.com/b/rxteam/archive/2010/10/28/rx-design-guidelines.aspx</a>
The PDF is here
<a href="http://go.microsoft.com/fwlink/?LinkID=205219">http://go.microsoft.com/fwlink/?LinkID=205219</a>
Great stuff. Next; the FxCop rules for static analysis?<br />
<br />
More guidance at <a href="http://www.introtorx.com/">IntroToRx.com</a> in the <a href="http://www.introtorx.com/Content/v1.0.10621.0/18_UsageGuidelines.html">Usage Guidelines appendix</a></div>Lee Campbellhttp://www.blogger.com/profile/16932445715757919177noreply@blogger.com1tag:blogger.com,1999:blog-455072646448673416.post-32260803423346163522010-10-26T08:38:00.003+01:002012-06-18T17:24:30.219+01:00Rx Part 8 - Testing Rx<div dir="ltr" style="text-align: left;" trbidi="on">
<b>STOP THE PRESS!</b> This series has now been superseded by the online book <a href="http://www.introtorx.com/">www.IntroToRx.com</a>. The new site/book offers far better explanations, samples and depth of content. I hope you enjoy!
<br />
<br />
Having <a href="http://leecampbell.blogspot.com/2010/06/rx-part-6-scheduling-and-threading.html" target="_blank" title="Part 6 - Scheduling and Threading in Rx">reviewed the scheduling</a> available to us in Rx and now that we are aware of <a href="http://leecampbell.blogspot.com/2010/08/rx-part-7-hot-and-cold-observables.html" target="_blank" title="Part 7 - Hot and Cold Observables">Hot and Cold observables</a> we have almost enough skills in our tool belt to start using Rx in anger. However many developers wouldn’t dream of starting coding without first being able to write tests to prove their code is in fact satisfying their requirements and providing them with that safety net against regression. Rx poses some interesting problems to our Test Driven community<br />
<ul>
<li>Scheduling and therefore Threading are generally avoided in test scenarios as they can introduce race conditions which may lead to non-deterministic tests. </li>
<li>Tests should run as fast as possible. </li>
<li>Rx is a new technology/library so naturally as we master it, we will refactor our code. We want to use to tests to ensure our refactoring have not altered the internal behaviour of our code base. </li>
</ul>
So we want to test our code but don't want to introduce false-negatives, false-positives or non-deterministic tests. Also if we look at the Rx library there are plenty of methods that involve Scheduling so it is hard to ignore. This Linq query shows us that there are at least 32 extension methods that accept an IScheduler as a parameter<br />
<pre class="csharpcode">var query = from method <span class="kwrd">in</span> <span class="kwrd">typeof</span> (Observable).GetMethods()
from parameter <span class="kwrd">in</span> method.GetParameters()
<span class="kwrd">where</span> <span class="kwrd">typeof</span> (IScheduler).IsAssignableFrom(parameter.ParameterType)
group method by method.Name into m
orderby m.Key
select m.Key;
query.Run(Console.WriteLine);
<span class="rem">/* </span>
<span class="rem">BufferWithTime, Catch, Concat, Delay, Empty, Generate, GenerateWithTime, Interval</span>
<span class="rem">Merge, ObserveOn, OnErrorResumeNext, Prune, Publish, Range, Repeat, Replay</span>
<span class="rem">Retry, Return, Sample, Start, StartWith, Subscribe, SubscribeOn, Take, Throttle</span>
<span class="rem">Throw, TimeInterval, Timeout, Timer, Timestamp, ToAsync, ToObservable</span>
<span class="rem">*/</span></pre>
There are some methods that we already will be familiar with such as ObserveOn and SubscribeOn. Then there are others that will optionally take an IScheduler in one of the method overloads. TDD/TestFirst coders will want to opt for the overload that takes the IScheduler so that we can have some control over scheduling in our tests.<br />
In this example we create a stream that publishes values every second for 5 seconds. <br />
<pre class="csharpcode">var interval = Observable
.Interval(TimeSpan.FromSeconds(1))
.Take(5);</pre>
If we to write a test that ensured that we received 5 values and they were each 1 second apart it would take 5 seconds. That would be no good, I want hundreds if not thousands of tests to run in 5 seconds. A more common time related example that would need tests is setting a timeout.<br />
<pre class="csharpcode">var stream = Observable.Never<<span class="kwrd">int</span>>();
var exceptionThrown = <span class="kwrd">false</span>;
stream.Timeout(TimeSpan.FromMinutes(1))
.Run(
i => Console.WriteLine(<span class="str">"This will never run."</span>),
ex => exceptionThrown = <span class="kwrd">true</span>);
Assert.IsTrue(exceptionThrown);</pre>
This test would take 1 minute to run. However if we did some test first code in this style, where we added the timeout after the test, running the test initially to fail (Red-Green-Refactor) would never complete. Hmmmm…..<br />
<h4>
TestScheduler</h4>
To our rescue comes the TestScheduler. This is a recent addition to the Rx library that takes the concept of a virtual scheduler to allow us emulate and control time. Cool.<br />
The concept of a virtual scheduler can sort of be thought of a queue of actions to be executed that are each marked with the point in time they should be executed. Where actions are attempted to be scheduled at the same “time” the second collision will be scheduled or queued after the first action but marked as with the same point in “time” . When we use the TestScheduler we can either “drain the queue” by calling run which will execute all scheduled actions, or we can specify to run all tasks up to a point in time.<br />
In this example we schedule a task on to the queue to be run immediately by using the simple overload. We then execute everything scheduled for the first tick (ie the first action).<br />
<pre class="csharpcode">var scheduler = <span class="kwrd">new</span> TestScheduler();
var wasExecuted = <span class="kwrd">false</span>;
scheduler.Schedule(() => wasExecuted = <span class="kwrd">true</span>);
Assert.IsFalse(wasExecuted);
scheduler.RunTo(1); <span class="rem">//execute 1 tick of queued actions</span>
Assert.IsTrue(wasExecuted);</pre>
The TestScheduler type is actually an implementation of the abstract VirtualScheduler<TAbsolute, TRelative> where the TestSchuduler specifies long for both TAbsolute and TRelative. The net result is that the TestScheduler interface looks something like this<br />
<pre class="csharpcode"><span class="kwrd">public</span> <span class="kwrd">class</span> TestScheduler
{
<span class="kwrd">public</span> TestScheduler();
<span class="rem">// Methods</span>
<span class="kwrd">public</span> <span class="kwrd">long</span> FromTimeSpan(TimeSpan timeSpan);
<span class="kwrd">public</span> <span class="kwrd">long</span> Increment(<span class="kwrd">long</span> absolute, <span class="kwrd">long</span> relative);
<span class="kwrd">public</span> <span class="kwrd">void</span> Run();
<span class="kwrd">public</span> <span class="kwrd">void</span> RunTo(<span class="kwrd">long</span> time);
<span class="kwrd">public</span> IDisposable Schedule(Action action);
<span class="kwrd">public</span> IDisposable Schedule(Action action, TimeSpan dueTime);
<span class="kwrd">public</span> IDisposable Schedule(Action action, <span class="kwrd">long</span> dueTime);
<span class="kwrd">public</span> <span class="kwrd">void</span> Sleep(<span class="kwrd">long</span> ticks);
<span class="kwrd">public</span> DateTimeOffset ToDateTimeOffset(<span class="kwrd">long</span> absolute);
<span class="rem">// Properties</span>
<span class="kwrd">public</span> DateTimeOffset Now { get; }
<span class="kwrd">public</span> <span class="kwrd">long</span> Ticks { get; }
}</pre>
We should already be familiar with what the Schedule methods would do, the other method of interest for this post are the Run() and RunTo(long) methods. Run() will just execute all the actions that have been scheduled. RunTo(long) will execute all the actions that have been scheduled up to the time specified by the long value which represents ticks in this case. Having a quick look into the implementations for Schedule, RunTo and Run give us the insight we need to really understand the virtual schedulers.<br />
<pre class="csharpcode"><span class="kwrd">public</span> IDisposable Schedule(Action action)
{
<span class="kwrd">return</span> <span class="kwrd">this</span>.Schedule(action, TimeSpan.Zero);
}
<span class="kwrd">public</span> IDisposable Schedule(Action action, TimeSpan dueTime)
{
<span class="kwrd">return</span> <span class="kwrd">this</span>.Schedule(action, <span class="kwrd">this</span>.FromTimeSpan(dueTime));
}
<span class="kwrd">public</span> IDisposable Schedule(Action action, TRelative dueTime)
{
BooleanDisposable disposable = <span class="kwrd">new</span> BooleanDisposable();
TAbsolute local = <span class="kwrd">this</span>.Increment(<span class="kwrd">this</span>.Ticks, dueTime);
Action action2 = <span class="kwrd">delegate</span> {
<span class="kwrd">if</span> (!disposable.IsDisposed)
{
action();
}
};
ScheduledItem<TAbsolute> item = <span class="kwrd">new</span> ScheduledItem<TAbsolute>(action2, local);
<span class="kwrd">this</span>.queue.Enqueue(item);
<span class="kwrd">return</span> disposable;
}
<span class="kwrd">public</span> <span class="kwrd">void</span> RunTo(TAbsolute time)
{
<span class="kwrd">while</span> ((<span class="kwrd">this</span>.queue.Count > 0) && (<span class="kwrd">this</span>.queue.Peek().DueTime.CompareTo(time) <= 0))
{
ScheduledItem<TAbsolute> item = <span class="kwrd">this</span>.queue.Dequeue();
<span class="kwrd">this</span>.Ticks = item.DueTime;
item.Action();
}
}
<span class="kwrd">public</span> <span class="kwrd">void</span> Run()
{
<span class="kwrd">while</span> (<span class="kwrd">this</span>.queue.Count > 0)
{
ScheduledItem<TAbsolute> item = <span class="kwrd">this</span>.queue.Dequeue();
<span class="kwrd">this</span>.Ticks = item.DueTime;
item.Action();
}
}</pre>
Obviously this is the internal implementation and would be subject to change, however as the documentation currently is quite weak for the VirtualScheduler and TestScheduler I think this use of reflection is helpful. This example should clear up what happens when items are scheduled at the same time.<br />
<pre class="csharpcode">var scheduler = <span class="kwrd">new</span> TestScheduler();
<span class="kwrd">long</span> dueTime = 4L;
scheduler.Schedule(() => Console.WriteLine(<span class="str">"1"</span>), dueTime);
scheduler.Schedule(() => Console.WriteLine(<span class="str">"2"</span>), dueTime);
scheduler.Schedule(() => Console.WriteLine(<span class="str">"3"</span>), dueTime+1);
scheduler.Schedule(() => Console.WriteLine(<span class="str">"4"</span>), dueTime+1);
Console.WriteLine(<span class="str">"RunTo(dueTime)"</span>);
scheduler.RunTo(dueTime);
Console.WriteLine(<span class="str">"Run()"</span>);
scheduler.Run();
<span class="rem">/* Output:</span>
<span class="rem">RunTo(dueTime)</span>
<span class="rem">1</span>
<span class="rem">2</span>
<span class="rem">Run()</span>
<span class="rem">3</span>
<span class="rem">4</span>
<span class="rem">*/</span></pre>
<h4>
Testing Rx code</h4>
Now that we have learnt a little bit about the TestScheduler, lets look at how we could use it to get our 2 initial code snippets (Interval and TimeOut) to execute as fast as possible but still maintaining the semantics of time. In this example we generate our 5 values one second apart but pass in our TestScheduler to the Interval method.<br />
<pre class="csharpcode">[TestMethod]
<span class="kwrd">public</span> <span class="kwrd">void</span> Testing_with_test_scheduler()
{
var scheduler = <span class="kwrd">new</span> TestScheduler();
var interval = Observable
.Interval(TimeSpan.FromSeconds(1), scheduler)
.Take(5);
<span class="kwrd">bool</span> isComplete = <span class="kwrd">false</span>;
interval.Subscribe(Console.WriteLine, () => isComplete = <span class="kwrd">true</span>);
scheduler.Run();
Assert.IsTrue(isComplete); <span class="rem">//Executes in less than 0.01s "on my machine"</span>
}</pre>
While this is mildly interesting, what I think is more important is how we would test a real piece of code. Imagine if you will a Presenter that subscribes to a stream of prices. As prices are published it adds them to a ViewModel. Assuming this is a WPF or Silverlight implementation we take the liberty of enforcing that the subscription be done on the ThreadPool and the observing is executed on the Dispatcher.<br />
<pre class="csharpcode"><span class="kwrd">public</span> <span class="kwrd">class</span> MyPresenter
{
...
<span class="kwrd">public</span> <span class="kwrd">void</span> Show(<span class="kwrd">string</span> symbol)
{
_myService.PriceStream(symbol)
.SubscribeOn(Scheduler.ThreadPool)
.ObserveOn(Scheduler.Dispatcher)
.Subscribe(price=>_viewModel.Prices.Add(price));
}
...
}</pre>
While this snippet of code may do what we want it to do, it will be hard to test as it is accessing the schedulers via static properties. To help my testing, I have created my own interface that exposes the same IScheduler implementations that the Scheduler type does.<br />
<pre class="csharpcode"><span class="kwrd">public</span> <span class="kwrd">interface</span> ISchedulerService
{
IScheduler CurrentThread { get; }
IScheduler Dispatcher { get; }
IScheduler Immediate { get; }
IScheduler NewThread { get; }
IScheduler ThreadPool { get; }
<span class="rem">//IScheduler TaskPool { get; }</span>
}
<span class="kwrd">public</span> <span class="kwrd">sealed</span> <span class="kwrd">class</span> SchedulerService : ISchedulerService
{
<span class="kwrd">public</span> IScheduler CurrentThread { get { <span class="kwrd">return</span> Scheduler.CurrentThread; } }
<span class="kwrd">public</span> IScheduler Dispatcher { get { <span class="kwrd">return</span> Scheduler.Dispatcher; } }
<span class="kwrd">public</span> IScheduler Immediate { get { <span class="kwrd">return</span> Scheduler.Immediate; } }
<span class="kwrd">public</span> IScheduler NewThread { get { <span class="kwrd">return</span> Scheduler.NewThread; } }
<span class="kwrd">public</span> IScheduler ThreadPool { get { <span class="kwrd">return</span> Scheduler.ThreadPool; } }
<span class="rem">//public IScheduler TaskPool { get { return Scheduler.TaskPool; } }</span>
}</pre>
This now allows me to inject an ISchedulerService which gives me a seam to help my testing. I can now write some tests for my Presenter.<br />
<pre class="csharpcode">[TestInitialize]
<span class="kwrd">public</span> <span class="kwrd">void</span> SetUp()
{
_myServiceMock = <span class="kwrd">new</span> Mock<IMyService>();
_viewModelMock = <span class="kwrd">new</span> Mock<IViewModel>();
_schedulerService = <span class="kwrd">new</span> TestSchedulers();
var prices = <span class="kwrd">new</span> ObservableCollection<<span class="kwrd">decimal</span>>();
_viewModelMock.SetupGet(vm => vm.Prices).Returns(prices);
_viewModelMock.SetupProperty(vm => vm.IsConnected);
}
[TestMethod]
<span class="kwrd">public</span> <span class="kwrd">void</span> Should_pass_symbol_to_MyService_PriceStream()
{
var expected = <span class="str">"SomeSymbol"</span>;
var priceStream = <span class="kwrd">new</span> Subject<<span class="kwrd">decimal</span>>();
_myServiceMock.Setup(svc => svc.PriceStream(It.Is<<span class="kwrd">string</span>>(symbol=>symbol==expected))).Returns(priceStream);
var sut = <span class="kwrd">new</span> MyPresenter(_myServiceMock.Object, _viewModelMock.Object, _schedulerService);
sut.Show(expected);
_myServiceMock.Verify();
}
[TestMethod]
<span class="kwrd">public</span> <span class="kwrd">void</span> Should_add_to_VM_Prices_when_MyService_publishes_price()
{
<span class="kwrd">decimal</span> expected = 1.23m;
var priceStream = <span class="kwrd">new</span> Subject<<span class="kwrd">decimal</span>>();
_myServiceMock.Setup(svc => svc.PriceStream(It.IsAny<<span class="kwrd">string</span>>())).Returns(priceStream);
var sut = <span class="kwrd">new</span> MyPresenter(_myServiceMock.Object, _viewModelMock.Object, _schedulerService);
sut.Show(<span class="str">"SomeSymbol"</span>);
_schedulerService.ThreadPool.Schedule(() => priceStream.OnNext(expected)); <span class="rem">//Schedule the OnNext</span>
_schedulerService.ThreadPool.RunTo(1); <span class="rem">//Execute the OnNext action</span>
_schedulerService.Dispatcher.RunTo(1); <span class="rem">//Execute the OnNext Handler (ie adding to the Prices collection)</span>
Assert.AreEqual(1, _viewModelMock.Object.Prices.Count);
Assert.AreEqual(expected, _viewModelMock.Object.Prices.First());
}</pre>
These two tests show first a simple expectation that a string value passed to my Show(string) method will be passed to the underlying service. This is not at all relevant to Rx. The next test shows the usage of my implementation of the ISchedulerService specific for testing. It exposes all of the IScheduler properties as instances of TestSchedulers. This now allows me to inject TestSchedulers for testing which in-turn allows me to control the rate at which things are scheduled. <br />
For those of you new to Moq, some of the syntax my be a little bit confusing. Where you see a Setup(..) or SetupGet(..) method call there is just a little bit of Expression magic that tells my mocks to return correct thing when called. The .Object property hanging off my mocks are the dynamically generated implementations of the interfaces. This next test I hope you find the most interesting. Here we really get the full value out of our TestScheduler, by testing timeouts.<br />
<pre class="csharpcode">[TestMethod]
<span class="kwrd">public</span> <span class="kwrd">void</span> Should_timeout_if_no_prices_for_10_seconds()
{
var timeoutPeriod = TimeSpan.FromSeconds(10);
var priceStream = Observable.Never<<span class="kwrd">decimal</span>>();
_myServiceMock.Setup(svc => svc.PriceStream(It.IsAny<<span class="kwrd">string</span>>())).Returns(priceStream);
var sut = <span class="kwrd">new</span> MyPresenter(_myServiceMock.Object, _viewModelMock.Object, _schedulerService);
sut.Show(<span class="str">"SomeSymbol"</span>);
_schedulerService.ThreadPool.RunTo(timeoutPeriod.Ticks - 1);
Assert.IsTrue(_viewModelMock.Object.IsConnected);
_schedulerService.ThreadPool.RunTo(timeoutPeriod.Ticks);
Assert.IsFalse(_viewModelMock.Object.IsConnected);
}</pre>
The key points to note are:<br />
<ol>
<li>We return an Observable.Never for our price stream so that no prices are ever pushed and no OnComplete is published either. </li>
<li>The _schedulerService is a test fake that returns TestSchedulers for all of it’s schedulers </li>
<li>We run the ThreadPool TestScheduler up until 1 tick away from our timeout period and ensure that we have not timed out </li>
<li>We run the ThreadPool TestScheduler up to our timeout period and then ensure that we have timed out. </li>
<li>The test is sub second even though we are testing for a 10 second timeout! </li>
</ol>
Generally I only want to have one assertion in each of my tests, but for this example I think it elegantly tests that the Timeout is for 10seconds and not longer or shorter. If I was to remove the first assertion my test would only prove that the timeout was no greater than 10 seconds but could reasonably be set to say 3 seconds. The implementation for this simple presenter is as follows<br />
<pre class="csharpcode"><span class="kwrd">public</span> <span class="kwrd">class</span> MyPresenter
{
<span class="kwrd">private</span> <span class="kwrd">readonly</span> IMyService _myService;
<span class="kwrd">private</span> <span class="kwrd">readonly</span> IViewModel _viewModel;
<span class="kwrd">private</span> <span class="kwrd">readonly</span> ISchedulerService _schedulerService;
<span class="kwrd">public</span> MyPresenter(IMyService myService, IViewModel viewModel, ISchedulerService schedulerService)
{
_myService = myService;
_schedulerService = schedulerService;
_viewModel = viewModel;
}
<span class="kwrd">public</span> <span class="kwrd">void</span> Show(<span class="kwrd">string</span> symbol)
{
_myService.PriceStream(symbol)
.SubscribeOn(_schedulerService.ThreadPool)
.ObserveOn(_schedulerService.Dispatcher)
.Timeout(TimeSpan.FromSeconds(10), _schedulerService.ThreadPool)
.Subscribe(OnPriceUpdate, ex =>
{
<span class="kwrd">if</span> (ex <span class="kwrd">is</span> TimeoutException)
_viewModel.IsConnected = <span class="kwrd">false</span>;
});
_viewModel.IsConnected = <span class="kwrd">true</span>;
}
<span class="kwrd">private</span> <span class="kwrd">void</span> OnPriceUpdate(<span class="kwrd">decimal</span> price)
{
_viewModel.Prices.Add(price);
}
}</pre>
I hope this post on testing helps you bring you skills that you have been developing in Rx to a level where you may be comfortable considering them for production use.<br />
For your reference here are the two test versions of the ISchedulerService I find useful. One will just schedule everything with the ImmediateScheduler and the other uses the TestSchedulers for fine grained control.<br />
<pre class="csharpcode"><span class="kwrd">public</span> <span class="kwrd">sealed</span> <span class="kwrd">class</span> TestSchedulers : ISchedulerService
{
<span class="kwrd">private</span> <span class="kwrd">readonly</span> TestScheduler _currentThread = <span class="kwrd">new</span> TestScheduler();
<span class="kwrd">private</span> <span class="kwrd">readonly</span> TestScheduler _dispatcher = <span class="kwrd">new</span> TestScheduler();
<span class="kwrd">private</span> <span class="kwrd">readonly</span> TestScheduler _immediate = <span class="kwrd">new</span> TestScheduler();
<span class="kwrd">private</span> <span class="kwrd">readonly</span> TestScheduler _newThread = <span class="kwrd">new</span> TestScheduler();
<span class="kwrd">private</span> <span class="kwrd">readonly</span> TestScheduler _threadPool = <span class="kwrd">new</span> TestScheduler();
<span class="preproc">#region</span> Implementation of ISchedulerService
IScheduler ISchedulerService.CurrentThread { get { <span class="kwrd">return</span> _currentThread; } }
IScheduler ISchedulerService.Dispatcher { get { <span class="kwrd">return</span> _dispatcher; } }
IScheduler ISchedulerService.Immediate { get { <span class="kwrd">return</span> _immediate; } }
IScheduler ISchedulerService.NewThread { get { <span class="kwrd">return</span> _newThread; } }
IScheduler ISchedulerService.ThreadPool { get { <span class="kwrd">return</span> _threadPool; } }
<span class="preproc">#endregion</span>
<span class="kwrd">public</span> TestScheduler CurrentThread { get { <span class="kwrd">return</span> _currentThread; } }
<span class="kwrd">public</span> TestScheduler Dispatcher { get { <span class="kwrd">return</span> _dispatcher; } }
<span class="kwrd">public</span> TestScheduler Immediate { get { <span class="kwrd">return</span> _immediate; } }
<span class="kwrd">public</span> TestScheduler NewThread { get { <span class="kwrd">return</span> _newThread; } }
<span class="kwrd">public</span> TestScheduler ThreadPool { get { <span class="kwrd">return</span> _threadPool; } }
}
<span class="kwrd">public</span> <span class="kwrd">sealed</span> <span class="kwrd">class</span> ImmediateSchedulers : ISchedulerService
{
<span class="kwrd">public</span> IScheduler CurrentThread { get { <span class="kwrd">return</span> Scheduler.Immediate; } }
<span class="kwrd">public</span> IScheduler Dispatcher { get { <span class="kwrd">return</span> Scheduler.Immediate; } }
<span class="kwrd">public</span> IScheduler Immediate { get { <span class="kwrd">return</span> Scheduler.Immediate; } }
<span class="kwrd">public</span> IScheduler NewThread { get { <span class="kwrd">return</span> Scheduler.Immediate; } }
<span class="kwrd">public</span> IScheduler ThreadPool { get { <span class="kwrd">return</span> Scheduler.Immediate; } }
}</pre>
The full source code is now available either via svn at <a href="http://code.google.com/p/rx-samples/source/checkout">http://code.google.com/p/rx-samples/source/checkout</a> or as a <a href="http://rx-samples.googlecode.com/files/rx-samples_v009.zip" target="_blank" title="rx-samples source code">zip file</a>.<br />
Back to the <a href="http://leecampbell.blogspot.com/2010/08/reactive-extensions-for-net.html" title="Reactive Extensions for .NET Introduction - Lee Campbell">contents</a> page for <a href="http://leecampbell.blogspot.com/2010/08/reactive-extensions-for-net.html" title="Reactive Extensions for .NET Introduction - Lee Campbell">Reactive Extensions for .NET Introduction</a><br />
Back to the previous post; <a href="http://leecampbell.blogspot.com/2010/08/rx-part-7-hot-and-cold-observables.html" title="Rx Part 7 – Hot and Cold Observables">Rx Part 7 – Hot and Cold Observables</a><br />
Forward to next post; <a href="http://leecampbell.blogspot.com/2011/03/rx-part-9join-window-buffer-and-group.html">Part 9 – Join, Window, Buffer and Group Join</a><br />
<div class="wlWriterEditableSmartContent" id="scid:0767317B-992E-4b12-91E0-4F059A8CECA8:29ce7b1c-0f37-4531-be9c-bac237e93013" style="display: inline; float: none; margin: 0px; padding-bottom: 0px; padding-left: 0px; padding-right: 0px; padding-top: 0px;">
Technorati Tags: <a href="http://technorati.com/tags/Reactive-Extensions" rel="tag">Reactive-Extensions</a>,<a href="http://technorati.com/tags/Rx" rel="tag">Rx</a>,<a href="http://technorati.com/tags/Tutorial" rel="tag">Tutorial</a></div>
</div>Lee Campbellhttp://www.blogger.com/profile/16932445715757919177noreply@blogger.com4tag:blogger.com,1999:blog-455072646448673416.post-8872839929554557312010-08-24T17:38:00.002+01:002012-06-18T17:20:46.160+01:00Reactive Extensions for .NET an Introduction<div dir="ltr" style="text-align: left;" trbidi="on">
<b>STOP THE PRESS!</b> This series has now been superseded by the online book <a href="http://www.introtorx.com/">www.IntroToRx.com</a>. The new site/book offers far better explanations, samples and depth of content. I hope you enjoy!<br />
<br />
<br />
Welcome to my series introducing <a href="http://msdn.microsoft.com/en-us/devlabs/ee794896.aspx" target="_blank" title="Reactive Extensions to .NET">Reactive Extensions to .NET</a> (Rx). This series is aimed at any .NET developer curious about the IObservable<T><t> and IObserver<T><t> interfaces that have popped up in .NET 4. However Rx is a library so, <strike>Silverlight 3</strike> & Silverlight 4 developers as well as .NET 3.5, Windows Phone and JavaScript coders can download the library. Rx is big. It is big in allsorts of ways: </t></t><br />
<ol>
<li>In the way that it tackles the Observer pattern is bold </li>
<li>In the way it tackles concurrency is quite a shift from how I have done it before. </li>
<li>The number of (extension) methods is huge. </li>
<li>The way in which it integrates with LINQ to leverage LINQ's composability & declarative style </li>
<li>The fact that any .NET developer should care. UI developer, backend algorithm coder or integrator; It helps all of us. </li>
<li>The future plans are even more grand, but that is a different series all together :-) </li>
</ol>
In this series I will introduce you to: <br />
<ul>
<li>the new types the Rx will provide </li>
<li>the extension methods and how to use them </li>
<li>how to manage subscriptions to "streams" of data </li>
<li>how to deal with concurrency to your advantage and avoid the common old pitfalls </li>
<li>how to compose, aggregate and transform streams </li>
<li>how to build workflows with Rx </li>
<li>some tips and tricks I have picked while using Rx over the past months. </li>
</ul>
So <a href="http://msdn.microsoft.com/en-us/data/gg577609" target="_blank" title="Reactive Extensions to .NET">download the assemblies</a> to reference, fire up Visual Studio and let's get started: <br />
<ul>
<li><a href="http://leecampbell.blogspot.com/2010/05/intro-to-rx.html" title="Part 1 - Introduction to Rx">Part 1 - Introduction to Rx</a> </li>
<li><a href="http://leecampbell.blogspot.com/2010/05/rx-part-2-static-and-extension-methods.html" title="Part 2 - Static and extension methods">Part 2 - Static and extension methods</a> </li>
<li><a href="http://leecampbell.blogspot.com/2010/05/rx-part-3-lifetime-management.html" title="Part 3 - Lifetime management – Completing and Unsubscribing">Part 3 - Lifetime management – Completing and Unsubscribing</a> </li>
<li><a href="http://leecampbell.blogspot.com/2010/05/rx-part-4-flow-control.html" title="Part 4 - Flow control">Part 4 - Flow control</a> </li>
<li><a href="http://leecampbell.blogspot.com/2010/06/rx-part-5-combining-multiple.html" title="Part 5 - Combining multiple IObservable streams">Part 5 - Combining multiple IObservable streams</a> </li>
<li><a href="http://leecampbell.blogspot.com/2010/06/rx-part-6-scheduling-and-threading.html" title="Part 6 - Scheduling and threading">Part 6 - Scheduling and threading</a> </li>
<li><a href="http://leecampbell.blogspot.com/2010/08/rx-part-7-hot-and-cold-observables.html" title="Part 7 - Hot and Cold observables">Part 7 - Hot and Cold observables</a> </li>
<li><a href="http://leecampbell.blogspot.com/2010/10/rx-part-8-testing-rx.html" target="_blank" title="Part 8 - Testing Rx">Part 8 – Testing Rx</a> </li>
<li><a href="http://leecampbell.blogspot.com/2011/03/rx-part-9join-window-buffer-and-group.html" title="Part 9 – Join, Window, Buffer and Group Join">Part 9 – Join, Window, Buffer and Group Join</a> </li>
</ul>
The full source code is now available either via SVN at <a href="http://code.google.com/p/rx-samples/source/checkout">http://code.google.com/p/rx-samples/source/checkout</a> or as a <a href="http://rx-samples.googlecode.com/files/rx-samples_v009.zip" title="Introduction to Rx code samples.">zip file</a>.<br />
Edit – This series of posts was first created in mid 2010. The Rx framework has gone through numerous changes during and after the writing of these posts. I am constantly making an effort to ensure that the blog stays as current as I can make it –Lee Campbell<br />
<div class="wlWriterEditableSmartContent" id="scid:0767317B-992E-4b12-91E0-4F059A8CECA8:dc70c31a-ef69-4d99-9d16-8db5bd751782" style="display: inline; float: none; margin: 0px; padding-bottom: 0px; padding-left: 0px; padding-right: 0px; padding-top: 0px;">
Technorati Tags: <a href="http://technorati.com/tags/Reactive-Extensions" rel="tag">Reactive-Extensions</a>,<a href="http://technorati.com/tags/Rx" rel="tag">Rx</a>,<a href="http://technorati.com/tags/Tutorial" rel="tag">Tutorial</a></div>
</div>Lee Campbellhttp://www.blogger.com/profile/16932445715757919177noreply@blogger.com20tag:blogger.com,1999:blog-455072646448673416.post-72386536852031034732010-08-19T12:31:00.001+01:002012-06-18T17:24:20.840+01:00Rx Part 7 – Hot and Cold Observables<div dir="ltr" style="text-align: left;" trbidi="on">
<b>STOP THE PRESS!</b> This series has now been superseded by the online book <a href="http://www.introtorx.com/">www.IntroToRx.com</a>. The new site/book offers far better explanations, samples and depth of content. I hope you enjoy!
<br />
<br />
In this post we will look how to describe and handle 2 styles of observable streams <br />
<ol>
<li>Streams that are passive and start publishing on request, </li>
<li>Streams that are active and publish regardless of subscriptions. </li>
</ol>
In this sense passive streams are called <em>Cold</em> and active are described as being <em>Hot</em>. You can draw some similarities between implementations of the IObservable<T> interface and implementations of the IEnumerable<T> interface with regards to Hot and Cold. With IEnumerable<T> you could have an “On demand” collection via the yield return syntax, or you could have an eager evaluation by populating a List<T>, for example, and returning that (as per the example below)<br />
<pre class="csharpcode">Do(LazyEvaluation());
Do(EagerEvaluation());
<span class="kwrd">private</span> <span class="kwrd">void</span> Do(IEnumerable<<span class="kwrd">int</span>> list)
{
<span class="kwrd">foreach</span> (var i <span class="kwrd">in</span> list)
{
Console.WriteLine(<span class="str">"Read out first value of {0}"</span>, i);
<span class="kwrd">break</span>;
}
}
<span class="kwrd">public</span> IEnumerable<<span class="kwrd">int</span>> LazyEvaluation()
{
Console.WriteLine(<span class="str">"About to return 1"</span>);
<span class="kwrd">yield</span> <span class="kwrd">return</span> 1;
Console.WriteLine(<span class="str">"About to return 2"</span>);<span class="rem">//Never called in this example</span>
<span class="kwrd">yield</span> <span class="kwrd">return</span> 2;
}
<span class="kwrd">public</span> IEnumerable<<span class="kwrd">int</span>> EagerEvaluation()
{
var result = <span class="kwrd">new</span> List<<span class="kwrd">int</span>>();
Console.WriteLine(<span class="str">"About to return 1"</span>);
result.Add(1);
Console.WriteLine(<span class="str">"About to return 2"</span>);<span class="rem">//executed but not used.</span>
result.Add(2);
<span class="kwrd">return</span> result;
}</pre>
Implementations of IObservable<T> can exhibit similar variations in style.<br />
Examples of Hot observables that could publish regardless of if there are any subscribers would be:<br />
<ul>
<li>Mouse movements </li>
<li>Timer events </li>
<li>broadcasts like ESB channels or UDP network packets. </li>
<li>price ticks from a trading exchange </li>
</ul>
Some examples of Cold observables would be:<br />
<ul>
<li>subscription to a queue </li>
<li>when Rx is used for an asynchronous request </li>
<li>on demand streams </li>
</ul>
In this post we will look at 3 scenarios in which cold, hot and both cold & hot are implemented.<br />
<h4>
Cold Observables</h4>
In this first example we have a requirement to fetch a list of products from a service. In our implementation we choose to return an IObservable<string> and as we get the results we publish them until we have the full list and then we publish an OnComplete. This is a pretty simple example.<br />
<pre class="csharpcode"><span class="kwrd">private</span> <span class="kwrd">static</span> IObservable<<span class="kwrd">string</span>> GetProducts()
{
<span class="kwrd">return</span> Observable.CreateWithDisposable<<span class="kwrd">string</span>>(
o =>
{
<span class="kwrd">using</span>(var conn = <span class="kwrd">new</span> SqlConnection(<span class="str">@"Data Source=.\SQLSERVER;Initial Catalog=AdventureWorksLT2008;Integrated Security=SSPI;"</span>))
<span class="kwrd">using</span> (var cmd = <span class="kwrd">new</span> SqlCommand(<span class="str">"Select Name FROM SalesLT.ProductModel"</span>, conn))
{
conn.Open();
SqlDataReader reader = cmd.ExecuteReader(CommandBehavior.CloseConnection);
<span class="kwrd">while</span> (reader.Read())
{
o.OnNext(reader.GetString(0));
}
o.OnCompleted();
<span class="kwrd">return</span> Disposable.Create(()=>Console.WriteLine(<span class="str">"--Disposed--"</span>));
}
});
}</pre>
This style of API would allow for a non blocking call to fetch the list of products and would inform the consumer of when the list was complete. This is fairly common stuff, but note that every time this is called, the database will be accessed again. <br />
In the example above I use Disposable.Create factory method. This factory method just creates an implementation of IDisposable that executes a given action when disposed. This is perfect for doing a Console.WriteLine once the subscription has been disposed.<br />
In this example below, we have a consumer of our above code, but it explicitly only wants up to 3 values (the full set has 128 values). This code illustrates that the Take(3) expression will restrict what the consumer receives but GetProducts() method will still publish all of the values.<br />
<pre class="csharpcode"><span class="kwrd">public</span> <span class="kwrd">void</span> ColdSample()
{
var productStream = GetProducts().Take(3);
productStream.Subscribe(Console.WriteLine);
Console.ReadLine();
}</pre>
<h4>
Hot Observables</h4>
Trying to come up with an example for Hot Observables has been a real pain. I have started off with examples with some sort of context (streaming stock prices or weather information) but this all seemed to detract from the real working of the code. So I think it is best to step through this slowly with a contrived demo and build it up to a piece of code you might actually want to use.<br />
Let us start with subscribing to an Interval. In the example below we subscribe to the same Observable that is created via the Interval extension method. The delay between the two subscriptions should demonstrate that while they are subscribed to the same observable instance, it is not the same logical stream of data.<br />
<pre class="csharpcode"><span class="kwrd">public</span> <span class="kwrd">void</span> SimpleColdSample()
{
var period = TimeSpan.FromSeconds(1);
var observable = Observable.Interval(period);
observable.Subscribe(i => Console.WriteLine(<span class="str">"first subscription : {0}"</span>, i));
Thread.Sleep(period);
observable.Subscribe(i => Console.WriteLine(<span class="str">"second subscription : {0}"</span>, i));
Console.ReadKey();
<span class="rem">/* Ouput:</span>
<span class="rem"> first subscription : 0</span>
<span class="rem"> first subscription : 1</span>
<span class="rem"> second subscription : 0</span>
<span class="rem"> first subscription : 2</span>
<span class="rem"> second subscription : 1</span>
<span class="rem"> first subscription : 3</span>
<span class="rem"> second subscription : 2 </span>
<span class="rem"> */</span>
}</pre>
<h5>
Publish and Connect</h5>
If I want to be able to share the actual stream of data and not just the instance of the observable, I can use the Publish() extension method. This will return an IConnectableObservable<T>, which extends IObservable<T> by adding the single Connect() method. By using the Publish() then the Connect() method, we can get this functionality.<br />
<pre class="csharpcode"><span class="kwrd">public</span> <span class="kwrd">void</span> SimpleConnectSample()
{
var period = TimeSpan.FromSeconds(1);
var observable = Observable.Interval(period).Publish();
observable.Connect();
observable.Subscribe(i => Console.WriteLine(<span class="str">"first subscription : {0}"</span>, i));
Thread.Sleep(period);
observable.Subscribe(i => Console.WriteLine(<span class="str">"second subscription : {0}"</span>, i));
Console.ReadKey();
<span class="rem">/* Ouput:</span>
<span class="rem"> first subscription : 0</span>
<span class="rem"> first subscription : 1</span>
<span class="rem"> second subscription : 1</span>
<span class="rem"> first subscription : 2</span>
<span class="rem"> second subscription : 2 </span>
<span class="rem"> */</span>
}</pre>
In the example above the <em>observable</em> variable is an IConnectableObservable<T>, and by calling Connect() it will subscribe to the underlying (the Observable.Interval). In this case we are quick enough to subscribe before the first item is published but only on the first subscription. The second subscription subscribes late and misses the first publication. We could move the invocation of the Connect() method until after each of the subscriptions have been made so that even with the Thread.Sleep we wont really subscribe to the underlying until after both subscriptions are made. This would be done as follows:<br />
<pre class="csharpcode"><span class="kwrd">public</span> <span class="kwrd">void</span> SimpleConnectSample()
{
var period = TimeSpan.FromSeconds(1);
var observable = Observable.Interval(period).Publish();
observable.Subscribe(i => Console.WriteLine(<span class="str">"first subscription : {0}"</span>, i));
Thread.Sleep(period);
observable.Subscribe(i => Console.WriteLine(<span class="str">"second subscription : {0}"</span>, i));
observable.Connect();
Console.ReadKey();
<span class="rem">/* Ouput:</span>
<span class="rem"> first subscription : 0</span>
<span class="rem"> second subscription : 0</span>
<span class="rem"> first subscription : 1</span>
<span class="rem"> second subscription : 1</span>
<span class="rem"> first subscription : 2</span>
<span class="rem"> second subscription : 2 </span>
<span class="rem"> */</span>
}</pre>
You can probably imagine how this could be quite useful where an application had the need to share streams of data. In a trading application if you wanted to consume a price stream for a certain asset in more than one place, you would want to reuse that stream and not have to make another subscription to the server providing that data. Publish() and Connect() are perfect solutions for this. <br />
<h5>
Disposal of connections and subscriptions</h5>
What does become interesting is how disposal is performed. What was not covered above is that the Connect() method returns an IDisposable. By disposing of the “connection” you can turn the stream on and off (Connect() to turn it on and then disposing of the connection to turn it off). In this example we see that the the stream can be connected and disconnected multiple times.<br />
<pre class="csharpcode"><span class="kwrd">public</span> <span class="kwrd">void</span> ConnectAndDisposeSample()
{
var period = TimeSpan.FromSeconds(1);
var observable = Observable.Interval(period).Publish();
observable.Subscribe(i => Console.WriteLine(<span class="str">"subscription : {0}"</span>, i));
var exit = <span class="kwrd">false</span>;
<span class="kwrd">while</span> (!exit)
{
Console.WriteLine(<span class="str">"Press enter to connect, esc to exit."</span>);
var key = Console.ReadKey(<span class="kwrd">true</span>);
<span class="kwrd">if</span>(key.Key== ConsoleKey.Enter)
{
var connection = observable.Connect(); <span class="rem">//--Connects here--</span>
Console.WriteLine(<span class="str">"Press any key to dispose of connection."</span>);
Console.ReadKey();
connection.Dispose(); <span class="rem">//--Disconnects here--</span>
}
<span class="kwrd">if</span>(key.Key==ConsoleKey.Escape)
{
exit = <span class="kwrd">true</span>;
}
}
<span class="rem">/* Ouput:</span>
<span class="rem"> Press enter to connect, esc to exit.</span>
<span class="rem"> Press any key to dispose of connection.</span>
<span class="rem"> subscription : 0</span>
<span class="rem"> subscription : 1</span>
<span class="rem"> subscription : 2</span>
<span class="rem"> Press enter to connect, esc to exit.</span>
<span class="rem"> Press any key to dispose of connection.</span>
<span class="rem"> subscription : 0</span>
<span class="rem"> subscription : 1</span>
<span class="rem"> subscription : 2</span>
<span class="rem"> Press enter to connect, esc to exit. </span>
<span class="rem"> */</span>
}</pre>
Let us finally consider automatic disposal of a connection. It would be common place for a single stream to be shared between subscriptions, as per the price stream example mentioned above. It would however also be common place for the developer to want to only have the stream running hot if there are subscriptions to it. Therefore it seems not only obvious that there should be a mechanism for automatically connecting (once a subscription has been made), but also a mechanism for disconnecting (once there are no more subscriptions) from a stream. First let us look at what happens to a stream when we connect with no subscribers, and then later unsubscribe:<br />
<pre class="csharpcode"><span class="kwrd">public</span> <span class="kwrd">void</span> OrphanedStreamExample()
{
var period = TimeSpan.FromSeconds(1);
var observable = Observable.Interval(period)
.Do(l => Console.WriteLine(<span class="str">"Publishing {0}"</span>, l)) <span class="rem">//produce Side effect to show it is running.</span>
.Publish();
observable.Connect();
Console.WriteLine(<span class="str">"Press any key to subscribe"</span>);
Console.ReadKey();
var subscription = observable.Subscribe(i => Console.WriteLine(<span class="str">"subscription : {0}"</span>, i));
Console.WriteLine(<span class="str">"Press any key to unsubscribe."</span>);
Console.ReadKey();
subscription.Dispose();
Console.WriteLine(<span class="str">"Press any key to exit."</span>);
Console.ReadKey();
<span class="rem">/* Ouput:</span>
<span class="rem"> Press any key to subscribe</span>
<span class="rem"> Publishing 0</span>
<span class="rem"> Publishing 1</span>
<span class="rem"> Press any key to unsubscribe.</span>
<span class="rem"> Publishing 2</span>
<span class="rem"> subscription : 2</span>
<span class="rem"> Publishing 3</span>
<span class="rem"> subscription : 3</span>
<span class="rem"> Press any key to exit.</span>
<span class="rem"> Publishing 4</span>
<span class="rem"> Publishing 5</span>
<span class="rem"> */</span>
}</pre>
A few things to note here:<br />
<ol>
<li>I use the <em>Do</em> extension method to create side effects on the stream (ie writing to the console). This allows us to see when the stream is actually connected. </li>
<li>We connect first and then subscribe, which means we can be publishing without any subscriptions. </li>
<li>We dispose of our subscription but don’t dispose of the connection which means the stream will still be running. This means we will be publishing even though there are no subscriptions. </li>
</ol>
<h5>
RefCount</h5>
Taking the last example, if we just comment out the line that makes the Connection, and then add a further extension method to our creation of our observable RefCount we have magically implemented all of our requirements. RefCount will take an IConnectableObservable<T> and turn it back into an IObservable<T> and automatically implement the connect and disconnect behaviour we are looking for.<br />
<pre class="csharpcode"><span class="kwrd">public</span> <span class="kwrd">void</span> RefCountExample()
{
var period = TimeSpan.FromSeconds(1);
var observable = Observable.Interval(period)
.Do(l => Console.WriteLine(<span class="str">"Publishing {0}"</span>, l)) <span class="rem">//produce Side effect to show it is running.</span>
.Publish()
.RefCount();
<span class="rem">//observable.Connect(); Use RefCount instead now</span>
Console.WriteLine(<span class="str">"Press any key to subscribe"</span>);
Console.ReadKey();
var subscription = observable.Subscribe(i => Console.WriteLine(<span class="str">"subscription : {0}"</span>, i));
Console.WriteLine(<span class="str">"Press any key to unsubscribe."</span>);
Console.ReadKey();
subscription.Dispose();
Console.WriteLine(<span class="str">"Press any key to exit."</span>);
Console.ReadKey();
<span class="rem">/* Ouput:</span>
<span class="rem"> Press any key to subscribe</span>
<span class="rem"> Press any key to unsubscribe.</span>
<span class="rem"> Publishing 0</span>
<span class="rem"> subscription : 0</span>
<span class="rem"> Publishing 1</span>
<span class="rem"> subscription : 1</span>
<span class="rem"> Publishing 2</span>
<span class="rem"> subscription : 2</span>
<span class="rem"> Press any key to exit.</span>
<span class="rem"> */</span>
}</pre>
<h4>
Other Connectable Observables</h4>
While this is a post about Hot and Cold Observables, I think it is worth mentioning the other ways IConnectableObservable<T> can pop up.<br />
<h5>
Prune</h5>
The prune method is effectively a non blocking .Last() call. You can consider it similar to an AsyncSubject<T> wrapping your target Observable so that you get equivalent semantics of only returning the last value of an observable and only once it completes. <br />
<pre class="csharpcode"><span class="kwrd">public</span> <span class="kwrd">void</span> PruneExample()
{
var period = TimeSpan.FromSeconds(1);
var observable = Observable.Interval(period)
.Take(5)
.Do(l => Console.WriteLine(<span class="str">"Publishing {0}"</span>, l)) <span class="rem">//produce Side effect to show it is running.</span>
.Prune();
observable.Connect();
Console.WriteLine(<span class="str">"Press any key to subscribe"</span>);
Console.ReadKey();
var subscription = observable.Subscribe(i => Console.WriteLine(<span class="str">"subscription : {0}"</span>, i));
Console.WriteLine(<span class="str">"Press any key to unsubscribe."</span>);
Console.ReadKey();
subscription.Dispose();
Console.WriteLine(<span class="str">"Press any key to exit."</span>);
Console.ReadKey();
<span class="rem">/* Ouput:</span>
<span class="rem"> Press any key to subscribe</span>
<span class="rem"> Publishing 0</span>
<span class="rem"> Publishing 1</span>
<span class="rem"> Press any key to unsubscribe.</span>
<span class="rem"> Publishing 2</span>
<span class="rem"> Publishing 3</span>
<span class="rem"> Publishing 4</span>
<span class="rem"> subscription : 4</span>
<span class="rem"> Press any key to exit.</span>
<span class="rem"> */</span>
}</pre>
<h5>
Replay </h5>
The Replay extension method allows you take an existing Observable and give it “replay” semantics as per the ReplaySubject<T>. As a reminder, the ReplaySubject<T> will cache all values so that any late subscribers will also get all of the values. In this example 2 subscriptions are made on time, and then a third subscription can be made after they complete. Even though the third subscription can be done after the OnComplete we can still get all of the values.<br />
<pre class="csharpcode"><span class="kwrd">public</span> <span class="kwrd">void</span> ReplayOnHotExample()
{
var period = TimeSpan.FromSeconds(1);
var hot = Observable.Interval(period)
.Take(3)
.Publish();
hot.Connect();
Thread.Sleep(period); <span class="rem">//Run hot and ensure a value is lost.</span>
var observable = hot.Replay();
observable.Connect();
observable.Subscribe(i => Console.WriteLine(<span class="str">"first subscription : {0}"</span>, i));
Thread.Sleep(period);
observable.Subscribe(i => Console.WriteLine(<span class="str">"second subscription : {0}"</span>, i));
Console.ReadKey();
observable.Subscribe(i => Console.WriteLine(<span class="str">"third subscription : {0}"</span>, i));
Console.ReadKey();
<span class="rem">/* Ouput:</span>
<span class="rem"> first subscription : 1</span>
<span class="rem"> second subscription : 1</span>
<span class="rem"> first subscription : 2</span>
<span class="rem"> second subscription : 2 </span>
<span class="rem"> third subscription : 1</span>
<span class="rem"> third subscription : 2</span>
<span class="rem"> */</span>
}</pre>
<br />
I hope that gives some insight to <em>Hot</em> and <em>Cold</em> observables. I think this is one of the more mysterious parts of Rx for many newbies so I hope that it helps clear up some of the cool stuff you can do with Rx. I sure many of you will look forward to finding ways to implement this in your next application.<br />
For more information on IConnectableObservable<T> and Hot/Cold streams check out these resources:<br />
<a href="http://channel9.msdn.com/posts/J.Van.Gogh/Rx-API-in-depth-Hot-and-Cold-observables/" target="_blank" title="Rx API in depth Hot and Cold Observables on channel9.msdn.com">channel 9 video on Hot and Cold observables</a><br />
<a href="http://blogs.microsoft.co.il/blogs/bnaya/archive/2010/03/13/rx-for-beginners-part-9-hot-vs-cold-observable.aspx" target="_blank" title="RX for beginners -Part 9 hot vs cold observable">Hot and Cold by Bnaya Eshet</a><br />
The full source code is now available either via svn at <a href="http://code.google.com/p/rx-samples/source/checkout">http://code.google.com/p/rx-samples/source/checkout</a> or as a <a href="http://rx-samples.googlecode.com/files/rx-samples_v009.zip" target="_blank" title="rx-samples source code">zip file</a>.<br />
Back to the <a href="http://leecampbell.blogspot.com/2010/08/reactive-extensions-for-net.html" title="Reactive Extensions for .NET Introduction - Lee Campbell">contents</a> page for <a href="http://leecampbell.blogspot.com/2010/08/reactive-extensions-for-net.html" title="Reactive Extensions for .NET Introduction - Lee Campbell">Reactive Extensions for .NET Introduction</a><br />
Back to the previous post; <a href="http://leecampbell.blogspot.com/2010/06/rx-part-6-scheduling-and-threading.html">Part 6 - Scheduling and threading</a><br />
Forward to next post; <a href="http://leecampbell.blogspot.com/2010/10/rx-part-8-testing-rx.html">Part 8 – Testing Rx</a><br />
<div class="wlWriterEditableSmartContent" id="scid:0767317B-992E-4b12-91E0-4F059A8CECA8:a717e7ae-9dec-4a1d-bc6c-446c0b3cde6f" style="display: inline; float: none; margin: 0px; padding-bottom: 0px; padding-left: 0px; padding-right: 0px; padding-top: 0px;">
Technorati Tags: <a href="http://technorati.com/tags/Reactive-Extensions" rel="tag">Reactive-Extensions</a>,<a href="http://technorati.com/tags/Rx" rel="tag">Rx</a>,<a href="http://technorati.com/tags/Tutorial" rel="tag">Tutorial</a></div>
</div>Lee Campbellhttp://www.blogger.com/profile/16932445715757919177noreply@blogger.com8