Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
qiskit-ibm-provider / api / clients / runtime_ws.py
Size: Mime:
# This code is part of Qiskit.
#
# (C) Copyright IBM 2021.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.

"""Client for accessing IBM Quantum runtime service."""

import logging
from typing import Optional
from queue import Queue

from .base import BaseWebsocketClient
from ..client_parameters import ClientParameters

logger = logging.getLogger(__name__)


class RuntimeWebsocketClient(BaseWebsocketClient):
    """Client for websocket communication with the IBM Quantum runtime service."""

    def __init__(
        self,
        websocket_url: str,
        client_params: ClientParameters,
        job_id: str,
        message_queue: Optional[Queue] = None,
    ) -> None:
        """WebsocketClient constructor.

        Args:
            websocket_url: URL for websocket communication with IBM Quantum.
            client_params: Parameters used for server connection.
            job_id: Job ID.
            message_queue: Queue used to hold received messages.
        """
        super().__init__(websocket_url, client_params, job_id, message_queue)
        self._header = client_params.get_auth_handler().get_headers()

    def _handle_message(self, message: str) -> None:
        """Handle received message.

        Args:
            message: Message received.
        """
        if not self._authenticated:
            self._authenticated = True  # First message is an ACK
        else:
            self._message_queue.put_nowait(message)
            self._current_retry = 0

    def job_results(self, max_retries: int = 5, backoff_factor: float = 0.5) -> None:
        """Return the interim result of a runtime job.

        Args:
            max_retries: Max number of retries.
            backoff_factor: Backoff factor used to calculate the
                time to wait between retries.

        Raises:
            WebsocketError: If a websocket error occurred.
        """
        url = "{}/stream/jobs/{}".format(self._websocket_url, self._job_id)
        self.stream(url=url, retries=max_retries, backoff_factor=backoff_factor)

    def _handle_stream_iteration(self) -> None:
        """Handle a streaming iteration."""
        pass