wfcommons.trace¶
wfcommons.trace.schema¶
-
class
wfcommons.trace.schema.
SchemaValidator
(schema_file: Optional[str] = None, logger: Optional[logging.Logger] = None)¶ Bases:
object
Validate JSON files against WorkflowHub schema. If schema file path is not provided, it will look for a local copy of the WorkflowHub schema, and if not available it will fetch the latest schema from the WorkflowHub schema GitHub repository.
Parameters: - schema_file (str) – JSON schema file path.
- logger (Logger) – The logger where to log information/warning or errors.
-
_load_schema
(schema_file: Optional[str] = None)¶ Load the schema file. If schema file path is not provided, it will look for a local copy of the WorkflowHub schema, and if not available it will fetch the latest schema from the GitHub repository.
Parameters: schema_file (str) – JSON schema file path. Returns: The JSON schema. Return type: json
-
_semantic_validation
(data: Dict[str, Any])¶ Validate the semantics of the JSON workflow execution trace.
Parameters: data (Dict[str, Any]) – Workflow trace in JSON format.
-
_syntax_validation
(data: Dict[str, Any])¶ Validate the JSON workflow execution trace against the schema.
Parameters: data (Dict[str, Any]) – Workflow trace in JSON format.
-
validate_trace
(data: Dict[str, Any])¶ Perform syntax validation against the schema, and semantic validation.
Parameters: data (Dict[str, Any]) – Workflow trace in JSON format.
wfcommons.trace.trace¶
-
class
wfcommons.trace.trace.
Trace
(input_trace: str, schema_file: Optional[str] = None, logger: Optional[logging.Logger] = None) Bases:
object
Representation of one execution of one workflow on a set of machines
Trace(input_trace = 'trace.json')
Parameters: - input_trace (str) – The JSON trace.
- schema_file (str) –
The path to the JSON schema that defines the trace. If no schema file is provided, it will look for a local copy of the WorkflowHub schema, and if not available it will fetch the latest schema from the WorkflowHub schema GitHub repository.
- logger (Logger) – The logger where to log information/warning or errors.
-
draw
(output: Optional[str] = None, extension: str = 'pdf') → None Produce an image or a pdf file representing the trace.
Parameters: - output (str) – Name of the output file.
- extension – Type of the file extension (
pdf
,png
, orsvg
).
-
leaves
() → List[str] Get the leaves of the workflow (i.e., the tasks without any successors).
Returns: List of leaves Return type: List[str]
-
roots
() → List[str] Get the roots of the workflow (i.e., the tasks without any predecessors).
Returns: List of roots Return type: List[str]
-
write_dot
(output: Optional[str] = None) → None Write a dot file of the trace.
Parameters: output (str) – The output dot
file name (optional).
wfcommons.trace.trace_analyzer¶
-
class
wfcommons.trace.trace_analyzer.
TraceAnalyzer
(logger: Optional[logging.Logger] = None) Bases:
object
Set of tools for analyzing collections of traces.
Parameters: logger (Logger) – The logger where to log information/warning or errors (optional). -
append_trace
(trace: wfcommons.trace.trace.Trace) → None Append a workflow trace object to the trace analyzer.
trace = Trace(input_trace = 'trace.json', schema = 'schema.json') trace_analyzer = TraceAnalyzer() trace_analyzer.append_trace(trace)
Parameters: trace (Trace) – A workflow trace object.
-
build_summary
(tasks_list: List[str], include_raw_data: Optional[bool] = True) → Dict[str, Dict[str, Any]] Analyzes appended traces and produce a summary of the analysis per task prefix.
workflow_tasks = ['sG1IterDecon', 'wrapper_siftSTFByMisfit'] traces_summary = trace_analyzer.build_summary(workflow_tasks, include_raw_data=False)
Parameters: - tasks_list (List[str]) – List of workflow tasks prefix (e.g., mProject, sol2sanger, add_replace)
- include_raw_data (bool) – Whether to include the raw data in the trace summary.
Returns: A summary of the analysis of traces in the form of a dictionary in which keys are task prefixes.
Return type: Dict[str, Dict[str, Any]]
-
generate_all_fit_plots
(outfile_prefix: Optional[str] = None) → None Produce fit plots as images for each entry of the summary analysis. For entries in which there are no distribution (i.e., constant value), no plot will be generated.
Parameters: outfile_prefix (str) – Prefix to be attached to each generated plot file name (optional).
-
generate_fit_plots
(trace_element: wfcommons.trace.trace_analyzer.TraceElement, outfile_prefix: Optional[str] = None) → None Produce fit plots as images for each entry of a trace element generated by the summary analysis. For entries in which there are no distribution (i.e., constant value), no plot will be generated.
Parameters: - trace_element (TraceElement) – Workflow element for which the fit plots will be generated.
- outfile_prefix (str) – Prefix to be attached to each generated plot file name (optional).
-
-
class
wfcommons.trace.trace_analyzer.
TraceElement
Bases:
wfcommons.utils.NoValue
An enumeration.
-
INPUT
= ('input', 'Input File Size (bytes)')
-
OUTPUT
= ('output', 'Input File Size (bytes)')
-
RUNTIME
= ('runtime', 'Runtime (s)')
-
-
wfcommons.trace.trace_analyzer.
_append_file_to_dict
(extension: str, dict_obj: Dict[str, Any], file_size: int) → None Add a file size to a file type extension dictionary.
Parameters: - extension (str) – File type extension.
- dict_obj (Dict[str, Any]) – Dictionary of file type extensions.
- file_size (int) – File size in bytes.
-
wfcommons.trace.trace_analyzer.
_best_fit_distribution_for_file
(dict_obj, include_raw_data) → None Find the best fit distribution for a file.
Parameters: - dict_obj (Dict[str, Any]) – Dictionary of file type extensions.
- include_raw_data (bool) –
-
wfcommons.trace.trace_analyzer.
_generate_fit_plots
(el: Dict[KT, VT], title: str, xlabel: str, outfile: str, font_size: Optional[int] = None, logger: Optional[logging.Logger] = None) → None Produce a fit plot as an image for an entry of a trace element generated by the summary analysis.
Parameters: - el (Dict) – Entry of a trace element generated by the summary analysis.
- title (str) – Plot title.
- xlabel (str) – X-axis label.
- outfile (Optional[int]) – Plot file name.
- font_size – Size of the font.
- logger (Logger) – The logger where to log information/warning or errors.
-
wfcommons.trace.trace_analyzer.
_json_format_distribution_fit
(dist_tuple: Tuple) → Dict[str, Any] Format the best fit distribution data into a dictionary
Parameters: dist_tuple (Tuple) – Tuple containing best fit distribution name and parameters. Returns: Return type: Dict[str, Any]
wfcommons.trace.logs.abstract_logs_parser¶
-
class
wfcommons.trace.logs.abstract_logs_parser.
LogsParser
(wms_name: str, wms_url: Optional[str] = None, description: Optional[str] = None, logger: Optional[logging.Logger] = None) Bases:
abc.ABC
An abstract class of logs parser for creating workflow traces.
Parameters: - wms_name (str) – Name of the workflow system.
- wms_url (str) – URL for the workflow system.
- description (str) – Workflow trace description.
- logger (Logger) – The logger where to log information/warning or errors (optional).
-
_abc_impl
= <_abc_data object>
-
build_workflow
(workflow_name: Optional[str] = None) → wfcommons.common.workflow.Workflow Create workflow trace based on the workflow execution logs.
Parameters: workflow_name (str) – The workflow name. Returns: A workflow trace object. Return type: Workflow
wfcommons.trace.logs.makeflow¶
-
class
wfcommons.trace.logs.makeflow.
MakeflowLogsParser
(execution_dir: str, resource_monitor_logs_dir: str, description: Optional[str] = None, logger: Optional[logging.Logger] = None) Bases:
wfcommons.trace.logs.abstract_logs_parser.LogsParser
Parse Makeflow submit directory to generate workflow trace.
Parameters: - execution_dir (str) – Makeflow workflow execution directory (contains .mf and .makeflowlog files).
- resource_monitor_logs_dir (str) – Resource Monitor log files directory.
- description (str) – Workflow trace description.
- logger (Logger) – The logger where to log information/warning or errors (optional).
-
_abc_impl
= <_abc_data object>
-
_create_files
(files_list: List[str], link: wfcommons.common.file.FileLink, task_name: str) Create a list of files objects.
Parameters: - files_list – list of file names.
- link – Link type for the files in the list.
- task_name – Task name.
Rtype files_list: List[str]
Rtype link: FileLink
Rtype task_name: str
Returns: List of file objects.
Return type: List[File]
-
_parse_makeflow_log_file
() Parse the makeflow log file and update workflow task information.
-
_parse_resource_monitor_logs
() Parse the log files produced by resource monitor
-
_parse_workflow_file
() Parse the makeflow workflow file and build the workflow structure.
-
build_workflow
(workflow_name: Optional[str] = None) → wfcommons.common.workflow.Workflow Create workflow trace based on the workflow execution logs.
Parameters: workflow_name (str) – The workflow name. Returns: A workflow trace object. Return type: Workflow
wfcommons.trace.logs.pegasus¶
-
class
wfcommons.trace.logs.pegasus.
PegasusLogsParser
(submit_dir: str, description: Optional[str] = None, ignore_auxiliary: Optional[bool] = True, legacy: Optional[bool] = False, logger: Optional[logging.Logger] = None) Bases:
wfcommons.trace.logs.abstract_logs_parser.LogsParser
Parse Pegasus submit directory to generate workflow trace.
Parameters: - submit_dir (str) – Pegasus submit directory.
- legacy (bool) – Whether the submit directory is from a Pegasus 4.x version.
- description (str) – Workflow trace description.
- ignore_auxiliary (bool) – Ignore auxiliary jobs.
- logger (Logger) – The logger where to log information/warning or errors (optional).
-
_abc_impl
= <_abc_data object>
-
_fetch_all_files
(extension: str, file_name: str = '') Fetch all files from the directory and its hierarchy
Parameters: - extension (str) – file extension to be searched for
- file_name (str) – file_name to be searched
Returns: List of file names that match
Return type: List[str]
-
_parse_braindump
() Parse the Pegasus braindump.txt file
-
_parse_dag
() Parse the DAG file.
-
_parse_dax
() Parse the DAX file.
-
_parse_job_output
(task) Parse the kickstart job output file (e.g., .out.000).
Parameters: task (Task) – Task object.
-
_parse_job_output_latest
(task, output_file) Parse the kickstart job output file in YAML format (e.g., .out.000).
Parameters: - task (Task) – Task object.
- output_file (str) – Output file name.
-
_parse_job_output_legacy
(task, output_file) Parse the kickstart job output file in XML format (e.g., .out.000).
Parameters: - task (Task) – Task object.
- output_file (str) – Output file name.
-
_parse_meta_file
(task_name) Parse the Pegasus meta file (generated from pegasus-integrity)
Parameters: task_name (str) – Task file name.
-
_parse_workflow
() Parse the Workflow file.
-
build_workflow
(workflow_name: Optional[str] = None) → wfcommons.common.workflow.Workflow Create workflow trace based on the workflow execution logs.
Parameters: workflow_name (str) – The workflow name. Returns: A workflow trace object. Return type: Workflow