Design

Dyff consists of two major components:

dyff-api

A web API built around dyff-operator functionality. Components include an API server, message broker, database, and various internal services that coordinate platform operations. An orchestrator service creates and manages Dyff k8s resources via the k8s API in response to Dyff API actions, and the Dyff Operator in turn does the actual work in response to these k8s resources.

dyff-operator

A Kubernetes operator that manages a set of custom resources . The Dyff Operator launches “workflows” composed of one or more “workflow steps” in response to creation of Dyff k8s resources.

Service diagram

This diagram summarizes the various services that make up Dyff and their interactions.

../_images/dyff-service-diagram.svg

Workflow walkthrough

The main task of Dyff is to execute a few different types of workflows on behalf of platform users. A workflow is a sequence of computational steps that realize one stage of the overall audit pipeline.

Here is a sketch of what happens when the user wants to execute an Evaluation:

  1. The user calls the client.evaluations.create() method on an instance of the Python API client.

  2. The client sends a POST /evaluations request to the api-server.

  3. The api-server checks that the user-supplied bearer token grants the required permissions to create the requested Evaluation.

  4. The api-server assigns an ID to the new Evaluation, resolves references to other resources by making database queries, populates a complete new Evaluation object, produces a Create event to the dyff.workflows.events topic with the new Evaluation object, and returns the Evaluation object to the client.

  5. The workflows-aggregator consumes the dyff.workflows.events message and upserts the corresponding Evaluation object in the dyff.workflows.state topic. The dyff.workflows.state topic has cleanup.policy = compact set, so it only retains the most recent version of each object.

  6. The workflows-sink service consumes the dyff.workflows.state message and puts the updated Evaluation object in the database.

  7. Concurrently, the orchestrator consumes the dyff.workflows.state message and sees that the entity has .status = Created. The orchestrator checks whether scheduling preconditions are met – whether all dependencies are in .status = Ready and whether the user account’s quotas can accommodate the workflow. If preconditions are satisfied, the orchestrator creates an Evaluation k8s resource in the cluster and produces a status change message to dyff.workflows.events like ID: {"status": "Admitted"}.

  8. The workflows-aggregator updates the corresponding full object in dyff.workflows.state with the new status, and the workflows-sink propagates this change to the database.

  9. The dyff-operator sees the new Evaluation k8s resource and creates other k8s resources to execute the various steps in the Evaluation workflow. In the first step, it creates an InferenceSession k8s resource for the inference “server”, and a Job running the evaluation_client image for the inference “client”. InferenceSession is another k8s custom resource, and it triggers the dyff-operator to create a Deployment containing replicas of the inference model and a Service exposing the replicas on the internal network. The operator watches the client Job, and when it is complete, it starts the second step, where a verify_evaluation_output Job checks the output for completeness. As each step completes, the operator sets the .status.conditions of the k8s resource to record the progress of the workflow.

  10. The workflows-informer service watches for changes to the status of the k8s resources through the k8s API “watch” mechanism. When the k8s resource status changes, it produces a corresponding message to dyff.workflows.events, such as ID: {"status": "Admitted", "reason": "Unverified"} or ID: {"status": "Complete"}.

  11. Eventually, the Evaluation workflow is finished. The user calls client.evaluations.get(ID), which sends a GET /evaluations/ID request to api-server, which queries the database and returns the Evaluation object with that ID. If the evaluation was successful, the object will have .status = Complete.

Principles

Data Model

REST-y API and data model

We generally adhere to REST design, where users control the platform by creating, modifying, and querying presistent resources.

Exception: API usability is more important than 100% RESTful-ness.

Data schemas

All data objects have explicit schemas and can be converted to and from JSON data automatically. Schemas are defined using Pydantic data models. JSON is the standard data interchange format used for Kafka messages and service APIs.

Data schema versioning

The schema has a version and a revision. The revision is incremented for incompatible changes that have a limited scope but nevertheless require data migration. The version is incremented for wholesale changes that would require major updates to client code. We hope that version will never have to be > 1.

Schema changes that are both forward- and backward-compatible may be made without a revision increment. For example, a new optional field may be added.

Immutable data

Data objects are mostly immutable. Each object has a unique ID, and the essential properties of an object with a given ID will never change after it has been created. Immutability is essential for reproducibility and long-term integrity of audit results. If two evaluations are run on a dataset with a given ID, we know for sure that they saw the same input examples, because the dataset can’t be changed once created.

There are two primary exceptions: status information and tags. Every core resource has fields like .status and .reason that describe progress of the associated workflow. These obviously change during execution. In addition, core objects have some “tag” fields such as .labels that can be used to add metadata to an object after it has been created. This separation of immutable and mutable properties is strongly influenced by Kubernetes.

Data objects never totally disappear, we simply set their status to Deleted, possibly delete the actual data associated with the object (such as a dataset or model weights), and forbid using them in new workflows. This way, all references to those objects remain valid.

Data object versioning

Note

This is not implemented yet but is on the roadmap.

Data objects can belong to a “version group” of related objects. For example, a dataset might be updated periodically with new instances. Each of these updates produces a new, immutable object with a unique ID. The version group is just an annotation that says that all of these objects are related. Following a Docker-like model, users can annotate objects in the version group with named tags such as latest to help users query them.

Distributed System Architecture

Reconciliation-based execution control

Services that control computational work make their decisions by comparing the current state of the system to a desired state and taking actions to reconcile the two.

This is how Kubernetes works, too, and it just makes a lot of distributed system problems much easier to think about. For example, restarting after an error becomes fairly trivial, because it’s impossible to “miss” any events during the downtime.

Message ordering, at-least-once delivery, and consumer idempotence

Services communicate through messages sent via a Kafka broker. We design all of the Dyff services to maintain the following properties:

  1. All message consumers see messages about a given entity in the same order

  2. Messages are delivered at-least-once

  3. The current system state can be reconstructed by re-playing all the messages

At-least-once delivery semantics imply that all message consumers must be idempotent to duplicate message delivery.

To maintain the at-least-once invariant, message consumers have to be structured in a particular way. Specifically, they have to process messages with the following steps:

  1. Receive new incoming message

  2. Take all appropriate actions (including sending more messages) and make sure they succeeded

  3. “Commit” the incoming message so that it won’t be acted on again

The “commit” step is usually acknowledging receipt of the message to Kafka, which increments the consumer’s “index” in the topic and prevents that message from being replayed if the consumer reconnects. It could also be a local operation; for example, the orchestrator commits some actions by updating its local KV store. The point is that the commit happens after we’re sure that the message has been fully processed. Any crash before the commit step will result in all of the actions being tried again.

Data persistence, eventual consistency, command-query responsibility segregation

The dyff.workflows.state Kafka topic is the source of truth for the current system state. It is replicated in the Kafka broker and backed up regularly.

Every downstream view of the system state derives from dyff.workflows.state. For example, we have a MongoDB instance for querying objects in the system, but it is populated from dyff.workflows.state messages. User commands don’t modify the MongoDB directly; they generate messages that eventually propagate to the DB. This general pattern is called command-query responsibility segregation (CQRS).

A consequence of this is that the global system state is inherently eventually consistent; it takes time for messages to propagate to downstream data views. Services that need strongly consistent datastores manage these themselves. For example, the orchestrator has a local strongly consistent KV store for keeping track of scheduling information, so that workflows aren’t scheduled twice if duplicate messages are generated. Likewise, the api-server owns the authorization database and communicates with it using strongly consistent semantics.