Flush & shutdown
A background worker batches events and sends them. Four methods give you control over it:
| Method | Does |
|---|---|
client.flush() | Block until the queue is drained. |
client.shutdown() | Flush and stop the worker (sync apps). |
await client.aflush() | Async equivalent of flush(). |
await client.ashutdown() | Async equivalent of shutdown(). |
client.flush() # block until the queue is drainedclient.shutdown() # flush and stop the worker (sync apps)await client.aflush() # async equivalentsawait client.ashutdown()Short scripts
A trace flushes on exit, so short scripts that use with client.trace(...) usually need nothing more. The SDK also registers a best-effort atexitflush — but don't rely on it as the primary mechanism in production.
Long-running services
For long-running or async apps, wire ashutdown()into your framework's shutdown hook — for example, an ASGI lifespan handler — so queued events are drained on deploy and restart.
from contextlib import asynccontextmanagerfrom fastapi import FastAPIfrom mv37.rollout import Rolloutclient = Rollout(agent_name="support_agent")@asynccontextmanagerasync def lifespan(app: FastAPI): yield await client.ashutdown() # drain the queue on shutdownapp = FastAPI(lifespan=lifespan)Heads up
shutdown() waits up to shutdown_timeout seconds (default 5) for the queue to drain. Tune it on the client if your batches are large or your network is slow.
Synchronous mode
sync_mode=True sends events inline in the calling thread instead of via the background worker. It is intended for tests and local debugging only, and it makes flush and shutdown no-ops.
client = Rollout(api_key="...", sync_mode=True) # tests / debugging only