Source code for qstone.generators.generator

"""
Generation of the testbench.
"""

import argparse
import os
import shutil
import tarfile
from typing import Any, List

import numpy
import pandas as pa
from jinja2 import Template

from qstone.utils.utils import QpuConfiguration, parse_json

SCHEDULERS = {
    "bare_metal": "bare_metal",
    "jsrun": "lsf/jsrun",
    "slurm": "slurm/schedmd",
}
SCHEDULER_CMDS = {"bare_metal": "bash", "jsrun": "jrun", "slurm": "sbatch"}
SCHEDULER_EXTS = {"bare_metal": "sh", "jsrun": "bsub", "slurm": "sbatch"}

SCHEDULER_ARGS = {"walltime": "3", "nthreads": "1"}

CURRENT_PATH = os.path.dirname(os.path.realpath(__file__))
GEN_PATH = "qstone_suite"


def _get_value(job_cfg: pa.DataFrame, key: str, default: str):
    val = default
    try:
        val = job_cfg[key].values[0]
    except (KeyError, IndexError):
        pass
    if val is numpy.nan:
        val = default
    return str(val)


def _render_templates(
    sched: str,
    sched_path: str,
    subs: dict,
    job_types: List[str],
    jobs_cfg: pa.DataFrame,
):
    """Convert all templates and add all the files that are in the scheduler folder"""
    # Add common folder here
    search_paths = [sched_path, os.path.join(CURRENT_PATH, "common")]

    # Ignore folders and search in the search paths all the paths
    all_files = [
        os.path.join(search_path, s)
        for search_path in search_paths
        for s in os.listdir(search_path)
        if s not in ["__pycache__", ".cache"] and not s.endswith(".pyc")
    ]
    jinja_files = [s for s in all_files if s.endswith("jinja")]
    non_jinja_files = list(set(all_files) - set(jinja_files))
    # Adding templated files
    for jinja_file in jinja_files:
        with open(jinja_file, encoding="utf-8") as fid:
            source = fid.read()
        if "{app}" in jinja_file:
            for t in job_types:
                outfile = os.path.join(
                    GEN_PATH,
                    os.path.basename(
                        jinja_file.replace(".jinja", "").replace("{app}", t)
                    ),
                )
                j = jobs_cfg[jobs_cfg["type"] == t]
                args = {
                    key: _get_value(j, key, val) for key, val in SCHEDULER_ARGS.items()
                }
                sched_args = {"sched_args": _get_value(j, f"{sched}_opt", "")}
                Template(source).stream({**subs, **args, **sched_args}).dump(outfile)
        else:
            outfile = os.path.join(
                GEN_PATH, os.path.basename(jinja_file.replace(".jinja", ""))
            )
            Template(source).stream(subs).dump(outfile)
    # Adding non template files
    for non_jinja_file in non_jinja_files:
        shutil.copy(
            non_jinja_file,
            f"{os.path.join(GEN_PATH, os.path.basename(non_jinja_file))}",
        )


def _render_and_pack(
    scheduler: str,
    output_filename: str,
    subs: dict,
    job_types: List[str],
    jobs_cfg: pa.DataFrame,
):
    """
    Renders and packs all the necessary files to run as a user
    """
    sched = SCHEDULERS[scheduler]
    sched_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), sched)
    shutil.rmtree(GEN_PATH, ignore_errors=True)
    os.makedirs(GEN_PATH)
    _render_templates(sched, sched_path, subs, job_types, jobs_cfg)
    # Copy the required files.
    with tarfile.open(output_filename, "w:gz") as tar:
        # Adding necessary scripts excluding original templates
        tar.add(GEN_PATH, recursive=True)
        for job_type in job_types:
            # Adding user defined apps
            job_cfg = jobs_cfg[jobs_cfg["type"] == job_type]
            app = _get_value(job_cfg, "path", "")
            if app:
                assert os.path.exists(app)
                tar.add(
                    app,
                    arcname=f"{GEN_PATH}/{os.path.basename(app)}",
                    recursive=False,
                )
    shutil.rmtree(GEN_PATH)


def _compute_job_pdf(usr_cfg: "pa.Series[Any]") -> List[float]:
    """Computes the normalized pdf to assign to different jobs based on user
    configurations and speciified qubit capacity
    """

    pdf = [prob for comp, prob in usr_cfg["computations"].items()]

    normalized = [float(p) / sum(pdf) for p in pdf]

    return normalized


def _randomise(vals, def_val):
    """Return randomised value from range when available"""

    if pa.isnull(vals).any():
        value = def_val
    else:
        values = vals.tolist()[0]
        if len(values) > 1:
            value = numpy.random.randint(values[0], values[1])
        else:
            value = values[0]
    return value


[docs] def Convert(lst): res_dct = {lst[i]: lst[i + 1] for i in range(0, len(lst), 2)} return res_dct
def _generate_user_jobs( usr_cfg: "pa.Series[Any]", jobs_cfg: pa.DataFrame, job_pdf: List[float], num_calls: int, ): """ Generates the different user jobs provided given the configuration and the number of calls. """ runner = 'python "$EXEC_PATH"/type_exec.py' job_types = numpy.random.choice( list(usr_cfg["computations"].keys()), p=job_pdf, size=(num_calls) ) # Check that we have generated a not empty assert ( len(job_types) > 0 ), "Configuration generated zero jobs. Please check your configuration file." # Randomise number of qubits num_qubits = [] num_shots = [] DEF_QUBITS = 2 DEF_SHOTS = 100 for j in job_types: app_cfg = jobs_cfg[jobs_cfg["type"] == j] if app_cfg.empty: num_qubits.append(DEF_QUBITS) num_shots.append(DEF_SHOTS) else: num_qubits.append(_randomise(app_cfg["qubits"], DEF_QUBITS)) num_shots.append(_randomise(app_cfg["num_shots"], DEF_SHOTS)) # Assign job id and pack job_ids = list(range(len(job_types))) return ( list(zip([f"{runner} {s}" for s in job_types], num_qubits, job_ids, num_shots)), set(job_types), ) def _environment_variables_exports(env_vars: dict) -> List[str]: """ Generates export statements for environment variables. """ exports_list = [ f'export {k.upper().replace(".","_")}="{v}"' for k, v in env_vars.items() ] return exports_list
[docs] def generate_suite( config: str, num_calls: int, output_folder: str, atomic: bool, scheduler: str ) -> List[str]: """ Generates the suites of jobs for the required users. Args: config: Input configuration for generate, defines QPU configuration and user jobs num_calls: Number of jobs to generate per user output_folder: Scheduler tar file output location atomic: optional flag to create a single job out of the three phase scheduler: target HPC scheduler Returns list of output file paths """ # Get configurations config_dict = parse_json(config) env_cfg = config_dict["environment"] users_cfg = pa.DataFrame(config_dict["users"]) jobs_cfg = pa.DataFrame(config_dict["jobs"]) env_exports = _environment_variables_exports(env_cfg) qpu_config = QpuConfiguration() qpu_config.load_configuration(env_cfg) # Generating list of jobs output_paths = [] for prog_id, user_cfg in users_cfg.iterrows(): pdf = _compute_job_pdf(user_cfg) jobs, job_types = _generate_user_jobs( user_cfg, jobs_cfg, pdf, int(user_cfg["weight"] * num_calls) ) # generate substitutions for Jinja templates formatted_jobs = [" ".join(map(str, job)) for job in jobs] user_name = user_cfg["user"] usr_env_exports = [ f'export PROG_ID="{prog_id}"', f'export QS_USER="{user_name}"', ] subs = { "exports": "\n".join(env_exports + usr_env_exports), "jobs": "\n".join(formatted_jobs), "project_name": env_cfg["project_name"], "atomic": atomic, "sched_ext": SCHEDULER_EXTS[scheduler], "sched_cmd": SCHEDULER_CMDS[scheduler], "sched_aware": True if env_cfg["qpu_management"] == "SCHEDULER" else False, } # Pack project files filename = os.path.join(output_folder, f"{scheduler}_{user_name}.qstone.tar.gz") # render and pack all the files _render_and_pack(scheduler, filename, subs, job_types, jobs_cfg) output_paths.append(filename) return output_paths
[docs] def main(): """ Runs the generator phase. """ parser = argparse.ArgumentParser() parser.add_argument("config", type=str) parser.add_argument("num_calls", type=int) parser.add_argument("output_folder", type=str) parser.add_argument("scheduler", type=str, choices=SCHEDULERS) parser.add_argument("atomic", type=bool, action="store_true") args = parser.parse_args() generate_suite( args.config, args.num_calls, args.output_folder, args.atomic, args.scheduler )
if __name__ == "__main__": main()