Source code for qstone.connectors.backends.rigetti.runner

"""Quantum executor for Rigetti backend"""

import calendar
import time

import waiting
from pyquil import Program, get_qc

# pylint: disable=import-error,no-name-in-module
from qcs_sdk.compiler.quilc import (
    QuilcClient,
)
from qcs_sdk.qvm import QVMClient

# pylint: enable=import-error,no-name-in-module
from qstone.connectors import connection
from qstone.utils.utils import ComputationStep, trace


[docs] class RigettiConnection(connection.Connection): """Connection running jobs to Rigetti backend""" def __init__(self): """Creates the empty response""" self.response = None self.qc = None self.result = None self.mode = None self.origin = None def _get_qc( self, mode: str, qpu_host: str, qpu_port: int, compiler_host: str, compiler_port: int, target: str, ): self.mode = mode self.origin = qpu_host as_qvm = self.mode != "REAL" # compiler (non-standard port) # note: this a way to run a docker version of the compiler: # docker run --rm -it -p 5556:5556 rigetti/quilc -P -S -p 5556 quilc_client = QuilcClient.new_rpcq(f"{compiler_host}:{compiler_port}") # qvm (non-standart port) # note, could run a docker version like this: # docker run --rm -it -p 5001:5001 rigetti/qvm -S -p 5001 qvm_client = QVMClient.new_http(f"{qpu_host}:{qpu_port}") return get_qc( target, as_qvm=as_qvm, quilc_client=quilc_client, qvm_client=qvm_client ) def _run(self, program: Program): return self.qc.run(program) def _compile(self, circuit: Program, reps: int): return self.qc.compile(circuit.wrap_in_numshots_loop(reps)) def _get_results(self, run): # "c" is the register used to store the classical value return run.data.result_data.to_register_map().get("c").to_ndarray()
[docs] @trace( computation_type="CONNECTION", computation_step=ComputationStep.PRE, ) def preprocess(self, qasm_ptr: str) -> Program: """Preprocess the data. Transform QASM format into PyQuil""" with open(qasm_ptr, "r", encoding="utf-8") as fid: return self.qc.compiler.transpile_qasm_2(fid.read())
[docs] @trace( computation_type="CONNECTION", computation_step=ComputationStep.POST, ) def postprocess(self, message: str) -> dict: """Postprocess the data""" # Data is returned in the format defined in assumptions.md. # Mapping is currently "complete" and in order. mapping = list(range(len(self.result[0]))) timestamp = calendar.timegm(time.gmtime()) origin = self.origin msg = { "mapping": mapping, "measurements": self.result, "mode": self.mode, "timestamp": timestamp, "origin": origin, } return msg
@trace( computation_type="CONNECTION", computation_step=ComputationStep.QUERY, label="_request_and_process", ) def _request_and_process( self, qasm_ptr: str, reps: int, hostpath: str, lockfile: str ): circuit = self.preprocess(qasm_ptr) self.response = None success = False lock = connection.FileLock(lockfile) if lock.acquire_lock(): run = self._run(self._compile(circuit, reps)) self.result = self._get_results(run) success = True return success # mypy: disable-error-code="attr-defined"
[docs] @trace( computation_type="CONNECTION", computation_step=ComputationStep.RUN, ) def run( self, qasm_ptr: str, reps: int, mode: str, qpu_host: str, qpu_port: int, compiler_host: str, compiler_port: int, target: str, lockfile: str, ) -> dict: """Run the connection to the server""" self.qc = self._get_qc( mode, qpu_host, qpu_port, compiler_host, compiler_port, target ) try: waiting.wait( lambda: self._request_and_process(qasm_ptr, reps, qpu_host, lockfile), timeout_seconds=20, ) except waiting.TimeoutExpired: pass return self.postprocess(self.response)