Pipelines

A Pipeline is a set of Dyff workflows that can be executed as a group.

The pipeline is a directed acyclic graph representing data dependencies between workflows. For example, a simple pipeline might run an Evaluation and then create a SafetyCase from the evaluation output. This pipeline would have a graph structure like evaluation -> safetycase.

Each node in the pipeline contains the specification of a Dyff API request. The request specifications may contain placeholders that reference other nodes in the pipeline graph. When the pipeline is run, the nodes execute in an order that respects their dependencies, and the placeholders are replaced with concrete values once they are known.

Creating a Pipeline

Use pipelines.create() to create a new pipeline:

from dyff.schema.requests import PipelineCreateRequest
from dyff.schema.platform import PipelineNode, PipelineNodeRequest

pipeline = dyffapi.pipelines.create(
    PipelineCreateRequest(
        account=account,
        name="my-pipeline",
        nodes={
            "evaluation": PipelineNode(
                name="evaluation",
                request=PipelineNodeRequest(
                    # ... evaluation request spec
                ),
            ),
            "safetycase": PipelineNode(
                name="safetycase",
                request=PipelineNodeRequest(
                    # ... safetycase request spec with $(evaluation) placeholder
                ),
            ),
        },
    )
)

Pipeline Nodes

A PipelineNode represents a node in the pipeline graph. Each node contains:

  • name - Unique name within the pipeline

  • request - The request template that will be executed

You can use the syntax $(node_name) in request fields to reference another entity in the pipeline. The placeholder will be substituted with the ID of the created entity once it is known. Dyff infers the dependency graph structure from these placeholders.

Running a Pipeline

Use pipelines.run() to execute a pipeline:

from dyff.schema.requests import PipelineRunRequest

pipeline_run = dyffapi.pipelines.run(
    pipeline.id,
    PipelineRunRequest(
        account=account,
        pipeline=pipeline.id,
        arguments={
            # Pipeline parameter values
        },
    ),
)

Pipeline Runs

A PipelineRun represents an execution of a pipeline. It tracks:

  • pipeline - ID of the pipeline being run

  • workflows - Dictionary mapping node names to the IDs of created entities

Checking Run Status

Use pipelines.get_run_status() to check the status of a pipeline run:

status = dyffapi.pipelines.get_run_status(pipeline_run.id)
print(status.status)  # Overall status
print(status.nodes)   # Status of each node

Getting Run Workflows

Use pipelines.get_run_workflows() to retrieve the entities created by a pipeline run:

workflows = dyffapi.pipelines.get_run_workflows(pipeline_run.id)
# workflows is a dict mapping node names to entity objects

Pipeline Parameters

Pipelines can define input parameters that customize their behavior. Parameters are specified when creating the pipeline and values are provided when running it.