Get Started

Workflows

Multi-agent pipelines with DAG execution. Define a graph of steps once, run it many times with different inputs. Independent branches execute in parallel automatically. Steps can fan out, branch on conditions, wait for humans, call webhooks, transform data, and dynamically splice new steps into a running DAG.

Workflows require primitivesEnabled: true on your platform config. Available on Cloud, Team, and Enterprise plans.

Creating a workflow

Use the typed builder API to construct a workflow. Call .build() at the end to produce a WorkflowCreateOpts object you pass to theazo.workflows.create().

workflow.ts
import { Theazo, workflow } from 'theazo'

const theazo = new Theazo({ apiKey: 'th_live_...' })

const wf = await theazo.workflows.create(
  workflow('lead-enrichment')
    .step({
      id: 'scrape',
      type: 'agent',
      agent: 'web-researcher',
      task: 'Scrape company info from {{ input.website }}',
    })
    .step({
      id: 'enrich',
      type: 'agent',
      agent: 'data-enricher',
      dependsOn: ['scrape'],
      task: 'Enrich lead data with LinkedIn and Clearbit',
      inputMap: {
        companyData: '$.scrape.output.company',
        rawHtml:     '$.scrape.output.html',
      },
    })
    .condition({
      id: 'check-fit',
      dependsOn: ['enrich'],
      expression: '$.enrich.output.icp_score >= 70',
      onTrue:  { next: 'send-to-crm' },
      onFalse: { next: 'archive' },
    })
    .build()
)

console.log(wf.id)       // 'wf_a1b2c3'
console.log(wf.version)  // 1

Input and output schemas

Attach JSON Schema validators to inputSchema and outputSchema. Theazo validates the run input at trigger time and the final output before marking the run complete. Invalid inputs are rejected with a 422 before any agents start.

schema.ts
const wf = await theazo.workflows.create(
  workflow('lead-enrichment')
    .inputSchema({
      type: 'object',
      required: ['website', 'userId'],
      properties: {
        website: { type: 'string', format: 'uri' },
        userId:  { type: 'string' },
        priority: { type: 'string', enum: ['low', 'high'] },
      },
    })
    .outputSchema({
      type: 'object',
      required: ['score', 'enrichedLead'],
      properties: {
        score:        { type: 'number', minimum: 0, maximum: 100 },
        enrichedLead: { type: 'object' },
      },
    })
    // ... steps ...
    .build()
)

Step types

Each step has a type that controls how it executes. All step types share id, dependsOn, inputMap, and onStepFailure. There are 10 step types.

agentdefault

Runs an agent definition against a task prompt. The agent gets its own sandbox and model loop.

{
  id: 'scrape',
  type: 'agent',
  agent: 'web-researcher',
  task: 'Scrape company info from {{ input.website }}',
  inputMap: { website: '$.input.website' },
}
parallel

Fans out to multiple sub-steps simultaneously. All sub-steps must complete (or fail per policy) before downstream dependents proceed.

{
  id: 'research-all',
  type: 'parallel',
  steps: [
    { id: 'twitter',  type: 'agent', agent: 'social-agent', task: 'Twitter presence for {{ input.company }}' },
    { id: 'linkedin', type: 'agent', agent: 'social-agent', task: 'LinkedIn profile for {{ input.company }}' },
    { id: 'news',     type: 'agent', agent: 'news-agent',   task: 'Recent press for {{ input.company }}' },
  ],
}
condition

Branches execution based on a JSONPath expression evaluated against the current run state. Resolves to the onTrue or onFalse branch.

{
  id: 'check-score',
  type: 'condition',
  dependsOn: ['score'],
  expression: '$.score.output.score >= 70',
  onTrue:  { next: 'send-to-crm' },
  onFalse: { next: 'archive' },
}
delay

Pauses execution for a fixed duration before proceeding. The agent sandbox is released during the wait — no compute cost accrues.

{
  id: 'wait-24h',
  type: 'delay',
  duration: 86400,    // seconds
  dependsOn: ['send-intro-email'],
}
approval

Pauses the run and waits for a human approval decision via the Approvals API or dashboard. On timeout, the defaultAction applies.

{
  id: 'approve-send',
  type: 'approval',
  action: 'send_email',
  timeout: '4h',
  defaultAction: 'deny',
  dependsOn: ['draft-email'],
}
webhook

POSTs a payload to an external URL and waits for a callback (or fires-and-forgets). Body is constructed from inputMap values.

{
  id: 'notify-crm',
  type: 'webhook',
  url: 'https://hooks.yourapp.com/lead-created',
  method: 'POST',
  waitForCallback: false,
  inputMap: { lead: '$.enrich.output.lead' },
  headers: { 'X-Source': 'theazo' },
}
transform

Runs a pure JSONPath reshaping operation — no agent, no sandbox. Use to restructure data between steps without paying for compute. Transform expressions are serializable JSONB; never use JS functions here.

{
  id: 'reshape',
  type: 'transform',
  dependsOn: ['research-all'],
  output: {
    twitterHandle: '$.twitter.output.handle',
    linkedinUrl:   '$.linkedin.output.url',
    latestHeadline: '$.news.output.articles[0].title',
    company:        '$.input.company',
  },
}
map

Iterates an array from a previous step and runs a sub-step for each element. Controlled concurrency via concurrency. Produces an array output in the same order as the input.

{
  id: 'score-each',
  type: 'map',
  dependsOn: ['scrape'],
  over: '$.scrape.output.leads',      // JSONPath to the array
  as: 'lead',                          // binding name inside sub-step
  concurrency: 5,
  step: {
    id: 'score-lead',
    type: 'agent',
    agent: 'scorer',
    task: 'Score this lead: {{ lead.company }}',
    inputMap: { lead: '$.lead' },
  },
}
planner

Dynamic DAG expansion. Runs an agent whose output is a list of new step definitions. Theazo splices those steps into the running DAG at runtime. Useful when the number or shape of tasks is not known at workflow-definition time.

{
  id: 'plan',
  type: 'planner',
  agent: 'task-planner',
  task: 'Given these {{ input.count }} leads, produce a list of research tasks',
  // Agent must output: { steps: WorkflowStep[] }
  // Theazo validates the schema and splices them into the DAG
  dependsOn: ['pre-filter'],
}
wait

Suspends the run until a named external event arrives via POST /v1/workflows/runs/:id/resume. Optionally times out and routes to a fallback step. Use for long-running async integrations.

{
  id: 'wait-payment',
  type: 'wait',
  event: 'payment.confirmed',
  timeout: '48h',
  onTimeout: { next: 'send-reminder' },
  dependsOn: ['send-invoice'],
}

Data passing with JSONPath

Steps pass data forward using inputMap. Keys become named variables available to the step. Values are JSONPath strings evaluated against the full run state. All values are serializable to JSONB — never use JavaScript functions in step definitions.

// JSONPath reference patterns
inputMap: {
  leads:       '$.scrape.output.leads',           // array from prior step
  companyName: '$.scrape.output.company.name',    // nested field
  rawInput:    '$.input.website',                 // original workflow input
  firstItem:   '$.scrape.output.leads[0]',        // array index
}

Shared state

Steps can declare which keys they read and write from a shared state bag. The state bag persists across the entire run and survives hibernation. Declare stateReads and stateWrites per step so Theazo can detect conflicts and serialize concurrent writers.

shared-state.ts
const wf = await theazo.workflows.create(
  workflow('pipeline')
    .step({
      id: 'writer',
      type: 'agent',
      agent: 'data-agent',
      task: 'Fetch and normalize the company record',
      stateWrites: ['company', 'metadata'],   // keys this step writes
    })
    .step({
      id: 'reader',
      type: 'agent',
      agent: 'scorer',
      task: 'Score this company',
      dependsOn: ['writer'],
      stateReads: ['company'],               // keys this step reads
    })
    .build()
)

// Pass initial state on run
const run = await wf.run({
  userId: 'user_123',
  input: { website: 'https://acme.com' },
  sharedState: { retryCount: 0, campaign: 'q2-outbound' },
})

Workflow policy

Attach a WorkflowPolicy to enforce tool restrictions, cost budgets, and approval gates across all steps. Policy applies to every agent launched by the workflow — individual step overrides are not supported.

policy.ts
const wf = await theazo.workflows.create(
  workflow('secure-pipeline')
    .policy({
      allowTools: ['web_search', 'read_file', 'http_get'],
      denyTools:  ['shell_exec', 'write_file'],
      requireApprovalFor: ['send_email', 'update_crm'],
      maxCostPerStep: { amount: 50, currency: 'usd' },  // $0.50 per step
      maxTotalCost:   { amount: 500, currency: 'usd' }, // $5.00 per run
    })
    // ... steps ...
    .build()
)

If a step exceeds maxCostPerStep, it is paused and a workflow.step.cost_exceeded webhook fires. If the run total exceeds maxTotalCost, the entire run is cancelled.

Per-step error routing

By default a failing step stops the run (respecting the top-level onFailure policy). Override per step with onStepFailure to route to a recovery step instead.

{
  id: 'enrich',
  type: 'agent',
  agent: 'data-enricher',
  dependsOn: ['scrape'],
  task: 'Enrich lead from external APIs',
  onStepFailure: { action: 'goto', target: 'enrich-fallback' },
},
{
  id: 'enrich-fallback',
  type: 'agent',
  agent: 'data-enricher-lite',
  task: 'Minimal enrichment using only public data',
  inputMap: { company: '$.scrape.output.company' },
}

Running a workflow

run.ts
const run = await wf.run({
  userId: 'user_123',
  input: {
    website:  'https://acme.com',
    priority: 'high',
  },
  idempotencyKey: 'lead-acme-2026-06-05',  // safe to retry — won't double-run
})

console.log(run.id)      // 'wfrun_abc123'
console.log(run.status)  // 'running'
idempotencyKey is scoped per platform. Submitting the same key within 24 hours returns the existing run rather than starting a new one. Use it to safely retry on network errors.

Checking run status

const run = await theazo.workflows.getRun('wfrun_abc123')

console.log(run.status)          // 'completed' | 'running' | 'failed' | 'partial' | 'cancelled'
console.log(run.steps)           // per-step: { id, status, cost, output, startedAt, completedAt }
console.log(run.cost)            // { amount: 2340, currency: 'usd' }
console.log(run.duration)        // 184 (seconds)
console.log(run.canRetryFrom)    // 'enrich' — resume from this step on partial failure

Partial results

When a non-critical step fails and the run continues, the run status becomes partial. The canRetryFrom field names the first failed step so you can re-run from that point without re-executing completed steps.

if (run.status === 'partial') {
  // Resume from the failed step — completed steps are not re-run
  const resumed = await theazo.workflows.retryFrom(run.id, run.canRetryFrom!)
  console.log(resumed.id)  // new run ID, shares upstream step outputs
}

Cancelling a run

await theazo.workflows.cancelRun('wfrun_abc123')
// In-progress agents are paused; partial results preserved

SSE streaming

Subscribe to a run's live event stream using theazo.workflows.streamRun(runId). Events are Server-Sent Events delivered as an AsyncIterable<StreamEvent>.

stream.ts
for await (const event of theazo.workflows.streamRun('wfrun_abc123')) {
  switch (event.type) {
    case 'step.started':
      console.log('Started:', event.stepId)
      break
    case 'step.completed':
      console.log('Done:', event.stepId, '— cost', event.cost)
      break
    case 'step.failed':
      console.error('Failed:', event.stepId, event.error)
      break
    case 'run.completed':
      console.log('Workflow done. Total cost:', event.totalCost)
      break
    case 'run.partial':
      console.warn('Partial result. Retry from:', event.canRetryFrom)
      break
    case 'approval.pending':
      console.log('Waiting for approval on:', event.action)
      break
  }
}

Cost estimation

Before running an expensive workflow, call theazo.workflows.estimate() with a sample input. Theazo runs a dry-pass — resolving the DAG, counting expected steps and agent calls — and returns a cost estimate without executing any agents.

estimate.ts
const estimate = await theazo.workflows.estimate('wf_a1b2c3', {
  input: { website: 'https://acme.com', priority: 'high' },
})

console.log(estimate.minCost)  // { amount: 80,  currency: 'usd' }
console.log(estimate.maxCost)  // { amount: 420, currency: 'usd' }
console.log(estimate.steps)    // per-step estimate breakdown

Versioning

Every call to theazo.workflows.update() increments the version integer. In-flight runs always execute against the version they started on. New runs pick up the latest version automatically. You can pin a run to a specific version using the version field on .run().

// Update the workflow definition — version increments atomically
const updated = await theazo.workflows.update('wf_a1b2c3', {
  steps: [ /* new step list */ ],
})
console.log(updated.version)  // 2

// Pin a run to an older version
const run = await wf.run({
  userId: 'user_123',
  input: { website: 'https://acme.com' },
  version: 1,   // use the previous definition
})

Complete example

A full lead pipeline: parallel research, data transform, map-score each lead, conditional routing, and approval gate before CRM push.

lead-pipeline.ts
import { Theazo, workflow } from 'theazo'

const theazo = new Theazo({ apiKey: 'th_live_...' })

const wf = await theazo.workflows.create(
  workflow('lead-pipeline')
    .inputSchema({
      type: 'object',
      required: ['company', 'website'],
      properties: {
        company: { type: 'string' },
        website: { type: 'string', format: 'uri' },
      },
    })
    .policy({
      allowTools: ['web_search', 'http_get'],
      requireApprovalFor: ['update_crm'],
      maxTotalCost: { amount: 1000, currency: 'usd' },
    })

    // Step 1 — parallel social + news research
    .step({
      id: 'research',
      type: 'parallel',
      steps: [
        { id: 'twitter',  type: 'agent', agent: 'social-agent', task: 'Twitter for {{ input.company }}' },
        { id: 'linkedin', type: 'agent', agent: 'social-agent', task: 'LinkedIn for {{ input.company }}' },
        { id: 'news',     type: 'agent', agent: 'news-agent',   task: 'Recent news for {{ input.company }}' },
      ],
    })

    // Step 2 — transform into a clean shape (no agent, no cost)
    .step({
      id: 'reshape',
      type: 'transform',
      dependsOn: ['research'],
      output: {
        twitter:  '$.twitter.output',
        linkedin: '$.linkedin.output',
        news:     '$.news.output.articles',
        company:  '$.input.company',
      },
    })

    // Step 3 — score each news article in parallel (concurrency=3)
    .step({
      id: 'score-news',
      type: 'map',
      dependsOn: ['reshape'],
      over: '$.reshape.output.news',
      as: 'article',
      concurrency: 3,
      step: {
        id: 'score-article',
        type: 'agent',
        agent: 'relevance-scorer',
        task: 'Rate relevance of this article for ICP fit: {{ article.title }}',
        inputMap: { article: '$.article' },
      },
    })

    // Step 4 — final scoring with all signals
    .step({
      id: 'score',
      type: 'agent',
      agent: 'lead-scorer',
      dependsOn: ['score-news'],
      task: 'Score this lead 0-100 for ICP fit',
      inputMap: {
        twitter:    '$.reshape.output.twitter',
        linkedin:   '$.reshape.output.linkedin',
        newsScores: '$.score-news.output',
        company:    '$.input.company',
      },
      onStepFailure: { action: 'goto', target: 'fallback-score' },
    })

    // Step 4b — fallback if scorer fails
    .step({
      id: 'fallback-score',
      type: 'agent',
      agent: 'lead-scorer-lite',
      task: 'Minimal score based only on LinkedIn data',
      inputMap: { linkedin: '$.reshape.output.linkedin' },
    })

    // Step 5 — conditional routing
    .condition({
      id: 'route',
      dependsOn: ['score'],
      expression: '$.score.output.score >= 70',
      onTrue:  { next: 'approve-crm' },
      onFalse: { next: 'archive' },
    })

    // Step 6a — high score → approval gate before CRM
    .step({
      id: 'approve-crm',
      type: 'approval',
      action: 'update_crm',
      timeout: '4h',
      defaultAction: 'deny',
    })

    // Step 6b — low score → archive
    .step({
      id: 'archive',
      type: 'agent',
      agent: 'archiver',
      task: 'Archive this lead with ICP miss reason',
      inputMap: { lead: '$.reshape.output', score: '$.score.output' },
    })

    .build()
)

// Estimate cost before running
const estimate = await theazo.workflows.estimate(wf.id, {
  input: { company: 'Stripe', website: 'https://stripe.com' },
})
console.log('Estimated cost:', estimate.maxCost)  // { amount: 420, currency: 'usd' }

// Run it
const run = await wf.run({
  userId: 'user_456',
  input: { company: 'Stripe', website: 'https://stripe.com' },
  idempotencyKey: 'lead-stripe-2026-06-05',
})

// Stream live events
for await (const event of theazo.workflows.streamRun(run.id)) {
  if (event.type === 'step.completed') {
    console.log(event.stepId, 'done — cost:', event.cost)
  }
  if (event.type === 'run.completed') {
    console.log('Total cost:', event.totalCost)
    break
  }
}

API reference

POST/v1/workflows

Create a workflow definition. Returns the workflow with version=1.

Parameters

namestringrequiredHuman-readable workflow name.
stepsWorkflowStep[]requiredArray of step definitions. Must form a valid DAG.
inputSchemaJSONSchemaJSON Schema for the run input. Validated at trigger time.
outputSchemaJSONSchemaJSON Schema for the final output.
policyWorkflowPolicyTool allow/deny lists, approval gates, cost caps.
onFailurestring'stop' | 'continue' | 'retry'. Default: 'stop'.
retriesnumberNumber of retries on failure. Default: 0.
timeoutnumberMax run duration in seconds.

Response

Workflow

Example

bash
curl -X POST https://api.theazo.com/v1/workflows \
  -H "Authorization: Bearer th_live_..." \
  -H "Content-Type: application/json" \
  -d '{
    "name": "lead-enrichment",
    "steps": [{ "id": "scrape", "type": "agent", "agent": "web-researcher", "task": "..." }]
  }'
GET/v1/workflows

List all workflow definitions for this platform. Ordered by createdAt descending.

Response

Workflow[]

Example

bash
curl https://api.theazo.com/v1/workflows \
  -H "Authorization: Bearer th_live_..."
GET/v1/workflows/:id

Fetch a single workflow definition by ID.

Response

Workflow

Example

bash
curl https://api.theazo.com/v1/workflows/wf_a1b2c3 \
  -H "Authorization: Bearer th_live_..."
PATCH/v1/workflows/:id

Update a workflow definition. Increments version atomically. In-flight runs are unaffected.

Parameters

stepsWorkflowStep[]New step list. Replaces the existing steps entirely.
policyWorkflowPolicyUpdated policy.
namestringRename the workflow.

Response

Workflow

Example

bash
curl -X PATCH https://api.theazo.com/v1/workflows/wf_a1b2c3 \
  -H "Authorization: Bearer th_live_..." \
  -H "Content-Type: application/json" \
  -d '{ "name": "lead-enrichment-v2" }'
DELETE/v1/workflows/:id

Delete a workflow definition. Does not cancel in-progress runs.

Response

void

Example

bash
curl -X DELETE https://api.theazo.com/v1/workflows/wf_a1b2c3 \
  -H "Authorization: Bearer th_live_..."
POST/v1/workflows/:id/runs

Start a new run of a workflow. Returns 202 with the run object. Execution is async.

Parameters

userIdstringrequiredEnd-user ID scoping this run.
inputobjectrequiredRun input. Validated against inputSchema if defined.
idempotencyKeystringDeduplicate retries. Scoped per platform, 24h TTL.
versionnumberPin run to a specific workflow version. Defaults to latest.
sharedStateobjectInitial shared state bag available to all steps.

Response

WorkflowRun

Example

bash
curl -X POST https://api.theazo.com/v1/workflows/wf_a1b2c3/runs \
  -H "Authorization: Bearer th_live_..." \
  -H "Content-Type: application/json" \
  -d '{
    "userId": "user_123",
    "input": { "website": "https://acme.com" },
    "idempotencyKey": "lead-acme-2026-06-05"
  }'
GET/v1/workflows/:id/runs

List all runs for a workflow. Ordered by createdAt descending.

Parameters

statusstringFilter by status: running | completed | failed | partial | cancelled.
limitnumberMax results to return. Default 50.

Response

WorkflowRun[]

Example

bash
curl "https://api.theazo.com/v1/workflows/wf_a1b2c3/runs?status=completed&limit=20" \
  -H "Authorization: Bearer th_live_..."
GET/v1/workflows/runs/:runId

Fetch current status, per-step results, and cost for a run.

Response

WorkflowRun

Example

bash
curl https://api.theazo.com/v1/workflows/runs/wfrun_abc123 \
  -H "Authorization: Bearer th_live_..."
GET/v1/workflows/runs/:runId/stream

SSE stream of run events. Returns text/event-stream. Events: step.started, step.completed, step.failed, approval.pending, run.completed, run.partial.

Response

text/event-stream

Example

bash
curl https://api.theazo.com/v1/workflows/runs/wfrun_abc123/stream \
  -H "Authorization: Bearer th_live_..." \
  -H "Accept: text/event-stream"
POST/v1/workflows/runs/:runId/cancel

Cancel an in-progress run. In-flight agents are paused, partial results preserved.

Response

WorkflowRun

Example

bash
curl -X POST https://api.theazo.com/v1/workflows/runs/wfrun_abc123/cancel \
  -H "Authorization: Bearer th_live_..."
POST/v1/workflows/runs/:runId/resume

Resume a run paused by a wait step. Supply the event name and optional payload.

Parameters

eventstringrequiredEvent name matching the wait step's event field.
payloadobjectData to inject into the run state as the event payload.

Response

WorkflowRun

Example

bash
curl -X POST https://api.theazo.com/v1/workflows/runs/wfrun_abc123/resume \
  -H "Authorization: Bearer th_live_..." \
  -H "Content-Type: application/json" \
  -d '{ "event": "payment.confirmed", "payload": { "txId": "txn_999" } }'
POST/v1/workflows/:id/estimate

Dry-run cost estimation. No agents are executed. Returns min/max cost and per-step breakdown.

Parameters

inputobjectrequiredSample input used to resolve the DAG shape.

Response

WorkflowEstimate

Example

bash
curl -X POST https://api.theazo.com/v1/workflows/wf_a1b2c3/estimate \
  -H "Authorization: Bearer th_live_..." \
  -H "Content-Type: application/json" \
  -d '{ "input": { "website": "https://acme.com" } }'

SDK method reference

workflow(name)WorkflowBuilderCreate a typed builder. Chain .step(), .condition(), .transform(), .policy(), .inputSchema(), .outputSchema(), then .build().
theazo.workflows.create(opts)Promise<Workflow>Create a workflow definition. Accepts the output of workflow().build() or a raw WorkflowCreateOpts.
theazo.workflows.list()Promise<Workflow[]>List all workflow definitions for this platform.
theazo.workflows.get(workflowId)Promise<Workflow>Fetch a workflow definition by ID.
theazo.workflows.update(workflowId, opts)Promise<Workflow>Update steps, policy, or name. Version increments atomically.
theazo.workflows.delete(workflowId)Promise<void>Delete a workflow definition. Does not cancel in-progress runs.
theazo.workflows.estimate(workflowId, { input })Promise<WorkflowEstimate>Dry-run cost estimate. No agents are executed.
workflow.run({ userId, input, idempotencyKey?, version?, sharedState? })Promise<WorkflowRun>Start a new run. Async — returns 202 immediately.
theazo.workflows.getRun(runId)Promise<WorkflowRun>Fetch current status, per-step results, and total cost.
theazo.workflows.listRuns(workflowId, opts?)Promise<WorkflowRun[]>List runs for a workflow, newest first. Filterable by status.
theazo.workflows.cancelRun(runId)Promise<void>Cancel an in-progress run. In-flight agents are paused.
theazo.workflows.retryFrom(runId, stepId)Promise<WorkflowRun>Resume a partial run from a named step. Completed upstream steps are not re-run.
theazo.workflows.streamRun(runId)AsyncIterable<StreamEvent>SSE stream of run events: step.started, step.completed, step.failed, approval.pending, run.completed, run.partial.
Was this page helpful?
Ask anything...⌘I