Design¶
Dyff consists of two major components:
dyff-api
A web API built around dyff-operator functionality. Components include an API server, message broker, database, and various internal services that coordinate platform operations. An
orchestrator
service creates and manages Dyff k8s resources via the k8s API in response to Dyff API actions, and the Dyff Operator in turn does the actual work in response to these k8s resources.dyff-operator
A Kubernetes operator that manages a set of custom resources . The Dyff Operator launches “workflows” composed of one or more “workflow steps” in response to creation of Dyff k8s resources.
Service diagram¶
This diagram summarizes the various services that make up Dyff and their interactions.
Workflow walkthrough¶
The main task of Dyff is to execute a few different types of workflows on behalf of platform users. A workflow is a sequence of computational steps that realize one stage of the overall audit pipeline.
Here is a sketch of what happens when the user wants to execute an Evaluation:
The user calls the
client.evaluations.create()
method on an instance of the Python API client.The client sends a
POST /evaluations
request to theapi-server
.The
api-server
checks that the user-supplied bearer token grants the required permissions to create the requested Evaluation.The
api-server
assigns an ID to the new Evaluation, resolves references to other resources by making database queries, populates a complete new Evaluation object, produces a Create event to thedyff.workflows.events
topic with the new Evaluation object, and returns the Evaluation object to the client.The
workflows-aggregator
consumes thedyff.workflows.events
message and upserts the corresponding Evaluation object in thedyff.workflows.state
topic. Thedyff.workflows.state
topic hascleanup.policy = compact
set, so it only retains the most recent version of each object.The
workflows-sink
service consumes thedyff.workflows.state
message and puts the updated Evaluation object in the database.Concurrently, the
orchestrator
consumes thedyff.workflows.state
message and sees that the entity has.status = Created
. The orchestrator checks whether scheduling preconditions are met – whether all dependencies are in.status = Ready
and whether the user account’s quotas can accommodate the workflow. If preconditions are satisfied, the orchestrator creates anEvaluation
k8s resource in the cluster and produces a status change message todyff.workflows.events
likeID: {"status": "Admitted"}
.The
workflows-aggregator
updates the corresponding full object indyff.workflows.state
with the new status, and theworkflows-sink
propagates this change to the database.The
dyff-operator
sees the newEvaluation
k8s resource and creates other k8s resources to execute the various steps in the Evaluation workflow. In the first step, it creates anInferenceSession
k8s resource for the inference “server”, and a Job running theevaluation_client
image for the inference “client”.InferenceSession
is another k8s custom resource, and it triggers thedyff-operator
to create a Deployment containing replicas of the inference model and a Service exposing the replicas on the internal network. The operator watches the client Job, and when it is complete, it starts the second step, where averify_evaluation_output
Job checks the output for completeness. As each step completes, the operator sets the.status.conditions
of the k8s resource to record the progress of the workflow.The
workflows-informer
service watches for changes to the status of the k8s resources through the k8s API “watch” mechanism. When the k8s resource status changes, it produces a corresponding message todyff.workflows.events
, such asID: {"status": "Admitted", "reason": "Unverified"}
orID: {"status": "Complete"}
.Eventually, the Evaluation workflow is finished. The user calls
client.evaluations.get(ID)
, which sends aGET /evaluations/ID
request toapi-server
, which queries the database and returns the Evaluation object with that ID. If the evaluation was successful, the object will have.status = Complete
.
Principles¶
Data Model¶
REST-y API and data model¶
We generally adhere to REST design, where users control the platform by creating, modifying, and querying presistent resources.
Exception: API usability is more important than 100% RESTful-ness.
Data schemas¶
All data objects have explicit schemas and can be converted to and from JSON data automatically. Schemas are defined using Pydantic data models. JSON is the standard data interchange format used for Kafka messages and service APIs.
Data schema versioning¶
The schema has a version and a revision. The revision is incremented for
incompatible changes that have a limited scope but nevertheless require data
migration. The version is incremented for wholesale changes that would require
major updates to client code. We hope that version will never have to be >
1
.
Schema changes that are both forward- and backward-compatible may be made without a revision increment. For example, a new optional field may be added.
Immutable data¶
Data objects are mostly immutable. Each object has a unique ID, and the essential properties of an object with a given ID will never change after it has been created. Immutability is essential for reproducibility and long-term integrity of audit results. If two evaluations are run on a dataset with a given ID, we know for sure that they saw the same input examples, because the dataset can’t be changed once created.
There are two primary exceptions: status information and tags. Every core
resource has fields like .status
and .reason
that describe progress of
the associated workflow. These obviously change during execution. In addition,
core objects have some “tag” fields such as .labels
that can be used to add
metadata to an object after it has been created. This separation of immutable
and mutable properties is strongly influenced by Kubernetes.
Data objects never totally disappear, we simply set their status to Deleted
,
possibly delete the actual data associated with the object (such as a dataset or
model weights), and forbid using them in new workflows. This way, all references
to those objects remain valid.
Data object versioning¶
Note
This is not implemented yet but is on the roadmap.
Data objects can belong to a “version group” of related objects. For example, a
dataset might be updated periodically with new instances. Each of these updates
produces a new, immutable object with a unique ID. The version group is just an
annotation that says that all of these objects are related. Following a
Docker-like model, users can annotate objects in the version group with named
tags such as latest
to help users query them.
Distributed System Architecture¶
Reconciliation-based execution control¶
Services that control computational work make their decisions by comparing the current state of the system to a desired state and taking actions to reconcile the two.
This is how Kubernetes works, too, and it just makes a lot of distributed system problems much easier to think about. For example, restarting after an error becomes fairly trivial, because it’s impossible to “miss” any events during the downtime.
Message ordering, at-least-once delivery, and consumer idempotence¶
Services communicate through messages sent via a Kafka broker. We design all of the Dyff services to maintain the following properties:
All message consumers see messages about a given entity in the same order
Messages are delivered at-least-once
The current system state can be reconstructed by re-playing all the messages
At-least-once delivery semantics imply that all message consumers must be idempotent to duplicate message delivery.
To maintain the at-least-once invariant, message consumers have to be structured in a particular way. Specifically, they have to process messages with the following steps:
Receive new incoming message
Take all appropriate actions (including sending more messages) and make sure they succeeded
“Commit” the incoming message so that it won’t be acted on again
The “commit” step is usually acknowledging receipt of the message to Kafka,
which increments the consumer’s “index” in the topic and prevents that message
from being replayed if the consumer reconnects. It could also be a local
operation; for example, the orchestrator
commits some actions by updating
its local KV store. The point is that the commit happens after we’re sure that
the message has been fully processed. Any crash before the commit step will
result in all of the actions being tried again.
Data persistence, eventual consistency, command-query responsibility segregation¶
The dyff.workflows.state
Kafka topic is the source of truth for the
current system state. It is replicated in the Kafka broker and backed up
regularly.
Every downstream view of the system state derives from dyff.workflows.state
.
For example, we have a MongoDB instance for querying objects in the system, but
it is populated from dyff.workflows.state
messages. User commands don’t
modify the MongoDB directly; they generate messages that eventually propagate to
the DB. This general pattern is called command-query responsibility segregation
(CQRS).
A consequence of this is that the global system state is inherently eventually
consistent; it takes time for messages to propagate to downstream data views.
Services that need strongly consistent datastores manage these themselves. For
example, the orchestrator
has a local strongly consistent KV store for
keeping track of scheduling information, so that workflows aren’t scheduled
twice if duplicate messages are generated. Likewise, the api-server
owns the
authorization database and communicates with it using strongly consistent
semantics.