Source code for qstone.connectors.grpc.runner

"""Quantum executor over a grpc channel"""

import json
import secrets

import grpc

import qstone.connectors.grpc.qpu_pb2 as pb2
import qstone.connectors.grpc.qpu_pb2_grpc as pb2_grpc
from qstone.connectors import connection
from qstone.utils.utils import ComputationStep, QpuConfiguration, trace


[docs] class GRPCConnecction(connection.Connection): """Connection running jobs over gRPC"""
[docs] @trace( computation_type="CONNECTION", computation_step=ComputationStep.PRE, ) def preprocess(self, qasm_ptr: str) -> str: # Currently passthrough. with open(qasm_ptr, "r", encoding="utf-8") as fid: return fid.read()
[docs] @trace( computation_type="CONNECTION", computation_step=ComputationStep.POST, ) def postprocess(self, message: str) -> str: # Currently passthrough. print(f"postprocess: {message}") return json.loads(message)
# mypy: disable-error-code="attr-defined"
[docs] @trace( computation_type="CONNECTION", computation_step=ComputationStep.RUN, ) def run( self, qasm_ptr: str, reps: int, host: str, server_port: int, lockfile: str ) -> dict: compression = None # grpc.Compression.None # instantiate a channel channel = grpc.insecure_channel( f"{host}:{server_port}", compression=compression ) stub = pb2_grpc.QPUStub(channel) pkt_id = secrets.randbelow(2**31) circuit = self.preprocess(qasm_ptr) request = pb2.Circuit(circuit=circuit, pkt_id=pkt_id) # type: ignore[attr-defined] m = stub.RunQuantumCircuit(request) return self.postprocess(m.result)
[docs] @trace( computation_type="CONNECTION", computation_step=ComputationStep.QUERY, ) def query_qpu_config(self, host: str, server_port: int) -> QpuConfiguration: """Query the Qpu configuraiton of the target""" print("implement me!") return QpuConfiguration()