Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
blp / blp.py
Size: Mime:
import datetime
import itertools
import logging
import queue
import threading
from numbers import Number
from typing import Any, Callable, Dict, Generator, Iterable, List, Optional, Sequence, Union

import blpapi
import pandas
import pytz

BBG_MONTH_MAP = dict(zip("FGHJKMNQUVXZ", range(1, 13)))
MONTH_BBG_MAP = {v: k for k, v in BBG_MONTH_MAP.items()}


_EVENT_DICT = {
    blpapi.Event.ADMIN: "blpapi.Event.ADMIN",
    blpapi.Event.AUTHORIZATION_STATUS: "blpapi.Event.AUTHORIZATION_STATUS",
    blpapi.Event.PARTIAL_RESPONSE: "blpapi.Event.PARTIAL_RESPONSE",
    blpapi.Event.REQUEST: "blpapi.Event.REQUEST",
    blpapi.Event.REQUEST_STATUS: "blpapi.Event.REQUEST_STATUS",
    blpapi.Event.RESOLUTION_STATUS: "blpapi.Event.RESOLUTION_STATUS",
    blpapi.Event.RESPONSE: "blpapi.Event.RESPONSE",
    blpapi.Event.SERVICE_STATUS: "blpapi.Event.SERVICE_STATUS",
    blpapi.Event.SESSION_STATUS: "blpapi.Event.SESSION_STATUS",
    blpapi.Event.SUBSCRIPTION_DATA: "blpapi.Event.SUBSCRIPTION_DATA",
    blpapi.Event.SUBSCRIPTION_STATUS: "blpapi.Event.SUBSCRIPTION_STATUS",
    blpapi.Event.TIMEOUT: "blpapi.Event.TIMEOUT",
    blpapi.Event.TOKEN_STATUS: "blpapi.Event.TOKEN_STATUS",
    blpapi.Event.TOPIC_STATUS: "blpapi.Event.TOPIC_STATUS",
    blpapi.Event.UNKNOWN: "blpapi.Event.UNKNOWN",
}

_AUTHORIZATION_SUCCESS = blpapi.Name("AuthorizationSuccess")
_AUTHORIZATION_FAILURE = blpapi.Name("AuthorizationFailure")
_TOKEN_SUCCESS = blpapi.Name("TokenGenerationSuccess")
_TOKEN_FAILURE = blpapi.Name("TokenGenerationFailure")
_SESSION_TERMINATED = blpapi.Name("SessionTerminated")
_SESSION_DOWN = blpapi.Name("SessionConnectionDown")
_MARKET_DATA_EVENTS = blpapi.Name("MarketDataEvents")

logger = logging.getLogger(__name__)


class BlpSession:
    def __init__(self, event_handler: Optional[Callable], host: str, port: int, app: Optional[str] = None, **kwargs):
        """Manage a Bloomberg session.

        A BlpSession is used for managing the lifecycle of a connection to a blpapi.Session. This includes managing
        session options, event handlers and authentication.

        Args:
            event_handler: Event handler, if None this session is synchronous. See blpapi.Session.
            host: Host to connect session on
            port: Port to connect session to
            app: The app to use for the session identity
            **kwargs: Keyword arguments used in blpapi.SessionOptions

        """
        self.event_handler = event_handler
        self.session_options = self.create_session_options(host, port, app=app, **kwargs)
        if event_handler:
            self.session = blpapi.Session(options=self.session_options, eventHandler=event_handler)
        else:
            self.session = blpapi.Session(options=self.session_options, eventHandler=None)
        self.identity = None

    def __repr__(self):
        host, port = (
            self.session_options.serverHost(),
            self.session_options.serverPort(),
        )
        return "{} with <address={}:{}><identity={!r}><eventHandler={!r}>".format(
            type(self), host, port, self.identity, self.event_handler
        )

    @staticmethod
    def create_session_options(host: str, port: int, app: Optional[str] = None, **kwargs) -> blpapi.SessionOptions:
        """Create blpapi.SessionOptions class used in blpapi.Session.

        Args:
            host: Host to connect session on
            port: Port to connection session to
            app: The app to use for the session identity
            **kwargs: Keyword args passed to the blpapi.SessionOpts, if authentication is needed use
                setAuthenticationOptions

        Returns: A blpapi.SessionOptions

        """
        session_options = blpapi.SessionOptions()
        kwargs["setServerHost"] = host
        kwargs["setServerPort"] = port
        if app:
            kwargs["setSessionIdentityOptions"] = blpapi.AuthOptions.createWithApp(app)
        # logging and subscription logic does not currently support multiple correlationIds
        kwargs["setAllowMultipleCorrelatorsPerMsg"] = False
        kwargs.setdefault("setAutoRestartOnDisconnection", True)
        kwargs.setdefault("setNumStartAttempts", 1)
        kwargs.setdefault("setRecordSubscriptionDataReceiveTimes", True)
        for key in kwargs:
            getattr(session_options, key)(kwargs[key])
        return session_options

    def authenticate(self, timeout: int = 0) -> None:
        """Authenticate the blpapi.Session.

        Args:
            timeout: Milliseconds to wait for service before the blpapi.EventQueue returns a blpapi.Event.TIMEOUT

        """
        token_event_queue = blpapi.EventQueue()
        self.session.generateToken(eventQueue=token_event_queue)

        event = token_event_queue.nextEvent(timeout)
        for n, msg in enumerate(event):
            if msg.messageType() == _TOKEN_SUCCESS:
                logger.info(f"TOKEN_STATUS - Message {n} - {msg}")
                auth_service = self.session.getService("//blp/apiauth")
                auth_request = auth_service.createAuthorizationRequest()
                auth_request.set("token", msg.getElementAsString("token"))
                identity = self.session.createIdentity()
                logger.info(f"Send authorization request\n{auth_request}")
                self.session.sendAuthorizationRequest(auth_request, identity)
            elif msg.messageType() == _TOKEN_FAILURE:
                raise ConnectionError(f"TOKEN_STATUS - Message {n} - {msg}")

        event = token_event_queue.nextEvent(timeout)
        for n, msg in enumerate(event):
            if msg.messageType() == _AUTHORIZATION_FAILURE:
                raise ConnectionError(f"RESPONSE - Message {n} - {msg}")
            elif msg.messageType() == _AUTHORIZATION_SUCCESS:
                logger.info(f"RESPONSE - Message {n} - {msg}")
                self.identity = identity


class BlpStream(BlpSession):
    event_handler: "EventHandler"

    def __init__(self, host: str = "localhost", port: int = 8194, **kwargs):
        """A class to manage an asynchronous Bloomberg streaming session.

        Args:
            host: Host to connect session on
            port: Port to connection session to
            **kwargs: keyword arguments used in blpapi.SessionOptions

        """
        super().__init__(EventHandler(), host, port, **kwargs)

    def __enter__(self):
        if not self.session.start():
            raise ConnectionError(f"Failed to start {self!r}")
        if self.session_options.authenticationOptions():
            if not self.session.openService("//blp/apiauth"):
                raise ConnectionError(f"Failed to start //blp/apiauth for {self!r}")
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        # exiting gracefully, log correspondingly
        self.event_handler.closed.set()
        self.session.stop()
        logger.debug(f"Closed {self!r}")

    def subscribe(self, sub: Dict, timeout: int = 5) -> Dict:
        """Subscribe to instruments.

        Args:
            sub: Dictionary used to build a blpapi.SubscriptionList of subscriptions
            timeout: Seconds before getting result from the queue.Queue returns a queue.Empty exception

        Returns: A dictionary where the key is the blpapi.correlationId value
        and the value is True if successful, False if there was an error and
        a list of invalid fields if there are exceptions

        Examples:
            >>> bs = BlpStream()
            >>> with bs: # doctest: +SKIP
            ...     bs.subscribe(
            ...       {
            ...          'USDCAD Curncy': {'fields': ['LAST_PRICE']},
            ...          'EURUSD Curncy': {'fields': ['LAST_PRICE', 'BAD_FIELD']},
            ...          'BAD_TICKER': {'fields': ['LAST_PRICE']}
            ...       }
            ...     )
             {'USDCAD Curncy': True, 'EURUSD Curncy': ['BAD_FIELD'], 'BAD_TICKER': False}

        """

        logger.info(f"Subscribing to: {sub}")
        sublist = self.dict_to_sub(sub)
        self.session.subscribe(sublist, identity=self.identity)
        return dict(self.event_handler.subscription_queue.get(timeout=timeout) for _ in range(sublist.size()))

    def resubscribe(self, sub: Dict, timeout: int = 5) -> Dict:
        """Resubscribe to all instruments.

        Args:
            sub: Dictionary used to build a blpapi.SubscriptionList of subscriptions
            timeout: Seconds before getting result from the queue.Queue returns a queue.Empty exception

        Returns: A dictionary where the key is the blpapi.correlationId value and the value is True if successful,
        False if there was an error and a list of invalid fields if there are exceptions

        Notes: The documentation indicates that resubscribing to instruments with invalid correlationIds will be ignored
        however this does not seem to work in practice. All current activate subscriptions must be resubscribed to.
        This may be a quirk of DAPI.

        """
        logger.info(f"Resubscribing to: {sub}")
        sublist = self.dict_to_sub(sub)
        self.session.resubscribe(sublist)
        return dict(self.event_handler.subscription_queue.get(timeout=timeout) for _ in range(sublist.size()))

    def unsubscribe(self, sub: Dict, timeout: int = 5) -> Dict:
        """Unsubscribe from all instruments.

        Args:
            sub: Dictionary used to build a blpapi.SubscriptionList of subscriptions
            timeout: Seconds before getting result from the queue.Queue returns a queue.Empty exception

        Notes: The documentation is somewhat ambiguous but the docs suggest you can unsubscribe from only some
        subscriptions however this does not seem to work in practice. This may be a quirk of DAPI.

        """
        logger.info(f"Unsubscribing to: {sub}")
        sublist = self.dict_to_sub(sub)
        self.session.unsubscribe(sublist)
        return dict(self.event_handler.subscription_queue.get(timeout=timeout) for _ in range(sublist.size()))

    def events(self, timeout: Optional[int] = None) -> Generator:
        """Generator of events.

        Args:
            timeout: Seconds before getting result from the queue.Queue returns a queue.Empty exception

        Returns: A generator of events

        Examples:
            >>> bs = BlpStream()
            >>> events = []
            >>> with bs: # doctest: +SKIP
            ...   subs = bs.subscribe({'USDCAD Curncy': {'fields': ['LAST_PRICE']}})
            ...   for ev in bs.events():
            ...       events.append(ev)
            ...       if len(events) > 2: break

        """
        logger.debug("Enter events")
        while True:
            event = self.event_handler.data_queue.get(timeout=timeout)
            yield event

    @staticmethod
    def dict_to_sub(sub: Dict, use_topic: bool = True) -> blpapi.SubscriptionList:
        """Convert dictionary to blpapi.SubscriptionList.

        Args:
            sub: A dictionary containing topics (Bloomberg securities) with nested dictionaries containg optional
              keys 'fields', 'options', 'correlationID'.
            use_topic (boolean): Indicate whether to use the topic as the correlationID if None
              (otherwise will be Bloomberg automatically generated)

        Returns: A blpapi.SubscriptionList

        """
        subscription = blpapi.SubscriptionList()
        for topic in sub:
            fields = sub[topic].get("fields", None)
            options = sub[topic].get("options", None)
            cid = sub[topic].get("correlationID", None)
            if cid is None and use_topic:
                cid = blpapi.CorrelationId(topic)
            subscription.add(topic, fields=fields, options=options, correlationId=cid)
        return subscription


class EventHandler:
    """A default implementation of an eventHandler used in blpapi.Session."""

    def __init__(self):
        self.data_queue = queue.Queue()
        self.subscription_queue = queue.Queue()
        self.closed = threading.Event()

    def __call__(self, event: blpapi.Event, _):
        try:
            if event.eventType() == blpapi.Event.SUBSCRIPTION_DATA:
                self.marketdata_event(event)
            elif event.eventType() == blpapi.Event.SESSION_STATUS:
                self.session_status_event(event)
            elif event.eventType() == blpapi.Event.SUBSCRIPTION_STATUS:
                self.subscription_status_event(event)
            else:
                self.other_event(event)
        except Exception as e:
            logger.exception(e)
            # as per blpapi.Session docs, this will kill the whole process
            raise e

    def session_status_event(self, event):
        for n, msg in enumerate(event):
            if msg.messageType() == _SESSION_TERMINATED and not self.closed.is_set():
                logger.error(f"SESSION_STATUS - Message {n} - {msg}")
            elif msg.messageType() == _SESSION_DOWN:
                logger.warning(f"SESSION_STATUS - Message {n} - {msg}")
            else:
                logger.info(f"SESSION_STATUS - Message {n} - {msg}")

    def subscription_status_event(self, event):
        for n, msg in enumerate(event):
            cid = msg.correlationIds()[0].value()
            subscription_element = element_to_dict(msg.asElement())
            if "SubscriptionFailure" in subscription_element:
                logger.error(f"SUBSCRIPTION_STATUS - cid: {cid!r} - Message {n} - {msg}")
                self.subscription_queue.put((cid, False))
            elif "SubscriptionStarted" in subscription_element:
                exceptions = subscription_element["SubscriptionStarted"]["exceptions"]
                if len(exceptions) > 0:
                    logger.warning(f"SUBSCRIPTION_STATUS - cid: {cid!r} - Message {n} - {msg}")
                    bad_fields = [exception["exceptions"]["fieldId"] for exception in exceptions]
                    self.subscription_queue.put((cid, bad_fields))
                else:
                    logger.debug(f"SUBSCRIPTION_STATUS - cid: {cid!r} - Message {n} - {msg}")
                    self.subscription_queue.put((cid, True))
            elif "SubscriptionTerminated" in subscription_element:
                logger.debug(f"SUBSCRIPTION_STATUS - cid: {cid!r} - Message {n} - {msg}")
                self.subscription_queue.put((cid, True))
            else:
                logger.debug(f"SUBSCRIPTION_STATUS - cid: {cid!r} - Message {n} - {msg}")

    def marketdata_event(self, event):
        event_name = _EVENT_DICT[event.eventType()]
        for n, msg in enumerate(event):
            if msg.messageType() == _MARKET_DATA_EVENTS:
                self.data_queue.put(message_to_dict(msg))
            else:
                self.other_message(event_name, n, msg)

    @classmethod
    def other_event(cls, event):
        event_name = _EVENT_DICT[event.eventType()]
        for n, msg in enumerate(event):
            cls.other_message(event_name, n, msg)

    @staticmethod
    def other_message(event_name, n, msg):
        logger.info(f"{event_name} - Message {n} - {msg}")


def datetime_converter(value: Union[str, datetime.date, datetime.datetime]) -> pandas.Timestamp:
    ts = pandas.Timestamp(value)
    if ts.tz:
        ts = ts.tz_convert(pytz.FixedOffset(ts.tz.getOffsetInMinutes()))
    return ts


def element_to_value(elem: blpapi.Element) -> Union[pandas.Timestamp, str, Number, None]:
    """Convert a blpapi.Element to its value defined in its datatype with some possible coercisions.

    datetime.datetime -> pandas.Timestamp
    datetime.date -> pandas.Timestamp
    blp.name.Name -> str
    null value -> None
    ValueError Exception -> None

    Args:
        elem: Element to convert

    Returns: A value

    """
    if elem.isNull():
        return None
    else:
        try:
            value = elem.getValue()
            if isinstance(value, blpapi.name.Name):
                return str(value)
            if isinstance(value, datetime.datetime) or isinstance(value, datetime.date):
                return datetime_converter(value)

            return value
        except ValueError:
            return None


def _element_to_dict(elem: Union[str, blpapi.Element]) -> Any:
    if isinstance(elem, str):
        return elem
    dtype = elem.datatype()
    if dtype == blpapi.DataType.CHOICE:
        return {f"{elem.name()}": _element_to_dict(elem.getChoice())}
    elif elem.isArray():
        return [_element_to_dict(v) for v in elem.values()]
    elif dtype == blpapi.DataType.SEQUENCE:
        return {f"{elem.name()}": {f"{e.name()}": _element_to_dict(e) for e in elem.elements()}}
    else:
        return element_to_value(elem)


def element_to_dict(elem: blpapi.Element) -> Dict:
    """Convert a blpapi.Element to an equivalent dictionary representation.

    Args:
        elem: A blpapi.Element

    Returns: A dictionary representation of blpapi.Element

    """
    return _element_to_dict(elem)


def message_to_dict(msg: blpapi.Message) -> Dict:
    """Convert a blpapi.Message to a dictionary representation.

    Args:
        msg: A blpapi.Message

    Returns: A dictionary with relevant message metadata and data

    """
    return {
        "fragmentType": msg.fragmentType() if hasattr(msg, "fragmentType") else None,
        "correlationIds": [cid.value() for cid in msg.correlationIds()],
        "messageType": f"{msg.messageType()}",
        "timeReceived": _get_time_received(msg),
        "element": element_to_dict(msg.asElement()),
    }


def _get_time_received(msg: blpapi.Message) -> Optional[pandas.Timestamp]:
    try:
        return datetime_converter(msg.timeReceived())
    except ValueError:
        return None


def dict_to_req(request: blpapi.Request, request_data: Dict) -> blpapi.Request:
    """Populate request with data from request_data.

    Args:
        request: Request to populate
        request_data: Data used for populating the request

    Returns: A blpapi.Request

    Notes: An example request data dictionary is

      rdata = {'fields': ['SETTLE_DT'], 'securities': ['AUD Curncy'],
               'overrides': [{'overrides': {'fieldId': 'REFERENCE_DATE', 'value': '20180101'}}]}

    """
    for key, value in request_data.items():
        elem = request.getElement(key)
        if elem.datatype() == blpapi.DataType.SEQUENCE:
            for elem_dict in value:
                if elem.isArray():
                    el = elem.appendElement()
                    for k, vv in elem_dict[key].items():
                        el.setElement(k, vv)
                else:
                    elem.setElement(elem_dict, value[elem_dict])
        elif elem.isArray():
            for v in value:
                elem.appendValue(v)
        elif elem.datatype() == blpapi.DataType.CHOICE:
            for k, v in value.items():
                c = elem.getElement(k)
                if c.isArray():
                    for v_i in v:
                        c.appendValue(v_i)
                else:
                    c.setValue(v)
        else:
            elem.setValue(value)
    return request


class BlpQuery(BlpSession):

    _SERVICES = {
        "HistoricalDataRequest": "//blp/refdata",
        "ReferenceDataRequest": "//blp/refdata",
        "IntradayTickRequest": "//blp/refdata",
        "IntradayBarRequest": "//blp/refdata",
        "FieldInfoRequest": "//blp/apiflds",
        "FieldListRequest": "//blp/apiflds",
        "instrumentListRequest": "//blp/instruments",
        "GetFills": "//blp/emsx.history",
    }

    def __init__(
        self,
        host: str = "localhost",
        port: int = 8194,
        timeout: int = 10000,
        parser: Optional[Callable] = None,
        field_column_map: Optional[Dict[str, Callable]] = None,
        **kwargs,
    ):
        """A class to manage a synchronous Bloomberg request/response session.

        Args:
            host: Host to connect session on
            port: Port to connection session to
            timeout: Default milliseconds to wait for service before the blpapi.EventQueue returns blpapi.Event.TIMEOUT
            parser: Callable which parses response and request_data
            field_column_map: A map from bloomberg field name to a callable. It is used by the collectors to ensure
              the correct data type. If a field is missing from the map the default pandas type inference is used.
            **kwargs: Keyword arguments used in blpapi.SessionOptions

        Examples:
            >>> BlpQuery()
            <class 'blp.blp.BlpQuery'> with <address=localhost:8194><identity=None><eventHandler=None>

        """
        # TODO change so parser and collector are same object
        if parser is None:
            parser = BlpParser()
        if field_column_map is None:
            field_column_map = dict()
        self.parser = parser
        self._field_column_map = field_column_map
        event_handler = None
        self._started = False
        self._services: Dict[str, blpapi.Service] = {}
        self.timeout = timeout
        super().__init__(event_handler, host, port, **kwargs)

    def __enter__(self):
        self.start()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.stop()

    def start(self):
        """Start the blpapi.Session and open relevant services."""
        if not self._started:
            self._started = self.session.start()
            if not self._started:
                msg = next(iter(self.session.tryNextEvent()))
                logger.info(f"Failed to connect to Bloomberg:\n{msg}")
                raise ConnectionError(f"Failed to start {self!r}")
            logger.debug(f"Started {self!r}")
            logger.debug(f"{next(iter(self.session.tryNextEvent()))}{next(iter(self.session.tryNextEvent()))}")
        for service in set(self._SERVICES.values()):
            if not self.session.openService(service):
                raise ConnectionError(f"Unknown service {service!r}")
            logger.debug(f"Service {service!r} opened")
            logger.debug(f"{next(iter(self.session.tryNextEvent()))}")
            self._services[service] = self.session.getService(service)
        return self

    def stop(self):
        self.session.stop()

    @staticmethod
    def _pass_through(x, _):
        yield x

    def query(
        self,
        request_data: Dict,
        parse: Optional[Callable] = None,
        collector: Optional[Callable] = None,
        timeout: int = None,
    ):
        """Request and parse Bloomberg data.

        Args:
            request_data: A dictionary representing a blpapi.Request, specifying both the service and the data
            parse: Callable which takes a dictionary response and request and yields 0 or more values. If None, use
              default parser. If False, do not parse the response
            collector: Callable which takes an iterable
            timeout: Milliseconds to wait for service before the blpapi.EventQueue returns a blpapi.Event.TIMEOUT

        Returns: A result from collector, if collector=None default is a itertools.chain

        Examples:
            >>> bq = BlpQuery().start() # doctest: +SKIP

            A historical data request collected into a list

            >>> rd = {
            ...  'HistoricalDataRequest': {
            ...    'securities': ['CL1 Comdty'],
            ...    'fields': ['PX_LAST'],
            ...    'startDate': '20190102',
            ...    'endDate': '20190102'
            ...   }
            ... }
            >>> bq.query(rd, collector=list) # doctest: +SKIP
            [
             {
              'security':'CL1 Comdty',
              'fields':['PX_LAST'],
              'data':[{'date': Timestamp('2019-01-02 00:00:00'), 'PX_LAST':46.54}]
             }
            ]

            A historical data request with no parsing collected into a list

            >>> bq.query(rd, collector=list, parse=False) # doctest: +SKIP
            [
               {
                  'eventType':5,
                  'eventTypeName':'blpapi.Event.RESPONSE',
                  'messageNumber':0,
                  'message':{
                     'fragmentType':0,
                     'correlationIds':[8],
                     'messageType':'HistoricalDataResponse',
                     'topicName':'',
                     'timeReceived':None,
                     'element':{
                        'HistoricalDataResponse':{
                           'securityData':{
                              'security':'CL1 Comdty',
                              'eidData':[],
                              'sequenceNumber':0,
                              'fieldExceptions':[],
                              'fieldData':[{'fieldData':{'date': Timestamp('2019-01-02 00:00:00'),'PX_LAST':46.54}}]
                           }
                        }
                     }
                  }
               }
            ]

        """
        if timeout is None:
            timeout = self.timeout
        if parse is False:
            parse = self._pass_through
        elif parse is None:
            parse = self.parser
        data_queue = blpapi.EventQueue()
        request = self.create_request(request_data)
        self.send_request(request, data_queue)
        res = (parse(data, request_data) for data in self.get_response(data_queue, timeout))
        res = itertools.chain.from_iterable(res)  # type: ignore
        if collector:
            res = collector(res)
        return res

    def create_request(self, request_data: Dict) -> blpapi.Request:
        """Create a blpapi.Request.

        Args:
            request_data: A dictionary representing a blpapi.Request, specifying both the service and the data

        Returns: blpapi.Request

        Examples:
            >>> bq = BlpQuery().start() # doctest: +SKIP
            >>> bq.create_request({
            ...  'HistoricalDataRequest': {
            ...    'securities': ['CL1 Comdty'],
            ...    'fields': ['PX_LAST'],
            ...    'startDate': '20190102',
            ...    'endDate': '20190102'
            ...   }
            ... }) # doctest: +SKIP

        """
        operation = list(request_data.keys())[0]
        service = self._services[self._SERVICES[operation]]
        request = service.createRequest(operation)
        rdata = request_data[operation]
        request = dict_to_req(request, rdata)
        return request

    def send_request(
        self,
        request: blpapi.Request,
        data_queue: blpapi.EventQueue,
        correlation_id: Optional[blpapi.CorrelationId] = None,
    ) -> blpapi.CorrelationId:
        """Send a request who's data will be populated into data_queue.

        Args:
            request: Request to send
            data_queue: Queue which response populates
            correlation_id: Id associated with request/response

        Returns: blpapi.CorrelationId associated with the request

        """
        logger.debug(
            f"Sent {request} with identity={self.identity!r}, correlationId={correlation_id!r}, event_queue={data_queue!r}",  # noqa: E501
        )
        cid = self.session.sendRequest(request, self.identity, correlation_id, data_queue)
        return cid

    def get_response(self, data_queue: blpapi.EventQueue, timeout: Optional[int] = None) -> Generator:
        """Yield dictionary representation of blpapi.Messages from a blpapi.EventQueue.

        Args:
            data_queue: Queue which contains response
            timeout: Milliseconds to wait for service before the blpapi.EventQueue returns a blpapi.Event.TIMEOUT

        Returns: A generator of messages translated into a dictionary representation

        """
        if timeout is None:
            timeout = self.timeout
        while True:
            event = data_queue.nextEvent(timeout=timeout)
            event_type = event.eventType()
            event_type_name = _EVENT_DICT[event_type]
            if event_type == blpapi.Event.TIMEOUT:
                raise ConnectionError(f"Unexpected blpapi.Event.TIMEOUT received by {self!r}")
            for n, msg in enumerate(event):
                logger.debug(f"Message {n} in {event_type_name}:{msg}")
                response = {
                    "eventType": event_type,
                    "eventTypeName": event_type_name,
                    "messageNumber": n,
                    "message": message_to_dict(msg),
                }
                yield response
            if event_type == blpapi.Event.RESPONSE:
                return

    def cast_columns(self, df: pandas.DataFrame, fields: Iterable) -> pandas.DataFrame:
        res = {}
        for field in fields:
            col_data = df.get(field)
            if field in self._field_column_map:
                col = self._field_column_map[field]
                col_data = col(col_data)
            res[field] = col_data
        # handle the case where all values are None
        try:
            return pandas.DataFrame(res)
        except ValueError as e:
            if df.empty:
                return pandas.DataFrame(columns=res.keys())
            else:
                raise e

    def bdh(
        self,
        securities: Sequence[str],
        fields: List[str],
        start_date: str,
        end_date: str,
        overrides: Optional[Sequence] = None,
        options: Optional[Dict] = None,
    ) -> pandas.DataFrame:
        """Bloomberg historical data request.

        Args:
            securities: list of strings of securities
            fields: list of strings of fields
            start_date: start date as '%Y%m%d'
            end_date: end date as '%Y%m%d'
            overrides: List of tuples containing the field to override and its value
            options: key value pairs to to set in request

        Returns: A pandas.DataFrame with columns ['date', 'security', fields[0], ...]

        """
        query = create_historical_query(securities, fields, start_date, end_date, overrides, options)
        res = self.query(query, self.parser, self.collect_to_bdh)
        dfs = []
        for sec in res:
            dfs.append(res[sec].assign(security=sec))
        df = (
            pandas.concat(dfs)
            .sort_values(by="date", axis=0)
            .loc[:, ["date", "security"] + fields]
            .reset_index(drop=True)
        )
        return df

    def collect_to_bdh(self, responses: Iterable) -> Dict[str, pandas.DataFrame]:
        """Collector for bdh()."""
        dfs: Dict = {}
        for response in responses:
            security = response["security"]
            fields = response["fields"] + ["date"]
            # have not seen example where a HistoricalDataResponse for a single security is broken across
            # multiple PARTIAL_RESONSE/RESPONSE but API docs are vague about whether technically possible
            sec_dfs = dfs.get(security, [])
            df = pandas.DataFrame(response["data"])
            df = self.cast_columns(df, fields)
            sec_dfs.append(df)
            dfs[security] = sec_dfs
        for sec in dfs:
            df_list = dfs[sec]
            if len(df_list) > 1:
                dfs[sec] = pandas.concat(df_list).sort_values(by="date", axis=0, ignore_index=True)
            else:
                dfs[sec] = df_list[0]

        return dfs

    def bdp(
        self,
        securities: Sequence[str],
        fields: List[str],
        overrides: Optional[Sequence] = None,
        options: Optional[Dict] = None,
    ) -> pandas.DataFrame:
        """Bloomberg reference data point request.

        Args:
            securities: list of strings of securities
            fields: list of strings of fields
            overrides: list of tuples containing the field to override and its value
            options: key value pairs to to set in request

        Returns: A pandas.DataFrame where columns are ['security', field[0], ...]

        """
        query = create_reference_query(securities, fields, overrides, options)
        df = self.query(query, self.parser, self.collect_to_bdp)
        df = df.loc[:, ["security"] + fields]
        return df

    def collect_to_bdp(self, responses: Iterable) -> pandas.DataFrame:
        """Collector for bdp()."""
        rows = []
        fields = {"security"}
        for response in responses:
            data = response["data"]
            # possible some fields are missing for different securities
            fields = fields.union(set(response["fields"]))
            for _, value in data.items():
                if isinstance(value, list):
                    raise TypeError(f"Bulk reference data not supported, expected singleton values but received {data}")
            data["security"] = response["security"]
            rows.append(data)
        df = pandas.DataFrame(rows)
        return self.cast_columns(df, fields)

    def bds(
        self, security: str, field: str, overrides: Optional[Sequence] = None, options: Optional[Dict] = None
    ) -> pandas.DataFrame:
        """Bloomberg reference data set request.

        Args:
            security: String representing security
            field: String representing field
            overrides: List of tuples containing the field to override and its value
            options: key value pairs to to set in request

        Returns: A pandas.DataFrame where columns are data element names

        """
        query = create_reference_query(security, field, overrides, options)
        return self.query(query, self.parser, self.collect_to_bds)

    def collect_to_bds(self, responses: Iterable) -> pandas.DataFrame:
        """Collector for bds()."""
        rows = []
        field = None
        for response in responses:
            keys = list(response["data"].keys())
            if len(keys) > 1:
                raise ValueError(f"responses must have only one field, received {keys}")
            if field is not None and field != keys[0]:
                raise ValueError(f"responses contain different fields, {field} and {keys[0]}")
            field = keys[0]
            data = response["data"][field]
            try:
                rows.extend(data)
            except TypeError:
                raise TypeError(f"response data must be bulk reference data, received {response['data']}")
        df = pandas.DataFrame(rows)
        return self.cast_columns(df, df.columns)

    def collect_many_to_bds(self, responses) -> Dict:
        """Collector to nested dictionary of DataFrames.

        Top level keys are securities, next level keys are fields and values are DataFrame in bds() form

        """
        res: Dict = {}
        for response in responses:
            security = response["security"]
            sec_dict = res.get(security, {})
            for field in response["data"]:
                data = response["data"][field]
                rows = sec_dict.get(field, [])
                rows.extend(data)
                sec_dict[field] = rows
            res[security] = sec_dict
        for s in res:
            for f in res[s]:
                # what does res[s][f] look like? can it be passed to to_series directly?
                df = pandas.DataFrame(res[s][f])
                res[s][f] = self.cast_columns(df, df.columns)
        return res

    def bdib(
        self,
        security: str,
        event_type: str,
        interval: int,
        start_datetime: str,
        end_datetime: str,
        overrides: Optional[Sequence] = None,
        options: Optional[Dict] = None,
    ) -> pandas.DataFrame:
        """Bloomberg intraday bar data request.

        Args:
            security: Security name
            event_type: Event type {TRADE, BID, ASK, BEST_BID, BEST_ASK}
            interval: Length in minutes of bars {1,...1440}
            start_datetime: UTC datetime as '%Y%-m%-dTHH:MM:SS'
            end_datetime: UTC datetime as '%Y-%m-%dTHH:MM:SS'
            overrides: List of tuples containing the field to override and its value
            options: Key value pairs to to set in request

        Returns: A pandas.DataFrame where columns are
        ['time', open', 'high', 'low', 'close', 'volume', 'numEvents', 'value'] time is UTC and value is the dollar
        volume (I believe)

        """
        query = create_intraday_bar_query(
            security,
            event_type,
            interval,
            start_datetime,
            end_datetime,
            overrides,
            options,
        )
        res = self.query(query, self.parser, self.collect_to_bdib)
        dfs = []
        for key in res:
            dfs.append(res[key])
        df = pandas.concat(dfs, ignore_index=True).sort_values(by="time", axis=0)
        return df

    def collect_to_bdib(self, responses: Iterable) -> Dict:
        """Collector for bdib()."""
        dfs: Dict = {}
        for response in responses:
            security = response["security"]
            event = response["events"][0]
            # accounts for situation where response is broken up over a RESPONSE and PARTIAL_RESPONSE.
            # also supports passing multiple IntradayBarResponses
            key = (security, event)
            bar_data = dfs.get(key, [])
            bar_data.extend(response["data"])
            dfs[key] = bar_data
        fields = ["time", "open", "high", "low", "close", "volume", "numEvents", "value"]
        for key in dfs:
            df = pandas.DataFrame(dfs[key])
            df = self.cast_columns(df, fields)
            dfs[key] = df.sort_values(by="time", axis=0, ignore_index=True)
        return dfs

    def bdit(
        self,
        security: str,
        event_types: Sequence[str],
        start_datetime: str,
        end_datetime: str,
        overrides: Optional[Sequence] = None,
        options: Optional[Dict] = None,
    ) -> pandas.DataFrame:
        """Bloomberg tick data request.

        Args:
            security: Security name
            event_types: List of event types
              {TRADE, BID, ASK, BID_BEST, ASK_BEST, BID_YIELD, ASK_YIELD, MID_PRICE, AT_TRADE, BEST_BID}
            start_datetime: UTC datetime as '%Y%-m%-dTHH:MM:SS'
            end_datetime: UTC datetime as '%Y-%m-%dTHH:MM:SS'
            overrides : List of tuples containing the field to override and its value
            options: Key value pairs to to set in request

        Returns: A pandas.DataFrame where columns are ['time', 'type', 'value', 'size'], time is in UTC. Certain options
        may add various columns

        Examples:
            >>> bq = BlpQuery().start() # doctest: +SKIP
            >>> df = bq.bdit('USDCAD Curncy', ['TRADE'], '2019-04-23T08:00:00', '2019-04-23T13:00:00') # doctest: +SKIP

        Notes: Various options exist to add certain fields, such as ("includeConditionCodes", True) and
        ("includeExchangeCodes", True). See Bloomberg Schema for more info.

        """
        query = create_intraday_tick_query(security, event_types, start_datetime, end_datetime, overrides, options)
        res = self.query(query, self.parser, self.collect_to_bdit)
        dfs = []
        for key in res:
            dfs.append(res[key])
        df = pandas.concat(dfs, ignore_index=True).sort_values(by="time", axis=0, ignore_index=True)
        return df

    def collect_to_bdit(self, responses: Iterable) -> Dict:
        """Collector for bdit()."""
        dfs: Dict = {}
        for response in responses:
            security = response["security"]
            events = tuple(response["events"])
            # accounts for situation where response is broken up over a RESPONSE and PARTIAL_RESPONSE.
            # also supports passing multiple IntradayTickResponses
            key = (security, events)
            bar_data = dfs.get(key, [])
            bar_data.extend(response["data"])
            dfs[key] = bar_data
        fields = ["time", "type", "value", "size"]
        for key in dfs:
            df = pandas.DataFrame(dfs[key])
            cols = fields + [c for c in df.columns if c not in fields]
            df = self.cast_columns(df, cols)
            dfs[key] = df.sort_values(by="time", axis=0, ignore_index=True)
        return dfs


class BlpParser:
    """A callable class with a default response parsing implementation.

    The parse method parses the responses from BlpQuery.get_response into a simplified representation the can easily
    be collected using collectors in BlpQuery.

    Args:
        processor_steps: A list of processors which take in a response and request_data and returns a
          validated and possibly modified response. Processors are called sequentially at the start of parse()
        raise_security_errors: If True, raise errors when response contains an INVALID_SECURITY error, otherwise
          log as a warning. This is ignored if ``processor_steps`` is not None.

    """

    def __init__(self, processor_steps: Optional[Sequence] = None, raise_security_errors: bool = True):

        if processor_steps is None and raise_security_errors:
            processor_steps = [
                self._validate_event,
                self._validate_response_type,
                self._validate_response_error,
                self._validate_security_error,
                self._process_field_exception,
            ]
        elif processor_steps is None:
            processor_steps = [
                self._validate_event,
                self._validate_response_type,
                self._validate_response_error,
                self._warn_security_error,
                self._process_field_exception,
            ]
        self._processor_steps = processor_steps

    @staticmethod
    def _validate_event(response, _):
        if response["eventType"] not in (blpapi.Event.PARTIAL_RESPONSE, blpapi.Event.RESPONSE):
            raise TypeError(f"Unknown eventType: {response}")
        return response

    @staticmethod
    def _validate_response_type(response, _):
        rtype = list(response["message"]["element"].keys())[0]
        known_responses = (
            "ReferenceDataResponse",
            "HistoricalDataResponse",
            "IntradayBarResponse",
            "IntradayTickResponse",
            "fieldResponse",
            "InstrumentListResponse",
            "GetFillsResponse",
        )
        if rtype not in known_responses:
            raise TypeError(f"Unknown {rtype!r}, must be in {known_responses}")
        return response

    @staticmethod
    def _validate_response_error(response, request):
        rtype = list(response["message"]["element"].keys())[0]
        if "responseError" in response["message"]["element"][rtype]:
            raise TypeError(f"Response contains responseError\nresponse: {response}\nrequest: {request}")
        return response

    @staticmethod
    def _process_field_exception(response, _):
        rtype = list(response["message"]["element"].keys())[0]
        response_data = response["message"]["element"][rtype]
        if rtype in (
            "IntradayBarResponse",
            "IntradayTickResponse",
            "fieldResponse",
            "InstrumentListResponse",
            "GetFillsResponse",
        ):
            return response
        if rtype == "HistoricalDataResponse":
            response_data = [response_data]
        for sec_data in response_data:
            field_exceptions = sec_data["securityData"]["fieldExceptions"]
            for fe in field_exceptions:
                fe = fe["fieldExceptions"]
                einfo = fe["errorInfo"]["errorInfo"]
                if einfo["category"] == "BAD_FLD" and einfo["subcategory"] == "NOT_APPLICABLE_TO_REF_DATA":
                    field = fe["fieldId"]
                    sec_data["securityData"]["fieldData"]["fieldData"][field] = None
                else:
                    raise TypeError(f"Response for {sec_data['securityData']['security']} contains fieldException {fe}")
        return response

    @staticmethod
    def _validate_fields_exist(response, request_data):
        rtype = list(response["message"]["element"].keys())[0]
        if rtype != "HistoricalDataResponse":
            return response

        fields = set(request_data["HistoricalDataRequest"]["fields"])
        sec_data = response["message"]["element"]["HistoricalDataResponse"]["securityData"]
        if not sec_data["fieldData"] and fields:
            raise TypeError(f"fieldData for {sec_data['security']!r} is missing fields {fields!r}")
        for fd in sec_data["fieldData"]:
            fd = fd["fieldData"]
            diff = fields.difference(fd.keys())
            if diff:
                raise TypeError(f"fieldData for {sec_data['security']!r} is missing fields {diff!r} in {fd!r}")

    @staticmethod
    def _validate_security_error(response, _):
        rtype = list(response["message"]["element"].keys())[0]
        response_data = response["message"]["element"][rtype]
        if rtype in (
            "IntradayBarResponse",
            "IntradayTickResponse",
            "fieldResponse",
            "InstrumentListResponse",
            "GetFillsResponse",
        ):
            return response
        if rtype == "HistoricalDataResponse":
            response_data = [response_data]
        for sec_data in response_data:
            data = sec_data["securityData"]
            if "securityError" in data:
                raise TypeError(f"Response for {data['security']!r} contains securityError {data['securityError']}")
        return response

    @staticmethod
    def _warn_security_error(response, _):
        rtype = list(response["message"]["element"].keys())[0]
        response_data = response["message"]["element"][rtype]
        if rtype in (
            "IntradayBarResponse",
            "IntradayTickResponse",
            "fieldResponse",
            "InstrumentListResponse",
            "GetFillsResponse",
        ):
            return response
        if rtype == "HistoricalDataResponse":
            response_data = [response_data]
        for sec_data in response_data:
            data = sec_data["securityData"]
            if "securityError" in data:
                logger.warning(f"Response for {data['security']!r} contains securityError {data['securityError']}")
        return response

    def __call__(self, response, request_data):
        """A default parser to parse dictionary representation of response.

        Parses data response to a generator of dictionaries or raises a TypeError if the response type is unknown.
        There is support for ReferenceDataResponse, HistoricalDataResponse, IntradayBarResponse, IntradayTickResponse,
        fieldResponse, InstrumentListResponse and GetFillsResponse. Parsed dictionaries have the following forms:

        .. code-block:: text

            1. ReferenceDataResponse
                Schema: {'security': <str>, 'fields': <list of str>, 'data': <dict of field:value>}
                Examples:
                    {'security': 'SPY US Equity', 'fields': ['NAME'], 'data': {'NAME': 'SPDR S&P 500 ETF TRUST'}}
                    {
                        'security': 'C 1 Comdty',
                        'fields': ['FUT_CHAIN'],
                        'data': {'FUT_CHAIN': [
                            {'Security Description': 'C H10 Comdty'},
                            {'Security Description': 'C K10 Comdty'}
                        ]}
                    }

            2. HistoricalDataResponse
              Schema: {'security': <str>, 'fields': <list of str>, 'data': <list of dict of field:value>}
              Examples:
                  {
                    'security': 'SPY US Equity',
                    'fields': ['PX_LAST'],
                    'data': [
                      {'date': pandas.Timestamp(2018, 1, 2), 'PX_LAST': 268.77},
                      {'date': pandas.Timestamp(2018, 1, 3), 'PX_LAST': 270.47}
                    ]
                  }

            3. IntradayBarResponse
              Schema: {'security': <str>, 'events': [<str>],
                       'data': <list of {'time': <pandas.Timestamp>, 'open': <float>, 'high': <float>, 'low': <float>,
                                         'close': <float>, 'volume': <int>, 'numEvents': <int>, 'value': <float>}}
                      }
              Examples:
                  {
                    'security': 'CL1 Comdty',
                    'data': [{'time': pandas.Timestamp('2019-04-24 08:00:00'), 'open': 65.85, 'high': 65.89,
                              'low': 65.85, 'close': 65.86, 'volume': 565, 'numEvents': 209, 'value': 37215.16}],
                    'events': ['TRADE']
                  }

            4. IntradayTickResponse
              Schema: {'security': <str>, 'events': <list of str>,
                       'data': <list of  {'time': <pandas.Timestamp>, 'type': <str>, 'value': <float>, 'size': <int>}>}
              Examples:
                  {
                     'security': 'CL1 Comdty',
                     'data': [
                       {'time': pandas.Timestamp('2019-04-24 08:00:00'), 'type': 'BID', 'value': 65.85, 'size': 4},
                       {'time': pandas.Timestamp('2019-04-24 08:00:00'), 'type': 'BID', 'value': 65.85, 'size': 41},
                       {'time': pandas.Timestamp('2019-04-24 08:00:00'), 'type': 'ASK', 'value': 65.86, 'size': 50},
                     ],
                     'events': ['BID', 'ASK']
                  }

            5. fieldResponse
              Schema: {'id': <list of str>, data: {<str>: {field: value}}}
              Examples:
                  {
                    'id': ['PX_LAST', 'NAME'],
                    'data': {
                      'DS002': {
                        'mnemonic': 'NAME',
                        'description': 'Name',
                        'datatype': 'String',
                        'categoryName': [],
                        'property': [],
                        'overrides': [],
                        'ftype': 'Character'
                      },
                     'PR005': {
                       'mnemonic': 'PX_LAST',
                       'description': 'Last Price',
                       'datatype': 'Double',
                       'categoryName': [],
                       'property': [],
                       'overrides': ['PX628', 'DY628',...]
                       'ftype': 'Price'
                      }
                    }
                  }

            6. InstrumentListResponse
            Schema: {'security': <str>, 'description': <str>}
            Examples:
                {
                   'security': 'T<govt>,
                   'description': 'United States Treasury Note/Bond (Multiple Matches)'
                }

            7. GetFillsResponse
            Schema: {'Fills': <dict>}
            Examples:
                {
                    'fills': [
                        {'Ticker': 'GCZ9', 'Exchange': 'CMX', 'Type': 'MKT', ...},
                        {'Ticker': 'SIZ9', 'Exchange': 'CMX', 'Type': 'LMT', ...}
                    ]
                }

        Args:
            response (dict): Representation of a blpapi.Message
            request_data (dict): A dictionary representing a blpapi.Request

        Returns: A generator of responses parsed to dictionaries

        """
        for processor in self._processor_steps:
            response = processor(response, request_data)

        rtype = list(response["message"]["element"].keys())[0]
        if rtype == "ReferenceDataResponse":
            sec_data_parser = self._parse_reference_security_data
        elif rtype == "HistoricalDataResponse":
            sec_data_parser = self._parse_historical_security_data
        elif rtype == "IntradayBarResponse":
            sec_data_parser = self._parse_bar_security_data
        elif rtype == "IntradayTickResponse":
            sec_data_parser = self._parse_tick_security_data
        elif rtype == "fieldResponse":
            sec_data_parser = self._parse_field_info_data
        elif rtype == "InstrumentListResponse":
            sec_data_parser = self._parse_instrument_info_data
        elif rtype == "GetFillsResponse":
            sec_data_parser = self._parse_fills_data
        else:
            known_responses = (
                "ReferenceDataResponse",
                "HistoricalDataResponse",
                "IntradayBarResponse",
                "IntradayTickResponse",
                "fieldResponse",
                "InstrumentListResponse",
                "GetFillsResponse",
            )
            raise TypeError(f"Unknown {rtype!r}, must be in {known_responses}")

        return sec_data_parser(response, request_data)

    @staticmethod
    def _parse_reference_security_data(response, request_data):
        rtype = list(response["message"]["element"].keys())[0]
        response_data = response["message"]["element"][rtype]
        req_type = list(request_data.keys())[0]
        for sec_data in response_data:
            result = {
                "security": sec_data["securityData"]["security"],
                "fields": request_data[req_type]["fields"],
            }
            field_data = sec_data["securityData"]["fieldData"]["fieldData"]
            data = {}
            for field in field_data.keys():
                # bulk reference data
                if isinstance(field_data[field], list):
                    rows = []
                    for fd in field_data[field]:
                        datum = {}
                        for name, value in fd[field].items():
                            datum[name] = value
                        rows.append(datum)
                    data[field] = rows
                # reference data
                else:
                    data[field] = field_data[field]
            result["data"] = data
            yield result

    @staticmethod
    def _parse_historical_security_data(response, request_data):
        rtype = list(response["message"]["element"].keys())[0]
        response_data = [response["message"]["element"][rtype]]
        req_type = list(request_data.keys())[0]
        for sec_data in response_data:
            result = {
                "security": sec_data["securityData"]["security"],
                "fields": request_data[req_type]["fields"],
            }
            field_data = sec_data["securityData"]["fieldData"]
            data = []
            for fd in field_data:
                data.append(fd["fieldData"])
            result["data"] = data
            yield result

    @staticmethod
    def _parse_bar_security_data(response, request_data):
        rtype = list(response["message"]["element"].keys())[0]
        bar_data = response["message"]["element"][rtype]["barData"]["barTickData"]
        data = []
        for bd in bar_data:
            data.append(bd["barTickData"])
        req_type = list(request_data.keys())[0]
        result = {
            "security": request_data[req_type]["security"],
            "data": data,
            "events": [request_data[req_type]["eventType"]],
        }
        yield result

    @staticmethod
    def _parse_tick_security_data(response, request_data):
        rtype = list(response["message"]["element"].keys())[0]
        bar_data = response["message"]["element"][rtype]["tickData"]["tickData"]
        data = []
        for bd in bar_data:
            data.append(bd["tickData"])
        req_type = list(request_data.keys())[0]
        result = {
            "security": request_data[req_type]["security"],
            "data": data,
            "events": request_data[req_type]["eventTypes"],
        }
        yield result

    @staticmethod
    def _parse_field_info_data(response, request_data):
        rtype = "fieldResponse"
        field_data = response["message"]["element"][rtype]
        data = {}
        for fd in field_data:
            datum = fd["fieldData"]["fieldInfo"]["fieldInfo"]
            data[fd["fieldData"]["id"]] = datum
        if "FieldInfoRequest" in request_data:
            ids = request_data["FieldInfoRequest"]["id"]
        else:
            ids = list(data.keys())
        result = {"id": ids, "data": data}
        yield result

    @staticmethod
    def _parse_instrument_info_data(response, _):
        data = response["message"]["element"]["InstrumentListResponse"]["results"]
        for datum in data:
            result = datum["results"]
            yield result

    @staticmethod
    def _parse_fills_data(response, _):
        data = response["message"]["element"]["GetFillsResponse"]["Fills"]
        result = []
        for datum in data:
            result.append(datum["Fills"])
        yield {"Fills": result}


def create_query(request_type: str, values: Dict, overrides: Optional[Sequence] = None) -> Dict:
    """Create a request dictionary used to construct a blpapi.Request.

    Args:
        request_type: Type of request
        values: key value pairs to set in the request
        overrides: List of tuples containing the field to override and its value

    Returns: A dictionary representation of a blpapi.Request

    Examples:

        Reference data request

        >>> create_query(
        ...   'ReferenceDataRequest',
        ...   {'securities': ['CL1 Comdty', 'CO1 Comdty'], 'fields': ['PX_LAST']}
        ... )
        {'ReferenceDataRequest': {'securities': ['CL1 Comdty', 'CO1 Comdty'], 'fields': ['PX_LAST']}}

        Reference data request with overrides

        >>> create_query(
        ...   'ReferenceDataRequest',
        ...   {'securities': ['AUD Curncy'], 'fields': ['SETTLE_DT']},
        ...   [('REFERENCE_DATE', '20180101')]
        ... )  # noqa: E501
        {'ReferenceDataRequest': {'securities': ['AUD Curncy'], 'fields': ['SETTLE_DT'], 'overrides': [{'overrides': {'fieldId': 'REFERENCE_DATE', 'value': '20180101'}}]}}

        Historical data request

        >>> create_query(
        ...   'HistoricalDataRequest',
        ...   {
        ...    'securities': ['CL1 Comdty'],
        ...    'fields': ['PX_LAST', 'VOLUME'],
        ...    'startDate': '20190101',
        ...    'endDate': '20190110'
        ...   }
        ... )  # noqa: E501
        {'HistoricalDataRequest': {'securities': ['CL1 Comdty'], 'fields': ['PX_LAST', 'VOLUME'], 'startDate': '20190101', 'endDate': '20190110'}}

    """
    request_dict: Dict = {request_type: {}}
    for key in values:
        request_dict[request_type][key] = values[key]
    ovrds = []
    if overrides:
        for field, value in overrides:
            ovrds.append({"overrides": {"fieldId": field, "value": value}})
        request_dict[request_type]["overrides"] = ovrds
    return request_dict


def create_historical_query(
    securities: Union[str, Sequence[str]],
    fields: Union[str, Sequence[str]],
    start_date: str,
    end_date: str,
    overrides: Optional[Sequence] = None,
    options: Optional[Dict] = None,
) -> Dict:
    """Create a HistoricalDataRequest dictionary request.

    Args:
        securities: list of strings of securities
        fields: list of strings of fields
        start_date: start date as '%Y%m%d'
        end_date: end date as '%Y%m%d'
        overrides: List of tuples containing the field to override and its value
        options: key value pairs to to set in request

    Returns: A dictionary representation of a blpapi.Request

    """
    if isinstance(securities, str):
        securities = [securities]
    if isinstance(fields, str):
        fields = [fields]
    values = {
        "securities": securities,
        "fields": fields,
        "startDate": start_date,
        "endDate": end_date,
    }
    if options:
        values.update(options)
    return create_query("HistoricalDataRequest", values, overrides)


def create_reference_query(
    securities: Union[str, Sequence[str]],
    fields: Union[str, Sequence[str]],
    overrides: Optional[Sequence] = None,
    options: Optional[Dict] = None,
) -> Dict:
    """Create a ReferenceDataRequest dictionary request.

    Args:
        securities: list of strings of securities
        fields: list of strings of fields
        overrides: List of tuples containing the field to override and its value
        options: key value pairs to to set in request

    Returns: A dictionary representation of a blpapi.Request

    """
    if isinstance(securities, str):
        securities = [securities]
    if isinstance(fields, str):
        fields = [fields]
    values = {"securities": securities, "fields": fields}
    if options:
        values.update(options)
    return create_query("ReferenceDataRequest", values, overrides)


def create_intraday_tick_query(
    security: str,
    event_types: Sequence[str],
    start_datetime: str,
    end_datetime: str,
    overrides: Optional[Sequence] = None,
    options: Optional[Dict] = None,
) -> Dict:
    """Create an IntradayTickRequest.

    Args:
        security: Security name
        event_types: List of event types
          {TRADE, BID, ASK, BID_BEST, ASK_BEST, BID_YIELD, ASK_YIELD, MID_PRICE, AT_TRADE, BEST_BID}
        start_datetime: UTC datetime as '%Y%-m%-dTHH:MM:SS'
        end_datetime: UTC datetime as '%Y-%m-%dTHH:MM:SS'
        overrides: List of tuples containing the field to override and its value
        options: Key value pairs to to set in request

    Returns: A dictionary representation of a blpapi.Request

    """
    if isinstance(event_types, str):
        event_types = [event_types]
    values = {
        "security": security,
        "eventTypes": event_types,
        "startDateTime": start_datetime,
        "endDateTime": end_datetime,
    }
    if options:
        values.update(options)
    return create_query("IntradayTickRequest", values, overrides)


def create_intraday_bar_query(
    security: str,
    event_type: str,
    interval: int,
    start_datetime: str,
    end_datetime: str,
    overrides: Optional[Sequence] = None,
    options: Optional[Dict] = None,
) -> Dict:
    """Create an IntradayBarRequest dictionary request.

    Args:
        security: Security name
        event_type: Event type {TRADE, BID, ASK, BEST_BID, BEST_ASK}
        interval: Length in minutes of bars {1,...1440}
        start_datetime: UTC datetime as '%Y%-m%-dTHH:MM:SS'
        end_datetime: UTC datetime as '%Y-%m-%dTHH:MM:SS'
        overrides: List of tuples containing the field to override and its value
        options: Key value pairs to to set in request

    Returns: A dictionary representation of a blpapi.Request

    """
    values = {
        "security": security,
        "eventType": event_type,
        "startDateTime": start_datetime,
        "endDateTime": end_datetime,
        "interval": interval,
    }
    if options:
        values.update(options)
    return create_query("IntradayBarRequest", values, overrides)


def create_fills_query(start_datetime: str, end_datetime: str, uuids: Sequence[str]) -> Dict:
    """Create a GetFills dictionary request.

    Args:
        start_datetime: UTC datetime as '%Y%-m%-dTHH:MM:SS'
        end_datetime: UTC datetime as '%Y-%m-%dTHH:MM:SS'
        uuids: List of user uuids to get fills associated with

    Returns: A dictionary representation of a blpapi.Request

    """
    return {"GetFills": {"FromDateTime": start_datetime, "ToDateTime": end_datetime, "Scope": {"Uuids": uuids}}}


def create_field_list_query(field_type: Optional[str] = None, field_documentation: bool = True) -> Dict:
    """Create a FieldListRequest dictionary request.

    Args:
        field_type: One of {'All', 'Static', 'RealTime'}
        field_documentation: Return field documentation

    Returns: A dictionary representation of a blpapi.Request

    """
    field_type = field_type or "All"
    return {"FieldListRequest": {"fieldType": field_type, "returnFieldDocumentation": field_documentation}}


def create_instrument_list_query(values: Optional[Dict] = None) -> Dict:
    """Create an instrumentListRequest dictionary request.

    Args:
        values: Values to set in request

    Returns: A dictionary representation of a blpapi.Request

    """
    values = values or {}
    return create_query("instrumentListRequest", values)