Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
qiskit-aer / pulse / controllers / pulse_controller.py
Size: Mime:
# -*- coding: utf-8 -*-

# This code is part of Qiskit.
#
# (C) Copyright IBM 2018, 2019, 2020.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.
# pylint: disable=invalid-name, no-name-in-module, import-error

"""
Entry/exit point for pulse simulation specified through PulseSimulator backend
"""

from warnings import warn
from copy import copy
from typing import Callable
import numpy as np
from qiskit.quantum_info.operators.operator import Operator
from ..system_models.string_model_parser.string_model_parser import NoiseParser
from ..system_models.string_model_parser import operator_generators as op_gen
from .digest_pulse_qobj import digest_pulse_qobj
from .pulse_sim_options import PulseSimOptions
from .unitary_controller import run_unitary_experiments
from .mc_controller import run_monte_carlo_experiments
from .pulse_utils import get_ode_rhs_functor


def pulse_controller(qobj):
    """Interprets PulseQobj input, runs simulations, and returns results

    Parameters:
        qobj (PulseQobj): pulse qobj containing a list of pulse schedules

    Returns:
        list: simulation results

    Raises:
        ValueError: if input is of incorrect format
        Exception: for invalid ODE options
    """

    pulse_sim_desc = PulseSimDescription()
    pulse_de_model = PulseInternalDEModel()

    config = qobj.config

    # ###############################
    # ### Extract model parameters
    # ###############################

    system_model = config.system_model

    # Get qubit list and number
    qubit_list = system_model.subsystem_list
    if qubit_list is None:
        raise ValueError("Model must have a qubit list to simulate.")
    n_qubits = len(qubit_list)

    # get Hamiltonian
    if system_model.hamiltonian is None:
        raise ValueError("Model must have a Hamiltonian to simulate.")
    ham_model = system_model.hamiltonian

    # Extract DE model information
    pulse_de_model.system = ham_model._system
    pulse_de_model.variables = ham_model._variables
    pulse_de_model.channels = ham_model._channels
    pulse_de_model.h_diag = ham_model._h_diag
    pulse_de_model.evals = ham_model._evals
    pulse_de_model.estates = ham_model._estates
    dim_qub = ham_model._subsystem_dims
    dim_osc = {}
    # convert estates into a Qutip qobj
    estates = [op_gen.state(state) for state in ham_model._estates.T[:]]

    # initial state set here
    if getattr(config, "initial_state", None) is not None:
        pulse_sim_desc.initial_state = op_gen.state(config.initial_state)
    else:
        pulse_sim_desc.initial_state = estates[0]

    # Get dt
    if system_model.dt is None:
        raise ValueError("System model must have a dt value to simulate.")

    pulse_de_model.dt = system_model.dt

    # Parse noise
    noise_model = getattr(config, "noise_model", None)

    # post warnings for unsupported features
    _unsupported_warnings(noise_model)

    if noise_model:
        noise = NoiseParser(noise_dict=noise_model, dim_osc=dim_osc, dim_qub=dim_qub)
        noise.parse()

        pulse_de_model.noise = noise.compiled
        if any(pulse_de_model.noise):
            pulse_sim_desc.can_sample = False

    # ###############################
    # ### Parse qobj_config settings
    # ###############################
    digested_qobj = digest_pulse_qobj(qobj, pulse_de_model.channels, system_model.dt, qubit_list)

    # extract simulation-description level qobj content
    pulse_sim_desc.shots = digested_qobj.shots
    pulse_sim_desc.meas_level = digested_qobj.meas_level
    pulse_sim_desc.meas_return = digested_qobj.meas_return
    pulse_sim_desc.memory_slots = digested_qobj.memory_slots
    pulse_sim_desc.memory = digested_qobj.memory

    # extract model-relevant information
    pulse_de_model.n_registers = digested_qobj.n_registers
    pulse_de_model.pulse_array = digested_qobj.pulse_array
    pulse_de_model.pulse_indices = digested_qobj.pulse_indices
    pulse_de_model.pulse_to_int = digested_qobj.pulse_to_int

    pulse_sim_desc.experiments = digested_qobj.experiments

    # Handle qubit_lo_freq
    qubit_lo_freq = digested_qobj.qubit_lo_freq

    # if it wasn't specified in the PulseQobj, draw from system_model
    if qubit_lo_freq is None:
        default_freq = getattr(config, "qubit_freq_est", [np.inf])
        if default_freq != [np.inf]:
            qubit_lo_freq = default_freq

    # if still None, or is the placeholder value draw from the Hamiltonian
    if qubit_lo_freq is None:
        qubit_lo_freq = system_model.hamiltonian.get_qubit_lo_from_drift()
        if getattr(qobj.config, "schedule_los", None) is None:
            warn(
                "Warning: qubit_lo_freq was not specified in PulseQobj and there is no default, "
                "so it is being automatically determined from the drift Hamiltonian."
            )

    pulse_de_model.freqs = system_model.calculate_channel_frequencies(qubit_lo_freq=qubit_lo_freq)
    pulse_de_model.calculate_channel_frequencies = system_model.calculate_channel_frequencies

    # ###############################
    # ### Parse backend_options
    # # solver-specific information should be extracted in the solver
    # ###############################

    pulse_sim_desc.seed = int(config.seed) if hasattr(config, "seed") else None
    pulse_sim_desc.q_level_meas = int(getattr(config, "q_level_meas", 1))

    # solver options
    allowed_solver_options = [
        "atol",
        "rtol",
        "nsteps",
        "max_step",
        "num_cpus",
        "norm_tol",
        "norm_steps",
        "method",
    ]
    solver_options = getattr(config, "solver_options", {})
    for key in solver_options:
        if key not in allowed_solver_options:
            raise Exception("Invalid solver_option: {}".format(key))
    solver_options = PulseSimOptions(**solver_options)

    # Set the ODE solver max step to be the half the
    # width of the smallest pulse
    min_width = np.iinfo(np.int32).max
    for key, val in pulse_de_model.pulse_to_int.items():
        if key != "pv":
            stop = pulse_de_model.pulse_indices[val + 1]
            start = pulse_de_model.pulse_indices[val]
            min_width = min(min_width, stop - start)
    solver_options.de_options.max_step = min_width / 2 * pulse_de_model.dt

    # ########################################
    # Determination of measurement operators.
    # ########################################
    pulse_sim_desc.measurement_ops = [None] * n_qubits

    for exp in pulse_sim_desc.experiments:
        # Add in measurement operators
        # Not sure if this will work for multiple measurements
        # Note: the extraction of multiple measurements works, but the simulation routines
        # themselves implicitly assume there is only one measurement at the end
        if any(exp["acquire"]):
            for acq in exp["acquire"]:
                for jj in acq[1]:
                    if jj > qubit_list[-1]:
                        continue
                    if not pulse_sim_desc.measurement_ops[qubit_list.index(jj)]:
                        q_level_meas = pulse_sim_desc.q_level_meas
                        pulse_sim_desc.measurement_ops[
                            qubit_list.index(jj)
                        ] = op_gen.qubit_occ_oper_dressed(
                            jj, estates, h_osc=dim_osc, h_qub=dim_qub, level=q_level_meas
                        )

        if not exp["can_sample"]:
            pulse_sim_desc.can_sample = False

    # trim measurement operators to relevant qubits once constructed
    meas_ops_reduced = []
    for op in pulse_sim_desc.measurement_ops:
        if op is not None:
            meas_ops_reduced.append(op)
    pulse_sim_desc.measurement_ops = meas_ops_reduced

    run_experiments = (
        run_unitary_experiments if pulse_sim_desc.can_sample else run_monte_carlo_experiments
    )
    exp_results, exp_times = run_experiments(pulse_sim_desc, pulse_de_model, solver_options)

    output = {
        "results": format_exp_results(exp_results, exp_times, pulse_sim_desc),
        "success": True,
        "qobj_id": qobj.qobj_id,
    }
    return output


def format_exp_results(exp_results, exp_times, pulse_sim_desc):
    """format simulation results

    Parameters:
        exp_results (list): simulation results
        exp_times (list): simulation times
        pulse_sim_desc (PulseSimDescription): object containing all simulation information

    Returns:
        list: formatted simulation results
    """

    # format the data into the proper output
    all_results = []
    for idx_exp, exp in enumerate(pulse_sim_desc.experiments):
        m_lev = pulse_sim_desc.meas_level
        m_ret = pulse_sim_desc.meas_return

        # populate the results dictionary
        results = {
            "seed_simulator": exp["seed"],
            "shots": pulse_sim_desc.shots,
            "status": "DONE",
            "success": True,
            "time_taken": exp_times[idx_exp],
            "header": exp["header"],
            "meas_level": m_lev,
            "meas_return": m_ret,
            "data": {},
        }

        if pulse_sim_desc.can_sample:
            memory = exp_results[idx_exp][0]
            results["data"]["statevector"] = []
            for coef in exp_results[idx_exp][1]:
                results["data"]["statevector"].append([np.real(coef), np.imag(coef)])
            results["header"]["ode_t"] = exp_results[idx_exp][2]
        else:
            memory = exp_results[idx_exp]

        # meas_level 2 return the shots
        if m_lev == 2:
            # convert the memory **array** into a n
            # integer
            # e.g. [1,0] -> 2
            int_mem = memory.dot(np.power(2.0, np.arange(memory.shape[1]))).astype(int)

            # if the memory flag is set return each shot
            if pulse_sim_desc.memory:
                hex_mem = [hex(val) for val in int_mem]
                results["data"]["memory"] = hex_mem

            # Get hex counts dict
            unique = np.unique(int_mem, return_counts=True)
            hex_dict = {}
            for kk in range(unique[0].shape[0]):
                key = hex(unique[0][kk])
                hex_dict[key] = unique[1][kk]
            results["data"]["counts"] = hex_dict

        # meas_level 1 returns the <n>
        elif m_lev == 1:
            if m_ret == "avg":
                memory = [np.mean(memory, 0)]

            # convert into the right [real, complex] pair form for json
            results["data"]["memory"] = []
            for mem_shot in memory:
                results["data"]["memory"].append([])
                for mem_slot in mem_shot:
                    results["data"]["memory"][-1].append([np.real(mem_slot), np.imag(mem_slot)])

            if m_ret == "avg":
                results["data"]["memory"] = results["data"]["memory"][0]

        all_results.append(results)
    return all_results


def _unsupported_warnings(noise_model):
    """Warns the user about untested/unsupported features.

    Parameters:
        noise_model (dict): backend_options for simulation
    Returns:
    Raises:
        AerError: for unsupported features
    """

    # Warnings that don't stop execution
    warning_str = "{} are an untested feature, and therefore may not behave as expected."
    if noise_model is not None:
        warn(warning_str.format("Noise models"))


class PulseInternalDEModel:
    """Container of information required for de RHS construction"""

    def __init__(self):
        # The system Hamiltonian in numerical format
        self.system = None
        # The noise (if any) in numerical format
        self.noise = None
        # System variables
        self.variables = None
        # Channels in the Hamiltonian string
        # these tell the order in which the channels
        # are evaluated in the RHS solver.
        self.channels = None
        # Array containing all pulse samples
        self.pulse_array = None
        # Array of indices indicating where a pulse starts in the self.pulse_array
        self.pulse_indices = None
        # A dict that translates pulse names to integers for use in self.pulse_indices
        self.pulse_to_int = None
        # dt for pulse schedules
        self.dt = None
        # holds default frequencies for the channels
        self.freqs = {}
        # frequency calculation function for overriding defaults
        self.calculate_channel_frequencies = None
        # diagonal elements of the hamiltonian
        self.h_diag = None
        # eigenvalues of the time-independent hamiltonian
        self.evals = None
        # eigenstates of the time-independent hamiltonian
        self.estates = None

        self.n_registers = None

        # attributes used in RHS function
        self.vars = None
        self.vars_names = None
        self.num_h_terms = None
        self.c_num = None
        self.c_ops_data = None
        self.n_ops_data = None
        self.h_diag_elems = None

        self.h_ops_data = None

        self._rhs_dict = None

    def _config_internal_data(self):
        """Preps internal data into format required by RHS function."""

        self.vars = list(self.variables.values())
        # Need this info for evaluating the hamiltonian vars in the c++ solver
        self.vars_names = list(self.variables.keys())

        num_h_terms = len(self.system)
        H = [hpart[0] for hpart in self.system]
        self.num_h_terms = num_h_terms

        self.c_ops_data = []
        self.n_ops_data = []

        self.h_diag_elems = self.h_diag

        # if there are any collapse operators
        self.c_num = 0

        if self.noise:
            self.c_num = len(self.noise)
            self.num_h_terms += 1
            H_noise = Operator(np.zeros(self.noise[0].data.shape))
            for kk in range(self.c_num):
                c_op = self.noise[kk]
                n_op = c_op.adjoint() & c_op
                # collapse ops
                self.c_ops_data.append(c_op.data)
                # norm ops
                self.n_ops_data.append(n_op.data)
                # Norm ops added to time-independent part of
                # Hamiltonian to decrease norm
                H_noise = Operator(H_noise.data - 0.5j * n_op.data)

            H = H + [H_noise]

        # construct data sets
        self.h_ops_data = [-1.0j * hpart.data for hpart in H]

        self._rhs_dict = {
            "freqs": list(self.freqs.values()),
            "pulse_array": self.pulse_array,
            "pulse_indices": self.pulse_indices,
            "vars": self.vars,
            "vars_names": self.vars_names,
            "num_h_terms": self.num_h_terms,
            "h_ops_data": self.h_ops_data,
            "h_diag_elems": self.h_diag_elems,
        }

    def init_rhs(self, exp):
        """Set up and return rhs function corresponding to this model for a given
        experiment exp
        """

        # if _rhs_dict has not been set up, config the internal data
        if self._rhs_dict is None:
            self._config_internal_data()

        channels = dict(self.channels)

        # Init register
        register = np.ones(self.n_registers, dtype=np.uint8)

        rhs_dict = setup_rhs_dict_freqs(self._rhs_dict, exp, self.calculate_channel_frequencies)
        ode_rhs_obj = get_ode_rhs_functor(rhs_dict, exp, self.system, channels, register)

        def rhs(t, y):
            return ode_rhs_obj(t, y)

        return rhs


def setup_rhs_dict_freqs(
    default_rhs_dict: dict, exp: dict, calculate_channel_frequencies: Callable
):
    """Standalone function for overriding channel frequencies in a given experiment.

    Args:
        default_rhs_dict: Dictionary containing default RHS data.
        exp: Dictionary containing experiment data.
        calculate_channel_frequencies: Function for computing all channel frequencies from
                                       a list of DriveChannel frequencies.

    Returns:
        dict: Dictionary with frequencies potentially overriden by those in exp.
    """

    if "qubit_lo_freq" in exp and exp["qubit_lo_freq"] is not None:
        # copy to not overwrite defaults
        default_rhs_dict = copy(default_rhs_dict)
        freqs_dict = calculate_channel_frequencies(exp["qubit_lo_freq"])
        default_rhs_dict["freqs"] = list(freqs_dict.values())

    return default_rhs_dict


class PulseSimDescription:
    """Object for holding any/all information required for simulation.
    Needs to be refactored into different pieces.
    """

    def __init__(self):
        self.initial_state = None
        # Channels in the Hamiltonian string
        # these tell the order in which the channels
        # are evaluated in the RHS solver.
        self.experiments = []
        # Can experiments be simulated once then sampled
        self.can_sample = True

        self.shots = None
        self.meas_level = None
        self.meas_return = None
        self.memory_slots = None
        self.memory = None

        self.seed = None
        self.q_level_meas = None
        self.measurement_ops = None