wfcommons.trace

wfcommons.trace.trace

class wfcommons.trace.trace.Trace(input_trace: str, schema_file: Optional[str] = None, logger: Optional[logging.Logger] = None)

Bases: object

Representation of one execution of one workflow on a set of machines

Trace(input_trace = 'trace.json')
Parameters:
  • input_trace (str) – The JSON trace.
  • schema_file (str) – The path to the JSON schema that defines the trace. If no schema file is provided, it will look for a local copy of the WorkflowHub schema, and if not available it will fetch the latest schema from the WorkflowHub schema GitHub repository.
  • logger (Logger) – The logger where to log information/warning or errors.
draw(output: Optional[str] = None, extension: str = 'pdf') → None

Produce an image or a pdf file representing the trace.

Parameters:
  • output (str) – Name of the output file.
  • extension – Type of the file extension (pdf, png, or svg).
leaves() → List[str]

Get the leaves of the workflow (i.e., the tasks without any successors).

Returns:List of leaves
Return type:List[str]
roots() → List[str]

Get the roots of the workflow (i.e., the tasks without any predecessors).

Returns:List of roots
Return type:List[str]
write_dot(output: Optional[str] = None) → None

Write a dot file of the trace.

Parameters:output (str) – The output dot file name (optional).

wfcommons.trace.trace_analyzer

class wfcommons.trace.trace_analyzer.TraceAnalyzer(logger: Optional[logging.Logger] = None)

Bases: object

Set of tools for analyzing collections of traces.

Parameters:logger (Logger) – The logger where to log information/warning or errors (optional).
append_trace(trace: wfcommons.trace.trace.Trace) → None

Append a workflow trace object to the trace analyzer.

trace = Trace(input_trace = 'trace.json', schema = 'schema.json')
trace_analyzer = TraceAnalyzer()
trace_analyzer.append_trace(trace)
Parameters:trace (Trace) – A workflow trace object.
build_summary(tasks_list: List[str], include_raw_data: Optional[bool] = True) → Dict[str, Dict[str, Any]]

Analyzes appended traces and produce a summary of the analysis per task prefix.

workflow_tasks = ['sG1IterDecon', 'wrapper_siftSTFByMisfit']
traces_summary = trace_analyzer.build_summary(workflow_tasks, include_raw_data=False)
Parameters:
  • tasks_list (List[str]) – List of workflow tasks prefix (e.g., mProject, sol2sanger, add_replace)
  • include_raw_data (bool) – Whether to include the raw data in the trace summary.
Returns:

A summary of the analysis of traces in the form of a dictionary in which keys are task prefixes.

Return type:

Dict[str, Dict[str, Any]]

generate_all_fit_plots(outfile_prefix: Optional[str] = None) → None

Produce fit plots as images for each entry of the summary analysis. For entries in which there are no distribution (i.e., constant value), no plot will be generated.

Parameters:outfile_prefix (str) – Prefix to be attached to each generated plot file name (optional).
generate_fit_plots(trace_element: wfcommons.trace.trace_analyzer.TraceElement, outfile_prefix: Optional[str] = None) → None

Produce fit plots as images for each entry of a trace element generated by the summary analysis. For entries in which there are no distribution (i.e., constant value), no plot will be generated.

Parameters:
  • trace_element (TraceElement) – Workflow element for which the fit plots will be generated.
  • outfile_prefix (str) – Prefix to be attached to each generated plot file name (optional).
class wfcommons.trace.trace_analyzer.TraceElement

Bases: wfcommons.utils.NoValue

An enumeration.

INPUT = ('input', 'Input File Size (bytes)')
OUTPUT = ('output', 'Input File Size (bytes)')
RUNTIME = ('runtime', 'Runtime (s)')

wfcommons.trace.logs.makeflow

class wfcommons.trace.logs.makeflow.MakeflowLogsParser(execution_dir: str, resource_monitor_logs_dir: str, description: Optional[str] = None, logger: Optional[logging.Logger] = None)

Bases: wfcommons.trace.logs.abstract_logs_parser.LogsParser

Parse Makeflow submit directory to generate workflow trace.

Parameters:
  • execution_dir (str) – Makeflow workflow execution directory (contains .mf and .makeflowlog files).
  • resource_monitor_logs_dir (str) – Resource Monitor log files directory.
  • description (str) – Workflow trace description.
  • logger (Logger) – The logger where to log information/warning or errors (optional).
build_workflow(workflow_name: Optional[str] = None) → wfcommons.common.workflow.Workflow

Create workflow trace based on the workflow execution logs.

Parameters:workflow_name (str) – The workflow name.
Returns:A workflow trace object.
Return type:Workflow

wfcommons.trace.logs.pegasus

class wfcommons.trace.logs.pegasus.PegasusLogsParser(submit_dir: str, description: Optional[str] = None, ignore_auxiliary: Optional[bool] = True, legacy: Optional[bool] = False, logger: Optional[logging.Logger] = None)

Bases: wfcommons.trace.logs.abstract_logs_parser.LogsParser

Parse Pegasus submit directory to generate workflow trace.

Parameters:
  • submit_dir (str) – Pegasus submit directory.
  • legacy (bool) – Whether the submit directory is from a Pegasus 4.x version.
  • description (str) – Workflow trace description.
  • ignore_auxiliary (bool) – Ignore auxiliary jobs.
  • logger (Logger) – The logger where to log information/warning or errors (optional).
build_workflow(workflow_name: Optional[str] = None) → wfcommons.common.workflow.Workflow

Create workflow trace based on the workflow execution logs.

Parameters:workflow_name (str) – The workflow name.
Returns:A workflow trace object.
Return type:Workflow