Repository URL to install this package:
|
Version:
3.26.1.1 ▾
|
blpapi
/
session.py
|
|---|
# session.py
"""Provide consumer session to get Bloomberg Service.
This component implements a consumer session for getting services.
"""
from __future__ import annotations
from weakref import ref, ReferenceType
from typing import Optional, Callable, Any, List
import threading
from .names import Names
import ctypes
from enum import Enum
import logging
import asyncio
import os
import sys
import time
import traceback
from .message import Message
from .abstractsession import AbstractSession
from .event import Event
from . import exception
from .exception import _ExceptionUtil
from . import internals
from .correlationid import CorrelationId
from .sessionoptions import SessionOptions
from .requesttemplate import RequestTemplate
from .utils import get_handle, MetaClassForClassesWithEnums
from . import typehints # pylint: disable=unused-import
from .version import version
# pylint: disable=too-many-arguments,protected-access,bare-except
class SubscriptionPreprocessMode(Enum):
"""The modes that can be used for the :meth:`Session.subscribe()` and
:meth:`Session.resubscribe()` operations."""
FAIL_ON_FIRST_ERROR = 1
"""In this mode, an exception is raised on the first invalid entry in the
:class:`SubscriptionList`, and the entire batch will be considered failed.
Otherwise, all the subscriptions will move forward and the method returns
``None``."""
RETURN_INDIVIDUAL_ERRORS = 2
"""In this mode, instead of raising exceptions, a list of
:class:`SubscriptionPreprocessError` is returned, each representing an
error due to an invalid subscription in the :class:`SubscriptionList`. The
valid subscriptions will move forward."""
class Session(AbstractSession, metaclass=MetaClassForClassesWithEnums):
r"""Consumer session for making requests for Bloomberg services.
This class provides a consumer session for making requests for Bloomberg
services. For information on generic session operations, see the parent
class: :class:`AbstractSession`.
Sessions manage access to services either by requests and responses or
subscriptions. A Session can dispatch events and replies in either a
synchronous or asynchronous mode. The mode of a Session is determined when
it is constructed and cannot be changed subsequently.
A Session is asynchronous if an ``eventHandler`` argument is supplied when
it is constructed. The ``nextEvent()`` method may not be called. All
incoming events are delivered to the ``eventHandler`` supplied on
construction.
If supplied, ``eventHandler`` must be a callable object that takes two
arguments: received :class:`Event` and related session.
A Session is synchronous if an ``eventHandler`` argument is not supplied
when it is constructed. The :meth:`nextEvent()` method must be called to
read incoming events.
Several methods in Session take a :class:`CorrelationId` parameter. The
application may choose to supply its own :class:`CorrelationId` values or
allow the Session to create values. If the application supplies its own
:class:`CorrelationId` values it must manage their lifetime such that the
same value is not reused for more than one operation at a time. The
lifetime of a :class:`CorrelationId` begins when it is supplied in a method
invoked on a Session and ends either when it is explicitly cancelled using
:meth:`cancel()` or :meth:`unsubscribe()`, when a :attr:`~Event.RESPONSE`
:class:`Event` (not a :attr:`~Event.PARTIAL_RESPONSE`) containing it is
received or when a :attr:`~Event.SUBSCRIPTION_STATUS` :class:`Event` which
indicates that the subscription it refers to has been terminated is
received.
When using an asynchronous Session, the application must be aware that
because the callbacks are generated from another thread, they may be
processed before the call which generates them has returned. For example,
the :attr:`~Event.SESSION_STATUS` :class:`Event` generated by a
:meth:`startAsync()` may be processed before :meth:`startAsync()` has
returned (even though :meth:`startAsync()` itself will not block).
This becomes more significant when Session generated
:class:`CorrelationId`\s are in use. For example, if a call to
:meth:`subscribe()` which returns a Session generated
:class:`CorrelationId` has not completed before the first :class:`Event`\s
which contain that :class:`CorrelationId` arrive the application may not be
able to interpret those events correctly. For this reason, it is preferable
to use user generated :class:`CorrelationId`\s when using asynchronous
Sessions. This issue does not arise when using a synchronous Session as
long as the calls to :meth:`subscribe()` etc. are made on the same thread
as the calls to :meth:`nextEvent()`.
Events of a synchronous Session can be consumed using `for event in session`
statement. Note, this is not possible with an asynchronous Session
(an Exception is raised). If the user calls :meth:`nextEvent()` during or
after iterating over :class:`Session`, an Exception is raised.
A synchronous Session supports async compatible usage. The Session exposes an
awaitable function `awaitEvent`, which returns the next event without blocking,
and the possibility to iterate asynchronously over the Session, using the syntax
`async for event in session`. This enables iteration over the generated Events
without blocking. If the user calls :meth:`awaitEvent()` during or after
asynchronously iterating over the :class:`Session`, an Exception is raised.
The class attributes represent the states in which a subscription can be.
"""
UNSUBSCRIBED = internals.SUBSCRIPTIONSTATUS_UNSUBSCRIBED
"""No longer active, terminated by API."""
SUBSCRIBING = internals.SUBSCRIPTIONSTATUS_SUBSCRIBING
"""Initiated but no updates received."""
SUBSCRIBED = internals.SUBSCRIPTIONSTATUS_SUBSCRIBED
"""Updates are flowing."""
CANCELLED = internals.SUBSCRIPTIONSTATUS_CANCELLED
"""No longer active, terminated by Application."""
PENDING_CANCELLATION = internals.SUBSCRIPTIONSTATUS_PENDING_CANCELLATION
"""No longer active, terminated by Application."""
def __init__(
self,
options: Optional[SessionOptions] = None,
eventHandler: Optional[Callable[[Event, Session], None]] = None,
eventDispatcher: Optional["typehints.EventDispatcher"] = None,
) -> None:
"""Create a consumer :class:`Session`.
Args:
options: Options to construct the session with
eventHandler: Handler for events
generated by the session. Takes two arguments - received event
and related session
eventDispatcher: An optional dispatcher for events.
Raises:
InvalidArgumentException: If ``eventHandler`` is ``None`` and and
the ``eventDispatcher`` is not ``None``
If ``eventHandler`` is not ``None`` then this :class:`Session` will
operate in asynchronous mode, otherwise the :class:`Session` will
operate in synchronous mode.
If ``eventDispatcher`` is ``None`` then the :class:`Session` will
create a default :class:`EventDispatcher` for this :class:`Session`
which will use a single thread for dispatching events. For more control
over event dispatching a specific instance of :class:`EventDispatcher`
can be supplied. This can be used to share a single
:class:`EventDispatcher` amongst multiple :class:`Session` objects.
If an ``eventDispatcher`` is supplied which uses more than one thread
the :class:`Session` will ensure that events which should be ordered
are passed to callbacks in a correct order. For example, partial
response to a request or updates to a single subscription.
Each ``eventDispatcher`` uses it's own thread or pool of threads so if
you want to ensure that a session which receives very large messages
and takes a long time to process them does not delay a session that
receives small messages and processes each one very quickly then give
each one a separate ``eventDispatcher``.
Note:
In case of unhandled exception in ``eventHandler``, the exception
traceback will be printed to ``sys.stderr`` and application will be
terminated with nonzero exit code.
"""
# https://docs.python.org/3/library/sys.html#sys.argv
# argv[0] is the script name (it is operating system dependent whether
# this is a full pathname or not). If the command was executed using
# the -c command line option to the interpreter, argv[0] is set to the
# string '-c'. If no script name was passed to the Python interpreter,
# argv[0] is the empty string.
taskName = os.path.basename(sys.argv[0])
if taskName == "__main__.py":
# Try to get a parent folder name for '__main__.py'.
taskNameDir = os.path.basename(
os.path.dirname(os.path.abspath(sys.argv[0]))
)
if taskNameDir:
taskName = taskNameDir
if taskName and taskName != "-c":
internals.blpapi_UserAgentInfo_setUserTaskName(taskName)
internals.blpapi_UserAgentInfo_setNativeSdkLanguageAndVersion(
"Python", version()
)
self._consumerThread = None
self.__lastEventPopped = threading.Event()
self.__handler: Optional[Callable[[Event, Session], None]] = None
# Used list to be able to share data between threads. With an int that is not possible
self.__handler_thread_id: List[int] = []
if (eventHandler is None) and (eventDispatcher is not None):
raise exception.InvalidArgumentException(
"eventDispatcher is specified but eventHandler is None", 0
)
if options is None:
options = SessionOptions()
# Note __handle in Session is not the __handle
# in/of AbstractSession base class
self.__handle = internals.Session_createHelper(
get_handle(options),
get_handle(eventDispatcher),
)
if eventHandler is not None:
# pylint: disable=unused-private-member
self.__handler = eventHandler
session_weakref = ref(self)
# Passing the function as static method to avoid creating an extra reference
self._consumerThread = threading.Thread(
target=Session._thread_poll,
args=(
session_weakref,
self.__lastEventPopped,
eventHandler,
self.__handler_thread_id,
self.__handle,
),
daemon=True,
)
def _dtor(handle: Any) -> None:
internals.Session_destroyHelper(handle, None)
AbstractSession.__init__(
self,
self.__handle,
internals.blpapi_Session_getAbstractSession(self.__handle),
_dtor,
)
@staticmethod
def _thread_poll(
session_weakref: ReferenceType[Session],
stopCondition: threading.Event,
handler: Callable[[Event, Session], None],
thread_ids: List[int],
handle: Optional[ctypes.c_void_p],
) -> None:
r"""Logic for the thread handler.
Gets the next event with a timeout to avoid occupying the CPU
completely. Then the handler is called with the retrieved event.
"""
thread_ids.append(
threading.get_ident()
) # Save the thread ID for converting stop in stopAsync when necessary
try:
# we stop either if session is being destroyed -- signal from outside
# or after seeing a last event -- signal from _tryNextEvent in this thread
while not stopCondition.is_set():
if Session._scoped_event_consume(
session_weakref, handler, handle
):
stopCondition.set()
except:
session = session_weakref()
if session is not None:
# Shutting down gracefully if processing of event fails
# and the session is still alive
session._consumerThread = None
session.stop()
exception_data = sys.exc_info()
traceback.print_exc()
logging.log(
internals.blpapi_Logging_SEVERITY_INFO, f"{exception_data}"
)
os._exit(1)
@staticmethod
def _scoped_event_consume(
session_weakref: ReferenceType[Session],
handler: Callable[[Event, Session], None],
handle: Optional[ctypes.c_void_p],
) -> bool:
"""Try to poll a single event from Session in the Thread.
We do not want _thread_poll to maintain a strong reference to the session.
Session references temporary so that if there are no references to the session outside
of the thread_poll, we can expect the destructor to be called. If we maintained a
strong session reference in that function, and would result in a situation where the reference
count of session would never drop to 0 even if there are no more references
to the session in the user code.
Returns True if tryNextEvent will never produce additional events.
"""
retCode, event = internals.blpapi_Session_tryNextEvent(handle)
if retCode:
if retCode == internals.ERROR_ILLEGAL_STATE:
return True
time.sleep(0.005) # Avoid high CPU consumption
return False
session = session_weakref()
# If we want to dereference a destroyed Session, we get None
# as of https://docs.python.org/3/library/weakref.html#weakref.ref
if session is not None:
event = Event(event, {session})
handler(event, session)
return False
def _session_handle(self) -> Any:
"""This is for internal implementation only"""
# `_handle()` returns `blpapi_AbstractSession_t *`, but there are times
# when we need `blpapi_Session_t *` instead.
return self.__handle
def start(self) -> bool:
"""Start this :class:`Session` in synchronous mode.
Returns:
``True`` if the :class:`Session` started successfully,
``False`` otherwise.
Attempt to start this :class:`Session` and block until the
:class:`Session` has started or failed to start. Before
:meth:`start()` returns a :attr:`~Event.SESSION_STATUS` :class:`Event`
is generated. A :class:`Session` may only be started once.
"""
return self.__start(False)
def startAsync(self) -> bool:
"""Start this :class:`Session` in asynchronous mode.
Returns:
``True`` if the process to start a :class:`Session` began
successfully, ``False`` otherwise.
Attempt to begin the process to start this :class:`Session`. The
application must monitor events for a :attr:`~Event.SESSION_STATUS`
:class:`Event` which will be generated once the :class:`Session` has
started or if it fails to start. The :attr:`~Event.SESSION_STATUS`
:class:`Event` may be processed by the registered ``eventHandler``
before :meth:`startAsync()` has returned. A :class:`Session` may only
be started once.
"""
return self.__start(True)
def __start(self, is_async: bool) -> bool:
"""Helper to start Session.
If the start call fails, we drain the Queue until we get the
SESSION_STARTUP_FAILURE message, and elaborate the Event
with the eventHandler.
"""
if is_async:
started_successfully = (
internals.blpapi_Session_startAsync(self.__handle) == 0
)
else:
started_successfully = (
internals.blpapi_Session_start(self.__handle) == 0
)
if self._consumerThread and self.__handler is not None:
if started_successfully:
self._consumerThread.start()
else:
self.__deliver_events_without_consumerThread()
return started_successfully
def __deliver_events_without_consumerThread(self) -> None:
"""Deliver the events without starting the thread if startup fails.
If the startup process fails, is better draining the queue without
starting the consumer thread to then handle this extra case, but instead
manage it in the user thread, initially.
"""
assert self.__handler is not None
retrieved_termination_message = False
while not retrieved_termination_message:
event = self._nextEvent()
self.__handler(event, self)
if event.eventType() == Event.SESSION_STATUS:
message: Message
for message in event:
if message.messageType() == Names.SESSION_STARTUP_FAILURE:
retrieved_termination_message = True
break
def stop(self) -> bool:
"""Stop operation of this :class:`Session` and wait until it stops.
Returns:
``True`` if the :class:`Session` stopped successfully,
``False`` otherwise.
Stop operation of this :class:`Session` and block until all callbacks
to ``eventHandler`` objects relating to this :class:`Session` which are
currently in progress have completed (including the callback to handle
the :class:`~Event.SESSION_STATUS` :class:`Event` this call generates).
Once this returns no further callbacks to ``eventHandlers`` will occur.
If :meth:`stop()` is called from within an ``eventHandler`` callback it
is silently converted to a :meth:`stopAsync()` in order to prevent
deadlock. Once a :class:`Session` has been stopped it can only be
destroyed.
"""
# Check if called from the handler
thread_id = threading.get_ident()
if (
len(self.__handler_thread_id) > 0
and thread_id == self.__handler_thread_id[0]
):
return self.stopAsync()
res = internals.blpapi_Session_stop(self.__handle) == 0
if (
self._consumerThread is not None
and self._consumerThread.is_alive()
):
self._consumerThread.join()
return res
def stopAsync(self) -> bool:
"""Begin the process to stop this Session and return immediately.
Returns:
``True`` if the process to stop a :class:`Session` began
successfully, ``False`` otherwise.
The application must monitor events for a :attr:`~Event.SESSION_STATUS`
:class:`Event` which will be generated once the :class:`Session` has
been stopped. After this :attr:`~Event.SESSION_STATUS` :class:`Event`
no further callbacks to ``eventHandlers`` will occur. Once a
:class:`Session` has been stopped it can only be destroyed.
"""
return internals.blpapi_Session_stopAsync(self.__handle) == 0
async def _awaitEvent(self) -> Event:
if self.__handler is not None:
raise exception.InvalidStateException(
"The use of generators is not supported on asynchronous sessions",
internals.ERROR_ILLEGAL_STATE,
)
event = None
while event is None and not self.__lastEventPopped.is_set():
event = self._tryNextEvent()
if event is None:
await asyncio.sleep(0.005)
if self.__lastEventPopped.is_set():
raise StopAsyncIteration
assert event is not None
return event
async def awaitEvent(self) -> Event:
r"""
Returns:
Event: Next available event for this session
Raises:
InvalidStateException: If invoked on a session created in
asynchronous mode
Await until there is not an event or the session is closed. This
method does not block. If the :class:`Session` is async, then it
raises the same error, as synchronous `nextEvent()`
"""
return await self._awaitEvent()
def _nextEvent(self, timeout: int = 0) -> Event:
"""Call to C++ bindings to get the optional event without waiting if not
present. This function was created to encapsulate this logic and be able to
check the type of the :class:`Session`, to see if the user could use the method
"""
retCode, event = internals.blpapi_Session_nextEvent(
self.__handle, timeout
)
_ExceptionUtil.raiseOnError(retCode)
return Event(event, {self})
def nextEvent(self, timeout: int = 0) -> Event:
"""
Args:
timeout: Timeout threshold in milliseconds
Returns:
Event: Next available event for this session
Raises:
InvalidStateException: If invoked on a session created in
asynchronous mode
If there is no :class:`Event` available this will block for up to the
specified ``timeout`` milliseconds for an :class:`Event` to arrive. A
value of ``0`` for ``timeout`` (the default) indicates
:meth:`nextEvent()` should not timeout and will not return until the
next :class:`Event` is available.
If :meth:`nextEvent()` returns due to a timeout it will return an event
of type :attr:`~Event.TIMEOUT`.
"""
if self.__handler is not None:
raise exception.InvalidStateException(
"The use of generators is not supported on asynchronous sessions",
internals.ERROR_ILLEGAL_STATE,
)
return self._nextEvent(timeout)
def _tryNextEvent(self) -> Optional[Event]:
"""Call to C++ bindings to get the optional event without waiting if not
present. This function was created to encapsulate this logic and be able to
check the type of the :class:`Session`, to see if the user could use the method
"""
retCode, event = internals.blpapi_Session_tryNextEvent(self.__handle)
if retCode:
if retCode == internals.ERROR_ILLEGAL_STATE:
self.__lastEventPopped.set()
return None
retEvent = Event(event, {self})
return retEvent
def tryNextEvent(self) -> Optional[Event]:
r"""
Returns:
Event: Next available event for this session
If there are :class:`Event`\s available for the session, return the
next :class:`Event` If there is no event available for the
:class:`Session`, return ``None``. This method never blocks.
"""
if self.__handler is not None:
return None
return self._tryNextEvent()
@staticmethod
def _createErrorAppender(
errorList: List[SubscriptionPreprocessError],
) -> Callable:
def errorAppender(
correlationId: CorrelationId,
subscriptionString: str,
errorCode: int,
description: str,
) -> None:
errorList.append(
SubscriptionPreprocessError(
correlationId,
subscriptionString,
SubscriptionPreprocessError.ErrorCode(errorCode),
description,
)
)
return errorAppender
def subscribe(
self,
subscriptionList: "typehints.SubscriptionList",
identity: Optional["typehints.Identity"] = None,
requestLabel: Optional[str] = None,
mode: SubscriptionPreprocessMode = SubscriptionPreprocessMode.FAIL_ON_FIRST_ERROR,
) -> Optional[List[SubscriptionPreprocessError]]:
"""Begin subscriptions for each entry in the specified list.
Args:
subscriptionList: List of subscriptions to begin
identity: Identity used for authorization
requestLabel: String which will be recorded along with any
diagnostics for this operation
mode: Mode to use for this operation.
See :class:`SubscriptionPreprocessMode` for an explanation of
the available modes.
Returns:
If mode is :attr:`~SubscriptionPreprocessMode.FAIL_ON_FIRST_ERROR`,
then ``None`` is returned. If mode is
:attr:`~SubscriptionPreprocessMode.RETURN_INDIVIDUAL_ERRORS`,
then a list of :class:`SubscriptionPreprocessError` is
returned.
Begin subscriptions for each entry in the specified
``subscriptionList``, which must be an object of type
:class:`SubscriptionList`, optionally using the specified ``identity``
for authorization. If no ``identity`` is specified, the default
authorization information is used. If the optional ``requestLabel`` is
provided it defines a string which will be recorded along with any
diagnostics for this operation.
A :attr:`~Event.SUBSCRIPTION_STATUS` :class:`Event` will be generated
for each entry in the ``subscriptionList``.
When ``identity`` is not provided, the session identity will be used if
it has been authorized.
"""
subMode = SubscriptionPreprocessMode(mode)
if subMode == SubscriptionPreprocessMode.FAIL_ON_FIRST_ERROR:
_ExceptionUtil.raiseOnError(
internals.blpapi_Session_subscribe(
self.__handle,
get_handle(subscriptionList),
get_handle(identity),
requestLabel,
)
)
elif subMode == SubscriptionPreprocessMode.RETURN_INDIVIDUAL_ERRORS:
errorList: List[SubscriptionPreprocessError] = []
errorAppender = Session._createErrorAppender(errorList)
_ExceptionUtil.raiseOnError(
internals.blpapi_Session_subscribeEx_helper(
self.__handle,
get_handle(subscriptionList),
get_handle(identity),
requestLabel,
errorAppender,
)
)
return errorList
else:
raise exception.InvalidArgumentException(
"Unsupported SubscriptionPreprocessMode: '{}'".format(mode), 0
)
return None
def unsubscribe(
self, subscriptionList: "typehints.SubscriptionList"
) -> None:
"""Cancel subscriptions from the specified ``subscriptionList``.
Args:
subscriptionList: List of subscriptions to cancel
Cancel each of the current subscriptions identified by the specified
``subscriptionList``, which must be an object of type
:class:`SubscriptionList`. If the correlation ID of any entry in the
``subscriptionList`` does not identify a current subscription then that
entry is ignored. All entries which have valid correlation IDs will be
cancelled.
Once this call returns the correlation ids in the ``subscriptionList``
will not be seen in any subsequent :class:`Message` obtained from a
``MessageIterator`` by calling ``next()`` However, any :class:`Message`
currently pointed to by a ``MessageIterator`` when
:meth:`unsubscribe()` is called is not affected even if it has one of
the correlation IDs in the ``subscriptionList``. Also any
:class:`Message` where a reference has been retained by the application
may still contain a correlation ID from the ``subscriptionList``. For
these reasons, although technically an application is free to re-use
the correlation IDs as soon as this method returns it is preferable not
to aggressively re-use correlation IDs, particularly with an
asynchronous :class:`Session`.
"""
_ExceptionUtil.raiseOnError(
internals.blpapi_Session_unsubscribe(
self.__handle, get_handle(subscriptionList), None
)
)
def resubscribe(
self,
subscriptionList: "typehints.SubscriptionList",
requestLabel: Optional[str] = None,
resubscriptionId: Optional[int] = None,
mode: SubscriptionPreprocessMode = SubscriptionPreprocessMode.FAIL_ON_FIRST_ERROR,
) -> Optional[List[SubscriptionPreprocessError]]:
"""Modify subscriptions in ``subscriptionList``.
Args:
subscriptionList: List of subscriptions to modify
requestLabel: String which will be recorded along with any
diagnostics for this operation
resubscriptionId: An id that will be included in the event
generated from this operation
mode: Mode to use for this operation.
See :class:`SubscriptionPreprocessMode` for an explanation of
the available modes.
Returns:
If mode is :attr:`~SubscriptionPreprocessMode.FAIL_ON_FIRST_ERROR`,
then ``None`` is returned. If mode is
:attr:`~SubscriptionPreprocessMode.RETURN_INDIVIDUAL_ERRORS`,
then a list of :class:`SubscriptionPreprocessError` is
returned.
Modify each subscription in the specified ``subscriptionList``, which
must be an object of type :class:`SubscriptionList`, to reflect the
modified options specified for it. If the optional ``requestLabel`` is
provided it defines a string which will be recorded along with any
diagnostics for this operation.
For each entry in the ``subscriptionList`` which has a correlation ID
which identifies a current subscription the modified options replace
the current options for the subscription and a
:attr:`~Event.SUBSCRIPTION_STATUS` :class:`Event` (containing the
``resubscriptionId`` if specified) will be generated in the event
stream before the first update based on the new options. If the
correlation ID of an entry in the ``subscriptionList`` does not
identify a current subscription then that entry is ignored.
"""
subMode = SubscriptionPreprocessMode(mode)
if subMode == SubscriptionPreprocessMode.FAIL_ON_FIRST_ERROR:
if resubscriptionId is None:
_ExceptionUtil.raiseOnError(
internals.blpapi_Session_resubscribe(
self.__handle,
get_handle(subscriptionList),
requestLabel,
)
)
else:
_ExceptionUtil.raiseOnError(
internals.blpapi_Session_resubscribeWithId(
self.__handle,
get_handle(subscriptionList),
resubscriptionId, # an int, not a cid
requestLabel,
)
)
elif subMode == SubscriptionPreprocessMode.RETURN_INDIVIDUAL_ERRORS:
errorList: List[SubscriptionPreprocessError] = []
errorAppender = Session._createErrorAppender(errorList)
if resubscriptionId is None:
_ExceptionUtil.raiseOnError(
internals.blpapi_Session_resubscribeEx_helper(
self.__handle,
get_handle(subscriptionList),
requestLabel,
errorAppender,
)
)
else:
_ExceptionUtil.raiseOnError(
internals.blpapi_Session_resubscribeWithIdEx_helper(
self.__handle,
get_handle(subscriptionList),
resubscriptionId, # an int, not a cid
requestLabel,
errorAppender,
)
)
return errorList
else:
raise exception.InvalidArgumentException(
"Unsupported SubscriptionPreprocessMode: '{}'".format(mode), 0
)
return None
def setStatusCorrelationId(
self,
service: "typehints.Service",
correlationId: CorrelationId,
identity: Optional["typehints.Identity"] = None,
) -> None:
"""Set the Correlation id on which service status messages will be
received.
Args:
service: The service from which the status messages
are received
correlationId: Correlation id to associate with the
service status messages
identity: Identity used for authorization
Note:
No service status messages are received prior to this call
"""
_ExceptionUtil.raiseOnError(
internals.blpapi_Session_setStatusCorrelationId(
self.__handle,
get_handle(service),
get_handle(identity),
correlationId,
)
)
def sendRequest(
self,
request: "typehints.Request",
identity: Optional["typehints.Identity"] = None,
correlationId: Optional[CorrelationId] = None,
eventQueue: Optional["typehints.EventQueue"] = None,
requestLabel: Optional[str] = None,
) -> CorrelationId:
r"""Send the specified ``request``.
Args:
request: Request to send
identity: Identity used for authorization
correlationId: Correlation id to associate with the
request
eventQueue: Event queue on which the events related to
this operation will arrive
requestLabel: String which will be recorded along with any
diagnostics for this operation
Returns:
CorrelationId: The actual correlation id associated with the
request
Send the specified ``request`` using the optionally specified
``identity`` for authorization. If ``identity`` is not provided, then
the request will be sent using the session identity. If the optionally
specified ``correlationId`` is supplied use it, otherwise create a
:class:`CorrelationId`. The actual :class:`CorrelationId` used is
returned. If the optionally specified ``eventQueue`` is supplied all
events relating to this :class:`Request` will arrive on that
:class:`EventQueue`. If the optional ``requestLabel`` is provided it
defines a string which will be recorded along with any diagnostics for
this operation.
A successful request will generate zero or more
:class:`~Event.PARTIAL_RESPONSE` :class:`Message`\s followed by
exactly one :class:`~Event.RESPONSE` :class:`Message`. Once the final
:class:`~Event.RESPONSE` :class:`Message` has been received the
:class:`CorrelationId` associated with this request may be re-used. If
the request fails at any stage a :class:`~Event.REQUEST_STATUS` will be
generated after which the :class:`CorrelationId` associated with the
request may be re-used.
When ``identity`` is not provided, the session identity will be used if
it has been authorized.
"""
if correlationId is None:
correlationId = CorrelationId()
res = internals.blpapi_Session_sendRequest(
self.__handle,
get_handle(request),
correlationId,
get_handle(identity),
get_handle(eventQueue),
requestLabel,
)
_ExceptionUtil.raiseOnError(res)
if eventQueue is not None:
eventQueue._registerSession(self)
return correlationId
def sendRequestTemplate(
self,
requestTemplate: RequestTemplate,
correlationId: Optional[CorrelationId] = None,
) -> CorrelationId:
"""Send a request defined by the specified ``requestTemplate``.
Args:
requestTemplate: Template that defines the
request
correlationId: Correlation id to associate with the
request
Returns:
CorrelationId: The actual correlation id used for the request is
returned.
If the optionally specified ``correlationId`` is supplied, use it
otherwise create a new :class:`CorrelationId`. The actual
:class:`CorrelationId` used is returned.
A successful request will generate zero or more
:attr:`~Event.PARTIAL_RESPONSE` events followed by exactly one
:attr:`~Event.RESPONSE` event. Once the final :attr:`~Event.RESPONSE`
event has been received the ``CorrelationId`` associated with this
request may be re-used. If the request fails at any stage a
:attr:`~Event.REQUEST_STATUS` will be generated after which the
``CorrelationId`` associated with the request may be re-used.
"""
if correlationId is None:
correlationId = CorrelationId()
res = internals.blpapi_Session_sendRequestTemplate(
self.__handle, get_handle(requestTemplate), correlationId
)
_ExceptionUtil.raiseOnError(res)
return correlationId
def createSnapshotRequestTemplate(
self,
subscriptionString: str,
correlationId: CorrelationId,
identity: Optional["typehints.Identity"] = None,
) -> RequestTemplate:
"""Create a snapshot request template for getting subscription data
specified by the ``subscriptionString`` using the specified
``identity``.
Args:
subscriptionString: String that specifies the subscription
correlationId: Correlation id to associate with
events generated by this operation
identity: Optional. Identity used for authorization.
Returns:
RequestTemplate: Created request template
Raises:
Exception: If one or more of the following conditions is not met:
the session is established, ``subscriptionString`` is a valid
subscription string and ``correlationId`` is not used in this
session.
The provided ``correlationId`` will be used for status updates about
the created request template state and an implied subscription
associated with it delivered by :attr:`~Event.SUBSCRIPTION_STATUS`
events.
The benefit of the snapshot request templates is that these requests
may be serviced from a cache and the user may expect to see
significantly lower response time.
There are 3 possible states for a created request template:
``Pending``, ``Available``, and ``Terminated``. Right after creation a
request template is in the ``Pending`` state.
If a state is ``Pending``, the user may send a request using this
template but there are no guarantees about response time since cache
is not available yet. Request template may transition into ``Pending``
state only from the ``Available`` state. In this case the
``RequestTemplatePending`` message is generated.
If state is ``Available``, all requests will be serviced from a cache
and the user may expect to see significantly reduced latency. Note,
that a snapshot request template can transition out of the
``Available`` state concurrently with requests being sent, so no
guarantee of service from the cache can be provided. Request
template may transition into ``Available`` state only from the
``Pending`` state. In this case the ``RequestTemplateAvailable`` message
is generated. This message will also contain information about
currently used connection in the ``boundTo`` field. Note that it is
possible to get the ``RequestTemplateAvailable`` message with a new
connection information, even if a request template is already in the
``Available`` state.
If state is ``Terminated``, sending request will always result in a
failure response. Request template may transition into this state from
any other state. This is a final state and it is guaranteed that the
last message associated with the provided ``correlationId`` will be the
``RequestTemplateTerminated`` message which is generated when a request
template transitions into this state. If a request template transitions
into this state, all outstanding requests will be failed and
appropriate messages will be generated for each request. After
receiving the ``RequestTemplateTerminated`` message, ``correlationId``
may be reused.
Note that resources used by a snapshot request template are released
only when request template transitions into the ``Terminated`` state
or when session is destroyed. In order to release resources when
request template is not needed anymore, user should call the
:meth:`cancel()` method unless the ``RequestTemplateTerminated``
message was already received due to some problems. When the last
copy of a ``RequestTemplate`` object goes out of scope and there are
no outstanding requests left, the snapshot request template will be
destroyed automatically. If the last copy of a ``RequestTemplate``
object goes out of scope while there are still some outstanding
requests left, snapshot service request template will be destroyed
automatically when the last request gets a final response.
When ``identity`` is ``None``, the session identity will be used if it
has been authorized.
"""
# We changed the order of last two arguments, but
# old clients may have them swapped at call site.
# This detects the swap and calls the method correctly.
# It causes mypy to complain about an unreachable statement
# so we add # type: ignore to suppress it
# Note: cid may never be None, only identity is allowed None
# Hence, in the swapped case identity must be of type CorrelationId
if isinstance(correlationId, CorrelationId):
(
rc,
template,
) = internals.blpapi_Session_createSnapshotRequestTemplate(
self.__handle,
subscriptionString,
get_handle(identity),
correlationId,
)
elif isinstance(identity, CorrelationId): # type: ignore
(
rc,
template,
) = internals.blpapi_Session_createSnapshotRequestTemplate(
self.__handle,
subscriptionString,
get_handle(correlationId),
identity,
)
else:
raise exception.InvalidArgumentException(
"Invalid CorrelationId", 0
)
_ExceptionUtil.raiseOnError(rc)
reqTemplate = RequestTemplate(template)
return reqTemplate
def _enable_safe_iteration(self) -> None:
r"""Invalidate :class:`Event` getters in :class:`Session`
to avoid causing race conditions at runtime
"""
def nextEvent_blocker(*_: Any) -> None:
raise exception.InvalidStateException(
"Cannot get Events after iteration has started",
internals.ERROR_ILLEGAL_STATE,
)
async def awaitEvent_blocker(*_: Any) -> None:
nextEvent_blocker()
setattr(self, "nextEvent", nextEvent_blocker)
setattr(self, "awaitEvent", awaitEvent_blocker)
setattr(self, "tryNextEvent", lambda *_: None)
def __aiter__(self) -> Session:
r"""Makes :class:`Session` an `AsyncIterable` object,
allowing user to get event asynchronously"""
self._enable_safe_iteration()
return self
async def __anext__(self) -> Event:
r"""Yield the next event that is available, while allowing other
coroutines to run if waiting.
"""
return await self._awaitEvent()
def __iter__(self) -> Session:
"""Start iterator for :class:`Session`"""
self._enable_safe_iteration()
return self
def __next__(self) -> Optional[Event]:
"""Yield next event for :class:`Session`"""
if self.__lastEventPopped.is_set():
raise StopIteration
event = self._tryNextEvent()
return event
def __del__(self) -> None:
"""Manage the deletion of session, making sure that it is stopped"""
# Stop iteration of polling thread to avoid any other thread getting a strong reference
# of the Session while deleting
# Users that didn't call stop before session was destroyed
# may not be seeing session terminated.
# This code will be further modified when tryNextEvent is changed in C++
self.__lastEventPopped.set()
is_async_session_started = (
self.__handler is not None
and self._consumerThread is not None
and self._consumerThread.is_alive()
)
called_from_polling_thread = (
len(self.__handler_thread_id) > 0
and threading.get_ident() == self.__handler_thread_id[0]
)
# Check that the thread has completed
if (
is_async_session_started
and not called_from_polling_thread
and self._consumerThread is not None
):
self._consumerThread.join()
return (
super().__del__()
) # Delete the handle. Calls `stop()` in C++ directly
class SubscriptionPreprocessError:
"""Class representing an error due to an invalid subscription."""
class ErrorCode(Enum):
"""The error codes used by :class:`SubscriptionPreprocessError`."""
SUBSCRIPTIONPREPROCESS_INVALID_SUBSCRIPTION_STRING = (
internals.SUBSCRIPTIONPREPROCESS_INVALID_SUBSCRIPTION_STRING
)
"""Error due to an invalid subscription string."""
SUBSCRIPTIONPREPROCESS_CORRELATIONID_ERROR = (
internals.SUBSCRIPTIONPREPROCESS_CORRELATIONID_ERROR
)
"""Error due to misuse of correlation id, such as using a duplicate."""
def __init__(
self,
correlationId: CorrelationId,
subscriptionString: str,
errorCode: SubscriptionPreprocessError.ErrorCode,
description: str,
) -> None:
self.correlationId = correlationId
self.subscriptionString = subscriptionString
self.errorCode = errorCode
self.description = description
def __str__(self) -> str:
return (
f"{{correlationId: {self.correlationId}"
f", subscriptionString: {self.subscriptionString}"
f", code: {self.errorCode}"
f", description: {self.description}}}"
)
__copyright__ = """
Copyright 2012. Bloomberg Finance L.P.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to
deal in the Software without restriction, including without limitation the
rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
sell copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions: The above
copyright notice and this permission notice shall be included in all copies
or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
IN THE SOFTWARE.
"""