Inference

An InferenceService is the “system under test”. Dyff requires that the system is packaged as a Web service that runs in a Docker container and provides an HTTP API for making inferences on input data.

A Model is the “raw” form of an inference model, from which one or more InferenceServices may be built.

An InferenceSession is a running instance of an InferenceService. Multiple replicas of the service can be run in a single session to increase throughput. Dyff automatically orchestrates the computational resources required, including GPU accelerators for neural network models.

Create an InferenceService

from dyff.client import Client
from dyff.schema.platform import (
    Accelerator,
    AcceleratorGPU,
    DataSchema,
    DyffDataSchema,
    InferenceInterface,
    InferenceServiceRunner,
    InferenceServiceRunnerKind,
    ModelResources,
    SchemaAdapter,
)
from dyff.schema.requests import InferenceServiceCreateRequest

API_KEY: str = ...
ACCOUNT: str = ...

client = Client(api_key=API_KEY)

service_request = InferenceServiceCreateRequest(
    account=ACCOUNT,
    # ID of the databricks/dolly-v2-3b Model
    model="3be8292c1296402bae1981499f31c635",
    name="databricks/dolly-v2-3b",
    runner=InferenceServiceRunner(
        kind=InferenceServiceRunnerKind.VLLM,
        # T4 GPUs don't support bfloat format, so force standard float format
        args=["--dtype", "float16"],
        accelerator=Accelerator(
            kind="GPU",
            gpu=AcceleratorGPU(
                hardwareTypes=["nvidia.com/gpu-t4"],
                memory="10Gi",
            ),
        ),
        resources=ModelResources(
            storage="10Gi",
            memory="16Gi",
        ),
    ),
    interface=InferenceInterface(
        # This is the inference endpoint for the vLLM runner
        endpoint="generate",
        # The output records should look like: {"text": "To be, or not to be"}
        outputSchema=DataSchema.make_output_schema(
            DyffDataSchema(components=["text.Text"]),
        ),
        # How to convert the input dataset into the format the runner expects
        inputPipeline=[
            # {"text": "The question"} -> {"prompt": "The question"}
            SchemaAdapter(
                kind="TransformJSON",
                configuration={
                    # Map 'text' in the input data to 'prompt' in the request
                    # sent to the model
                    "prompt": "$.text",
                    # Use the constant '100' for 'max_tokens' in the request
                    "max_tokens": 100,
                },
            ),
        ],
        # How to convert the runner output to match outputSchema
        outputPipeline=[
            # {"text": ["The answer"]} -> {"text": "The answer"}
            SchemaAdapter(
                kind="ExplodeCollections",
                configuration={"collections": ["text"]},
            ),
        ],
    ),
)

service = client.inferenceservices.create(service_request)
print(f"created inferenceservice:\n{service}")

Create and use an interactive InferenceSession

import datetime
import time

from dyff.client import Client
from dyff.schema.requests import InferenceSessionCreateRequest

API_KEY: str = ...
ACCOUNT: str = ...

client = Client(api_key=API_KEY)

session_request = InferenceSessionCreateRequest(
    account=ACCOUNT,
    # databricks/dolly-v2-3b
    inferenceService="ba4ba5c26c9246ee88e127d37cdf548d",
    expires=datetime.datetime.utcnow() + datetime.timedelta(days=1),
    replicas=1,
    useSpotPods=True,
)

session_and_token = client.inferencesessions.create(session_request)
session = session_and_token.inferencesession
session_id = session.id
session_token = session_and_token.token
print(f"created session:\n{session_and_token.inferencesession}")

# Starting the session can take some time, especially if you requested a GPU
# You can poll the inferencesessions.ready() endpoint to find out if the
# session is ready to accept requests. It will return status 200 if the
# session is ready, and will raise an HttpResponseError with status 503
# (ServiceUnavailable) if the session is not ready. (It may also return 404
# if the session was created recently.)
while not client.inferencesessions.ready(session_id):
    print(f"[{datetime.datetime.utcnow()}]: not ready")
    # Always use a short sleep when polling in a loop. ready() will usually
    # block for some time as well, but it depends on the runner implementation
    time.sleep(1)
print("Ready")

# If you already have a running session:
# session = client.inferencesessions.get(session_id)

# Create an inference client using the default interface specified for the
# InferenceService that's being run in the session
interface = session.inferenceService.interface
inference_client = client.inferencesessions.client(
    session_id,
    session_token,
    # If you don't specify 'interface', the client will use the native JSON
    # interface of the model.
    interface=interface,
    # You can also set any of these separately; they will override the
    # corresponding setting in 'interface' if you specify both.
    # endpoint=interface.endpoint,
    # input_adapter=create_pipeline(interface.inputPipeline),
    # output_adapter=create_pipeline(interface.outputPipeline),
)

# The input is {"text": ...} because the default interface for the dolly-v2-3b
# service maps {"text": ...} -> {"prompt": ...}
y = inference_client.infer({"text": "Open the pod bay doors, Hal!"})
print(y)