Code Examples¶
Practical examples for common Archivus integration tasks.
Quick Examples¶
Upload and Process Document¶
const { ArchivusClient } = require('@archivus/sdk');
const client = new ArchivusClient({
apiKey: process.env.ARCHIVUS_API_KEY,
tenant: process.env.ARCHIVUS_TENANT
});
async function uploadAndProcess() {
const fs = require('fs');
const file = fs.createReadStream('contract.pdf');
// Upload with AI processing
const document = await client.documents.upload({
file,
enableAI: true,
folderId: 'folder_legal'
});
console.log('Document uploaded:', document.id);
// Poll for AI completion
let processed = false;
while (!processed) {
const doc = await client.documents.get(document.id);
if (doc.ai_status === 'completed') {
console.log('AI Summary:', doc.ai_summary);
console.log('AI Tags:', doc.ai_tags);
console.log('Entities:', doc.entities);
processed = true;
} else if (doc.ai_status === 'failed') {
console.error('AI processing failed');
break;
}
await new Promise(resolve => setTimeout(resolve, 2000));
}
}
uploadAndProcess();
import time
from archivus import ArchivusClient
client = ArchivusClient(
api_key=os.environ['ARCHIVUS_API_KEY'],
tenant=os.environ['ARCHIVUS_TENANT']
)
def upload_and_process():
with open('contract.pdf', 'rb') as file:
# Upload with AI processing
document = client.documents.upload(
file=file,
enable_ai=True,
folder_id='folder_legal'
)
print(f'Document uploaded: {document.id}')
# Poll for AI completion
while True:
doc = client.documents.get(document.id)
if doc.ai_status == 'completed':
print(f'AI Summary: {doc.ai_summary}')
print(f'AI Tags: {doc.ai_tags}')
print(f'Entities: {doc.entities}')
break
elif doc.ai_status == 'failed':
print('AI processing failed')
break
time.sleep(2)
upload_and_process()
package main
import (
"context"
"fmt"
"os"
"time"
"github.com/archivus/archivus-go"
)
func main() {
client := archivus.NewClient(
os.Getenv("ARCHIVUS_API_KEY"),
os.Getenv("ARCHIVUS_TENANT"),
)
ctx := context.Background()
file, _ := os.Open("contract.pdf")
defer file.Close()
// Upload with AI processing
doc, err := client.Documents.Upload(ctx, &archivus.UploadOptions{
File: file,
EnableAI: true,
FolderID: "folder_legal",
})
if err != nil {
panic(err)
}
fmt.Printf("Document uploaded: %s\n", doc.ID)
// Poll for AI completion
for {
doc, _ := client.Documents.Get(ctx, doc.ID)
if doc.AIStatus == "completed" {
fmt.Printf("AI Summary: %s\n", doc.AISummary)
fmt.Printf("AI Tags: %v\n", doc.AITags)
fmt.Printf("Entities: %v\n", doc.Entities)
break
} else if doc.AIStatus == "failed" {
fmt.Println("AI processing failed")
break
}
time.Sleep(2 * time.Second)
}
}
Semantic Search¶
async function searchDocuments() {
const results = await client.search.query('contract renewal terms', {
mode: 'semantic',
limit: 10,
minScore: 0.7,
folderId: 'folder_legal'
});
console.log(`Found ${results.total} documents`);
results.results.forEach(result => {
console.log(`\n${result.document.filename} (score: ${result.score})`);
console.log(`Summary: ${result.document.ai_summary}`);
console.log(`Highlight: ${result.highlight}`);
});
}
def search_documents():
results = client.search.query(
'contract renewal terms',
mode='semantic',
limit=10,
min_score=0.7,
folder_id='folder_legal'
)
print(f'Found {results.total} documents')
for result in results.results:
print(f'\n{result.document.filename} (score: {result.score})')
print(f'Summary: {result.document.ai_summary}')
print(f'Highlight: {result.highlight}')
Chat with Document¶
async function chatWithDocument(documentId) {
// Create session
const session = await client.chat.createSession({
documentId,
name: 'Contract Analysis'
});
// Ask multiple questions
const questions = [
'What are the key terms?',
'What is the contract duration?',
'What are the payment terms?',
'What are the termination conditions?'
];
for (const question of questions) {
const answer = await client.chat.ask(session.id, question);
console.log(`\nQ: ${question}`);
console.log(`A: ${answer.content}`);
console.log(`Confidence: ${(answer.confidence * 100).toFixed(0)}%`);
if (answer.sources.length > 0) {
console.log('Sources:');
answer.sources.forEach(source => {
console.log(` - Page ${source.page}: "${source.excerpt}"`);
});
}
}
}
def chat_with_document(document_id):
# Create session
session = client.chat.create_session(
document_id=document_id,
name='Contract Analysis'
)
# Ask multiple questions
questions = [
'What are the key terms?',
'What is the contract duration?',
'What are the payment terms?',
'What are the termination conditions?'
]
for question in questions:
answer = client.chat.ask(session.id, question)
print(f'\nQ: {question}')
print(f'A: {answer.content}')
print(f'Confidence: {answer.confidence * 100:.0f}%')
if answer.sources:
print('Sources:')
for source in answer.sources:
print(f' - Page {source.page}: "{source.excerpt}"')
Batch Upload¶
const fs = require('fs');
const path = require('path');
async function batchUpload(directory) {
const files = fs.readdirSync(directory)
.filter(f => f.endsWith('.pdf'));
console.log(`Uploading ${files.length} files...`);
const uploadPromises = files.map(filename => {
const filepath = path.join(directory, filename);
const file = fs.createReadStream(filepath);
return client.documents.upload({
file,
enableAI: true
}).then(doc => {
console.log(`✓ Uploaded: ${filename} → ${doc.id}`);
return doc;
}).catch(err => {
console.error(`✗ Failed: ${filename} → ${err.message}`);
return null;
});
});
const documents = await Promise.all(uploadPromises);
const successful = documents.filter(d => d !== null);
console.log(`\nCompleted: ${successful.length}/${files.length} files uploaded`);
return successful;
}
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
def upload_file(filepath):
try:
with open(filepath, 'rb') as f:
doc = client.documents.upload(file=f, enable_ai=True)
print(f'✓ Uploaded: {os.path.basename(filepath)} → {doc.id}')
return doc
except Exception as e:
print(f'✗ Failed: {os.path.basename(filepath)} → {e}')
return None
def batch_upload(directory):
files = [f for f in os.listdir(directory) if f.endswith('.pdf')]
filepaths = [os.path.join(directory, f) for f in files]
print(f'Uploading {len(files)} files...')
documents = []
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(upload_file, fp) for fp in filepaths]
for future in as_completed(futures):
doc = future.result()
if doc:
documents.append(doc)
print(f'\nCompleted: {len(documents)}/{len(files)} files uploaded')
return documents
Webhook Handler¶
const express = require('express');
const { verifyWebhookSignature } = require('@archivus/sdk');
const app = express();
const WEBHOOK_SECRET = process.env.WEBHOOK_SECRET;
app.post('/webhooks/archivus',
express.raw({ type: 'application/json' }),
async (req, res) => {
const signature = req.headers['x-archivus-signature'];
const payload = req.body.toString();
// Verify signature
if (!verifyWebhookSignature(payload, signature, WEBHOOK_SECRET)) {
return res.status(401).send('Invalid signature');
}
const event = JSON.parse(payload);
// Handle event asynchronously
handleEvent(event).catch(console.error);
// Respond immediately
res.json({ status: 'ok' });
}
);
async function handleEvent(event) {
switch (event.type) {
case 'document.processed':
await onDocumentProcessed(event.data);
break;
case 'document.failed':
await onDocumentFailed(event.data);
break;
case 'dag.execution.completed':
await onWorkflowCompleted(event.data);
break;
}
}
async function onDocumentProcessed(data) {
console.log(`Document ${data.document_id} processed`);
console.log(`Summary: ${data.ai_summary}`);
// Trigger downstream processing
await processDocument(data.document_id);
}
app.listen(3000, () => {
console.log('Webhook server listening on port 3000');
});
from flask import Flask, request, jsonify
from archivus.webhooks import verify_webhook_signature
import threading
app = Flask(__name__)
WEBHOOK_SECRET = os.environ['WEBHOOK_SECRET']
@app.route('/webhooks/archivus', methods=['POST'])
def archivus_webhook():
signature = request.headers.get('X-Archivus-Signature')
payload = request.get_data(as_text=True)
# Verify signature
if not verify_webhook_signature(payload, signature, WEBHOOK_SECRET):
return jsonify({'error': 'Invalid signature'}), 401
event = request.json
# Handle event asynchronously
thread = threading.Thread(target=handle_event, args=(event,))
thread.start()
# Respond immediately
return jsonify({'status': 'ok'}), 200
def handle_event(event):
if event['type'] == 'document.processed':
on_document_processed(event['data'])
elif event['type'] == 'document.failed':
on_document_failed(event['data'])
elif event['type'] == 'dag.execution.completed':
on_workflow_completed(event['data'])
def on_document_processed(data):
print(f"Document {data['document_id']} processed")
print(f"Summary: {data['ai_summary']}")
# Trigger downstream processing
process_document(data['document_id'])
if __name__ == '__main__':
app.run(port=3000)
More Examples¶
For more examples, visit: