DAG Orchestration Engine

Production-Ready Workflow Engine for Complex Document Processing (Team+)


Overview

The DAG (Directed Acyclic Graph) Orchestration Engine enables building sophisticated document workflows with 24 production-ready node types. From simple automations to complex multi-step processes with human approval, DAG workflows handle enterprise document intelligence at scale.


Architecture

                    ┌─────────────┐
                    │   Trigger   │
                    │  (Manual/   │
                    │  Scheduled/ │
                    │   Event)    │
                    └──────┬──────┘
                           │
              ┌────────────┼────────────┐
              │            │            │
        ┌─────▼─────┐ ┌────▼────┐ ┌─────▼─────┐
        │  AI Node  │ │Transform│ │ Condition │
        │(Summarize)│ │  Node   │ │   Node    │
        └─────┬─────┘ └────┬────┘ └─────┬─────┘
              │            │      ┌─────┴─────┐
              │            │      │           │
        ┌─────▼────────────▼──────▼───┐ ┌─────▼─────┐
        │        Human Review          │ │   Skip    │
        │  (Approval/Reject/Modify)    │ │           │
        └──────────────┬───────────────┘ └───────────┘
                       │
              ┌────────┼────────┐
              │        │        │
        ┌─────▼──┐ ┌───▼───┐ ┌──▼─────┐
        │Approved│ │Rejected│ │Modified│
        └────┬───┘ └───┬───┘ └───┬────┘
             │         │         │
        ┌────▼─────────▼─────────▼────┐
        │         MCP Action           │
        │   (External Integration)     │
        └──────────────┬───────────────┘
                       │
                 ┌─────▼─────┐
                 │    End    │
                 └───────────┘

Node Types (24)

AI Nodes (6)

Node Description Credits
ai_summarize Generate document summary 2
ai_analyze Deep document analysis 3
ai_extract Extract entities/data 2
ai_classify Categorize document 1
ai_qa Answer questions about doc 2
ai_custom Custom prompt execution Variable

Example: AI Summarize Node

{
  "id": "summarize_1",
  "type": "ai_summarize",
  "config": {
    "document_id": "",
    "summary_type": "comprehensive",
    "max_length": 500,
    "include_key_points": true
  }
}

Human Interaction Nodes (3)

Node Description Features
human_approval Binary approve/reject Escalation, auto-reject
human_review Review with modifications Edit capabilities
human_assignment Route to specific user Role-based routing

Example: Human Approval Node

{
  "id": "approval_1",
  "type": "human_approval",
  "config": {
    "assignee_type": "role",
    "assignee_value": "manager",
    "title": "Review Contract Summary",
    "description": "Please review the AI-generated summary",
    "context": {
      "summary": "",
      "document_link": ""
    },
    "timeout_hours": 48,
    "escalation": {
      "enabled": true,
      "escalate_after_hours": 24,
      "escalate_to": "director"
    },
    "auto_action": {
      "enabled": true,
      "action": "reject",
      "after_hours": 72,
      "reason": "Approval timeout exceeded"
    }
  }
}

MCP Integration Node (1)

Node Description
mcp_tool Execute external MCP tool

Example: MCP Tool Node

{
  "id": "github_issue_1",
  "type": "mcp_tool",
  "config": {
    "server_id": "github_integration",
    "tool_name": "create_issue",
    "parameters": {
      "repo": "company/docs",
      "title": "New contract: ",
      "body": ""
    }
  }
}

Control Nodes (14)

Node Description
condition Branch based on conditions
switch Multi-way branching
parallel Execute branches concurrently
join Wait for parallel branches
loop Iterate over collections
delay Wait for duration
schedule Wait until specific time
transform Transform data
filter Filter collections
aggregate Combine results
set_variable Set workflow variable
http_request Make HTTP calls
notification Send notifications
end Terminate workflow

Workflow Definition

Complete Workflow Example

{
  "name": "Contract Review Workflow",
  "description": "AI analysis with human approval",
  "trigger": {
    "type": "document_upload",
    "conditions": {
      "folder_path": "/contracts/*",
      "file_types": ["pdf", "docx"]
    }
  },
  "nodes": [
    {
      "id": "analyze",
      "type": "ai_analyze",
      "config": {
        "document_id": "",
        "analysis_type": "contract"
      }
    },
    {
      "id": "check_risk",
      "type": "condition",
      "config": {
        "condition": " > 7"
      },
      "depends_on": ["analyze"]
    },
    {
      "id": "high_risk_approval",
      "type": "human_approval",
      "config": {
        "assignee_type": "role",
        "assignee_value": "legal_director",
        "title": "High-Risk Contract Review",
        "timeout_hours": 24
      },
      "depends_on": ["check_risk"],
      "run_if": " == true"
    },
    {
      "id": "standard_approval",
      "type": "human_approval",
      "config": {
        "assignee_type": "role",
        "assignee_value": "legal_team",
        "title": "Standard Contract Review",
        "timeout_hours": 72
      },
      "depends_on": ["check_risk"],
      "run_if": " == false"
    },
    {
      "id": "notify_slack",
      "type": "mcp_tool",
      "config": {
        "server_id": "slack",
        "tool_name": "post_message",
        "parameters": {
          "channel": "#contracts",
          "message": "Contract approved: "
        }
      },
      "depends_on": ["high_risk_approval", "standard_approval"],
      "run_if": "approved"
    }
  ]
}

Triggers

Trigger Types

Type Description
manual User-initiated
document_upload On document upload
document_update On document modification
schedule Cron-based scheduling
webhook External webhook trigger
api API-initiated

Trigger Configuration

{
  "trigger": {
    "type": "document_upload",
    "conditions": {
      "folder_path": "/contracts/*",
      "file_types": ["pdf", "docx"],
      "metadata": {
        "contract_type": "vendor"
      }
    }
  }
}

Pause/Resume Capability

Pause Workflow

POST /api/v1/dag/executions/{execution_id}/pause

Response:
{
  "execution_id": "exec_123",
  "status": "paused",
  "paused_at_node": "approval_1",
  "can_resume": true
}

Resume Workflow

POST /api/v1/dag/executions/{execution_id}/resume
{
  "decision": "approved",
  "modified_data": {
    "summary": "Updated summary text"
  }
}

Error Handling

Node-Level Error Handling

{
  "id": "analyze",
  "type": "ai_analyze",
  "config": { ... },
  "error_handling": {
    "on_error": "retry",
    "max_retries": 3,
    "retry_delay_seconds": 60,
    "fallback_node": "manual_analysis"
  }
}

Workflow-Level Error Handling

{
  "error_handling": {
    "on_failure": "pause",
    "notify_on_failure": ["workflow_admin"],
    "cleanup_on_failure": true
  }
}

Real-Time Notifications

WebSocket Events

// Subscribe to workflow events
ws.subscribe('dag.execution.updated', (event) => {
  console.log('Node completed:', event.node_id);
  console.log('Status:', event.status);
});

// Event payload
{
  "type": "dag.execution.updated",
  "execution_id": "exec_123",
  "workflow_id": "wf_456",
  "node_id": "analyze",
  "status": "completed",
  "output": { ... },
  "timestamp": "2026-01-18T10:30:00Z"
}

Notification Channels

  • WebSocket - Real-time in-app updates
  • Email - Approval requests, failures
  • Slack (via MCP) - Team notifications
  • Webhook - External system integration

API Reference

Workflow Management

Endpoint Method Description
/dag/workflows GET List workflows
/dag/workflows POST Create workflow
/dag/workflows/{id} GET Get workflow
/dag/workflows/{id} PUT Update workflow
/dag/workflows/{id} DELETE Delete workflow
/dag/workflows/{id}/validate POST Validate workflow

Execution Management

Endpoint Method Description
/dag/executions GET List executions
/dag/workflows/{id}/execute POST Start execution
/dag/executions/{id} GET Get execution status
/dag/executions/{id}/pause POST Pause execution
/dag/executions/{id}/resume POST Resume execution
/dag/executions/{id}/cancel POST Cancel execution

Human Tasks

Endpoint Method Description
/dag/tasks GET List pending tasks
/dag/tasks/{id} GET Get task details
/dag/tasks/{id}/complete POST Complete task

Performance

Execution Model

  • Parallel Execution - Independent nodes run concurrently
  • Worker Pools - Dedicated DAG worker pool
  • Priority Queuing - Workflow priority support
  • Checkpointing - Resume from any point

Limits

Tier Concurrent Executions Nodes per Workflow Executions per Day
Team 10 50 1,000
Enterprise Unlimited 200 Unlimited

Best Practices

Design

  1. Keep Workflows Focused - One workflow per business process
  2. Use Conditions Wisely - Branch early to avoid unnecessary processing
  3. Handle Errors - Configure retry and fallback strategies
  4. Document Workflows - Clear names and descriptions

Human Tasks

  1. Set Appropriate Timeouts - Balance urgency vs. availability
  2. Enable Escalation - Prevent workflow stalls
  3. Provide Context - Include relevant information for reviewers
  4. Auto-Actions - Configure fallback for unattended tasks

Performance

  1. Parallelize When Possible - Use parallel nodes for independent operations
  2. Minimize AI Calls - Combine related AI operations
  3. Cache Results - Use variables for repeated values


Ready to automate? Explore DAG API →