Webhook Handler Example

Complete example of handling Archivus webhook events.


Overview

This example demonstrates:

  • Setting up a webhook endpoint
  • Verifying webhook signatures
  • Handling different event types
  • Processing webhook events

Python Flask Example

from flask import Flask, request, jsonify
import hmac
import hashlib
import json

app = Flask(__name__)
WEBHOOK_SECRET = 'your-webhook-secret'

def verify_webhook_signature(payload, signature, secret):
    """Verify webhook signature."""
    expected_signature = hmac.new(
        secret.encode(),
        payload.encode(),
        hashlib.sha256
    ).hexdigest()
    return hmac.compare_digest(f"sha256={expected_signature}", signature)

@app.route('/webhooks/archivus', methods=['POST'])
def archivus_webhook():
    # Get signature from headers
    signature = request.headers.get('X-Archivus-Signature')
    if not signature:
        return jsonify({"error": "Missing signature"}), 401
    
    # Get raw payload
    payload = request.get_data(as_text=True)
    
    # Verify signature
    if not verify_webhook_signature(payload, signature, WEBHOOK_SECRET):
        return jsonify({"error": "Invalid signature"}), 401
    
    # Parse event
    try:
        event = json.loads(payload)
    except json.JSONDecodeError:
        return jsonify({"error": "Invalid JSON"}), 400
    
    # Process event
    event_type = event.get('type')
    event_data = event.get('data', {})
    
    if event_type == 'document.processed':
        handle_document_processed(event_data)
    elif event_type == 'document.failed':
        handle_document_failed(event_data)
    elif event_type == 'document.uploaded':
        handle_document_uploaded(event_data)
    elif event_type == 'chat.message.created':
        handle_chat_message(event_data)
    else:
        print(f"Unknown event type: {event_type}")
    
    # Always return 200 quickly
    return jsonify({"status": "ok"}), 200

def handle_document_processed(data):
    """Handle document processed event."""
    document_id = data.get('document_id')
    document = data.get('document', {})
    
    print(f"Document {document_id} processed successfully")
    print(f"Filename: {document.get('filename')}")
    print(f"AI Summary: {document.get('ai_summary', 'N/A')}")
    
    # Your business logic here
    # - Send notification
    # - Update database
    # - Trigger workflow
    # etc.

def handle_document_failed(data):
    """Handle document processing failure."""
    document_id = data.get('document_id')
    error = data.get('error', 'Unknown error')
    
    print(f"Document {document_id} processing failed: {error}")
    
    # Your error handling logic here
    # - Send alert
    # - Log error
    # - Retry processing
    # etc.

def handle_document_uploaded(data):
    """Handle document uploaded event."""
    document_id = data.get('document_id')
    
    print(f"Document {document_id} uploaded")
    
    # Your logic here
    # - Send confirmation
    # - Start processing workflow
    # etc.

def handle_chat_message(data):
    """Handle chat message created event."""
    session_id = data.get('session_id')
    message_id = data.get('message_id')
    role = data.get('role')
    
    print(f"Chat message created in session {session_id}")
    print(f"Role: {role}, Message ID: {message_id}")
    
    # Your logic here
    # - Send notification
    # - Update UI
    # - Log conversation
    # etc.

if __name__ == '__main__':
    app.run(port=5000)

Node.js Express Example

const express = require('express');
const crypto = require('crypto');
const app = express();

const WEBHOOK_SECRET = 'your-webhook-secret';

function verifyWebhookSignature(payload, signature, secret) {
  const expectedSignature = crypto
    .createHmac('sha256', secret)
    .update(payload)
    .digest('hex');
  
  return crypto.timingSafeEqual(
    Buffer.from(`sha256=${expectedSignature}`),
    Buffer.from(signature)
  );
}

// Middleware to verify webhook signature
app.use('/webhooks/archivus', express.raw({ type: 'application/json' }), (req, res, next) => {
  const signature = req.headers['x-archivus-signature'];
  
  if (!signature) {
    return res.status(401).json({ error: 'Missing signature' });
  }
  
  const payload = req.body.toString();
  
  if (!verifyWebhookSignature(payload, signature, WEBHOOK_SECRET)) {
    return res.status(401).json({ error: 'Invalid signature' });
  }
  
  req.webhookPayload = JSON.parse(payload);
  next();
});

app.post('/webhooks/archivus', (req, res) => {
  const event = req.webhookPayload;
  const eventType = event.type;
  const eventData = event.data || {};
  
  // Process event
  switch (eventType) {
    case 'document.processed':
      handleDocumentProcessed(eventData);
      break;
    case 'document.failed':
      handleDocumentFailed(eventData);
      break;
    case 'document.uploaded':
      handleDocumentUploaded(eventData);
      break;
    case 'chat.message.created':
      handleChatMessage(eventData);
      break;
    default:
      console.log(`Unknown event type: ${eventType}`);
  }
  
  // Always return 200 quickly
  res.json({ status: 'ok' });
});

function handleDocumentProcessed(data) {
  const { document_id, document } = data;
  
  console.log(`Document ${document_id} processed successfully`);
  console.log(`Filename: ${document?.filename}`);
  console.log(`AI Summary: ${document?.ai_summary || 'N/A'}`);
  
  // Your business logic here
}

function handleDocumentFailed(data) {
  const { document_id, error } = data;
  
  console.log(`Document ${document_id} processing failed: ${error || 'Unknown error'}`);
  
  // Your error handling logic here
}

function handleDocumentUploaded(data) {
  const { document_id } = data;
  
  console.log(`Document ${document_id} uploaded`);
  
  // Your logic here
}

function handleChatMessage(data) {
  const { session_id, message_id, role } = data;
  
  console.log(`Chat message created in session ${session_id}`);
  console.log(`Role: ${role}, Message ID: ${message_id}`);
  
  // Your logic here
}

app.listen(5000, () => {
  console.log('Webhook server listening on port 5000');
});

Event Types

Document Processed

{
  "id": "event_abc123",
  "type": "document.processed",
  "timestamp": "2025-12-16T10:30:00Z",
  "data": {
    "document_id": "doc_abc123",
    "document": {
      "id": "doc_abc123",
      "filename": "contract.pdf",
      "status": "completed",
      "ai_status": "completed",
      "ai_summary": "...",
      "ai_tags": ["contract", "legal"]
    }
  }
}

Document Failed

{
  "id": "event_def456",
  "type": "document.failed",
  "timestamp": "2025-12-16T10:30:00Z",
  "data": {
    "document_id": "doc_def456",
    "error": "Processing failed: Invalid file format"
  }
}

Chat Message Created

{
  "id": "event_ghi789",
  "type": "chat.message.created",
  "timestamp": "2025-12-16T10:30:00Z",
  "data": {
    "session_id": "session_xyz",
    "message_id": "msg_123",
    "role": "assistant",
    "document_id": "doc_abc123"
  }
}

Best Practices

Security

  1. Always verify signatures - Never process unverified webhooks
  2. Use HTTPS - Webhook URLs must use HTTPS
  3. Rotate secrets - Regularly rotate webhook secrets
  4. Validate events - Verify event types before processing

Reliability

  1. Return 200 quickly - Respond within 5 seconds
  2. Process asynchronously - Don’t block on webhook processing
  3. Handle failures - Implement retry logic for failed processing
  4. Idempotency - Handle duplicate events gracefully

Performance

  1. Use queues - Queue webhook events for async processing
  2. Batch processing - Process multiple events together
  3. Rate limiting - Implement rate limiting on your endpoint

Advanced: Queue-Based Processing

from celery import Celery
import redis

app = Celery('webhook_processor')
app.conf.broker_url = 'redis://localhost:6379/0'

@app.task
def process_webhook_event(event_type, event_data):
    """Process webhook event asynchronously."""
    if event_type == 'document.processed':
        handle_document_processed(event_data)
    elif event_type == 'document.failed':
        handle_document_failed(event_data)
    # ... other event types

@app.route('/webhooks/archivus', methods=['POST'])
def archivus_webhook():
    # Verify signature (same as before)
    signature = request.headers.get('X-Archivus-Signature')
    payload = request.get_data(as_text=True)
    
    if not verify_webhook_signature(payload, signature, WEBHOOK_SECRET):
        return jsonify({"error": "Invalid signature"}), 401
    
    # Parse and queue event
    event = json.loads(payload)
    process_webhook_event.delay(event['type'], event.get('data', {}))
    
    # Return immediately
    return jsonify({"status": "ok"}), 200

Testing Webhooks

Using ngrok

# Install ngrok
brew install ngrok  # macOS
# or download from https://ngrok.com

# Start your webhook server
python app.py

# In another terminal, expose your local server
ngrok http 5000

# Use the ngrok URL for webhook configuration
# Example: https://abc123.ngrok.io/webhooks/archivus

Testing Locally

import requests

# Test webhook endpoint
test_event = {
    "id": "test_event",
    "type": "document.processed",
    "timestamp": "2025-12-16T10:30:00Z",
    "data": {
        "document_id": "doc_test",
        "document": {
            "id": "doc_test",
            "filename": "test.pdf",
            "status": "completed"
        }
    }
}

# Sign the payload
import hmac
import hashlib
payload = json.dumps(test_event)
signature = hmac.new(
    WEBHOOK_SECRET.encode(),
    payload.encode(),
    hashlib.sha256
).hexdigest()

# Send test webhook
response = requests.post(
    'http://localhost:5000/webhooks/archivus',
    data=payload,
    headers={
        'Content-Type': 'application/json',
        'X-Archivus-Signature': f'sha256={signature}'
    }
)

print(response.status_code)
print(response.json())

Next Steps


Questions? Check the FAQ or contact support@ubiship.com