Event Filters - Dispatching

A common scenario with event driven architectures is that you want a property to be thread safe for writing, but you want all events raised by that property to be raised on the UI threa.

MetaProperties supports this through the DispatchingEventFilter. This event filter is included in the MetaProperties.EventFilters assembly.

It will asynchronously dispatch the task of raising the container's events to a particular thread. This means that the event order is guaranteed (if you are using the default RaiseEventsInsideLock behaviour of the MultiThreadedWriteObservable`1) but events such as PropertyChanging are less useful as the property value is likely to have already changed by the time you receive the event.

Should you require that the events are synchronously dispatched there is also a SynchronousDispatchingEventFilter. With this filter the PropertyChanging event will be processed before the property value has changed, however you should also consider the extra risks involved with raising the events while still holding onto the container's lock on another thread.

Below is a class which contains one thread safe property (using MultiThreadedWriteObservable`1) with a DispatchingEventFilter attached:

    public class MyThreadSafeObject : MetaPropertyHost
    {
        // We use a container which is thread safe for writing.
        private readonly MultiThreadedWriteObservable<int> observableData
            = new MultiThreadedWriteObservable<int>();

        // Expose the property as normal.
        public int Data
        {
            get { return this.observableData.Value; }
            set { this.observableData.Value = value; }
        }

        // We add an event filter to the exposed MetaProperty.
        [SetEventFilter(typeof(DispatchingEventFilter))]
        public IMetaProperty<int> DataProperty
        {
            get { return this.observableData.MetaProperty; }
        }
    }


We attach the event filter using the SetEventFilterAttribute class. This attribute takes one or more Type objects in an order which will form your event filter pipeline.

If you are deriving from MetaPropertyHost then these attributes on publicly exposed MetaProperties will automatically be picked up and passed into into the Initialize method of the IMetaProperty.

You can therefore configure the event pipeline with manually initialized MetaProperties by passing in the attributes yourself:

    public class MyThreadSafeObject : MetaPropertyHost
    {
        // We use a container which is thread safe for writing.
        private readonly MultiThreadedWriteObservable<int> observableData
            = new MultiThreadedWriteObservable<int>();

        public MyThreadSafeObject()
        {
            // Initialize the MetaProperty manually, passing in the attributes.
            this.Initialize(
                this.observableData.MetaProperty,
                new SetEventFilterAttribute(typeof(DispatchingEventFilter)));
        }
    }


The code below tests the first implementation of MyThreadSafeObject by creating a synchronization context and storing the thread IDs of the threads used while performing critical actions:

    // This class will contain the results of our test.
    public class Results
    {
        private readonly AutoResetEvent waitHandle = new AutoResetEvent(false);

        public int SyncContextThreadId { get; set; }
        public int SetterThreadId { get; set; }
        public int EventThreadId { get; set; }

        public AutoResetEvent WaitHandle
        {
            get { return this.waitHandle; }
        }
    }

    // Here we run our test.
    public void DispatchingEventFilterExample()
    {
        Results results = new Results();

        // We create a synchronization context for the event filter to dispatch to.
        using (var syncContext = new SingleThreadSynchronizationContext())
        {
            // Run the main test in the synchronization context's thread.
            syncContext.Send(() => this.DispatchingEventFilterExampleRunner(results));

            // Wait for the test to finish.
            results.WaitHandle.WaitOne();
        }

        // Verify that we executed on the expected threads.
        Assert.AreEqual(results.SyncContextThreadId, results.EventThreadId);
        Assert.AreNotEqual(results.SyncContextThreadId, results.SetterThreadId);

        Assert.AreNotEqual(0, results.SyncContextThreadId);
        Assert.AreNotEqual(0, results.SetterThreadId);
        Assert.AreNotEqual(0, results.EventThreadId);
    }

    // This runs the main part of the test on the synchronization context thread.
    public void DispatchingEventFilterExampleRunner(Results results)
    {
        // Save the synchronization context thread ID.
        results.SyncContextThreadId = Thread.CurrentThread.ManagedThreadId;

        // Create our test object.
        MyThreadSafeObject threadSafeObject = new MyThreadSafeObject();

        // When the data value changes, we save the thread ID and
        // set the wait handle.
        threadSafeObject.DataProperty.ValueChanged +=
            (s, e) =>
            {
                results.EventThreadId = Thread.CurrentThread.ManagedThreadId;
                results.WaitHandle.Set();
            };

        // Now we queue up a task on the thread pool that will save the 
        // thread ID (to verify it isn't the synchronization context thread)
        // and set the data.
        ThreadPool.QueueUserWorkItem(
            o =>
            {
                results.SetterThreadId = Thread.CurrentThread.ManagedThreadId;
                threadSafeObject.Data = 1;
            });
    }

Using a custom synchronization context

By default the DispatchingEventFilter will use the synchronization context found in SynchronizationContext.Current when it is instantiated. You can override this behaviour using an ISynchronizationContextContainer derived class.

This interface is defined below:

    public interface ISynchronizationContextContainer
    {
        SynchronizationContext SynchronizationContext { get; }
    }


It just contains one single getter property which returns the synchronization context. Where this context comes from is up to your implementation, but it could come from a service locator or a static member of a class.

Below is an example ISynchronizationContextContainer implementation which uses a static field to store the context:

    public class MainSynchronizationContextContainer 
        : ISynchronizationContextContainer
    {
        private static SynchronizationContext StaticSynchronizationContext;

        public static void SetSynchronizationContext(
            SynchronizationContext synchronizationContext)
        {
            StaticSynchronizationContext = synchronizationContext;
        }

        public SynchronizationContext SynchronizationContext
        {
            get
            {
                return StaticSynchronizationContext;
            }
        }
    }


You specify to use this container with the SetSynchronizationContextContainerAttribute class:

    public class MyThreadSafeObject : MetaPropertyHost
    {
        // We use a container which is thread safe for writing.
        private readonly MultiThreadedWriteObservable<int> observableData 
            = new MultiThreadedWriteObservable<int>();

        // Expose the property as normal.
        public int Data
        {
            get { return this.observableData.Value; }
            set { this.observableData.Value = value; }
        }

        // We add an event filter to the exposed MetaProperty.
        [SetEventFilter(typeof(DispatchingEventFilter))]
        [SetSynchronizationContextContainer(typeof(MainSynchronizationContextContainer))]
        public IMetaProperty<int> DataProperty
        {
            get { return this.observableData.MetaProperty; }
        }
    }


The code below adapts the previous test by creating the MyThreadSafeObject on a thread not associated with the synchronization context, and indeed creates the object before the synchronization context has even been created. We then call the SetSynchronizationContext method on the MainSynchronizationContextContainer after creating the context and proceed as normal, but passing the thread safe object into the DispatchingEventFilterExampleRunner method.

    public void DispatchingEventFilterExample()
    {
        Results results = new Results();

        // Create our test object.
        MyThreadSafeObject threadSafeObject = new MyThreadSafeObject();

        // We create a synchronization context for the event filter to dispatch to.
        using (var syncContext = new SingleThreadSynchronizationContext())
        {
            // Configure the synchronization context container.
            MainSynchronizationContextContainer.SetSynchronizationContext(syncContext);

            // Run the main test in the synchronization context's thread.
            syncContext.Send(() => this.DispatchingEventFilterExampleRunner(threadSafeObject, results));

            // Wait for the test to finish.
            results.WaitHandle.WaitOne();
        }

        // Verify that we executed on the expected threads.
        Assert.AreEqual(results.SyncContextThreadId, results.EventThreadId);
        Assert.AreNotEqual(results.SyncContextThreadId, results.SetterThreadId);

        Assert.AreNotEqual(0, results.SyncContextThreadId);
        Assert.AreNotEqual(0, results.SetterThreadId);
        Assert.AreNotEqual(0, results.EventThreadId);
    }

    // This runs the main part of the test on the synchronization context thread.
    public void DispatchingEventFilterExampleRunner(
        MyThreadSafeObject threadSafeObject, Results results)
    {
        // Save the synchronization context thread ID.
        results.SyncContextThreadId = Thread.CurrentThread.ManagedThreadId;
        
        // When the data value changes, we save the thread ID and
        // set the wait handle.
        threadSafeObject.DataProperty.ValueChanged +=
            (s, e) =>
            {
                results.EventThreadId = Thread.CurrentThread.ManagedThreadId;
                results.WaitHandle.Set();
            };

        // Now we queue up a task on the thread pool that will save the 
        // thread ID (to verify it isn't the synchronization context thread)
        // and set the data.
        ThreadPool.QueueUserWorkItem(
            o =>
            {
                results.SetterThreadId = Thread.CurrentThread.ManagedThreadId;
                threadSafeObject.Data = 1;
            });
    }


The assertions still pass because the synchronization context is requested from the MainSynchronizationContextContainer every time an event is fired.

The synchronization contexts used in the above examples are included in the MetaProperties.SynchronizationContexts assembly and are based on the synchronization contexts provided by this MSDN magazine article:

http://msdn.microsoft.com/en-us/magazine/cc163321.aspx

Last edited Mar 9, 2010 at 1:18 PM by jamesthurley, version 3

Comments

No comments yet.