Source code for qstone.connectors.backends.rigetti.runner
"""Quantum executor for Rigetti backend"""
import calendar
import time
import numpy
import waiting
from pyquil import Program, get_qc
from qstone.connectors import connection
from qstone.utils.utils import ComputationStep, QpuConfiguration, trace
[docs]
class RigettiConnection(connection.Connection):
"""Connection running jobs to Rigetti backend"""
def __init__(self):
"""Creates the empty response"""
self.response = None
self.qc = None
self.result = None
self.mode = None
self.origin = None
def _get_qc(self, hostpath: str):
self.mode = "simulated" if "qvm" in hostpath else "real"
self.origin = hostpath
return get_qc(hostpath)
def _run(self, program: Program):
return self.qc.run(program)
def _compile(self, circuit: Program, reps: int):
return self.qc.compile(circuit.wrap_in_numshots_loop(reps))
def _get_results(self, run):
# "c" is the register used to store the classical value
return run.data.result_data.to_register_map().get("c").to_ndarray()
[docs]
@trace(
computation_type="CONNECTION",
computation_step=ComputationStep.PRE,
)
def preprocess(self, qasm_ptr: str) -> Program:
"""Preprocess the data. Transform QASM format into PyQuil"""
with open(qasm_ptr, "r", encoding="utf-8") as fid:
return self.qc.compiler.transpile_qasm_2(fid.read())
[docs]
@trace(
computation_type="CONNECTION",
computation_step=ComputationStep.POST,
)
def postprocess(self, message: str) -> dict:
"""Postprocess the data"""
# Data is returned in the format defined in assumptions.md.
# Mapping is currently "complete" and in order.
mapping = list(range(len(self.result[0])))
timestamp = calendar.timegm(time.gmtime())
origin = self.origin
msg = {
"mapping": mapping,
"measurements": self.result,
"mode": self.mode,
"timestamp": timestamp,
"origin": origin,
}
return msg
@trace(
computation_type="CONNECTION",
computation_step=ComputationStep.QUERY,
label="_request_and_process",
)
def _request_and_process(
self, qasm_ptr: str, reps: int, hostpath: str, lockfile: str
):
circuit = self.preprocess(qasm_ptr)
self.response = None
success = False
lock = connection.FileLock(lockfile)
if lock.acquire_lock():
run = self._run(self._compile(circuit, reps))
self.result = self._get_results(run)
success = True
return success
# mypy: disable-error-code="attr-defined"
[docs]
@trace(
computation_type="CONNECTION",
computation_step=ComputationStep.RUN,
)
def run(
self, qasm_ptr: str, reps: int, host: str, server_port: int, lockfile: str
) -> dict:
"""Run the connection to the server"""
if server_port:
hostpath = f"{host}:{server_port}"
else:
hostpath = host
self.qc = self._get_qc(hostpath)
try:
waiting.wait(
lambda: self._request_and_process(qasm_ptr, reps, hostpath, lockfile),
timeout_seconds=20,
)
except waiting.TimeoutExpired:
pass
return self.postprocess(self.response)