wfcommons.trace

wfcommons.trace.schema

class wfcommons.trace.schema.SchemaValidator(schema_file: Optional[str] = None, logger: Optional[logging.Logger] = None)

Bases: object

Validate JSON files against WorkflowHub schema. If schema file path is not provided, it will look for a local copy of the WorkflowHub schema, and if not available it will fetch the latest schema from the WorkflowHub schema GitHub repository.

Parameters:
  • schema_file (str) – JSON schema file path.
  • logger (Logger) – The logger where to log information/warning or errors.
_load_schema(schema_file: Optional[str] = None)

Load the schema file. If schema file path is not provided, it will look for a local copy of the WorkflowHub schema, and if not available it will fetch the latest schema from the GitHub repository.

Parameters:schema_file (str) – JSON schema file path.
Returns:The JSON schema.
Return type:json
_semantic_validation(data: Dict[str, Any])

Validate the semantics of the JSON workflow execution trace.

Parameters:data (Dict[str, Any]) – Workflow trace in JSON format.
_syntax_validation(data: Dict[str, Any])

Validate the JSON workflow execution trace against the schema.

Parameters:data (Dict[str, Any]) – Workflow trace in JSON format.
validate_trace(data: Dict[str, Any])

Perform syntax validation against the schema, and semantic validation.

Parameters:data (Dict[str, Any]) – Workflow trace in JSON format.

wfcommons.trace.trace

class wfcommons.trace.trace.Trace(input_trace: str, schema_file: Optional[str] = None, logger: Optional[logging.Logger] = None)

Bases: object

Representation of one execution of one workflow on a set of machines

Trace(input_trace = 'trace.json')
Parameters:
  • input_trace (str) – The JSON trace.
  • schema_file (str) –

    The path to the JSON schema that defines the trace. If no schema file is provided, it will look for a local copy of the WorkflowHub schema, and if not available it will fetch the latest schema from the WorkflowHub schema GitHub repository.

  • logger (Logger) – The logger where to log information/warning or errors.
draw(output: Optional[str] = None, extension: str = 'pdf') → None

Produce an image or a pdf file representing the trace.

Parameters:
  • output (str) – Name of the output file.
  • extension – Type of the file extension (pdf, png, or svg).
leaves() → List[str]

Get the leaves of the workflow (i.e., the tasks without any successors).

Returns:List of leaves
Return type:List[str]
roots() → List[str]

Get the roots of the workflow (i.e., the tasks without any predecessors).

Returns:List of roots
Return type:List[str]
write_dot(output: Optional[str] = None) → None

Write a dot file of the trace.

Parameters:output (str) – The output dot file name (optional).

wfcommons.trace.trace_analyzer

class wfcommons.trace.trace_analyzer.TraceAnalyzer(logger: Optional[logging.Logger] = None)

Bases: object

Set of tools for analyzing collections of traces.

Parameters:logger (Logger) – The logger where to log information/warning or errors (optional).
append_trace(trace: wfcommons.trace.trace.Trace) → None

Append a workflow trace object to the trace analyzer.

trace = Trace(input_trace = 'trace.json', schema = 'schema.json')
trace_analyzer = TraceAnalyzer()
trace_analyzer.append_trace(trace)
Parameters:trace (Trace) – A workflow trace object.
build_summary(tasks_list: List[str], include_raw_data: Optional[bool] = True) → Dict[str, Dict[str, Any]]

Analyzes appended traces and produce a summary of the analysis per task prefix.

workflow_tasks = ['sG1IterDecon', 'wrapper_siftSTFByMisfit']
traces_summary = trace_analyzer.build_summary(workflow_tasks, include_raw_data=False)
Parameters:
  • tasks_list (List[str]) – List of workflow tasks prefix (e.g., mProject, sol2sanger, add_replace)
  • include_raw_data (bool) – Whether to include the raw data in the trace summary.
Returns:

A summary of the analysis of traces in the form of a dictionary in which keys are task prefixes.

Return type:

Dict[str, Dict[str, Any]]

generate_all_fit_plots(outfile_prefix: Optional[str] = None) → None

Produce fit plots as images for each entry of the summary analysis. For entries in which there are no distribution (i.e., constant value), no plot will be generated.

Parameters:outfile_prefix (str) – Prefix to be attached to each generated plot file name (optional).
generate_fit_plots(trace_element: wfcommons.trace.trace_analyzer.TraceElement, outfile_prefix: Optional[str] = None) → None

Produce fit plots as images for each entry of a trace element generated by the summary analysis. For entries in which there are no distribution (i.e., constant value), no plot will be generated.

Parameters:
  • trace_element (TraceElement) – Workflow element for which the fit plots will be generated.
  • outfile_prefix (str) – Prefix to be attached to each generated plot file name (optional).
class wfcommons.trace.trace_analyzer.TraceElement

Bases: wfcommons.utils.NoValue

An enumeration.

INPUT = ('input', 'Input File Size (bytes)')
OUTPUT = ('output', 'Input File Size (bytes)')
RUNTIME = ('runtime', 'Runtime (s)')
wfcommons.trace.trace_analyzer._append_file_to_dict(extension: str, dict_obj: Dict[str, Any], file_size: int) → None

Add a file size to a file type extension dictionary.

Parameters:
  • extension (str) – File type extension.
  • dict_obj (Dict[str, Any]) – Dictionary of file type extensions.
  • file_size (int) – File size in bytes.
wfcommons.trace.trace_analyzer._best_fit_distribution_for_file(dict_obj, include_raw_data) → None

Find the best fit distribution for a file.

Parameters:
  • dict_obj (Dict[str, Any]) – Dictionary of file type extensions.
  • include_raw_data (bool) –
wfcommons.trace.trace_analyzer._generate_fit_plots(el: Dict[KT, VT], title: str, xlabel: str, outfile: str, font_size: Optional[int] = None, logger: Optional[logging.Logger] = None) → None

Produce a fit plot as an image for an entry of a trace element generated by the summary analysis.

Parameters:
  • el (Dict) – Entry of a trace element generated by the summary analysis.
  • title (str) – Plot title.
  • xlabel (str) – X-axis label.
  • outfile (Optional[int]) – Plot file name.
  • font_size – Size of the font.
  • logger (Logger) – The logger where to log information/warning or errors.
wfcommons.trace.trace_analyzer._json_format_distribution_fit(dist_tuple: Tuple) → Dict[str, Any]

Format the best fit distribution data into a dictionary

Parameters:dist_tuple (Tuple) – Tuple containing best fit distribution name and parameters.
Returns:
Return type:Dict[str, Any]

wfcommons.trace.logs.abstract_logs_parser

class wfcommons.trace.logs.abstract_logs_parser.LogsParser(wms_name: str, wms_url: Optional[str] = None, description: Optional[str] = None, logger: Optional[logging.Logger] = None)

Bases: abc.ABC

An abstract class of logs parser for creating workflow traces.

Parameters:
  • wms_name (str) – Name of the workflow system.
  • wms_url (str) – URL for the workflow system.
  • description (str) – Workflow trace description.
  • logger (Logger) – The logger where to log information/warning or errors (optional).
_abc_impl = <_abc_data object>
build_workflow(workflow_name: Optional[str] = None) → wfcommons.common.workflow.Workflow

Create workflow trace based on the workflow execution logs.

Parameters:workflow_name (str) – The workflow name.
Returns:A workflow trace object.
Return type:Workflow

wfcommons.trace.logs.makeflow

class wfcommons.trace.logs.makeflow.MakeflowLogsParser(execution_dir: str, resource_monitor_logs_dir: str, description: Optional[str] = None, logger: Optional[logging.Logger] = None)

Bases: wfcommons.trace.logs.abstract_logs_parser.LogsParser

Parse Makeflow submit directory to generate workflow trace.

Parameters:
  • execution_dir (str) – Makeflow workflow execution directory (contains .mf and .makeflowlog files).
  • resource_monitor_logs_dir (str) – Resource Monitor log files directory.
  • description (str) – Workflow trace description.
  • logger (Logger) – The logger where to log information/warning or errors (optional).
_abc_impl = <_abc_data object>
_create_files(files_list: List[str], link: wfcommons.common.file.FileLink, task_name: str)

Create a list of files objects.

Parameters:
  • files_list – list of file names.
  • link – Link type for the files in the list.
  • task_name – Task name.
Rtype files_list:
 

List[str]

Rtype link:

FileLink

Rtype task_name:
 

str

Returns:

List of file objects.

Return type:

List[File]

_parse_makeflow_log_file()

Parse the makeflow log file and update workflow task information.

_parse_resource_monitor_logs()

Parse the log files produced by resource monitor

_parse_workflow_file()

Parse the makeflow workflow file and build the workflow structure.

build_workflow(workflow_name: Optional[str] = None) → wfcommons.common.workflow.Workflow

Create workflow trace based on the workflow execution logs.

Parameters:workflow_name (str) – The workflow name.
Returns:A workflow trace object.
Return type:Workflow

wfcommons.trace.logs.pegasus

class wfcommons.trace.logs.pegasus.PegasusLogsParser(submit_dir: str, description: Optional[str] = None, ignore_auxiliary: Optional[bool] = True, legacy: Optional[bool] = False, logger: Optional[logging.Logger] = None)

Bases: wfcommons.trace.logs.abstract_logs_parser.LogsParser

Parse Pegasus submit directory to generate workflow trace.

Parameters:
  • submit_dir (str) – Pegasus submit directory.
  • legacy (bool) – Whether the submit directory is from a Pegasus 4.x version.
  • description (str) – Workflow trace description.
  • ignore_auxiliary (bool) – Ignore auxiliary jobs.
  • logger (Logger) – The logger where to log information/warning or errors (optional).
_abc_impl = <_abc_data object>
_fetch_all_files(extension: str, file_name: str = '')

Fetch all files from the directory and its hierarchy

Parameters:
  • extension (str) – file extension to be searched for
  • file_name (str) – file_name to be searched
Returns:

List of file names that match

Return type:

List[str]

_parse_braindump()

Parse the Pegasus braindump.txt file

_parse_dag()

Parse the DAG file.

_parse_dax()

Parse the DAX file.

_parse_job_output(task)

Parse the kickstart job output file (e.g., .out.000).

Parameters:task (Task) – Task object.
_parse_job_output_latest(task, output_file)

Parse the kickstart job output file in YAML format (e.g., .out.000).

Parameters:
  • task (Task) – Task object.
  • output_file (str) – Output file name.
_parse_job_output_legacy(task, output_file)

Parse the kickstart job output file in XML format (e.g., .out.000).

Parameters:
  • task (Task) – Task object.
  • output_file (str) – Output file name.
_parse_meta_file(task_name)

Parse the Pegasus meta file (generated from pegasus-integrity)

Parameters:task_name (str) – Task file name.
_parse_workflow()

Parse the Workflow file.

build_workflow(workflow_name: Optional[str] = None) → wfcommons.common.workflow.Workflow

Create workflow trace based on the workflow execution logs.

Parameters:workflow_name (str) – The workflow name.
Returns:A workflow trace object.
Return type:Workflow