Repository URL to install this package:
Version:
1.1.1 ▾
|
async-promises
/
async_promises.py
|
---|
from functools import partial
from threading import Thread, Event, RLock
from sys import version_info
from .compat import Future, iscoroutine, ensure_future, iterate_promise # type: ignore
from typing import Callable, Optional, Iterator, Any, Dict # flake8: noqa
class CountdownLatch(object):
__slots__ = ('_lock', 'count')
def __init__(self, count):
# type: (CountdownLatch, int) -> None
assert count >= 0, "count needs to be greater or equals to 0. Got: %s" % count
self._lock = RLock()
self.count = count
def dec(self):
# type: (CountdownLatch) -> int
with self._lock:
assert self.count > 0, "count needs to be greater or equals to 0. Got: %s" % self.count
self.count -= 1
# Return inside lock to return the correct value,
# otherwise an other thread could already have
# decremented again.
return self.count
class Promise(object):
"""
This is the Promise class that complies
Promises/A+ specification and test suite:
http://promises-aplus.github.io/promises-spec/
"""
__slots__ = ('state', 'value', 'reason', '_cb_lock', '_callbacks', '_errbacks', '_event', '_future', '_thread')
# These are the potential states of a promise
PENDING = -1
REJECTED = 0
FULFILLED = 1
def __init__(self, fn=None):
# type: (Promise, Callable) -> None
"""
Initialize the Promise into a pending state.
"""
self.state = self.PENDING # type: int
self.value = None # type: Any
self.reason = None # type: Optional[Exception]
self._cb_lock = RLock()
self._callbacks = [] # type: List[Callable]
self._errbacks = [] # type: List[Callable]
self._event = Event()
self._future = None # type: Optional[Future]
if fn:
self._thread = Thread(target=self.do_resolve, args=(fn,))
self._thread.start()
def __iter__(self):
# type: (Promise) -> Iterator
return iterate_promise(self)
__await__ = __iter__
@property
def future(self):
# type: (Promise) -> Future
if not self._future:
self._future = Future()
self.add_callback(self._future.set_result)
self.add_errback(self._future.set_exception)
return self._future
def do_resolve(self, fn):
try:
fn(self.fulfill, self.reject)
except Exception as e:
self.reject(e)
@classmethod
def fulfilled(cls, x):
# type: (Any) -> Promise
p = cls()
p.fulfill(x)
return p
@classmethod
def rejected(cls, reason):
# type: (Any) -> Promise
p = cls()
p.reject(reason)
return p
def fulfill(self, x):
# type: (Promise, Any) -> None
"""
Fulfill the promise with a given value.
"""
if self is x:
raise TypeError("Cannot resolve promise with itself.")
elif is_thenable(x):
try:
self.promisify(x).done(self.fulfill, self.reject)
except Exception as e:
self.reject(e)
else:
self._fulfill(x)
resolve = fulfilled
def _fulfill(self, value):
# type: (Promise, Any) -> None
with self._cb_lock:
if self.state != self.PENDING:
return
self.value = value
self.state = self.FULFILLED
callbacks = self._callbacks
# We will never call these callbacks again, so allow
# them to be garbage collected. This is important since
# they probably include closures which are binding variables
# that might otherwise be garbage collected.
#
# Prevent future appending
self._callbacks = None
# Notify all waiting
self._event.set()
for callback in callbacks:
try:
callback(value)
except Exception:
# Ignore errors in callbacks
pass
def reject(self, reason):
# type: (Promise, Exception) -> None
"""
Reject this promise for a given reason.
"""
assert isinstance(reason, Exception), ("The reject function needs to be called with an Exception. "
"Got {}".format(reason))
with self._cb_lock:
if self.state != self.PENDING:
return
self.reason = reason
self.state = self.REJECTED
errbacks = self._errbacks
# We will never call these errbacks again, so allow
# them to be garbage collected. This is important since
# they probably include closures which are binding variables
# that might otherwise be garbage collected.
#
# Prevent future appending
self._errbacks = None
# Notify all waiting
self._event.set()
for errback in errbacks:
try:
errback(reason)
except Exception:
# Ignore errors in errback
pass
@property
def is_pending(self):
# type: (Promise) -> bool
"""Indicate whether the Promise is still pending. Could be wrong the moment the function returns."""
return self.state == self.PENDING
@property
def is_fulfilled(self):
# type: (Promise) -> bool
"""Indicate whether the Promise has been fulfilled. Could be wrong the moment the function returns."""
return self.state == self.FULFILLED
@property
def is_rejected(self):
# type: (Promise) -> bool
"""Indicate whether the Promise has been rejected. Could be wrong the moment the function returns."""
return self.state == self.REJECTED
def get(self, timeout=None):
# type: (Promise, float) -> Any
"""Get the value of the promise, waiting if necessary."""
if timeout is None and version_info[0] == 2:
self.wait(float("Inf"))
else:
self.wait(timeout)
if self.state == self.PENDING:
raise ValueError("Value not available, promise is still pending")
elif self.state == self.FULFILLED:
return self.value
raise self.reason
def wait(self, timeout=None):
# type: (Promise, float) -> None
"""
An implementation of the wait method which doesn't involve
polling but instead utilizes a "real" synchronization
scheme.
"""
self._event.wait(timeout)
def add_callback(self, f):
# type: (Promise, Callable) -> None
"""
Add a callback for when this promis is fulfilled. Note that
if you intend to use the value of the promise somehow in
the callback, it is more convenient to use the 'then' method.
"""
assert callable(f), "A function needs to be passed into add_callback. Got: {}".format(f)
with self._cb_lock:
if self.state == self.PENDING:
self._callbacks.append(f)
return
# This is a correct performance optimization in case of concurrency.
# State can never change once it is not PENDING anymore and is thus safe to read
# without acquiring the lock.
if self.state == self.FULFILLED:
f(self.value)
def add_errback(self, f):
# type: (Promise, Callable) -> None
"""
Add a callback for when this promis is rejected. Note that
if you intend to use the rejection reason of the promise
somehow in the callback, it is more convenient to use
the 'then' method.
"""
assert callable(f), "A function needs to be passed into add_errback. Got: {}".format(f)
with self._cb_lock:
if self.state == self.PENDING:
self._errbacks.append(f)
return
# This is a correct performance optimization in case of concurrency.
# State can never change once it is not PENDING anymore and is thus safe to read
# without acquiring the lock.
if self.state == self.REJECTED:
f(self.reason)
def catch(self, on_rejection):
# type: (Promise, Callable) -> Promise
"""
This method returns a Promise and deals with rejected cases only.
It behaves the same as calling Promise.then(None, on_rejection).
"""
return self.then(None, on_rejection)
def done(self, success=None, failure=None):
# type: (Promise, Callable, Callable) -> None
"""
This method takes two optional arguments. The first argument
is used if the "self promise" is fulfilled and the other is
used if the "self promise" is rejected. In contrast to then,
the return value of these callback is ignored and nothing is
returned.
"""
with self._cb_lock:
if success is not None:
self.add_callback(success)
if failure is not None:
self.add_errback(failure)
def done_all(self, handlers=None):
# type: (Promise, List[Callable]) -> List[Promise]
"""
:type handlers: list[(Any) -> object] | list[((Any) -> object, (Any) -> object)]
"""
if not handlers:
return []
for handler in handlers:
if isinstance(handler, tuple):
s, f = handler
self.done(s, f)
elif isinstance(handler, dict):
s = handler.get('success')
f = handler.get('failure')
self.done(s, f)
else:
self.done(success=handler)
def then(self, success=None, failure=None):
# type: (Promise, Callable, Callable) -> Promise
"""
This method takes two optional arguments. The first argument
is used if the "self promise" is fulfilled and the other is
used if the "self promise" is rejected. In either case, this
method returns another promise that effectively represents
the result of either the first of the second argument (in the
case that the "self promise" is fulfilled or rejected,
respectively).
Each argument can be either:
* None - Meaning no action is taken
* A function - which will be called with either the value
of the "self promise" or the reason for rejection of
the "self promise". The function may return:
* A value - which will be used to fulfill the promise
returned by this method.
* A promise - which, when fulfilled or rejected, will
cascade its value or reason to the promise returned
by this method.
* A value - which will be assigned as either the value
or the reason for the promise returned by this method
when the "self promise" is either fulfilled or rejected,
respectively.
:type success: (Any) -> object
:type failure: (Any) -> object
:rtype : Promise
"""
ret = self.__class__()
def call_and_fulfill(v):
"""
A callback to be invoked if the "self promise"
is fulfilled.
"""
try:
if callable(success):
ret.fulfill(success(v))
else:
ret.fulfill(v)
except Exception as e:
ret.reject(e)
def call_and_reject(r):
"""
A callback to be invoked if the "self promise"
is rejected.
"""
try:
if callable(failure):
ret.fulfill(failure(r))
else:
ret.reject(r)
except Exception as e:
ret.reject(e)
self.done(call_and_fulfill, call_and_reject)
return ret
def then_all(self, handlers=None):
# type: (Promise, List[Callable]) -> List[Promise]
"""
Utility function which calls 'then' for each handler provided. Handler can either
be a function in which case it is used as success handler, or a tuple containing
the success and the failure handler, where each of them could be None.
:type handlers: list[(Any) -> object] | list[((Any) -> object, (Any) -> object)]
:param handlers
:rtype : list[Promise]
"""
if not handlers:
return []
promises = [] # type: List[Promise]
for handler in handlers:
if isinstance(handler, tuple):
s, f = handler
promises.append(self.then(s, f))
elif isinstance(handler, dict):
s = handler.get('success')
f = handler.get('failure')
promises.append(self.then(s, f))
else:
promises.append(self.then(success=handler))
return promises
@classmethod
def all(cls, values_or_promises):
# Type: (Iterable[Promise, Any]) -> Promise
"""
A special function that takes a bunch of promises
and turns them into a promise for a vector of values.
In other words, this turns an list of promises for values
into a promise for a list of values.
"""
_len = len(values_or_promises)
if _len == 0:
return cls.fulfilled(values_or_promises)
promises = (cls.promisify(v_or_p) if is_thenable(v_or_p) else cls.resolve(v_or_p) for
v_or_p in values_or_promises) # type: Iterator[Promise]
all_promise = cls() # type: Promise
counter = CountdownLatch(_len)
values = [None] * _len # type: List[Any]
def handle_success(original_position, value):
# type: (int, Any) -> None
values[original_position] = value
if counter.dec() == 0:
all_promise.fulfill(values)
for i, p in enumerate(promises):
p.done(partial(handle_success, i), all_promise.reject) # type: ignore
return all_promise
@classmethod
def promisify(cls, obj):
# type: (Any) -> Promise
if isinstance(obj, cls):
return obj
add_done_callback = get_done_callback(obj) # type: Optional[Callable]
if callable(add_done_callback):
promise = cls()
add_done_callback(_process_future_result(promise))
return promise
done = getattr(obj, "done", None) # type: Optional[Callable]
if callable(done):
p = cls()
done(p.fulfill, p.reject)
return p
then = getattr(obj, "then", None) # type: Optional[Callable]
if callable(then):
p = cls()
then(p.fulfill, p.reject)
return p
if iscoroutine(obj):
return cls.promisify(ensure_future(obj))
raise TypeError("Object is not a Promise like object.")
@classmethod
def for_dict(cls, m):
# type: (Dict[Any, Promise]) -> Promise
"""
A special function that takes a dictionary of promises
and turns them into a promise for a dictionary of values.
In other words, this turns an dictionary of promises for values
into a promise for a dictionary of values.
"""
if not m:
return cls.fulfilled({})
dict_type = type(m)
def handle_success(resolved_values):
return dict_type(zip(m.keys(), resolved_values))
return cls.all(m.values()).then(handle_success)
promisify = Promise.promisify
promise_for_dict = Promise.for_dict
def _process_future_result(promise):
def handle_future_result(future):
exception = future.exception()
if exception:
promise.reject(exception)
else:
promise.fulfill(future.result())
return handle_future_result
def is_future(obj):
# type: (Any) -> bool
return callable(get_done_callback(obj))
def get_done_callback(obj):
# type: (Any) -> Callable
return getattr(obj, "add_done_callback", None)
def is_thenable(obj):
# type: (Any) -> bool
"""
A utility function to determine if the specified
object is a promise using "duck typing".
"""
return isinstance(obj, Promise) or is_future(obj) or (
hasattr(obj, "done") and callable(getattr(obj, "done"))) or (
hasattr(obj, "then") and callable(getattr(obj, "then"))) or (
iscoroutine(obj))