Source code for qstone.profiling.profile

"""Profile utilities"""

import argparse
import logging
import os

import pandas as pd
import pandera as pa

from qstone.utils.utils import ComputationStep, load_json_profile, parse_json

PROFILER_SCHEMA = pa.DataFrameSchema(
    {
        "user": pa.Column(str),
        "prog_id": pa.Column(str),
        "job_id": pa.Column(str),
        "job_type": pa.Column(str),
        "job_step": pa.Column(
            str, checks=pa.Check.isin([e.name for e in ComputationStep])
        ),
        "start": pa.Column(int),
        "success": pa.Column(bool),
        "end": pa.Column(int),
    }
)


def _get_stats_from_dir(folder, schema):
    """
    Get the statistics from a folder applying the schema provided.
    """
    df = None
    for func_profile in os.listdir(folder):
        if func_profile.endswith(".json"):
            data = load_json_profile(os.path.join(folder, func_profile), schema)
            df = pd.concat([data, df], sort=False)
    logging.info("Folder: %s - found %d entries", folder, df.shape[0])
    return df


def _extrapolate(stats):
    """
    extrapolate provides an example of capabilities of Pandas.
    """
    # Adding an entry that defines the total duration
    stats["total"] = stats["end"] - stats["start"]
    # Aggregating micro-jobs with that belong to the same ID
    for pid in list(set(stats["prog_id"])):
        # Filtering per id
        mask = stats.prog_id == pid
        jobs = stats[stats.prog_id == pid]
        # We are aggregating all the steps associated with the same job
        for s in ["PRE", "RUN", "POST"]:
            stats.loc[mask, f"{s}_agg"] = jobs[jobs.job_step == s]["total"].sum()
    stats["count"] = len(stats[stats["success"]].groupby(["job_id", "user"]).groups)
    stats["connection_total"] = stats.query('job_type == "CONNECTION"')["total"].sum()
    return stats


def _store(stats, pickle):
    if os.path.exists(pickle):
        df = pd.concat([stats, pd.read_pickle(pickle)])
        df.to_pickle(pickle)
    else:
        stats.to_pickle(pickle)


NS_TO_MS = 1_000_000


def _print_stats(stats: pd.DataFrame):
    """
    Print general statistics
    """
    tot_classical = (stats["PRE_agg"].iloc[0] + stats["POST_agg"].iloc[0]) / NS_TO_MS
    tot_quantum = stats["RUN_agg"].iloc[0] / NS_TO_MS
    connection_total = stats["connection_total"].iloc[0] / NS_TO_MS
    tot_runs = stats["count"].iloc[0]
    print("########### Stats ######################")
    print(f"Total classical computation   [ms]:  {tot_classical:>12.2f}")
    print(f"Total quantum computation     [ms]:  {tot_quantum:>12.2f}")
    print(f"Average classical computation [ms]:  {tot_classical/tot_runs:>12.2f}")
    print(f"Average quantum computation   [ms]:  {tot_quantum/tot_runs:>12.2f}")
    print(f"Average connection time       [ms]:  {connection_total/tot_runs:>12.2f}")


[docs] def profile(config: str, folder: list[str], pickle: str): """ Profile the total execution across multiple users and store into a generalised pickled object. """ # Get system configuration config_dict = parse_json(config) # Merging the results stats = pd.concat( [_get_stats_from_dir(f, PROFILER_SCHEMA) for f in folder], ignore_index=True ) # Example of data extrapolation. _extrapolate(stats) # Store into an pickle file _store(stats, pickle) # Stats _print_stats(stats)
[docs] def main(): """Main profile routine""" parser = argparse.ArgumentParser() parser.add_argument( "cfg", type=str, help="Configuration file used to generate the load" ) parser.add_argument("folder", type=str, help="Folder that contains the runs") parser.add_argument( "--pickle", type=str, help="Optional Pickle filepath to store pickled dataframe" ) args = parser.parse_args() profile(args.config, args.folder, args.pickle)
if __name__ == "__main__": main()