Design¶
Dyff consists of three major components:
dyff-operatorA Kubernetes operator that manages a set of custom resources . The Dyff Operator launches “workflows” composed of one or more “workflow steps” in response to creation of Dyff k8s resources. This is the “kernel” of the Dyff system that does the actual computational work.
dyff-apiA full-featured cloud platform built around
dyff-operatorfunctionality. It provides a JSON API, security functions, data management and backups, etc. Components include an API server, message broker, database, and various internal services that coordinate platform operations. Adyff-orchestratorservice creates and manages Dyff k8s resources via the k8s API in response to Dyff API actions, and the Dyff Operator in turn does the actual work in response to these k8s resources.dyff-frontendA Web app that presents audit results and allows authenticated users to view and manage their Dyff workflows. It gets its data via the JSON API. The
dyff-apiserver implements a handful of/viewsroutes that allow the frontend to be more responsive by bundling together all of the datastore queries that are needed to build certain frontend pages.
Service diagram¶
This diagram summarizes the various services that make up Dyff and
their interactions. Gray boxes indicate a Kubernetes namespace. The white boxes inside them are Kubernetes workloads. Parallelograms with the Kafka logo are Kafka topics; they are managed by services in the kafka namespace. Depending on how Dyff is deployed, the “s3 provider” may be provided by the cloud platform or it may be a minio instance running inside the Dyff k8s cluster.
Workflow walkthrough¶
The main task of Dyff is to execute a few different types of workflows on behalf of platform users. A workflow is a sequence of computational steps that realize one stage of the overall audit pipeline.
Here is a sketch of what happens when the user wants to execute an Evaluation:
The user calls the
client.evaluations.create()method on an instance of the Python API client.The client sends a
POST /evaluationsrequest to thedyff-apiserver.The
dyff-apiserver checks that the user-supplied bearer token grants the required permissions to create the requested Evaluation.The
dyff-apiserver assigns an ID to the new Evaluation, resolves references to other resources by making database queries, populates a complete new Evaluation object, produces a Create event to thedyff.workflows.eventstopic with the new Evaluation object, and returns the Evaluation object to the client.The
workflows-aggregatorconsumes thedyff.workflows.eventsmessage and upserts the full Evaluation object into thedyff.workflows.statetopic. Thedyff.workflows.statetopic hascleanup.policy = compactset, so it only retains the most recent version of each object.The
workflows-sink-mongodbservice consumes thedyff.workflows.statemessage and puts the updated Evaluation object in the database. Theworkflows-sink-s3service backs up the object state to an s3 bucket.Concurrently, the
dyff-orchestratorconsumes thedyff.workflows.statemessage and sees that the entity has.status = Created. The orchestrator checks whether scheduling preconditions are met – whether all dependencies are in.status = Readyand whether the user account’s quotas can accommodate the workflow. If preconditions are satisfied, the orchestrator creates anEvaluationk8s resource in the cluster and produces a status change message todyff.workflows.eventslikeID: {"command": "UpdateEntityStatus", "body": {"status": "Admitted"}}.The
workflows-aggregatorconsumes the status change event and updates the corresponding full object indyff.workflows.statewith the new status, and theworkflows-sinkservices propagate this change to their datastores.The
dyff-operatorsees the newEvaluationk8s resource and creates other k8s resources to execute the various steps in the Evaluation workflow. In the first step, it creates anInferenceSessionk8s resource for the inference “server”, and a Job running theevaluation-clientimage for the inference “client”.InferenceSessionis another k8s custom resource, and it triggers thedyff-operatorto create a Deployment containing replicas of the inference model and a Service exposing the replicas on the internal network. The operator watches the client Job, and when it is complete, it starts the second step, where averify-evaluation-outputJob checks the output for completeness. As each step completes, the operator sets the.status.conditionsof the k8s resource to record the progress of the workflow.The
workflows-informerservice watches for changes to the status of the k8s resources through the k8s API “watch” mechanism. When the k8s resource status changes, it produces a corresponding message todyff.workflows.events, such asID: {"command": "UpdateEntityStatus", "body": {"status": "Complete"}}Eventually, the Evaluation workflow is finished. The user calls
client.evaluations.get(ID), which sends aGET /evaluations/IDrequest to thedyff-apiserver, which queries the database and returns the Evaluation object with that ID. If the evaluation was successful, the object will have.status = Complete.
Principles¶
Data Model¶
REST-y API and data model¶
We generally adhere to REST design, where users control the platform by creating, modifying, and querying presistent resources.
Exception: API usability is more important than 100% RESTful-ness. Some of the APIs are implemented in an imperative style because it’s simpler.
Data schemas¶
All data objects have explicit schemas and can be converted to and from JSON data automatically. Schemas are defined using Pydantic data models. JSON is the standard data interchange format used for Kafka messages and service APIs.
Data schema versioning¶
The schema has a version and a revision. The revision is incremented for incompatible changes that have a limited scope but nevertheless require data migration. The version is incremented for wholesale changes that would require major updates to client code.
Schema changes that are both forward- and backward-compatible may be made without a revision increment. For example, a new optional field may be added, or a required field may be given a default value if it didn’t have one previously.
Immutable data¶
Data objects are mostly immutable. Each object has a unique ID, and the essential properties of an object with a given ID will never change after it has been created. Immutability is essential for reproducibility and long-term integrity of audit results. If two evaluations are run on a dataset with a given ID, we know for sure that they saw the same input examples, because the dataset can’t be changed once created.
There are two primary exceptions: status information and metadata. Every core
resource has fields like .status and .reason that describe progress of
the associated workflow. These obviously change during execution. In addition,
core objects have some metadata fields such as .labels that can be added
oor removed after the object has been created. This separation of immutable
and mutable properties is strongly influenced by Kubernetes.
Deletion process¶
When a user deletes a resource, its status is set to Deleted and it can
no longer be consumed by new workflows. A CronJob runs once per day, and if any
resources have been in the Deleted status for at least 7 days when the
CronJob runs, it sends a "ForgetEntity" command. Backend services respond
by deleting that object from the data stores that they manage.
In a production Dyff environment, the s3 buckets have a “soft delete” policy
applied, so objects can be un-deleted for up to 7 days after deletion. The s3
backup of the dyff.workflows.state topic can be used to restore messages
about the object, and the associated data in s3 simply needs to be un-deleted.
All s3 data is rsync’d to a separate backup bucket once per day. The
backup bucket also has a 7 day soft-delete policy. So, 7 days after the
"ForgetEntity" command is given, the object data disappears from the
“primary” s3 buckets, which triggers a delete in the backup bucket on the
next rsync. After 7 more days, the object disappears from the backup.
References to the deleted object in other live objects will remain there. Those objects can still be queried by references to the deleted object – i.e., you can find all Evaluations that ran on a deleted Dataset – but fetching the deleted Dataset will raise a 404 error.
Data object versioning¶
Note
This is not implemented yet but is on the roadmap.
Data objects can belong to a “version group” of related objects. For example, a
dataset might be updated periodically with new instances. Each of these updates
produces a new, immutable object with a unique ID. The version group is just an
annotation that says that all of these objects are related. Following a
Docker-like model, users can annotate objects in the version group with named
tags such as latest to help users query them.
Distributed System Architecture¶
Reconciliation-based execution control¶
Services that control computational work make their decisions by comparing the current state of the system to a desired state and taking actions to reconcile the two.
This is how Kubernetes works, too, and it just makes a lot of distributed system problems much easier to think about. For example, restarting after an error becomes fairly trivial, because it’s impossible to “miss” any events during the downtime.
Message ordering, at-least-once delivery, and consumer idempotence¶
Services communicate through messages sent via a Kafka broker. We design all of the Dyff services to maintain the following properties:
All message consumers see messages about a given entity in the same order
Messages are delivered at-least-once
The current system state can be reconstructed by re-playing all the messages
At-least-once delivery semantics imply that all message consumers must be idempotent to duplicate message delivery.
To maintain the at-least-once invariant, message consumers have to be structured in a particular way. Specifically, they have to process messages with the following steps:
Receive new incoming message
Take all appropriate actions (including sending more messages) and make sure they succeeded
“Commit” the incoming message so that it won’t be acted on again
The “commit” step is usually acknowledging receipt of the message to Kafka,
which increments the consumer’s “index” in the topic and prevents that message
from being replayed if the consumer reconnects. It could also be a local
operation; for example, dyff-orchestrator commits some actions by updating
its local KV store. The point is that the commit happens after we’re sure that
the message has been fully processed. Any crash before the commit step will
result in all of the actions being tried again.
Data persistence, eventual consistency, command-query responsibility segregation¶
The dyff.workflows.state Kafka topic is the source of truth for the
current system state. It is replicated in the Kafka broker and backed up
regularly.
Every downstream view of the system state derives from dyff.workflows.state.
For example, we have a MongoDB instance for querying objects in the system, but
it is populated from dyff.workflows.state messages. User commands don’t
modify the MongoDB directly; they generate messages that eventually propagate to
the DB. This general pattern is called command-query responsibility segregation
(CQRS).
A consequence of this is that the global system state is inherently eventually
consistent; it takes time for messages to propagate to downstream data views.
Services that need strongly consistent datastores manage these themselves. For
example, dyff-orchestrator has a local strongly consistent KV store for
keeping track of scheduling information, so that workflows aren’t scheduled
twice if duplicate messages are generated. Likewise, the dyff-api service owns the
authorization database and communicates with it using strongly consistent
semantics.