Repository URL to install this package:
|
Version:
0.3.2 ▾
|
"""
methods for using the bloomberg COM API v3 from python
Written by Brian P. Smith (brian.p.smith@gmail.com)
"""
from pythoncom import PumpWaitingMessages
from win32com.client import DispatchWithEvents, constants, CastTo
from collections import defaultdict, namedtuple
from datetime import datetime, timedelta
from pandas import DataFrame, to_datetime, concat, Panel
import numpy as np
SecurityErrorAttrs = [
"security",
"source",
"code",
"category",
"message",
"subcategory",
]
SecurityError = namedtuple("SecurityError", SecurityErrorAttrs)
FieldErrorAttrs = [
"security",
"field",
"source",
"code",
"category",
"message",
"subcategory",
]
FieldError = namedtuple("FieldError", FieldErrorAttrs)
# poor mans debugging
DEBUG = False
class XmlHelper(object):
@staticmethod
def security_iter(nodearr):
""" provide a security data iterator by returning a tuple of (Element, SecurityError) which are mutually exclusive """
assert nodearr.Name == "securityData" and nodearr.IsArray
for i in range(nodearr.NumValues):
node = nodearr.GetValue(i)
err = XmlHelper.get_security_error(node)
result = (None, err) if err else (node, None)
yield result
@staticmethod
def message_iter(evt):
""" provide a message iterator which checks for a response error prior to returning """
iter = evt.CreateMessageIterator()
while iter.Next():
msg = iter.Message
if DEBUG:
print(msg.Print)
if msg.AsElement.HasElement("responseError"):
raise Exception(msg.AsElement.GetValue("message"))
yield msg
@staticmethod
def get_sequence_value(node):
"""Convert an element with DataType Sequence to a DataFrame.
Note this may be a naive implementation as I assume that bulk data is always a table
"""
assert node.Datatype == 15
data = defaultdict(list)
cols = []
for i in range(node.NumValues):
row = node.GetValue(i)
if i == 0: # Get the ordered cols and assume they are constant
cols = [str(row.GetElement(_).Name) for _ in range(row.NumElements)]
for cidx in range(row.NumElements):
col = row.GetElement(cidx)
data[str(col.Name)].append(XmlHelper.as_value(col))
return DataFrame(data, columns=cols)
@staticmethod
def as_value(ele):
""" convert the specified element as a python value """
dtype = ele.Datatype
# print '%s = %s' % (ele.Name, dtype)
if dtype in (1, 2, 3, 4, 5, 6, 7, 9, 12):
# BOOL, CHAR, BYTE, INT32, INT64, FLOAT32, FLOAT64, BYTEARRAY, DECIMAL)
return ele.Value
elif dtype == 8: # String
val = ele.Value
if val:
# us centric :)
val = val.encode("ascii", "replace")
return str(val)
elif dtype == 10: # Date
v = ele.Value
return (
datetime(year=v.year, month=v.month, day=v.day).date() if v else np.nan
)
elif dtype == 11: # Time
v = ele.Value
return (
datetime(hour=v.hour, minute=v.minute, second=v.second).time()
if v
else np.nan
)
elif dtype == 13: # Datetime
v = ele.Value
return datetime(
year=v.year,
month=v.month,
day=v.day,
hour=v.hour,
minute=v.minute,
second=v.second,
)
elif dtype == 14: # Enumeration
raise NotImplementedError("ENUMERATION data type needs implemented")
elif dtype == 16: # Choice
raise NotImplementedError("CHOICE data type needs implemented")
elif dtype == 15: # SEQUENCE
return XmlHelper.get_sequence_value(ele)
else:
raise NotImplementedError(
"Unexpected data type %s. Check documentation" % dtype
)
@staticmethod
def get_child_value(parent, name, allow_missing=0):
""" return the value of the child element with name in the parent Element """
if not parent.HasElement(name):
if allow_missing:
return np.nan
else:
raise Exception("failed to find child element %s in parent" % name)
else:
return XmlHelper.as_value(parent.GetElement(name))
@staticmethod
def get_child_values(parent, names):
""" return a list of values for the specified child fields. If field not in Element then replace with nan. """
vals = []
for name in names:
if parent.HasElement(name):
vals.append(XmlHelper.as_value(parent.GetElement(name)))
else:
vals.append(np.nan)
return vals
@staticmethod
def as_security_error(node, secid):
""" convert the securityError element to a SecurityError """
assert node.Name == "securityError"
src = XmlHelper.get_child_value(node, "source")
code = XmlHelper.get_child_value(node, "code")
cat = XmlHelper.get_child_value(node, "category")
msg = XmlHelper.get_child_value(node, "message")
subcat = XmlHelper.get_child_value(node, "subcategory")
return SecurityError(
security=secid,
source=src,
code=code,
category=cat,
message=msg,
subcategory=subcat,
)
@staticmethod
def as_field_error(node, secid):
""" convert a fieldExceptions element to a FieldError or FieldError array """
assert node.Name == "fieldExceptions"
if node.IsArray:
return [
XmlHelper.as_field_error(node.GetValue(_), secid)
for _ in range(node.NumValues)
]
else:
fld = XmlHelper.get_child_value(node, "fieldId")
info = node.GetElement("errorInfo")
src = XmlHelper.get_child_value(info, "source")
code = XmlHelper.get_child_value(info, "code")
cat = XmlHelper.get_child_value(info, "category")
msg = XmlHelper.get_child_value(info, "message")
subcat = XmlHelper.get_child_value(info, "subcategory")
return FieldError(
security=secid,
field=fld,
source=src,
code=code,
category=cat,
message=msg,
subcategory=subcat,
)
@staticmethod
def get_security_error(node):
""" return a SecurityError if the specified securityData element has one, else return None """
assert node.Name == "securityData" and not node.IsArray
if node.HasElement("securityError"):
secid = XmlHelper.get_child_value(node, "security")
err = XmlHelper.as_security_error(node.GetElement("securityError"), secid)
return err
else:
return None
@staticmethod
def get_field_errors(node):
""" return a list of FieldErrors if the specified securityData element has field errors """
assert node.Name == "securityData" and not node.IsArray
nodearr = node.GetElement("fieldExceptions")
if nodearr.NumValues > 0:
secid = XmlHelper.get_child_value(node, "security")
errors = XmlHelper.as_field_error(nodearr, secid)
return errors
else:
return None
def debug_event(evt):
print("unhandled event: %s" % evt.EventType)
if evt.EventType in [constants.RESPONSE, constants.PARTIAL_RESPONSE]:
print("messages:")
for msg in XmlHelper.message_iter(evt):
print(msg.Print)
class ResponseHandler(object):
def do_init(self, handler):
""" will be called prior to waiting for the message """
self.waiting = True
self.exc_info = None
self.handler = handler
def set_evt_handler(self, handler):
self.handler = handler
def OnProcessEvent(self, evt):
try:
evt = CastTo(evt, "Event")
if not self.handler:
debug_event(evt)
if evt.EventType == constants.RESPONSE:
self.handler.on_event(evt, is_final=True)
self.waiting = False
elif evt.EventType == constants.PARTIAL_RESPONSE:
self.handler.on_event(evt, is_final=False)
else:
self.handler.on_admin_event(evt)
except Exception:
import sys
self.waiting = False
self.exc_info = sys.exc_info()
@property
def has_deferred_exception(self):
return self.exc_info is not None
def raise_deferred_exception(self):
raise self.exc_info[1].with_traceback(self.exc_info[2])
def do_cleanup(self):
self.waiting = False
self.exc_info = None
self.handler = None
class Request(object):
def __init__(self, ignore_security_error=0, ignore_field_error=0):
self.field_errors = []
self.security_errors = []
self.ignore_security_error = ignore_security_error
self.ignore_field_error = ignore_field_error
@property
def has_exception(self):
if not self.ignore_security_error and len(self.security_errors) > 0:
return True
if not self.ignore_field_error and len(self.field_errors) > 0:
return True
def raise_exception(self):
if not self.ignore_security_error and len(self.security_errors) > 0:
msgs = [
"(%s, %s, %s)" % (s.security, s.category, s.message)
for s in self.security_errors
]
raise Exception("SecurityError: %s" % ",".join(msgs))
if not self.ignore_field_error and len(self.field_errors) > 0:
msgs = [
"(%s, %s, %s, %s)" % (s.security, s.field, s.category, s.message)
for s in self.field_errors
]
raise Exception("FieldError: %s" % ",".join(msgs))
raise Exception("Programmer Error: No exception to raise")
def get_bbg_request(self, svc, session):
raise NotImplementedError()
def get_bbg_service_name(self):
raise NotImplementedError()
def on_event(self, evt, is_final):
raise NotImplementedError()
def on_admin_event(self, evt):
pass
def execute(self):
Terminal.execute_request(self)
return self
@staticmethod
def apply_overrides(request, omap):
""" add the given overrides (omap) to bloomberg request """
if omap:
for k, v in omap.items():
o = request.GetElement("overrides").AppendElment()
o.SetElement("fieldId", k)
o.SetElement("value", v)
class ReferenceDataRequest(Request):
def __init__(
self,
symbols,
fields,
overrides=None,
response_type="frame",
ignore_security_error=0,
ignore_field_error=0,
):
"""
response_type: (frame, map) how to return the results
"""
assert response_type in ("frame", "map")
Request.__init__(
self,
ignore_security_error=ignore_security_error,
ignore_field_error=ignore_field_error,
)
self.symbols = isinstance(symbols, str) and [symbols] or symbols
self.fields = isinstance(fields, str) and [fields] or fields
self.overrides = overrides or {}
# response related
self.response = {} if response_type == "map" else defaultdict(list)
self.response_type = response_type
def __repr__(self):
fmtargs = dict(
clz=self.__class__.__name__,
symbols=",".join(self.symbols),
fields=",".join(self.fields),
overrides=",".join(["%s=%s" % (k, v) for k, v in self.overrides.items()]),
rt=self.response_type,
ise=self.ignore_security_error and True or False,
ife=self.ignore_field_error and True or False,
)
return (
"<{clz}([{symbols}], [{fields}], overrides={overrides}, response_type={rt}, ignore_security_error={ise},"
+ "ignore_field_error={ife}"
).format(**fmtargs)
def get_bbg_service_name(self):
return "//blp/refdata"
def get_bbg_request(self, svc, session):
# create the bloomberg request object
request = svc.CreateRequest("ReferenceDataRequest")
[request.GetElement("securities").AppendValue(sec) for sec in self.symbols]
[request.GetElement("fields").AppendValue(fld) for fld in self.fields]
Request.apply_overrides(request, self.overrides)
return request
def on_security_node(self, node):
sid = XmlHelper.get_child_value(node, "security")
farr = node.GetElement("fieldData")
fdata = XmlHelper.get_child_values(farr, self.fields)
assert len(fdata) == len(self.fields), "field length must match data length"
if self.response_type == "map":
self.response[sid] = fdata
else:
self.response["security"].append(sid)
[self.response[f].append(d) for f, d in zip(self.fields, fdata)]
# Add any field errors if
ferrors = XmlHelper.get_field_errors(node)
ferrors and self.field_errors.extend(ferrors)
def on_event(self, evt, is_final):
""" this is invoked from in response to COM PumpWaitingMessages - different thread """
for msg in XmlHelper.message_iter(evt):
for node, error in XmlHelper.security_iter(msg.GetElement("securityData")):
if error:
self.security_errors.append(error)
else:
self.on_security_node(node)
if is_final and self.response_type == "frame":
index = self.response.pop("security")
frame = DataFrame(self.response, columns=self.fields, index=index)
frame.index.name = "security"
self.response = frame
@property
def response_as_series(self):
""" Return the response as a single series """
assert len(self.symbols) == 1, "expected single request"
if self.response_type == "frame":
return self.response.ix[self.symbols[0]]
else:
return pandas.Series(self.response[self.symbols])
@property
def response_as_field_values(self):
assert len(self.symbols) == 1
series = self.response_as_series
vals = [series[f] for f in self.fields]
return vals
class HistoricalDataRequest(Request):
def __init__(
self,
symbols,
fields,
start=None,
end=None,
period="DAILY",
overrides=None,
ignore_security_error=0,
ignore_field_error=0,
):
"""Historical data request for bloomberg.
Parameters
----------
symbols : string or list
fields : string or list
start : start date (if None then use 1 year ago)
end : end date (if None then use today)
period : ('DAILY', 'WEEKLY', 'MONTHLY', 'QUARTERLY', 'SEMI_ANNUALLY', 'YEARLY')
ignore_field_errors : bool
ignore_security_errors : bool
"""
Request.__init__(
self,
ignore_security_error=ignore_security_error,
ignore_field_error=ignore_field_error,
)
assert period in (
"DAILY",
"WEEKLY",
"MONTHLY",
"QUARTERLY",
"SEMI_ANNUALLY",
"YEARLY",
)
self.symbols = isinstance(symbols, str) and [symbols] or symbols
self.fields = isinstance(fields, str) and [fields] or fields
self.overrides = overrides or {}
if start is None:
start = datetime.today() - timedelta(365)
if end is None:
end = datetime.today()
self.start = to_datetime(start)
self.end = to_datetime(end)
self.period = period
# response related
self.response = {}
def __repr__(self):
fmtargs = dict(
clz=self.__class__.__name__,
symbols=",".join(self.symbols),
fields=",".join(self.fields),
start=self.start.strftime("%Y-%m-%d"),
end=self.end.strftime("%Y-%m-%d"),
period=self.period,
)
return "<{clz}([{symbols}], [{fields}], start={start}, end={end}, period={period}".format(
**fmtargs
)
def get_bbg_service_name(self):
return "//blp/refdata"
def get_bbg_request(self, svc, session):
# create the bloomberg request object
request = svc.CreateRequest("HistoricalDataRequest")
[request.GetElement("securities").AppendValue(sec) for sec in self.symbols]
[request.GetElement("fields").AppendValue(fld) for fld in self.fields]
request.Set("startDate", self.start.strftime("%Y%m%d"))
request.Set("endDate", self.end.strftime("%Y%m%d"))
request.Set("periodicitySelection", self.period)
Request.apply_overrides(request, self.overrides)
return request
def on_security_data_node(self, node):
"""process a securityData node - FIXME: currently not handling relateDate node """
sid = XmlHelper.get_child_value(node, "security")
farr = node.GetElement("fieldData")
dmap = defaultdict(list)
for i in range(farr.NumValues):
pt = farr.GetValue(i)
[
dmap[f].append(XmlHelper.get_child_value(pt, f, allow_missing=1))
for f in ["date"] + self.fields
]
idx = dmap.pop("date")
frame = DataFrame(dmap, columns=self.fields, index=idx)
frame.index.name = "date"
self.response[sid] = frame
def on_event(self, evt, is_final):
""" this is invoked from in response to COM PumpWaitingMessages - different thread """
for msg in XmlHelper.message_iter(evt):
# Single security element in historical request
node = msg.GetElement("securityData")
if node.HasElement("securityError"):
secid = XmlHelper.get_child_value(node, "security")
self.security_errors.append(
XmlHelper.as_security_error(node.GetElement("securityError"), secid)
)
else:
self.on_security_data_node(node)
def response_as_single(self, copy=0):
""" convert the response map to a single data frame with Multi-Index columns """
arr = []
for sid, frame in self.response.items():
if copy:
frame = frame.copy()
"security" not in frame and frame.insert(0, "security", sid)
arr.append(frame.reset_index().set_index(["date", "security"]))
return concat(arr).unstack()
def response_as_panel(self, swap=False):
panel = Panel(self.response)
if swap:
panel = panel.swapaxes("items", "minor")
return panel
class IntrdayBarRequest(Request):
def __init__(self, symbol, interval, start=None, end=None, event="TRADE"):
"""Intraday bar request for bloomberg
Parameters
----------
symbols : string
interval : number of minutes
start : start date
end : end date (if None then use today)
event : (TRADE,BID,ASK,BEST_BID,BEST_ASK)
"""
Request.__init__(self)
assert event in ("TRADE", "BID", "ASK", "BEST_BID", "BEST_ASK")
assert isinstance(symbol, str)
if start is None:
start = datetime.today() - timedelta(30)
if end is None:
end = datetime.today()
self.symbol = symbol
self.interval = interval
self.start = to_datetime(start)
self.end = to_datetime(end)
self.event = event
# response related
self.response = defaultdict(list)
def __repr__(self):
fmtargs = dict(
clz=self.__class__.__name__,
symbol=self.symbol,
interval=self.interval,
start=self.start.strftime("%Y-%m-%d"),
end=self.end.strftime("%Y-%m-%d"),
event=self.event,
)
return "<{clz}([{symbol}], interval={interval}, start={start}, end={end}, event={event}".format(
**fmtargs
)
def get_bbg_service_name(self):
return "//blp/refdata"
def get_bbg_request(self, svc, session):
# create the bloomberg request object
start, end = self.start, self.end
request = svc.CreateRequest("IntradayBarRequest")
request.Set("security", self.symbol)
request.Set("interval", self.interval)
request.Set("eventType", self.event)
request.Set(
"startDateTime",
session.CreateDatetime(
start.year, start.month, start.day, start.hour, start.minute
),
)
request.Set(
"endDateTime",
session.CreateDatetime(end.year, end.month, end.day, end.hour, end.minute),
)
return request
def on_event(self, evt, is_final):
""" this is invoked from in response to COM PumpWaitingMessages - different thread """
response = self.response
for msg in XmlHelper.message_iter(evt):
bars = msg.GetElement("barData").GetElement("barTickData")
for i in range(bars.NumValues):
bar = bars.GetValue(i)
ts = bar.GetElement(0).Value
response["time"].append(
datetime(ts.year, ts.month, ts.day, ts.hour, ts.minute)
)
response["open"].append(bar.GetElement(1).Value)
response["high"].append(bar.GetElement(2).Value)
response["low"].append(bar.GetElement(3).Value)
response["close"].append(bar.GetElement(4).Value)
response["volume"].append(bar.GetElement(5).Value)
response["events"].append(bar.GetElement(6).Value)
if is_final:
idx = response.pop("time")
self.response = DataFrame(
response,
columns=["open", "high", "low", "close", "volume", "events"],
index=idx,
)
class Terminal(object):
@classmethod
def execute_request(cls, request):
session = DispatchWithEvents("blpapicom.ProviderSession.1", ResponseHandler)
session.Start()
try:
svcname = request.get_bbg_service_name()
if not session.OpenService(svcname):
raise Exception("failed to open service %s" % svcname)
svc = session.GetService(svcname)
asbbg = request.get_bbg_request(svc, session)
session.SendRequest(asbbg)
session.do_init(request)
while session.waiting:
PumpWaitingMessages()
session.has_deferred_exception and session.raise_deferred_exception()
request.has_exception and request.raise_exception()
return request
finally:
session.Stop()
session.do_cleanup()
if __name__ == "__main__":
# 5 days ago
import pandas
d = pandas.datetools.BDay(-4).apply(datetime.now())
m = pandas.datetools.BMonthBegin(-2).apply(datetime.now())
def banner(msg):
print("*" * 25)
print(msg)
print("*" * 25)
banner("ReferenceDataRequest: single security, single field, frame response")
req = ReferenceDataRequest("msft us equity", "px_last", response_type="frame")
print(req.execute().response)
banner("ReferenceDataRequest: single security, single field, map response")
req = ReferenceDataRequest("msft us equity", "px_last", response_type="map")
print(req.execute().response)
banner("ReferenceDataRequest: multi-security, multi-field")
req = ReferenceDataRequest(
["eurusd curncy", "msft us equity"], ["px_open", "px_last"]
)
print(req.execute().response)
banner(
"ReferenceDataRequest: single security, multi-field (with bulk), frame response"
)
req = ReferenceDataRequest("eurusd curncy", ["px_last", "fwd_curve"])
req.execute()
print(req.response)
# DataFrame within a DataFrame
print(req.response.fwd_curve[0].tail())
banner("ReferenceDataRequest: multi security, multi-field, bad field")
req = ReferenceDataRequest(
["eurusd curncy", "msft us equity"],
["px_last", "fwd_curve"],
ignore_field_error=1,
)
req.execute()
print(req.response)
banner("HistoricalDataRequest: multi security, multi-field, daily data")
req = HistoricalDataRequest(
["eurusd curncy", "msft us equity"], ["px_last", "px_open"], start=d
)
req.execute()
print(req.response)
print("--------- AS SINGLE TABLE ----------")
print(req.response_as_single())
banner("HistoricalDataRequest: multi security, multi-field, weekly data")
req = HistoricalDataRequest(
["eurusd curncy", "msft us equity"],
["px_last", "px_open"],
start=m,
period="WEEKLY",
)
req.execute()
print(req.response)
print("--------- AS SINGLE TABLE ----------")
print(req.response_as_single())
print("--------- AS PANEL (id indexed) ----------")
print(req.response_as_panel())
print("--------- AS PANEL (field indexed) ----------")
print(req.response_as_panel(swap=1))
banner("IntrdayBarRequest: every hour")
req = IntrdayBarRequest("eurusd curncy", 60, start=d)
req.execute()
print(req.response[-10:])
#
# HOW TO
#
# - Retrieve an fx vol surface: BbgReferenceDataRequest('eurusd curncy', 'DFLT_VOL_SURF_MID')
# - Retrieve a fx forward curve: BbgReferenceDataRequest('eurusd curncy', 'FWD_CURVE')
# - Retrieve dividends: BbgReferenceDataRequest('csco us equity', 'BDVD_PR_EX_DTS_DVD_AMTS_W_ANN')