Pipelines¶
A Pipeline is a set of Dyff workflows that can be executed as a group.
The pipeline is a directed acyclic graph representing data dependencies between workflows. For example, a simple pipeline might run an Evaluation and then create a SafetyCase from the evaluation output. This pipeline would have a graph structure like evaluation -> safetycase.
Each node in the pipeline contains the specification of a Dyff API request. The request specifications may contain placeholders that reference other nodes in the pipeline graph. When the pipeline is run, the nodes execute in an order that respects their dependencies, and the placeholders are replaced with concrete values once they are known.
Creating a Pipeline¶
Use pipelines.create() to create a new pipeline:
from dyff.schema.requests import PipelineCreateRequest
from dyff.schema.platform import PipelineNode, PipelineNodeRequest
pipeline = dyffapi.pipelines.create(
PipelineCreateRequest(
account=account,
name="my-pipeline",
nodes={
"evaluation": PipelineNode(
name="evaluation",
request=PipelineNodeRequest(
# ... evaluation request spec
),
),
"safetycase": PipelineNode(
name="safetycase",
request=PipelineNodeRequest(
# ... safetycase request spec with $(evaluation) placeholder
),
),
},
)
)
Pipeline Nodes¶
A PipelineNode represents a node in the pipeline graph. Each node contains:
name- Unique name within the pipelinerequest- The request template that will be executed
You can use the syntax $(node_name) in request fields to reference another entity in the pipeline. The placeholder will be substituted with the ID of the created entity once it is known. Dyff infers the dependency graph structure from these placeholders.
Running a Pipeline¶
Use pipelines.run() to execute a pipeline:
from dyff.schema.requests import PipelineRunRequest
pipeline_run = dyffapi.pipelines.run(
pipeline.id,
PipelineRunRequest(
account=account,
pipeline=pipeline.id,
arguments={
# Pipeline parameter values
},
),
)
Pipeline Runs¶
A PipelineRun represents an execution of a pipeline. It tracks:
pipeline- ID of the pipeline being runworkflows- Dictionary mapping node names to the IDs of created entities
Checking Run Status¶
Use pipelines.get_run_status() to check the status of a pipeline run:
status = dyffapi.pipelines.get_run_status(pipeline_run.id)
print(status.status) # Overall status
print(status.nodes) # Status of each node
Getting Run Workflows¶
Use pipelines.get_run_workflows() to retrieve the entities created by a pipeline run:
workflows = dyffapi.pipelines.get_run_workflows(pipeline_run.id)
# workflows is a dict mapping node names to entity objects
Pipeline Parameters¶
Pipelines can define input parameters that customize their behavior. Parameters are specified when creating the pipeline and values are provided when running it.