Source code for qstone.connectors.http.runner
"""Quantum executor over a HTTP channel"""
import json
import os
import secrets
import sys
from time import sleep
import requests
import waiting
from qstone.connectors import connection
from qstone.utils.utils import ComputationStep, QpuConfiguration, trace
[docs]
class HttpConnection(connection.Connection):
"""Connection running jobs over Http"""
def __init__(self):
"""Creates the empty response"""
self.response = None
self.http_timeout = int(os.environ.get("TIMEOUTS_HTTP", 10))
self.lock_timeout = int(os.environ.get("TIMEOUTS_LOCK", 200))
[docs]
@trace(
computation_type="CONNECTION",
computation_step=ComputationStep.PRE,
)
def preprocess(self, qasm_ptr: str) -> str:
"""Preprocess the data."""
# Currently passthrough.
with open(qasm_ptr, "r", encoding="utf-8") as fid:
return fid.read()
[docs]
@trace(
computation_type="CONNECTION",
computation_step=ComputationStep.POST,
)
def postprocess(self, message: str) -> str:
"""Postprocess the data"""
# If the message is None we return an empty string.
# Please note. In this model it is responsability of the gateway machine to
# return the data in the correct format as defined in assumptions.md
return json.loads(message) if message else ""
@trace(
computation_type="CONNECTION",
computation_step=ComputationStep.RUN,
label="_request_and_process",
)
def _request_and_process(self, qasm_ptr: str, reps: int, hostpath: str):
pkt_id = secrets.randbelow(2**31)
circuit = self.preprocess(qasm_ptr)
payload = {"circuit": circuit, "pkt_id": pkt_id, "reps": reps}
headers: dict = {}
r = requests.post(
f"{hostpath}/execute", timeout=10, headers=headers, json=payload
)
success = r.status_code == 200
if success:
r = requests.get(
f"{hostpath}/results",
timeout=self.http_timeout,
json={"pkt_id": pkt_id},
)
self.response = r.text
success = r.status_code == 200
if not success:
sys.stderr.write("QSTONE::ERR - Request failed")
return success
# mypy: disable-error-code="attr-defined"
[docs]
@trace(
computation_type="CONNECTION",
computation_step=ComputationStep.RUN,
)
def run(
self, qasm_ptr: str, reps: int, host: str, server_port: int, lockfile: str
) -> dict:
"""Run the connection to the server"""
hostpath = f"{host}:{server_port}" if server_port else host
# Prepending the HTTP specifier if not provided.
hostpath = hostpath if hostpath.startswith("http://") else f"http://{hostpath}"
# Active wait on lock
if lockfile is not None:
owned = False
lock = connection.FileLock(lockfile)
for _ in range(10 * self.lock_timeout):
if lock.acquire_lock():
owned = True
break
sleep(0.1)
if not owned:
sys.stderr.write("QSTONE::ERR - timeout waiting for lock")
return {}
self._request_and_process(qasm_ptr, reps, hostpath)
# releasing lock takes care of None case as well.
if lockfile is not None:
lock.release_lock()
return self.postprocess(self.response)
[docs]
@trace(
computation_type="CONNECTION",
computation_step=ComputationStep.QUERY,
)
def query_qpu_config(self, host: str, server_port: int) -> QpuConfiguration:
"""Query the Qpu configuraiton of the target"""
hostpath = f"{host}"
if server_port:
hostpath += f":{server_port}"
response = requests.get(f"{hostpath}/qpu/config", timeout=10)
qpu_config = QpuConfiguration()
if response.ok:
qpu_config.load_configuration(json.loads(response.text))
return qpu_config