Jeremy Likness' Blog

Coroutines for Asynchronous Sequential Workflows using Reactive Extensions (Rx)

I've been doing quite a bit with Reactive Extensions (Rx) for Silverlight lately. One idea that I keep exploring is the concept of creative intuitive sequential workflows for asynchronous operations. You can read about my explorations using Wintellect's own Power Threading Library in this post along with a simple solution using an interface and enumerators in part 2 of that series. I'm tackling the problem from a different angle.

First, my disclaimers: this is more of an exploratory, "Here's what can be done" post. Please don't take this as best practice or guidance, but more of another way of looking at the solution as well as an opportunity to dive deeper into Reactive Extensions. Also understand I am by no means an expert with Rx so I welcome feedback related to this experiment.

The concept is straightforward: there are often times we want an asynchronous set of operations to perform sequentially. Perhaps you must load a list from a service, then load the selected item, then trigger an animation. This can be done either by chaining the completed events or nesting lambda expressions, but is there a cleaner way?

To me, a cleaner solution would make it easy to see what happens sequentially, and also easy to fire the sequence. If it becomes too complex to force the sequential workflow, it buys us nothing from the traditional methods of wiring into completed events or nesting lambda expressions.

So, I started with the idea of having a helper object that I could feed operations to in sequence, then fire it off. What would that look like?

First, I made an interface to keep track of operations as they run and complete. This is by no means a perfect design but for now it's a sort of "recrusive container" for work done. The interface looks like this:

public interface ISequentialResult 
{
    List<ISequentialResult> Results { get; set; }
    object Value { get; set; }        
}

The result is simple: it aggregates all previous results, and contains a pointer to whatever is processing the current action in the sequence. Simple enough. Now on to a base class:

public abstract class BaseResult<T> : ISequentialResult 
{
    protected IObserver<ISequentialResult> Observer { get; set; }

    protected BaseResult(IObserver<ISequentialResult> observer)
    {
        Results = new List<ISequentialResult>();
        Observer = observer;
    }

    public List<ISequentialResult> Results { get; set;}

    public object Value { get; set; }       

    public T TypedValue
    {
        get { return (T) Value; }
        set { Value = value; }
    }
}

The base class does a little more for us. First, it introduces the concept of an IObserver. The observer is simply a class that receives notifications, and the type is the type of the notification. In this case, we allow for a derived type that will create the observer as well as take our "object" value and type it to a specific value. You'll see how the observer works with our sequences in a bit.

Next, I created three implementations. One is just a default that does nothing, and the other two handle Click events and Storyboard interactions. Let's take a look at the handler for storyboards:

public class StoryboardResult : BaseResult<Storyboard>  
{
    public StoryboardResult(IObserver<ISequentialResult> observer, Storyboard sb) : base(observer)
    {
        TypedValue = sb;
        sb.Completed += SbCompleted;
        sb.Begin();
    }

    void SbCompleted(object sender, EventArgs e)
    {
        TypedValue.Completed -= SbCompleted;
        Observer.OnNext(this);
        Observer.OnCompleted();
    }       
}

You'll notice a few interesting things. We take in the story board, wire into its Completed event, then fire it off (we could have changed the interface to have an Execute method to give this more flexibility). The story board completion is the important piece to consider. First, the event is unhooked. Then, we pass the result to the observer. The observer is waiting for ISequentialResult notifications. We provide it with one, but only after the story board is completed. We also tell the observer "we're done" (an observer can listen for multiple push notifications, so we could have created a handler that never completed, or one that aggregated multiple storyboards and only completed when all story boards were done).

With that in mind, take a look at the Click handler:

public class ClickResult : BaseResult<ButtonBase>
{
    public ClickResult(IObserver<ISequentialResult> observer, ButtonBase button) : base(observer)
    {
        TypedValue = button;
        button.Click += ButtonClick;
    }

    void ButtonClick(object sender, System.Windows.RoutedEventArgs e)
    {
        TypedValue.Click -= ButtonClick;
        Observer.OnNext(this);
        Observer.OnCompleted();                 
    }
}

This is very similar to the storyboard. There is nothing to "kick off" but instead this result simply registers itself whenever the target is clicked. Why would we do that? I'll get to the example in a minute.

The most interesting part of workflow is the sequencer itself. This is the class that will manage the asychronous events and coordinate them. The key is that while they run sequentially, they also run asynchronously so there is no blocking.

Here is the sequencer class:

public class Sequencer : IDisposable 
{
    private IDisposable _sequence;
    private ISequentialResult _result;

    private readonly List<Func<IObserver<ISequentialResult>,ISequentialResult,ISequentialResult>> _sequences
        = new List<Func<IObserver<ISequentialResult>, ISequentialResult,ISequentialResult>>();

    public void AddSequence(Func<IObserver<ISequentialResult>, ISequentialResult,ISequentialResult> sequence)
    {
        _sequences.Add(sequence);
    }
        
    public void Run(Action<Exception> exception, Action<ISequentialResult> completed)
    {
        _sequence = Observable.Iterate(_Sequencer)
            .Subscribe(
                next => { },
                exception,
                ()=>completed(_result));
    }

    private IEnumerable<IObservable<Object>> _Sequencer()
    {
        var x = 0;

        _result = new DefaultResult(null) {Value = this};

        while (x (
                observer =>
                    {
                        var newResult = sequence(observer, _result);
                        newResult.Results.Add(_result);
                        newResult.Results.AddRange(_result.Results);
                        _result = newResult;
                        return () => { };
                    }
                ).Start();

            yield return step;                

            x++;
        }

        _result.Results.Add(_result);
    }

    public void Dispose()
    {
        _sequence.Dispose();
    }
}

There's a lot going on here, so let's break it down.

First, you'll notice two references: one to a disposable object, and one to a result. More on those in a bit.

The list may seem confusing at first, but it's only because of all of the type specifiers. Each "step" is represented by a function. The sequence will call the function with an observer and the previous result, and expect to get a new result back. Think of it as "here's who is watching, and what happened the last time ... now give me what you have."

A method is provided to add these steps to the sequence.

The run method is interesting. This is where we set up our disposable reference because we're creating a long-running observer that we want to dispose of when the sequence itself is disposed. The iterate function takes a list of observable sequences and observes them sequentially. We provide an enumerator that feeds the observable sequences, and for each sequence in the "outer loop" (the main algorithm that drives the sequential work flow) we simply drive through the collection. If an exception is encountered, we'll call back with the exception. When completed, we call back with the final result.

To better understand what's going on, take a look at the enumerator. For each iteration, we create a new observable stream. We call the function I mentioned earlier and pass the observer in. When we receive the result, we stack it recursively and store it, then iterate to the next in line. The yield statement ensures the sequential operation completes (when we call the OnCompleted in our result) before the next step begins.

That's the complicated part: setting up the core framework. Now comes the easy part: plugging into it.

In the XAML I placed a large red rectangle. There are three storyboards tied to the rectangle. One changes the colors, one shrinks it, and one twists it using the plane projection. There are also three buttons. One button kicks off the story boards. One button kicks off a sequential workflow that will run the story boards in order. The final button resets the story boards by calling Stop on them.

Let's have some fun. First, kicking off the story boards is simple enough:

private void Button_Click(object sender, RoutedEventArgs e)
{
    StoryTwist.Begin();
    StoryShrink.Begin();
    StoryColor.Begin();
}

The reset button is also easy:

private void Button_Click_2(object sender, RoutedEventArgs e)
{
    StoryTwist.Stop();
    StoryShrink.Stop();
    StoryColor.Stop();
}

Now for the sequential storyboards. This is where things should get easier for us. Instead of having to wire in several completed events or nesting lambdas, let's see what it looks like to run them in order using our sequencer:

private void Button_Click_1(object sender, RoutedEventArgs e)
{
    if (_sequence != null)
    {
        _sequence.Dispose();
    }

    _sequence = new Sequencer();
    _sequence.AddSequence((o, r) => new StoryboardResult(o, StoryColor));
    _sequence.AddSequence((o, r) => new StoryboardResult(o, StoryShrink));
    _sequence.AddSequence((o, r) => new StoryboardResult(o, StoryTwist));
    _sequence.Run(ex => MessageBox.Show(ex.Message),
                    result =>
                        {
                            foreach (var storyboard in
                                result.Results.Select(sequence => sequence.Value).OfType<Storyboard>())
                            {
                                storyboard.Stop();
                            }
                            _sequence.Dispose();
                            _sequence = null;
                        });
}       

So what's going on? If we have a previous sequence running, we dispose it so we can start fresh. Then, we simply add each story board in the order we want it to fire (the observer is provided to us by the sequencer). Finally, we run it. If there is an error, show it. When the sequence is done, we iterate the results and find any result that was a story board and call the "stop" method. This means after the sequence completes, it will automatically restore the rectangle to its original state.

Finally, to show just how powerful it is to drive sequential workflows without blocking, I added one more method:

private void _WaitForThreeClicks()
{
    _buttonSequence = new Sequencer();
    _buttonSequence.AddSequence((o, r) => new ClickResult(o, ResetButton));
    _buttonSequence.AddSequence((o, r) => new ClickResult(o, ResetButton));
    _buttonSequence.AddSequence((o, r) => new ClickResult(o, ResetButton));
    _buttonSequence.Run(ex=>MessageBox.Show(ex.Message),
        result=>
            {
                MessageBox.Show("You clicked the Reset button 3 times!");
                _buttonSequence.Dispose();
                _WaitForThreeClicks();
            });
}

This is a fun method. It literally creates three sequences, all "waiting" for the reset button to be clicked. After the sequence completes, we show a message indicating that 3 clicks happened, then restart the sequence recursively. I'll call it the first time just after IinitializeComponent:

public MainPage()
{
    InitializeComponent();  
    _WaitForThreeClicks();
}      

There you have it! When you run the code, you'll get this:

Notice that you can keep clicking the sequence to start it over: there is no blocking. And whatever order you decide to click on other buttons, the reset button always faithfully shows a "3 click" message on the third click.

You can download the full source code for this solution here.
Jeremy Likness

On Aug 22 2010 12:43 AMBy jlikness MEF, Async, .NET FrameworkWith 2 Comments

Comments (2)

  1. HI , I am very interested in your article. can you share me your code for this article? I found something wrong with class sequence when I use RX2.0, there is no Iterate() or Start() Method

Leave a Comment

Archives

Tags

Blogs