Design

Dyff consists of three major components:

dyff-operator

A Kubernetes operator that manages a set of custom resources . The Dyff Operator launches “workflows” composed of one or more “workflow steps” in response to creation of Dyff k8s resources. This is the “kernel” of the Dyff system that does the actual computational work.

dyff-api

A full-featured cloud platform built around dyff-operator functionality. It provides a JSON API, security functions, data management and backups, etc. Components include an API server, message broker, database, and various internal services that coordinate platform operations. A dyff-orchestrator service creates and manages Dyff k8s resources via the k8s API in response to Dyff API actions, and the Dyff Operator in turn does the actual work in response to these k8s resources.

dyff-frontend

A Web app that presents audit results and allows authenticated users to view and manage their Dyff workflows. It gets its data via the JSON API. The dyff-api server implements a handful of /views routes that allow the frontend to be more responsive by bundling together all of the datastore queries that are needed to build certain frontend pages.

Service diagram

This diagram summarizes the various services that make up Dyff and their interactions. Gray boxes indicate a Kubernetes namespace. The white boxes inside them are Kubernetes workloads. Parallelograms with the Kafka logo are Kafka topics; they are managed by services in the kafka namespace. Depending on how Dyff is deployed, the “s3 provider” may be provided by the cloud platform or it may be a minio instance running inside the Dyff k8s cluster.

../_images/dyff-service-diagram.svg

Workflow walkthrough

The main task of Dyff is to execute a few different types of workflows on behalf of platform users. A workflow is a sequence of computational steps that realize one stage of the overall audit pipeline.

Here is a sketch of what happens when the user wants to execute an Evaluation:

  1. The user calls the client.evaluations.create() method on an instance of the Python API client.

  2. The client sends a POST /evaluations request to the dyff-api server.

  3. The dyff-api server checks that the user-supplied bearer token grants the required permissions to create the requested Evaluation.

  4. The dyff-api server assigns an ID to the new Evaluation, resolves references to other resources by making database queries, populates a complete new Evaluation object, produces a Create event to the dyff.workflows.events topic with the new Evaluation object, and returns the Evaluation object to the client.

  5. The workflows-aggregator consumes the dyff.workflows.events message and upserts the full Evaluation object into the dyff.workflows.state topic. The dyff.workflows.state topic has cleanup.policy = compact set, so it only retains the most recent version of each object.

  6. The workflows-sink-mongodb service consumes the dyff.workflows.state message and puts the updated Evaluation object in the database. The workflows-sink-s3 service backs up the object state to an s3 bucket.

  7. Concurrently, the dyff-orchestrator consumes the dyff.workflows.state message and sees that the entity has .status = Created. The orchestrator checks whether scheduling preconditions are met – whether all dependencies are in .status = Ready and whether the user account’s quotas can accommodate the workflow. If preconditions are satisfied, the orchestrator creates an Evaluation k8s resource in the cluster and produces a status change message to dyff.workflows.events like ID: {"command": "UpdateEntityStatus", "body": {"status": "Admitted"}}.

  8. The workflows-aggregator consumes the status change event and updates the corresponding full object in dyff.workflows.state with the new status, and the workflows-sink services propagate this change to their datastores.

  9. The dyff-operator sees the new Evaluation k8s resource and creates other k8s resources to execute the various steps in the Evaluation workflow. In the first step, it creates an InferenceSession k8s resource for the inference “server”, and a Job running the evaluation-client image for the inference “client”. InferenceSession is another k8s custom resource, and it triggers the dyff-operator to create a Deployment containing replicas of the inference model and a Service exposing the replicas on the internal network. The operator watches the client Job, and when it is complete, it starts the second step, where a verify-evaluation-output Job checks the output for completeness. As each step completes, the operator sets the .status.conditions of the k8s resource to record the progress of the workflow.

  10. The workflows-informer service watches for changes to the status of the k8s resources through the k8s API “watch” mechanism. When the k8s resource status changes, it produces a corresponding message to dyff.workflows.events, such as ID: {"command": "UpdateEntityStatus", "body": {"status": "Complete"}}

  11. Eventually, the Evaluation workflow is finished. The user calls client.evaluations.get(ID), which sends a GET /evaluations/ID request to the dyff-api server, which queries the database and returns the Evaluation object with that ID. If the evaluation was successful, the object will have .status = Complete.

Principles

Data Model

REST-y API and data model

We generally adhere to REST design, where users control the platform by creating, modifying, and querying presistent resources.

Exception: API usability is more important than 100% RESTful-ness. Some of the APIs are implemented in an imperative style because it’s simpler.

Data schemas

All data objects have explicit schemas and can be converted to and from JSON data automatically. Schemas are defined using Pydantic data models. JSON is the standard data interchange format used for Kafka messages and service APIs.

Data schema versioning

The schema has a version and a revision. The revision is incremented for incompatible changes that have a limited scope but nevertheless require data migration. The version is incremented for wholesale changes that would require major updates to client code.

Schema changes that are both forward- and backward-compatible may be made without a revision increment. For example, a new optional field may be added, or a required field may be given a default value if it didn’t have one previously.

Immutable data

Data objects are mostly immutable. Each object has a unique ID, and the essential properties of an object with a given ID will never change after it has been created. Immutability is essential for reproducibility and long-term integrity of audit results. If two evaluations are run on a dataset with a given ID, we know for sure that they saw the same input examples, because the dataset can’t be changed once created.

There are two primary exceptions: status information and metadata. Every core resource has fields like .status and .reason that describe progress of the associated workflow. These obviously change during execution. In addition, core objects have some metadata fields such as .labels that can be added oor removed after the object has been created. This separation of immutable and mutable properties is strongly influenced by Kubernetes.

Deletion process

When a user deletes a resource, its status is set to Deleted and it can no longer be consumed by new workflows. A CronJob runs once per day, and if any resources have been in the Deleted status for at least 7 days when the CronJob runs, it sends a "ForgetEntity" command. Backend services respond by deleting that object from the data stores that they manage.

In a production Dyff environment, the s3 buckets have a “soft delete” policy applied, so objects can be un-deleted for up to 7 days after deletion. The s3 backup of the dyff.workflows.state topic can be used to restore messages about the object, and the associated data in s3 simply needs to be un-deleted.

All s3 data is rsync’d to a separate backup bucket once per day. The backup bucket also has a 7 day soft-delete policy. So, 7 days after the "ForgetEntity" command is given, the object data disappears from the “primary” s3 buckets, which triggers a delete in the backup bucket on the next rsync. After 7 more days, the object disappears from the backup.

References to the deleted object in other live objects will remain there. Those objects can still be queried by references to the deleted object – i.e., you can find all Evaluations that ran on a deleted Dataset – but fetching the deleted Dataset will raise a 404 error.

Data object versioning

Note

This is not implemented yet but is on the roadmap.

Data objects can belong to a “version group” of related objects. For example, a dataset might be updated periodically with new instances. Each of these updates produces a new, immutable object with a unique ID. The version group is just an annotation that says that all of these objects are related. Following a Docker-like model, users can annotate objects in the version group with named tags such as latest to help users query them.

Distributed System Architecture

Reconciliation-based execution control

Services that control computational work make their decisions by comparing the current state of the system to a desired state and taking actions to reconcile the two.

This is how Kubernetes works, too, and it just makes a lot of distributed system problems much easier to think about. For example, restarting after an error becomes fairly trivial, because it’s impossible to “miss” any events during the downtime.

Message ordering, at-least-once delivery, and consumer idempotence

Services communicate through messages sent via a Kafka broker. We design all of the Dyff services to maintain the following properties:

  1. All message consumers see messages about a given entity in the same order

  2. Messages are delivered at-least-once

  3. The current system state can be reconstructed by re-playing all the messages

At-least-once delivery semantics imply that all message consumers must be idempotent to duplicate message delivery.

To maintain the at-least-once invariant, message consumers have to be structured in a particular way. Specifically, they have to process messages with the following steps:

  1. Receive new incoming message

  2. Take all appropriate actions (including sending more messages) and make sure they succeeded

  3. “Commit” the incoming message so that it won’t be acted on again

The “commit” step is usually acknowledging receipt of the message to Kafka, which increments the consumer’s “index” in the topic and prevents that message from being replayed if the consumer reconnects. It could also be a local operation; for example, dyff-orchestrator commits some actions by updating its local KV store. The point is that the commit happens after we’re sure that the message has been fully processed. Any crash before the commit step will result in all of the actions being tried again.

Data persistence, eventual consistency, command-query responsibility segregation

The dyff.workflows.state Kafka topic is the source of truth for the current system state. It is replicated in the Kafka broker and backed up regularly.

Every downstream view of the system state derives from dyff.workflows.state. For example, we have a MongoDB instance for querying objects in the system, but it is populated from dyff.workflows.state messages. User commands don’t modify the MongoDB directly; they generate messages that eventually propagate to the DB. This general pattern is called command-query responsibility segregation (CQRS).

A consequence of this is that the global system state is inherently eventually consistent; it takes time for messages to propagate to downstream data views. Services that need strongly consistent datastores manage these themselves. For example, dyff-orchestrator has a local strongly consistent KV store for keeping track of scheduling information, so that workflows aren’t scheduled twice if duplicate messages are generated. Likewise, the dyff-api service owns the authorization database and communicates with it using strongly consistent semantics.