Synchronous requests

Jun 27, 2014 at 3:10 PM
Hi,

Thanks for this project.

I'm a bit new to Bloomberg, its API and this emulator. I prefer to use synchronous requests, but I can't get the emulator to work.

For example, if you run the code in SimpleSubscriptionExample.cs that you get in the real blapi .NET download, it fails.

Is the synchronous side something that has to be implemented yet?

thanks
Francois
Coordinator
Jun 30, 2014 at 6:21 PM
Francois,

I implemented market data subscriptions only as asynchronous operations. I did this because market data events are fundamentally asynchronous in nature. For the other four requests types (historical data requests, etc.), I implemented them as synchronous operations because they are fundamentally synchronous in nature: send one request, receive one response.

The actual BB API implements all of these five requests both synchronously and asynchronously. I think that internally it handles all requests asynchronously no matter what. Right now I have no plans on implementing synchronous market data subscription emulation. I might change my mind if enough users request such a feature.

For an example, take a look at my C# market data example here: https://bemu.codeplex.com/SourceControl/latest#Examples/MarketDataRequest.cs You can use this as a guide for writing asynchronous events. It's not that difficult in C#.

Please let me know if you have any other questions.

-Jordan
Jul 1, 2014 at 8:06 AM
Thanks. Yes, asynchronous isn't that difficult, but I have a specific situation where I want to use synchronous.

I'm developing an app for different users, each with access to a different source of financial indicators. Some use Bloomberg, some use I-Net Bridge and some use an internal data warehouse. I want to create a common interface and implement this interface for each data source, so that the user can easily switch between sources. This is much simpler to implement synchronously (some sources don't support asynchronous requests).

F
Coordinator
Jul 1, 2014 at 2:25 PM
Edited Jul 1, 2014 at 2:28 PM
Francois,

Have you considered putting a synchronous wrapper on top of the asynchronous BB code? This would turn asynchronous responses into synchronous responses. I threw together some code that demonstrates one way to do this. In this code, I have a wrapper that uses a Queue to hold messages produced by BB market data events. The wrapper exposes a blocking GetNextMessage() method and two non-blocking TryGetNextMessage() methods that read from the private Queue.

(If I was using .Net 4.0 or higher, I'd use a BlockingQueue<T> instead)
namespace Examples_csh
{
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;

    using Bloomberglp.Blpapi;

    //This declaration specifies that EventHandler refers to Bloomberglp.Blpapi.EventHandler and not System.EventHandler.  The Bloomberg API named this ambiguously.
    using EventHandler = Bloomberglp.Blpapi.EventHandler;

    public static class MarketDataRequestSync
    {
        public static void RunExample()
        {
            Console.WriteLine("Starting example");

            using (SyncWrapper sync = new SyncWrapper())
            {
                TimeSpan cutoff = TimeSpan.FromSeconds(10);
                System.Diagnostics.Stopwatch stop = System.Diagnostics.Stopwatch.StartNew();

                while (stop.Elapsed < cutoff) //run for 10 seconds
                {
                    TickType tick = sync.GetNextMessage(); //acts synchronously
                    Console.WriteLine(tick.ToString());
                }
            }

            Console.WriteLine("Stopping example");
        }

        private class TickType
        {
            public string Security { get; set; }
            public string Field { get; set; }
            public double Value { get; set; }

            public override string ToString()
            {
                return string.Format("{0}: {1} = {2}", this.Security, this.Field, this.Value);
            }
        }

        private class SyncWrapper : IDisposable
        {
            private readonly Session _session;
            private readonly List<string> _fields;
            private readonly Queue<TickType> _messageQ; //I'll use this as a blocking queue
            private readonly object _syncroot = new object();
            private readonly System.Threading.AutoResetEvent _waiter = new System.Threading.AutoResetEvent(false);

            public SyncWrapper()
            {
                this._fields = new List<string>(new string[] { "BID", "ASK" });
                this._messageQ = new Queue<TickType>();

                SessionOptions sessionOptions = new SessionOptions();
                sessionOptions.ServerHost = "localhost";
                sessionOptions.ServerPort = 8194;

                this._session = new Session(sessionOptions, new EventHandler(this.ProcessEvent));
                this._session.StartAsync();
            }

            public void Dispose()
            {
                this._session.Dispose();
            }

            public TickType GetNextMessage() //blocks
            {
                lock (this._syncroot) //protect this._messageQ
                {
                    if (this._messageQ.Any())
                        return this._messageQ.Dequeue();
                }

                this._waiter.WaitOne(); //wait for the messageQ to have content
                return this.GetNextMessage();
            }

            public bool TryGetNextMessage(out TickType result) //non-blocking
            {
                lock (this._syncroot) //protect this._messageQ
                {
                    if (this._messageQ.Any())
                    {
                        result = this._messageQ.Dequeue();
                        return true;
                    }
                }

                result = null;
                return false;
            }

            public bool TryGetNextMessage(TimeSpan timeout, out TickType result) //non-blocking, timeout
            {
                lock (this._syncroot) //protect this._messageQ
                {
                    if (this._messageQ.Any())
                    {
                        result = this._messageQ.Dequeue();
                        return true;
                    }
                }

                this._waiter.WaitOne(timeout); //wait for the messageQ to have content, or timeout
                return this.TryGetNextMessage(out result);
            }

            private void ProcessEvent(Event evt, Session session)
            {
                switch (evt.Type)
                {
                    case Event.EventType.SESSION_STATUS: //use this to open the service
                        foreach (Message message in evt.GetMessages())
                        {
                            if (message.MessageType.Equals("SessionStarted"))
                            {
                                try
                                {
                                    session.OpenServiceAsync("//blp/mktdata", new CorrelationID(-9999));
                                }
                                catch (Exception)
                                {
                                    Console.Error.WriteLine("Could not open //blp/mktdata for async");
                                }
                            }
                        }
                        break;

                    case Event.EventType.SERVICE_STATUS: //use this to subscribe to ticker feeds
                        List<Subscription> slist = new List<Subscription>();
                        slist.Add(new Subscription("SPY US EQUITY", this._fields));
                        slist.Add(new Subscription("MSFT US EQUITY", this._fields));

                        session.Subscribe(slist);
                        break;

                    case Event.EventType.SUBSCRIPTION_DATA:
                    case Event.EventType.RESPONSE:
                    case Event.EventType.PARTIAL_RESPONSE:
                        const bool excludeNullElements = true;
                        foreach (Message message in evt.GetMessages())
                        {
                            string security = message.TopicName;
                            foreach (var field in this._fields.Where(w => message.HasElement(w, excludeNullElements)))
                            {
                                double value = message.GetElementAsFloat64(field);

                                lock (this._syncroot) //protect this._messageQ
                                {
                                    this._messageQ.Enqueue(new TickType() { Security = security, Field = field, Value = value });
                                    this._waiter.Set();
                                }
                            }
                        }
                        break;
                }
            }
        }

    }
}
Jul 2, 2014 at 11:48 AM
Thanks very much.

I've been playing a bit with the Nito.Async package to synchronise and async thread and landed up with a very similar approach. But in the end I found that I misunderstood what reference and market data was for. I could get my 'latest price' info from the refdata after all.

Thanks
F
Coordinator
Jul 3, 2014 at 5:05 PM
I see. One quick note, if you find yourself requesting reference data often, you might be better off using market data subscriptions instead. The reason is that reference data requests use up your Bloomberg data caps more quickly than market data subscriptions.

Note that my emulator doesn't emulate data caps. You can make any number of requests.

Please let me know if you have any other questions.