Skip to content

Code Examples

Practical examples for common Archivus integration tasks.


Quick Examples

Upload and Process Document

const { ArchivusClient } = require('@archivus/sdk');

const client = new ArchivusClient({
  apiKey: process.env.ARCHIVUS_API_KEY,
  tenant: process.env.ARCHIVUS_TENANT
});

async function uploadAndProcess() {
  const fs = require('fs');
  const file = fs.createReadStream('contract.pdf');

  // Upload with AI processing
  const document = await client.documents.upload({
    file,
    enableAI: true,
    folderId: 'folder_legal'
  });

  console.log('Document uploaded:', document.id);

  // Poll for AI completion
  let processed = false;
  while (!processed) {
    const doc = await client.documents.get(document.id);
    if (doc.ai_status === 'completed') {
      console.log('AI Summary:', doc.ai_summary);
      console.log('AI Tags:', doc.ai_tags);
      console.log('Entities:', doc.entities);
      processed = true;
    } else if (doc.ai_status === 'failed') {
      console.error('AI processing failed');
      break;
    }
    await new Promise(resolve => setTimeout(resolve, 2000));
  }
}

uploadAndProcess();
import time
from archivus import ArchivusClient

client = ArchivusClient(
    api_key=os.environ['ARCHIVUS_API_KEY'],
    tenant=os.environ['ARCHIVUS_TENANT']
)

def upload_and_process():
    with open('contract.pdf', 'rb') as file:
        # Upload with AI processing
        document = client.documents.upload(
            file=file,
            enable_ai=True,
            folder_id='folder_legal'
        )

    print(f'Document uploaded: {document.id}')

    # Poll for AI completion
    while True:
        doc = client.documents.get(document.id)
        if doc.ai_status == 'completed':
            print(f'AI Summary: {doc.ai_summary}')
            print(f'AI Tags: {doc.ai_tags}')
            print(f'Entities: {doc.entities}')
            break
        elif doc.ai_status == 'failed':
            print('AI processing failed')
            break
        time.sleep(2)

upload_and_process()
package main

import (
    "context"
    "fmt"
    "os"
    "time"

    "github.com/archivus/archivus-go"
)

func main() {
    client := archivus.NewClient(
        os.Getenv("ARCHIVUS_API_KEY"),
        os.Getenv("ARCHIVUS_TENANT"),
    )

    ctx := context.Background()

    file, _ := os.Open("contract.pdf")
    defer file.Close()

    // Upload with AI processing
    doc, err := client.Documents.Upload(ctx, &archivus.UploadOptions{
        File: file,
        EnableAI: true,
        FolderID: "folder_legal",
    })
    if err != nil {
        panic(err)
    }

    fmt.Printf("Document uploaded: %s\n", doc.ID)

    // Poll for AI completion
    for {
        doc, _ := client.Documents.Get(ctx, doc.ID)
        if doc.AIStatus == "completed" {
            fmt.Printf("AI Summary: %s\n", doc.AISummary)
            fmt.Printf("AI Tags: %v\n", doc.AITags)
            fmt.Printf("Entities: %v\n", doc.Entities)
            break
        } else if doc.AIStatus == "failed" {
            fmt.Println("AI processing failed")
            break
        }
        time.Sleep(2 * time.Second)
    }
}

async function searchDocuments() {
  const results = await client.search.query('contract renewal terms', {
    mode: 'semantic',
    limit: 10,
    minScore: 0.7,
    folderId: 'folder_legal'
  });

  console.log(`Found ${results.total} documents`);

  results.results.forEach(result => {
    console.log(`\n${result.document.filename} (score: ${result.score})`);
    console.log(`Summary: ${result.document.ai_summary}`);
    console.log(`Highlight: ${result.highlight}`);
  });
}
def search_documents():
    results = client.search.query(
        'contract renewal terms',
        mode='semantic',
        limit=10,
        min_score=0.7,
        folder_id='folder_legal'
    )

    print(f'Found {results.total} documents')

    for result in results.results:
        print(f'\n{result.document.filename} (score: {result.score})')
        print(f'Summary: {result.document.ai_summary}')
        print(f'Highlight: {result.highlight}')

Chat with Document

async function chatWithDocument(documentId) {
  // Create session
  const session = await client.chat.createSession({
    documentId,
    name: 'Contract Analysis'
  });

  // Ask multiple questions
  const questions = [
    'What are the key terms?',
    'What is the contract duration?',
    'What are the payment terms?',
    'What are the termination conditions?'
  ];

  for (const question of questions) {
    const answer = await client.chat.ask(session.id, question);

    console.log(`\nQ: ${question}`);
    console.log(`A: ${answer.content}`);
    console.log(`Confidence: ${(answer.confidence * 100).toFixed(0)}%`);

    if (answer.sources.length > 0) {
      console.log('Sources:');
      answer.sources.forEach(source => {
        console.log(`  - Page ${source.page}: "${source.excerpt}"`);
      });
    }
  }
}
def chat_with_document(document_id):
    # Create session
    session = client.chat.create_session(
        document_id=document_id,
        name='Contract Analysis'
    )

    # Ask multiple questions
    questions = [
        'What are the key terms?',
        'What is the contract duration?',
        'What are the payment terms?',
        'What are the termination conditions?'
    ]

    for question in questions:
        answer = client.chat.ask(session.id, question)

        print(f'\nQ: {question}')
        print(f'A: {answer.content}')
        print(f'Confidence: {answer.confidence * 100:.0f}%')

        if answer.sources:
            print('Sources:')
            for source in answer.sources:
                print(f'  - Page {source.page}: "{source.excerpt}"')

Batch Upload

const fs = require('fs');
const path = require('path');

async function batchUpload(directory) {
  const files = fs.readdirSync(directory)
    .filter(f => f.endsWith('.pdf'));

  console.log(`Uploading ${files.length} files...`);

  const uploadPromises = files.map(filename => {
    const filepath = path.join(directory, filename);
    const file = fs.createReadStream(filepath);

    return client.documents.upload({
      file,
      enableAI: true
    }).then(doc => {
      console.log(`✓ Uploaded: ${filename}${doc.id}`);
      return doc;
    }).catch(err => {
      console.error(`✗ Failed: ${filename}${err.message}`);
      return null;
    });
  });

  const documents = await Promise.all(uploadPromises);
  const successful = documents.filter(d => d !== null);

  console.log(`\nCompleted: ${successful.length}/${files.length} files uploaded`);
  return successful;
}
import os
from concurrent.futures import ThreadPoolExecutor, as_completed

def upload_file(filepath):
    try:
        with open(filepath, 'rb') as f:
            doc = client.documents.upload(file=f, enable_ai=True)
        print(f'✓ Uploaded: {os.path.basename(filepath)}{doc.id}')
        return doc
    except Exception as e:
        print(f'✗ Failed: {os.path.basename(filepath)}{e}')
        return None

def batch_upload(directory):
    files = [f for f in os.listdir(directory) if f.endswith('.pdf')]
    filepaths = [os.path.join(directory, f) for f in files]

    print(f'Uploading {len(files)} files...')

    documents = []
    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(upload_file, fp) for fp in filepaths]
        for future in as_completed(futures):
            doc = future.result()
            if doc:
                documents.append(doc)

    print(f'\nCompleted: {len(documents)}/{len(files)} files uploaded')
    return documents

Webhook Handler

const express = require('express');
const { verifyWebhookSignature } = require('@archivus/sdk');

const app = express();
const WEBHOOK_SECRET = process.env.WEBHOOK_SECRET;

app.post('/webhooks/archivus',
  express.raw({ type: 'application/json' }),
  async (req, res) => {
    const signature = req.headers['x-archivus-signature'];
    const payload = req.body.toString();

    // Verify signature
    if (!verifyWebhookSignature(payload, signature, WEBHOOK_SECRET)) {
      return res.status(401).send('Invalid signature');
    }

    const event = JSON.parse(payload);

    // Handle event asynchronously
    handleEvent(event).catch(console.error);

    // Respond immediately
    res.json({ status: 'ok' });
  }
);

async function handleEvent(event) {
  switch (event.type) {
    case 'document.processed':
      await onDocumentProcessed(event.data);
      break;
    case 'document.failed':
      await onDocumentFailed(event.data);
      break;
    case 'dag.execution.completed':
      await onWorkflowCompleted(event.data);
      break;
  }
}

async function onDocumentProcessed(data) {
  console.log(`Document ${data.document_id} processed`);
  console.log(`Summary: ${data.ai_summary}`);

  // Trigger downstream processing
  await processDocument(data.document_id);
}

app.listen(3000, () => {
  console.log('Webhook server listening on port 3000');
});
from flask import Flask, request, jsonify
from archivus.webhooks import verify_webhook_signature
import threading

app = Flask(__name__)
WEBHOOK_SECRET = os.environ['WEBHOOK_SECRET']

@app.route('/webhooks/archivus', methods=['POST'])
def archivus_webhook():
    signature = request.headers.get('X-Archivus-Signature')
    payload = request.get_data(as_text=True)

    # Verify signature
    if not verify_webhook_signature(payload, signature, WEBHOOK_SECRET):
        return jsonify({'error': 'Invalid signature'}), 401

    event = request.json

    # Handle event asynchronously
    thread = threading.Thread(target=handle_event, args=(event,))
    thread.start()

    # Respond immediately
    return jsonify({'status': 'ok'}), 200

def handle_event(event):
    if event['type'] == 'document.processed':
        on_document_processed(event['data'])
    elif event['type'] == 'document.failed':
        on_document_failed(event['data'])
    elif event['type'] == 'dag.execution.completed':
        on_workflow_completed(event['data'])

def on_document_processed(data):
    print(f"Document {data['document_id']} processed")
    print(f"Summary: {data['ai_summary']}")

    # Trigger downstream processing
    process_document(data['document_id'])

if __name__ == '__main__':
    app.run(port=3000)

More Examples

For more examples, visit: