Source code for psrdb.tables.pipeline_run

import json

from psrdb.graphql_table import GraphQLTable


def get_parsers():
    """Returns the default parser for this model"""
    parser = GraphQLTable.get_default_parser("The following options will allow you to interact with the PipelineRun database object on the command line in different ways based on the sub-commands.")
    PipelineRun.configure_parsers(parser)
    return parser


[docs] class PipelineRun(GraphQLTable): """Class for interacting with the PipelineRun database object. Parameters ---------- client : GraphQLClient GraphQLClient class instance with the URL and Token already set. """ def __init__(self, client): GraphQLTable.__init__(self, client) self.table_name = "pipeline_run" self.field_names = [ "id", "observation { id }", "template { id }", "ephemeris { id }", "pipelineName", "pipelineDescription", "pipelineVersion", "jobState", "location", "dm", "dmErr", "dmEpoch", "dmChi2r", "dmTres", "sn", "flux", "rm", "percentRfiZapped", "configuration", ]
[docs] def list( self, id=None, observation_id=None, ephemeris_id=None, template_id=None, pipelineName=None, pipelineDescription=None, pipelineVersion=None, jobState=None, location=None, dm=None, dmErr=None, dmEpoch=None, dmChi2r=None, dmTres=None, sn=None, flux=None, rm=None, percentRfiZapped=None, ): """Return a list of PipelineRun information based on the `self.field_names` and filtered by the parameters. Parameters ---------- id : int, optional Filter by the database ID, by default None observation_id : int, optional Filter by the Observation database ID, by default None ephemeris_id : int, optional Filter by the Ephemeris database ID, by default None template_id : int, optional Filter by the Template database ID, by default None pipelineName : str, optional Filter by the pipeline name, by default None pipelineDescription : str, optional Filter by the pipeline description, by default None pipelineVersion : str, optional Filter by the pipeline version, by default None jobState : str, optional Filter by the job state, by default None location : str, optional Filter by the location, by default None dm : float, optional Filter by the dm, by default None dmErr : float, optional Filter by the dmErr, by default None dmEpoch : float, optional Filter by the dmEpoch, by default None dmChi2r : float, optional Filter by the dmChi2r, by default None dmTres : float, optional Filter by the dmTres, by default None sn : float, optional Filter by the sn, by default None flux : float, optional Filter by the flux, by default None rm : float, optional Filter by the rm, by default None percentRfiZapped : float, optional Filter by the percentRfiZapped, by default None Returns ------- list of dicts If `self.get_dicts` is `True`, a list of dictionaries containing the results. client_response: Else a client response object. """ filters = [ {"field": "id", "value": id}, {"field": "observation_Id", "value": observation_id}, {"field": "ephemeris_Id", "value": ephemeris_id}, {"field": "template_Id", "value": template_id,}, {"field": "pipelineName", "value": pipelineName}, {"field": "pipelineDescription", "value": pipelineDescription}, {"field": "pipelineVersion", "value": pipelineVersion}, {"field": "jobState", "value": jobState}, {"field": "location", "value": location}, {"field": "dm", "value": dm}, {"field": "dmErr", "value": dmErr}, {"field": "dmEpoch", "value": dmEpoch}, {"field": "dmChi2r", "value": dmChi2r}, {"field": "dmTres", "value": dmTres}, {"field": "sn", "value": sn}, {"field": "flux", "value": flux}, {"field": "rm", "value": rm}, {"field": "percentRfiZapped", "value": percentRfiZapped}, ] return GraphQLTable.list_graphql(self, self.table_name, filters, [], self.field_names)
[docs] def create( self, observationId, ephemerisId, templateId, pipelineName, pipelineDescription, pipelineVersion, jobState, location, configuration, results_dict=None, ): """Create a new PipelineRun database object. Parameters ---------- observationId : int The ID of the Observation database object of this PipelineRun. ephemerisId : int The ID of the Ephemeris database object of this PipelineRun. templateId : int The ID of the Template database object of this PipelineRun. pipelineName : str The name of the pipeline used for this PipelineRun. pipelineDescription : str The description of the pipeline used for this PipelineRun. pipelineVersion : str The version of the pipeline used for this PipelineRun. jobState : str The state of the job from ("Pending", "Running", "Completed", "Failed", "Cancelled"). location : str The location of the job outputs. configuration : dict The input parameters of the pipeline used for this PipelineRun. results_dict : dict, optional The results of the pipeline which is only uploaded when the run is completed, by default None Returns ------- client_response: A client response object. """ self.mutation_name = "createPipelineRun" self.mutation = """ mutation ( $observationId: Int!, $ephemerisId: Int!, $templateId: Int!, $pipelineName: String!, $pipelineDescription: String!, $pipelineVersion: String!, $jobState: String!, $location: String!, $configuration: String!, $dm: Float, $dm_err: Float, $dm_epoch: Float, $dm_chi2r: Float, $dm_tres: Float, $sn: Float, $flux: Float, $rm: Float, $rm_err: Float, $percent_rfi_zapped: Float, ) { createPipelineRun(input: { observationId: $observationId, ephemerisId: $ephemerisId, templateId: $templateId, pipelineName: $pipelineName, pipelineDescription: $pipelineDescription, pipelineVersion: $pipelineVersion, jobState: $jobState, location: $location, configuration: $configuration, dm: $dm, dmErr: $dm_err, dmEpoch: $dm_epoch, dmChi2r: $dm_chi2r, dmTres: $dm_tres, sn: $sn, flux: $flux, rm: $rm, rmErr: $rm_err, percentRfiZapped: $percent_rfi_zapped, }) { pipelineRun { id } } } """ if results_dict is None: results_dict = { "dm": None, "dm_err": None, "dm_epoch": None, "dm_chi2r": None, "dm_tres": None, "sn": None, "flux": None, "rm": None, "rm_err": None, "percent_rfi_zapped": None, } if results_dict["rm"] == "None": results_dict["rm"] = None if results_dict["rm_err"] == "None": results_dict["rm_err"] = None self.variables = { "observationId": observationId, "ephemerisId": ephemerisId, "templateId": templateId, "pipelineName": pipelineName, "pipelineDescription": pipelineDescription, "pipelineVersion": pipelineVersion, "jobState": jobState, "location": location, "configuration": json.dumps(configuration), "dm": results_dict["dm"], "dm_err": results_dict["dm_err"], "dm_epoch": results_dict["dm_epoch"], "dm_chi2r": results_dict["dm_chi2r"], "dm_tres": results_dict["dm_tres"], "sn": results_dict["sn"], "flux": results_dict["flux"], "rm": results_dict["rm"], "rm_err": results_dict["rm_err"], "percent_rfi_zapped": results_dict["percent_rfi_zapped"], } return self.mutation_graphql()
[docs] def update( self, id, jobState, results_dict=None, ): """Update a PipelineRun database object. Parameters ---------- id : int The database ID jobState : str The state of the job from ("Pending", "Running", "Completed", "Failed", "Cancelled"). results_dict : dict, optional The results of the pipeline which is only uploaded when the run is completed, by default None Returns ------- client_response: A client response object. """ self.mutation_name = "updatePipelineRun" self.mutation = """ mutation ( $id: Int!, $jobState: String!, $dm: Float, $dm_err: Float, $dm_epoch: Float, $dm_chi2r: Float, $dm_tres: Float, $sn: Float, $flux: Float, $rm: Float, $rm_err: Float, $percent_rfi_zapped: Float, ) { updatePipelineRun(id: $id, input: { jobState: $jobState, dm: $dm, dmErr: $dm_err, dmEpoch: $dm_epoch, dmChi2r: $dm_chi2r, dmTres: $dm_tres, sn: $sn, flux: $flux, rm: $rm, rmErr: $rm_err, percentRfiZapped: $percent_rfi_zapped, }) { pipelineRun { id } } } """ if results_dict is None: results_dict = { "dm": None, "dm_err": None, "dm_epoch": None, "dm_chi2r": None, "dm_tres": None, "sn": None, "flux": None, "rm": None, "rm_err": None, "percent_rfi_zapped": None, } if results_dict["rm"] == "None": results_dict["rm"] = None if results_dict["rm_err"] == "None": results_dict["rm_err"] = None self.variables = { "id": id, "jobState": jobState, "dm": results_dict["dm"], "dm_err": results_dict["dm_err"], "dm_epoch": results_dict["dm_epoch"], "dm_chi2r": results_dict["dm_chi2r"], "dm_tres": results_dict["dm_tres"], "sn": results_dict["sn"], "flux": results_dict["flux"], "rm": results_dict["rm"], "rm_err": results_dict["rm_err"], "percent_rfi_zapped": results_dict["percent_rfi_zapped"], } return self.mutation_graphql()
[docs] def delete( self, id, ): """Delete a PipelineRun database object. Parameters ---------- id : int The database ID Returns ------- client_response: A client response object. """ self.mutation_name = "deletePipelineRun" self.mutation = """ mutation ($id: Int!) { deletePipelineRun(id: $id) { ok } } """ self.variables = { "id": id, } return self.mutation_graphql()
def process(self, args): """Parse the arguments collected by the CLI.""" self.print_stdout = True if args.subcommand == "create": return self.create( args.target, args.calibration, args.telescope, args.instrument_config, args.project, args.config, args.utc, args.duration, args.nant, args.nanteff, args.suspect, args.comment, ) elif args.subcommand == "update": with open(args.results_json, "r") as json_file: results_dict = json.load(json_file) return self.update( args.id, args.job_state, results_dict, ) elif args.subcommand == "list": return self.list( args.id, args.observation_id, args.ephemeris_id, args.template_id, args.pipelineName, args.pipelineDescription, args.pipelineVersion, args.jobState, args.location, args.dm, args.dmErr, args.dmEpoch, args.dmChi2r, args.dmTres, args.sn, args.flux, args.rm, args.percentRfiZapped, ) elif args.subcommand == "delete": return self.delete(args.id) else: raise RuntimeError(f"{args.subcommand} command is not implemented") @classmethod def get_name(cls): return "pipeline_run" @classmethod def get_description(cls): return "PipelineRun details." @classmethod def get_parsers(cls): """Returns the default parser for this model""" parser = GraphQLTable.get_default_parser("PipelineRun model parser") cls.configure_parsers(parser) return parser @classmethod def configure_parsers(cls, parser): """Add sub-parsers for each of the valid commands.""" # create the parser for the "list" command parser.set_defaults(command=cls.get_name()) subs = parser.add_subparsers(dest="subcommand") subs.required = True parser_list = subs.add_parser("list", help="list existing PipelineRun") parser_list.add_argument("--id", metavar="ID", type=int, help="List pipeline run with matching pipeline_run_id [int]") parser_list.add_argument("--observation_id", metavar="OBS", type=int, help="List pipeline run with matching observation_id [int]") parser_list.add_argument("--ephemeris_id", metavar="EPHEM", type=int, help="List pipeline run with matching ephemeris_id [int]") parser_list.add_argument("--template_id", metavar="TEMP", type=int, help="List pipeline run with matching template_id [int]") parser_list.add_argument("--pipelineName", metavar="NAME", type=str, help="List pipeline run with matching pipelineName [str]") parser_list.add_argument("--pipelineDescription", metavar="DESC", type=str, help="List pipeline run with matching pipelineDescription [str]") parser_list.add_argument("--pipelineVersion", metavar="VER", type=str, help="List pipeline run with matching pipelineVersion [str]") parser_list.add_argument("--jobState", metavar="STATE", type=str, help="List pipeline run with matching jobState [str]") parser_list.add_argument("--location", metavar="LOC", type=str, help="List pipeline run with matching location [str]") parser_list.add_argument("--dm", metavar="DM", type=float, help="List pipeline run with matching dm [float]") parser_list.add_argument("--dmErr", metavar="DMERR", type=float, help="List pipeline run with matching dmErr [float]") parser_list.add_argument("--dmEpoch", metavar="DMEPOCH", type=float, help="List pipeline run with matching dmEpoch [float]") parser_list.add_argument("--dmChi2r", metavar="DMCHI2R", type=float, help="List pipeline run with matching dmChi2r [float]") parser_list.add_argument("--dmTres", metavar="DMTRES", type=float, help="List pipeline run with matching dmTres [float]") parser_list.add_argument("--sn", metavar="SN", type=float, help="List pipeline run with matching sn [float]") parser_list.add_argument("--flux", metavar="FLUX", type=float, help="List pipeline run with matching flux [float]") parser_list.add_argument("--rm", metavar="RM", type=float, help="List pipeline run with matching rm [float]") parser_list.add_argument("--percentRfiZapped", metavar="RFI", type=float, help="List pipeline run with matching percentRfiZapped [float]")