# Copyright 2015-2016 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""gRPC's Python API."""
import abc
import contextlib
import enum
import logging
import sys
import six
from grpc._cython import cygrpc as _cygrpc
from grpc import _compression
logging.getLogger(__name__).addHandler(logging.NullHandler())
try:
from grpc._grpcio_metadata import __version__
except ImportError:
__version__ = "dev0"
############################## Future Interface ###############################
class FutureTimeoutError(Exception):
"""Indicates that a method call on a Future timed out."""
class FutureCancelledError(Exception):
"""Indicates that the computation underlying a Future was cancelled."""
class Future(six.with_metaclass(abc.ABCMeta)):
"""A representation of a computation in another control flow.
Computations represented by a Future may be yet to be begun,
may be ongoing, or may have already completed.
"""
@abc.abstractmethod
def cancel(self):
"""Attempts to cancel the computation.
This method does not block.
Returns:
bool:
Returns True if the computation was canceled.
Returns False under all other circumstances, for example:
1. computation has begun and could not be canceled.
2. computation has finished
3. computation is scheduled for execution and it is impossible
to determine its state without blocking.
"""
raise NotImplementedError()
@abc.abstractmethod
def cancelled(self):
"""Describes whether the computation was cancelled.
This method does not block.
Returns:
bool:
Returns True if the computation was cancelled before its result became
available.
Returns False under all other circumstances, for example:
1. computation was not cancelled.
2. computation's result is available.
"""
raise NotImplementedError()
@abc.abstractmethod
def running(self):
"""Describes whether the computation is taking place.
This method does not block.
Returns:
Returns True if the computation is scheduled for execution or
currently executing.
Returns False if the computation already executed or was cancelled.
"""
raise NotImplementedError()
@abc.abstractmethod
def done(self):
"""Describes whether the computation has taken place.
This method does not block.
Returns:
bool:
Returns True if the computation already executed or was cancelled.
Returns False if the computation is scheduled for execution or
currently executing.
This is exactly opposite of the running() method's result.
"""
raise NotImplementedError()
@abc.abstractmethod
def result(self, timeout=None):
"""Returns the result of the computation or raises its exception.
This method may return immediately or may block.
Args:
timeout: The length of time in seconds to wait for the computation to
finish or be cancelled. If None, the call will block until the
computations's termination.
Returns:
The return value of the computation.
Raises:
FutureTimeoutError: If a timeout value is passed and the computation
does not terminate within the allotted time.
FutureCancelledError: If the computation was cancelled.
Exception: If the computation raised an exception, this call will
raise the same exception.
"""
raise NotImplementedError()
@abc.abstractmethod
def exception(self, timeout=None):
"""Return the exception raised by the computation.
This method may return immediately or may block.
Args:
timeout: The length of time in seconds to wait for the computation to
terminate or be cancelled. If None, the call will block until the
computations's termination.
Returns:
The exception raised by the computation, or None if the computation
did not raise an exception.
Raises:
FutureTimeoutError: If a timeout value is passed and the computation
does not terminate within the allotted time.
FutureCancelledError: If the computation was cancelled.
"""
raise NotImplementedError()
@abc.abstractmethod
def traceback(self, timeout=None):
"""Access the traceback of the exception raised by the computation.
This method may return immediately or may block.
Args:
timeout: The length of time in seconds to wait for the computation
to terminate or be cancelled. If None, the call will block until
the computation's termination.
Returns:
The traceback of the exception raised by the computation, or None
if the computation did not raise an exception.
Raises:
FutureTimeoutError: If a timeout value is passed and the computation
does not terminate within the allotted time.
FutureCancelledError: If the computation was cancelled.
"""
raise NotImplementedError()
@abc.abstractmethod
def add_done_callback(self, fn):
"""Adds a function to be called at completion of the computation.
The callback will be passed this Future object describing the outcome
of the computation. Callbacks will be invoked after the future is
terimated, whether successfully or not.
If the computation has already completed, the callback will be called
immediately.
Exceptions raised in the callback will be logged at ERROR level, but
will not terminate any threads of execution.
Args:
fn: A callable taking this Future object as its single parameter.
"""
raise NotImplementedError()
################################ gRPC Enums ##################################
@enum.unique
class ChannelConnectivity(enum.Enum):
"""Mirrors grpc_connectivity_state in the gRPC Core.
Attributes:
IDLE: The channel is idle.
CONNECTING: The channel is connecting.
READY: The channel is ready to conduct RPCs.
TRANSIENT_FAILURE: The channel has seen a failure from which it expects
to recover.
SHUTDOWN: The channel has seen a failure from which it cannot recover.
"""
IDLE = (_cygrpc.ConnectivityState.idle, 'idle')
CONNECTING = (_cygrpc.ConnectivityState.connecting, 'connecting')
READY = (_cygrpc.ConnectivityState.ready, 'ready')
TRANSIENT_FAILURE = (_cygrpc.ConnectivityState.transient_failure,
'transient failure')
SHUTDOWN = (_cygrpc.ConnectivityState.shutdown, 'shutdown')
@enum.unique
class StatusCode(enum.Enum):
"""Mirrors grpc_status_code in the gRPC Core.
Attributes:
OK: Not an error; returned on success
CANCELLED: The operation was cancelled (typically by the caller).
UNKNOWN: Unknown error.
INVALID_ARGUMENT: Client specified an invalid argument.
DEADLINE_EXCEEDED: Deadline expired before operation could complete.
NOT_FOUND: Some requested entity (e.g., file or directory) was not found.
ALREADY_EXISTS: Some entity that we attempted to create (e.g., file or directory)
already exists.
PERMISSION_DENIED: The caller does not have permission to execute the specified
operation.
UNAUTHENTICATED: The request does not have valid authentication credentials for the
operation.
RESOURCE_EXHAUSTED: Some resource has been exhausted, perhaps a per-user quota, or
perhaps the entire file system is out of space.
FAILED_PRECONDITION: Operation was rejected because the system is not in a state
required for the operation's execution.
ABORTED: The operation was aborted, typically due to a concurrency issue
like sequencer check failures, transaction aborts, etc.
UNIMPLEMENTED: Operation is not implemented or not supported/enabled in this service.
INTERNAL: Internal errors. Means some invariants expected by underlying
system has been broken.
UNAVAILABLE: The service is currently unavailable.
DATA_LOSS: Unrecoverable data loss or corruption.
"""
OK = (_cygrpc.StatusCode.ok, 'ok')
CANCELLED = (_cygrpc.StatusCode.cancelled, 'cancelled')
UNKNOWN = (_cygrpc.StatusCode.unknown, 'unknown')
INVALID_ARGUMENT = (_cygrpc.StatusCode.invalid_argument, 'invalid argument')
DEADLINE_EXCEEDED = (_cygrpc.StatusCode.deadline_exceeded,
'deadline exceeded')
NOT_FOUND = (_cygrpc.StatusCode.not_found, 'not found')
ALREADY_EXISTS = (_cygrpc.StatusCode.already_exists, 'already exists')
PERMISSION_DENIED = (_cygrpc.StatusCode.permission_denied,
'permission denied')
RESOURCE_EXHAUSTED = (_cygrpc.StatusCode.resource_exhausted,
'resource exhausted')
FAILED_PRECONDITION = (_cygrpc.StatusCode.failed_precondition,
'failed precondition')
ABORTED = (_cygrpc.StatusCode.aborted, 'aborted')
OUT_OF_RANGE = (_cygrpc.StatusCode.out_of_range, 'out of range')
UNIMPLEMENTED = (_cygrpc.StatusCode.unimplemented, 'unimplemented')
INTERNAL = (_cygrpc.StatusCode.internal, 'internal')
UNAVAILABLE = (_cygrpc.StatusCode.unavailable, 'unavailable')
DATA_LOSS = (_cygrpc.StatusCode.data_loss, 'data loss')
UNAUTHENTICATED = (_cygrpc.StatusCode.unauthenticated, 'unauthenticated')
############################# gRPC Status ################################
class Status(six.with_metaclass(abc.ABCMeta)):
"""Describes the status of an RPC.
This is an EXPERIMENTAL API.
Attributes:
code: A StatusCode object to be sent to the client.
details: A UTF-8-encodable string to be sent to the client upon
termination of the RPC.
trailing_metadata: The trailing :term:`metadata` in the RPC.
"""
############################# gRPC Exceptions ################################
class RpcError(Exception):
"""Raised by the gRPC library to indicate non-OK-status RPC termination."""
############################## Shared Context ################################
class RpcContext(six.with_metaclass(abc.ABCMeta)):
"""Provides RPC-related information and action."""
@abc.abstractmethod
def is_active(self):
"""Describes whether the RPC is active or has terminated.
Returns:
bool:
True if RPC is active, False otherwise.
"""
raise NotImplementedError()
@abc.abstractmethod
def time_remaining(self):
"""Describes the length of allowed time remaining for the RPC.
Returns:
A nonnegative float indicating the length of allowed time in seconds
remaining for the RPC to complete before it is considered to have
timed out, or None if no deadline was specified for the RPC.
"""
raise NotImplementedError()
@abc.abstractmethod
def cancel(self):
"""Cancels the RPC.
Idempotent and has no effect if the RPC has already terminated.
"""
raise NotImplementedError()
@abc.abstractmethod
def add_callback(self, callback):
"""Registers a callback to be called on RPC termination.
Args:
callback: A no-parameter callable to be called on RPC termination.
Returns:
True if the callback was added and will be called later; False if
Loading ...