Design¶
Dyff consists of three major components:
dyff-operator
A Kubernetes operator that manages a set of custom resources . The Dyff Operator launches “workflows” composed of one or more “workflow steps” in response to creation of Dyff k8s resources. This is the “kernel” of the Dyff system that does the actual computational work.
dyff-api
A full-featured cloud platform built around
dyff-operator
functionality. It provides a JSON API, security functions, data management and backups, etc. Components include an API server, message broker, database, and various internal services that coordinate platform operations. Adyff-orchestrator
service creates and manages Dyff k8s resources via the k8s API in response to Dyff API actions, and the Dyff Operator in turn does the actual work in response to these k8s resources.dyff-frontend
A Web app that presents audit results and allows authenticated users to view and manage their Dyff workflows. It gets its data via the JSON API. The
dyff-api
server implements a handful of/views
routes that allow the frontend to be more responsive by bundling together all of the datastore queries that are needed to build certain frontend pages.
Service diagram¶
This diagram summarizes the various services that make up Dyff and
their interactions. Gray boxes indicate a Kubernetes namespace. The white boxes inside them are Kubernetes workloads. Parallelograms with the Kafka logo are Kafka topics; they are managed by services in the kafka
namespace. Depending on how Dyff is deployed, the “s3 provider” may be provided by the cloud platform or it may be a minio
instance running inside the Dyff k8s cluster.
Workflow walkthrough¶
The main task of Dyff is to execute a few different types of workflows on behalf of platform users. A workflow is a sequence of computational steps that realize one stage of the overall audit pipeline.
Here is a sketch of what happens when the user wants to execute an Evaluation:
The user calls the
client.evaluations.create()
method on an instance of the Python API client.The client sends a
POST /evaluations
request to thedyff-api
server.The
dyff-api
server checks that the user-supplied bearer token grants the required permissions to create the requested Evaluation.The
dyff-api
server assigns an ID to the new Evaluation, resolves references to other resources by making database queries, populates a complete new Evaluation object, produces a Create event to thedyff.workflows.events
topic with the new Evaluation object, and returns the Evaluation object to the client.The
workflows-aggregator
consumes thedyff.workflows.events
message and upserts the full Evaluation object into thedyff.workflows.state
topic. Thedyff.workflows.state
topic hascleanup.policy = compact
set, so it only retains the most recent version of each object.The
workflows-sink-mongodb
service consumes thedyff.workflows.state
message and puts the updated Evaluation object in the database. Theworkflows-sink-s3
service backs up the object state to an s3 bucket.Concurrently, the
dyff-orchestrator
consumes thedyff.workflows.state
message and sees that the entity has.status = Created
. The orchestrator checks whether scheduling preconditions are met – whether all dependencies are in.status = Ready
and whether the user account’s quotas can accommodate the workflow. If preconditions are satisfied, the orchestrator creates anEvaluation
k8s resource in the cluster and produces a status change message todyff.workflows.events
likeID: {"command": "UpdateEntityStatus", "body": {"status": "Admitted"}}
.The
workflows-aggregator
consumes the status change event and updates the corresponding full object indyff.workflows.state
with the new status, and theworkflows-sink
services propagate this change to their datastores.The
dyff-operator
sees the newEvaluation
k8s resource and creates other k8s resources to execute the various steps in the Evaluation workflow. In the first step, it creates anInferenceSession
k8s resource for the inference “server”, and a Job running theevaluation-client
image for the inference “client”.InferenceSession
is another k8s custom resource, and it triggers thedyff-operator
to create a Deployment containing replicas of the inference model and a Service exposing the replicas on the internal network. The operator watches the client Job, and when it is complete, it starts the second step, where averify-evaluation-output
Job checks the output for completeness. As each step completes, the operator sets the.status.conditions
of the k8s resource to record the progress of the workflow.The
workflows-informer
service watches for changes to the status of the k8s resources through the k8s API “watch” mechanism. When the k8s resource status changes, it produces a corresponding message todyff.workflows.events
, such asID: {"command": "UpdateEntityStatus", "body": {"status": "Complete"}}
Eventually, the Evaluation workflow is finished. The user calls
client.evaluations.get(ID)
, which sends aGET /evaluations/ID
request to thedyff-api
server, which queries the database and returns the Evaluation object with that ID. If the evaluation was successful, the object will have.status = Complete
.
Principles¶
Data Model¶
REST-y API and data model¶
We generally adhere to REST design, where users control the platform by creating, modifying, and querying presistent resources.
Exception: API usability is more important than 100% RESTful-ness. Some of the APIs are implemented in an imperative style because it’s simpler.
Data schemas¶
All data objects have explicit schemas and can be converted to and from JSON data automatically. Schemas are defined using Pydantic data models. JSON is the standard data interchange format used for Kafka messages and service APIs.
Data schema versioning¶
The schema has a version and a revision. The revision is incremented for incompatible changes that have a limited scope but nevertheless require data migration. The version is incremented for wholesale changes that would require major updates to client code.
Schema changes that are both forward- and backward-compatible may be made without a revision increment. For example, a new optional field may be added, or a required field may be given a default value if it didn’t have one previously.
Immutable data¶
Data objects are mostly immutable. Each object has a unique ID, and the essential properties of an object with a given ID will never change after it has been created. Immutability is essential for reproducibility and long-term integrity of audit results. If two evaluations are run on a dataset with a given ID, we know for sure that they saw the same input examples, because the dataset can’t be changed once created.
There are two primary exceptions: status information and metadata. Every core
resource has fields like .status
and .reason
that describe progress of
the associated workflow. These obviously change during execution. In addition,
core objects have some metadata fields such as .labels
that can be added
oor removed after the object has been created. This separation of immutable
and mutable properties is strongly influenced by Kubernetes.
Deletion process¶
When a user deletes a resource, its status is set to Deleted
and it can
no longer be consumed by new workflows. A CronJob runs once per day, and if any
resources have been in the Deleted
status for at least 7 days when the
CronJob runs, it sends a "ForgetEntity"
command. Backend services respond
by deleting that object from the data stores that they manage.
In a production Dyff environment, the s3 buckets have a “soft delete” policy
applied, so objects can be un-deleted for up to 7 days after deletion. The s3
backup of the dyff.workflows.state
topic can be used to restore messages
about the object, and the associated data in s3 simply needs to be un-deleted.
All s3 data is rsync
’d to a separate backup
bucket once per day. The
backup bucket also has a 7 day soft-delete policy. So, 7 days after the
"ForgetEntity"
command is given, the object data disappears from the
“primary” s3 buckets, which triggers a delete in the backup
bucket on the
next rsync
. After 7 more days, the object disappears from the backup.
References to the deleted object in other live objects will remain there. Those objects can still be queried by references to the deleted object – i.e., you can find all Evaluations that ran on a deleted Dataset – but fetching the deleted Dataset will raise a 404 error.
Data object versioning¶
Note
This is not implemented yet but is on the roadmap.
Data objects can belong to a “version group” of related objects. For example, a
dataset might be updated periodically with new instances. Each of these updates
produces a new, immutable object with a unique ID. The version group is just an
annotation that says that all of these objects are related. Following a
Docker-like model, users can annotate objects in the version group with named
tags such as latest
to help users query them.
Distributed System Architecture¶
Reconciliation-based execution control¶
Services that control computational work make their decisions by comparing the current state of the system to a desired state and taking actions to reconcile the two.
This is how Kubernetes works, too, and it just makes a lot of distributed system problems much easier to think about. For example, restarting after an error becomes fairly trivial, because it’s impossible to “miss” any events during the downtime.
Message ordering, at-least-once delivery, and consumer idempotence¶
Services communicate through messages sent via a Kafka broker. We design all of the Dyff services to maintain the following properties:
All message consumers see messages about a given entity in the same order
Messages are delivered at-least-once
The current system state can be reconstructed by re-playing all the messages
At-least-once delivery semantics imply that all message consumers must be idempotent to duplicate message delivery.
To maintain the at-least-once invariant, message consumers have to be structured in a particular way. Specifically, they have to process messages with the following steps:
Receive new incoming message
Take all appropriate actions (including sending more messages) and make sure they succeeded
“Commit” the incoming message so that it won’t be acted on again
The “commit” step is usually acknowledging receipt of the message to Kafka,
which increments the consumer’s “index” in the topic and prevents that message
from being replayed if the consumer reconnects. It could also be a local
operation; for example, dyff-orchestrator
commits some actions by updating
its local KV store. The point is that the commit happens after we’re sure that
the message has been fully processed. Any crash before the commit step will
result in all of the actions being tried again.
Data persistence, eventual consistency, command-query responsibility segregation¶
The dyff.workflows.state
Kafka topic is the source of truth for the
current system state. It is replicated in the Kafka broker and backed up
regularly.
Every downstream view of the system state derives from dyff.workflows.state
.
For example, we have a MongoDB instance for querying objects in the system, but
it is populated from dyff.workflows.state
messages. User commands don’t
modify the MongoDB directly; they generate messages that eventually propagate to
the DB. This general pattern is called command-query responsibility segregation
(CQRS).
A consequence of this is that the global system state is inherently eventually
consistent; it takes time for messages to propagate to downstream data views.
Services that need strongly consistent datastores manage these themselves. For
example, dyff-orchestrator
has a local strongly consistent KV store for
keeping track of scheduling information, so that workflows aren’t scheduled
twice if duplicate messages are generated. Likewise, the dyff-api
service owns the
authorization database and communicates with it using strongly consistent
semantics.