Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
tia / bbg / bbg_com.py
Size: Mime:
"""
methods for using the bloomberg COM API v3 from python

Written by Brian P. Smith (brian.p.smith@gmail.com)
"""
from pythoncom import PumpWaitingMessages
from win32com.client import DispatchWithEvents, constants, CastTo
from collections import defaultdict, namedtuple
from datetime import datetime, timedelta
from pandas import DataFrame, to_datetime, concat, Panel
import numpy as np

SecurityErrorAttrs = ['security', 'source', 'code', 'category', 'message', 'subcategory']
SecurityError = namedtuple('SecurityError', SecurityErrorAttrs)
FieldErrorAttrs = ['security', 'field', 'source', 'code', 'category', 'message', 'subcategory']
FieldError = namedtuple('FieldError', FieldErrorAttrs)

# poor mans debugging
DEBUG = False


class XmlHelper(object):
    @staticmethod
    def security_iter(nodearr):
        """ provide a security data iterator by returning a tuple of (Element, SecurityError) which are mutually exclusive """
        assert nodearr.Name == 'securityData' and nodearr.IsArray
        for i in range(nodearr.NumValues):
            node = nodearr.GetValue(i)
            err = XmlHelper.get_security_error(node)
            result = (None, err) if err else (node, None)
            yield result

    @staticmethod
    def message_iter(evt):
        """ provide a message iterator which checks for a response error prior to returning """
        iter = evt.CreateMessageIterator()
        while iter.Next():
            msg = iter.Message
            if DEBUG:
                print(msg.Print)
            if msg.AsElement.HasElement('responseError'):
                raise Exception(msg.AsElement.GetValue('message'))
            yield msg

    @staticmethod
    def get_sequence_value(node):
        """Convert an element with DataType Sequence to a DataFrame.
        Note this may be a naive implementation as I assume that bulk data is always a table
        """
        assert node.Datatype == 15
        data = defaultdict(list)
        cols = []
        for i in range(node.NumValues):
            row = node.GetValue(i)
            if i == 0:  # Get the ordered cols and assume they are constant
                cols = [str(row.GetElement(_).Name) for _ in range(row.NumElements)]

            for cidx in range(row.NumElements):
                col = row.GetElement(cidx)
                data[str(col.Name)].append(XmlHelper.as_value(col))
        return DataFrame(data, columns=cols)

    @staticmethod
    def as_value(ele):
        """ convert the specified element as a python value """
        dtype = ele.Datatype
        #print '%s = %s' % (ele.Name, dtype)
        if dtype in (1, 2, 3, 4, 5, 6, 7, 9, 12):
            # BOOL, CHAR, BYTE, INT32, INT64, FLOAT32, FLOAT64, BYTEARRAY, DECIMAL)
            return ele.Value
        elif dtype == 8:  # String
            val = ele.Value
            if val:
                # us centric :)
                val = val.encode('ascii', 'replace')
            return str(val)
        elif dtype == 10:  # Date
            v = ele.Value
            return datetime(year=v.year, month=v.month, day=v.day).date() if v else np.nan
        elif dtype == 11:  # Time
            v = ele.Value
            return datetime(hour=v.hour, minute=v.minute, second=v.second).time() if v else np.nan
        elif dtype == 13:  # Datetime
            v = ele.Value
            return datetime(year=v.year, month=v.month, day=v.day, hour=v.hour, minute=v.minute, second=v.second)
        elif dtype == 14:  # Enumeration
            raise NotImplementedError('ENUMERATION data type needs implemented')
        elif dtype == 16:  # Choice
            raise NotImplementedError('CHOICE data type needs implemented')
        elif dtype == 15:  # SEQUENCE
            return XmlHelper.get_sequence_value(ele)
        else:
            raise NotImplementedError('Unexpected data type %s. Check documentation' % dtype)

    @staticmethod
    def get_child_value(parent, name, allow_missing=0):
        """ return the value of the child element with name in the parent Element """
        if not parent.HasElement(name):
            if allow_missing:
                return np.nan
            else:
                raise Exception('failed to find child element %s in parent' % name)
        else:
            return XmlHelper.as_value(parent.GetElement(name))

    @staticmethod
    def get_child_values(parent, names):
        """ return a list of values for the specified child fields. If field not in Element then replace with nan. """
        vals = []
        for name in names:
            if parent.HasElement(name):
                vals.append(XmlHelper.as_value(parent.GetElement(name)))
            else:
                vals.append(np.nan)
        return vals

    @staticmethod
    def as_security_error(node, secid):
        """ convert the securityError element to a SecurityError """
        assert node.Name == 'securityError'
        src = XmlHelper.get_child_value(node, 'source')
        code = XmlHelper.get_child_value(node, 'code')
        cat = XmlHelper.get_child_value(node, 'category')
        msg = XmlHelper.get_child_value(node, 'message')
        subcat = XmlHelper.get_child_value(node, 'subcategory')
        return SecurityError(security=secid, source=src, code=code, category=cat, message=msg, subcategory=subcat)

    @staticmethod
    def as_field_error(node, secid):
        """ convert a fieldExceptions element to a FieldError or FieldError array """
        assert node.Name == 'fieldExceptions'
        if node.IsArray:
            return [XmlHelper.as_field_error(node.GetValue(_), secid) for _ in range(node.NumValues)]
        else:
            fld = XmlHelper.get_child_value(node, 'fieldId')
            info = node.GetElement('errorInfo')
            src = XmlHelper.get_child_value(info, 'source')
            code = XmlHelper.get_child_value(info, 'code')
            cat = XmlHelper.get_child_value(info, 'category')
            msg = XmlHelper.get_child_value(info, 'message')
            subcat = XmlHelper.get_child_value(info, 'subcategory')
            return FieldError(security=secid, field=fld, source=src, code=code, category=cat, message=msg,
                              subcategory=subcat)

    @staticmethod
    def get_security_error(node):
        """ return a SecurityError if the specified securityData element has one, else return None """
        assert node.Name == 'securityData' and not node.IsArray
        if node.HasElement('securityError'):
            secid = XmlHelper.get_child_value(node, 'security')
            err = XmlHelper.as_security_error(node.GetElement('securityError'), secid)
            return err
        else:
            return None

    @staticmethod
    def get_field_errors(node):
        """ return a list of FieldErrors if the specified securityData element has field errors """
        assert node.Name == 'securityData' and not node.IsArray
        nodearr = node.GetElement('fieldExceptions')
        if nodearr.NumValues > 0:
            secid = XmlHelper.get_child_value(node, 'security')
            errors = XmlHelper.as_field_error(nodearr, secid)
            return errors
        else:
            return None


def debug_event(evt):
    print('unhandled event: %s' % evt.EventType)
    if evt.EventType in [constants.RESPONSE, constants.PARTIAL_RESPONSE]:
        print('messages:')
        for msg in XmlHelper.message_iter(evt):
            print(msg.Print)


class ResponseHandler(object):
    def do_init(self, handler):
        """ will be called prior to waiting for the message """
        self.waiting = True
        self.exc_info = None
        self.handler = handler

    def set_evt_handler(self, handler):
        self.handler = handler

    def OnProcessEvent(self, evt):
        try:
            evt = CastTo(evt, 'Event')
            if not self.handler:
                debug_event(evt)

            if evt.EventType == constants.RESPONSE:
                self.handler.on_event(evt, is_final=True)
                self.waiting = False
            elif evt.EventType == constants.PARTIAL_RESPONSE:
                self.handler.on_event(evt, is_final=False)
            else:
                self.handler.on_admin_event(evt)
        except Exception:
            import sys

            self.waiting = False
            self.exc_info = sys.exc_info()

    @property
    def has_deferred_exception(self):
        return self.exc_info is not None

    def raise_deferred_exception(self):
        raise self.exc_info[1].with_traceback(self.exc_info[2])

    def do_cleanup(self):
        self.waiting = False
        self.exc_info = None
        self.handler = None


class Request(object):
    def __init__(self, ignore_security_error=0, ignore_field_error=0):
        self.field_errors = []
        self.security_errors = []
        self.ignore_security_error = ignore_security_error
        self.ignore_field_error = ignore_field_error

    @property
    def has_exception(self):
        if not self.ignore_security_error and len(self.security_errors) > 0:
            return True
        if not self.ignore_field_error and len(self.field_errors) > 0:
            return True

    def raise_exception(self):
        if not self.ignore_security_error and len(self.security_errors) > 0:
            msgs = ['(%s, %s, %s)' % (s.security, s.category, s.message) for s in self.security_errors]
            raise Exception('SecurityError: %s' % ','.join(msgs))
        if not self.ignore_field_error and len(self.field_errors) > 0:
            msgs = ['(%s, %s, %s, %s)' % (s.security, s.field, s.category, s.message) for s in self.field_errors]
            raise Exception('FieldError: %s' % ','.join(msgs))
        raise Exception('Programmer Error: No exception to raise')

    def get_bbg_request(self, svc, session):
        raise NotImplementedError()

    def get_bbg_service_name(self):
        raise NotImplementedError()

    def on_event(self, evt, is_final):
        raise NotImplementedError()

    def on_admin_event(self, evt):
        pass

    def execute(self):
        Terminal.execute_request(self)
        return self

    @staticmethod
    def apply_overrides(request, omap):
        """ add the given overrides (omap) to bloomberg request """
        if omap:
            for k, v in omap.items():
                o = request.GetElement('overrides').AppendElment()
                o.SetElement('fieldId', k)
                o.SetElement('value', v)


class ReferenceDataRequest(Request):
    def __init__(self, symbols, fields, overrides=None, response_type='frame', ignore_security_error=0,
                 ignore_field_error=0):
        """
        response_type: (frame, map) how to return the results
        """
        assert response_type in ('frame', 'map')
        Request.__init__(self, ignore_security_error=ignore_security_error, ignore_field_error=ignore_field_error)
        self.symbols = isinstance(symbols, str) and [symbols] or symbols
        self.fields = isinstance(fields, str) and [fields] or fields
        self.overrides = overrides or {}
        # response related
        self.response = {} if response_type == 'map' else defaultdict(list)
        self.response_type = response_type

    def __repr__(self):
        fmtargs = dict(clz=self.__class__.__name__,
                       symbols=','.join(self.symbols),
                       fields=','.join(self.fields),
                       overrides=','.join(['%s=%s' % (k, v) for k, v in self.overrides.items()]),
                       rt=self.response_type,
                       ise=self.ignore_security_error and True or False,
                       ife=self.ignore_field_error and True or False,
                       )
        return ('<{clz}([{symbols}], [{fields}], overrides={overrides}, response_type={rt}, ignore_security_error={ise},'
                + 'ignore_field_error={ife}').format(**fmtargs)

    def get_bbg_service_name(self):
        return '//blp/refdata'

    def get_bbg_request(self, svc, session):
        # create the bloomberg request object
        request = svc.CreateRequest('ReferenceDataRequest')
        [request.GetElement('securities').AppendValue(sec) for sec in self.symbols]
        [request.GetElement('fields').AppendValue(fld) for fld in self.fields]
        Request.apply_overrides(request, self.overrides)
        return request

    def on_security_node(self, node):
        sid = XmlHelper.get_child_value(node, 'security')
        farr = node.GetElement('fieldData')
        fdata = XmlHelper.get_child_values(farr, self.fields)
        assert len(fdata) == len(self.fields), 'field length must match data length'
        if self.response_type == 'map':
            self.response[sid] = fdata
        else:
            self.response['security'].append(sid)
            [self.response[f].append(d) for f, d in zip(self.fields, fdata)]
            # Add any field errors if
        ferrors = XmlHelper.get_field_errors(node)
        ferrors and self.field_errors.extend(ferrors)

    def on_event(self, evt, is_final):
        """ this is invoked from in response to COM PumpWaitingMessages - different thread """
        for msg in XmlHelper.message_iter(evt):
            for node, error in XmlHelper.security_iter(msg.GetElement('securityData')):
                if error:
                    self.security_errors.append(error)
                else:
                    self.on_security_node(node)

        if is_final and self.response_type == 'frame':
            index = self.response.pop('security')
            frame = DataFrame(self.response, columns=self.fields, index=index)
            frame.index.name = 'security'
            self.response = frame

    @property
    def response_as_series(self):
        """ Return the response as a single series """
        assert len(self.symbols) == 1, 'expected single request'
        if self.response_type == 'frame':
            return self.response.loc[self.symbols[0]]
        else:
            return pandas.Series(self.response[self.symbols])

    @property
    def response_as_field_values(self):
        assert len(self.symbols) == 1
        series = self.response_as_series
        vals = [series[f] for f in self.fields]
        return vals


class HistoricalDataRequest(Request):
    def __init__(self, symbols, fields, start=None, end=None, period='DAILY', overrides=None, ignore_security_error=0,
                 ignore_field_error=0):
        """Historical data request for bloomberg.

        Parameters
        ----------
        symbols : string or list
        fields : string or list
        start : start date (if None then use 1 year ago)
        end : end date (if None then use today)
        period : ('DAILY', 'WEEKLY', 'MONTHLY', 'QUARTERLY', 'SEMI_ANNUALLY', 'YEARLY')
        ignore_field_errors : bool
        ignore_security_errors : bool
        """
        Request.__init__(self, ignore_security_error=ignore_security_error, ignore_field_error=ignore_field_error)
        assert period in ('DAILY', 'WEEKLY', 'MONTHLY', 'QUARTERLY', 'SEMI_ANNUALLY', 'YEARLY')
        self.symbols = isinstance(symbols, str) and [symbols] or symbols
        self.fields = isinstance(fields, str) and [fields] or fields
        self.overrides = overrides or {}
        if start is None:
            start = datetime.today() - timedelta(365)
        if end is None:
            end = datetime.today()
        self.start = to_datetime(start)
        self.end = to_datetime(end)
        self.period = period
        # response related
        self.response = {}

    def __repr__(self):
        fmtargs = dict(clz=self.__class__.__name__,
                       symbols=','.join(self.symbols),
                       fields=','.join(self.fields),
                       start=self.start.strftime('%Y-%m-%d'),
                       end=self.end.strftime('%Y-%m-%d'),
                       period=self.period,
                       )
        return '<{clz}([{symbols}], [{fields}], start={start}, end={end}, period={period}'.format(**fmtargs)

    def get_bbg_service_name(self):
        return '//blp/refdata'

    def get_bbg_request(self, svc, session):
        # create the bloomberg request object
        request = svc.CreateRequest('HistoricalDataRequest')
        [request.GetElement('securities').AppendValue(sec) for sec in self.symbols]
        [request.GetElement('fields').AppendValue(fld) for fld in self.fields]
        request.Set('startDate', self.start.strftime('%Y%m%d'))
        request.Set('endDate', self.end.strftime('%Y%m%d'))
        request.Set('periodicitySelection', self.period)
        Request.apply_overrides(request, self.overrides)
        return request

    def on_security_data_node(self, node):
        """process a securityData node - FIXME: currently not handling relateDate node """
        sid = XmlHelper.get_child_value(node, 'security')
        farr = node.GetElement('fieldData')
        dmap = defaultdict(list)
        for i in range(farr.NumValues):
            pt = farr.GetValue(i)
            [dmap[f].append(XmlHelper.get_child_value(pt, f, allow_missing=1)) for f in ['date'] + self.fields]
        idx = dmap.pop('date')
        frame = DataFrame(dmap, columns=self.fields, index=idx)
        frame.index.name = 'date'
        self.response[sid] = frame

    def on_event(self, evt, is_final):
        """ this is invoked from in response to COM PumpWaitingMessages - different thread """
        for msg in XmlHelper.message_iter(evt):
            # Single security element in historical request
            node = msg.GetElement('securityData')
            if node.HasElement('securityError'):
                secid = XmlHelper.get_child_value(node, 'security')
                self.security_errors.append(XmlHelper.as_security_error(node.GetElement('securityError'), secid))
            else:
                self.on_security_data_node(node)

    def response_as_single(self, copy=0):
        """ convert the response map to a single data frame with Multi-Index columns """
        arr = []
        for sid, frame in self.response.items():
            if copy:
                frame = frame.copy()
            'security' not in frame and frame.insert(0, 'security', sid)
            arr.append(frame.reset_index().set_index(['date', 'security']))
        return concat(arr).unstack()

    def response_as_panel(self, swap=False):
        panel = Panel(self.response)
        if swap:
            panel = panel.swapaxes('items', 'minor')
        return panel


class IntrdayBarRequest(Request):
    def __init__(self, symbol, interval, start=None, end=None, event='TRADE'):
        """Intraday bar request for bloomberg

        Parameters
        ----------
        symbols : string
        interval : number of minutes
        start : start date
        end : end date (if None then use today)
        event : (TRADE,BID,ASK,BEST_BID,BEST_ASK)
        """
        Request.__init__(self)
        assert event in ('TRADE', 'BID', 'ASK', 'BEST_BID', 'BEST_ASK')
        assert isinstance(symbol, str)
        if start is None:
            start = datetime.today() - timedelta(30)
        if end is None:
            end = datetime.today()

        self.symbol = symbol
        self.interval = interval
        self.start = to_datetime(start)
        self.end = to_datetime(end)
        self.event = event
        # response related
        self.response = defaultdict(list)

    def __repr__(self):
        fmtargs = dict(clz=self.__class__.__name__,
                       symbol=self.symbol,
                       interval=self.interval,
                       start=self.start.strftime('%Y-%m-%d'),
                       end=self.end.strftime('%Y-%m-%d'),
                       event=self.event,
                       )
        return '<{clz}([{symbol}], interval={interval}, start={start}, end={end}, event={event}'.format(**fmtargs)


    def get_bbg_service_name(self):
        return '//blp/refdata'

    def get_bbg_request(self, svc, session):
        # create the bloomberg request object
        start, end = self.start, self.end
        request = svc.CreateRequest('IntradayBarRequest')
        request.Set('security', self.symbol)
        request.Set('interval', self.interval)
        request.Set('eventType', self.event)
        request.Set('startDateTime',
                    session.CreateDatetime(start.year, start.month, start.day, start.hour, start.minute))
        request.Set('endDateTime', session.CreateDatetime(end.year, end.month, end.day, end.hour, end.minute))
        return request

    def on_event(self, evt, is_final):
        """ this is invoked from in response to COM PumpWaitingMessages - different thread """
        response = self.response
        for msg in XmlHelper.message_iter(evt):
            bars = msg.GetElement('barData').GetElement('barTickData')
            for i in range(bars.NumValues):
                bar = bars.GetValue(i)
                ts = bar.GetElement(0).Value
                response['time'].append(datetime(ts.year, ts.month, ts.day, ts.hour, ts.minute))
                response['open'].append(bar.GetElement(1).Value)
                response['high'].append(bar.GetElement(2).Value)
                response['low'].append(bar.GetElement(3).Value)
                response['close'].append(bar.GetElement(4).Value)
                response['volume'].append(bar.GetElement(5).Value)
                response['events'].append(bar.GetElement(6).Value)

        if is_final:
            idx = response.pop('time')
            self.response = DataFrame(response, columns=['open', 'high', 'low', 'close', 'volume', 'events'], index=idx)


class Terminal(object):
    @classmethod
    def execute_request(cls, request):
        session = DispatchWithEvents('blpapicom.ProviderSession.1', ResponseHandler)
        session.Start()
        try:
            svcname = request.get_bbg_service_name()
            if not session.OpenService(svcname):
                raise Exception('failed to open service %s' % svcname)

            svc = session.GetService(svcname)
            asbbg = request.get_bbg_request(svc, session)
            session.SendRequest(asbbg)
            session.do_init(request)
            while session.waiting:
                PumpWaitingMessages()
            session.has_deferred_exception and session.raise_deferred_exception()
            request.has_exception and request.raise_exception()
            return request
        finally:
            session.Stop()
            session.do_cleanup()


if __name__ == '__main__':
    # 5 days ago
    import pandas

    d = pandas.datetools.BDay(-4).apply(datetime.now())
    m = pandas.datetools.BMonthBegin(-2).apply(datetime.now())

    def banner(msg):
        print('*' * 25)
        print(msg)
        print('*' * 25)

    banner('ReferenceDataRequest: single security, single field, frame response')
    req = ReferenceDataRequest('msft us equity', 'px_last', response_type='frame')
    print(req.execute().response)

    banner('ReferenceDataRequest: single security, single field, map response')
    req = ReferenceDataRequest('msft us equity', 'px_last', response_type='map')
    print(req.execute().response)

    banner('ReferenceDataRequest: multi-security, multi-field')
    req = ReferenceDataRequest(['eurusd curncy', 'msft us equity'], ['px_open', 'px_last'])
    print(req.execute().response)

    banner('ReferenceDataRequest: single security, multi-field (with bulk), frame response')
    req = ReferenceDataRequest('eurusd curncy', ['px_last', 'fwd_curve'])
    req.execute()
    print(req.response)
    # DataFrame within a DataFrame
    print(req.response.fwd_curve[0].tail())

    banner('ReferenceDataRequest: multi security, multi-field, bad field')
    req = ReferenceDataRequest(['eurusd curncy', 'msft us equity'], ['px_last', 'fwd_curve'], ignore_field_error=1)
    req.execute()
    print(req.response)

    banner('HistoricalDataRequest: multi security, multi-field, daily data')
    req = HistoricalDataRequest(['eurusd curncy', 'msft us equity'], ['px_last', 'px_open'], start=d)
    req.execute()
    print(req.response)
    print('--------- AS SINGLE TABLE ----------')
    print(req.response_as_single())

    banner('HistoricalDataRequest: multi security, multi-field, weekly data')
    req = HistoricalDataRequest(['eurusd curncy', 'msft us equity'], ['px_last', 'px_open'], start=m, period='WEEKLY')
    req.execute()
    print(req.response)
    print('--------- AS SINGLE TABLE ----------')
    print(req.response_as_single())
    print('--------- AS PANEL (id indexed) ----------')
    print(req.response_as_panel())
    print('--------- AS PANEL (field indexed) ----------')
    print(req.response_as_panel(swap=1))

    banner('IntrdayBarRequest: every hour')
    req = IntrdayBarRequest('eurusd curncy', 60, start=d)
    req.execute()
    print(req.response[-10:])

    #
    # HOW TO
    #
    # - Retrieve an fx vol surface:  BbgReferenceDataRequest('eurusd curncy', 'DFLT_VOL_SURF_MID')
    # - Retrieve a fx forward curve:  BbgReferenceDataRequest('eurusd curncy', 'FWD_CURVE')
    # - Retrieve dividends:  BbgReferenceDataRequest('csco us equity', 'BDVD_PR_EX_DTS_DVD_AMTS_W_ANN')