Upload Document Example

Complete example of uploading and processing a document with Archivus.


Overview

This example demonstrates:

  • Authenticating with Archivus
  • Uploading a document
  • Checking processing status
  • Retrieving processed document with AI analysis

Python Example

import requests
import time
from typing import Optional, Dict, Any

class ArchivusClient:
    def __init__(self, api_key: str, tenant: str):
        self.api_key = api_key
        self.tenant = tenant
        self.base_url = "https://api.archivus.app/api/v1"
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "X-Tenant-Subdomain": tenant
        }
    
    def upload_document(
        self,
        file_path: str,
        folder_id: Optional[str] = None,
        enable_ai: bool = True,
        tags: Optional[list] = None
    ) -> Dict[str, Any]:
        """Upload a document and return document info."""
        url = f"{self.base_url}/documents/upload"
        
        with open(file_path, "rb") as f:
            files = {"file": f}
            data = {"enable_ai": str(enable_ai).lower()}
            
            if folder_id:
                data["folder_id"] = folder_id
            if tags:
                for i, tag in enumerate(tags):
                    data[f"tags[{i}]"] = tag
            
            response = requests.post(url, headers=self.headers, files=files, data=data)
            response.raise_for_status()
            return response.json()
    
    def get_document(self, document_id: str) -> Dict[str, Any]:
        """Get document details."""
        url = f"{self.base_url}/documents/{document_id}"
        response = requests.get(url, headers=self.headers)
        response.raise_for_status()
        return response.json()
    
    def wait_for_processing(
        self,
        document_id: str,
        timeout: int = 300,
        check_interval: int = 5
    ) -> Dict[str, Any]:
        """Wait for document processing to complete."""
        start_time = time.time()
        
        while time.time() - start_time < timeout:
            document = self.get_document(document_id)
            status = document.get("status")
            ai_status = document.get("ai_status")
            
            if status == "completed" and ai_status == "completed":
                return document
            
            if status == "failed" or ai_status == "failed":
                raise Exception(f"Document processing failed: {document.get('error', 'Unknown error')}")
            
            time.sleep(check_interval)
        
        raise TimeoutError(f"Document processing timed out after {timeout} seconds")


# Usage Example
def main():
    # Initialize client
    client = ArchivusClient(
        api_key="YOUR_API_KEY",
        tenant="your-tenant"
    )
    
    # Upload document
    print("Uploading document...")
    result = client.upload_document(
        file_path="contract.pdf",
        folder_id="folder_legal",
        enable_ai=True,
        tags=["contract", "Q4-2025"]
    )
    
    document_id = result["id"]
    print(f"Document uploaded: {document_id}")
    print(f"Status: {result['status']}")
    
    # Wait for processing
    print("Waiting for processing...")
    try:
        document = client.wait_for_processing(document_id)
        print("Processing complete!")
        
        # Display results
        print(f"\nDocument: {document['filename']}")
        print(f"AI Summary: {document.get('ai_summary', 'N/A')}")
        print(f"AI Tags: {', '.join(document.get('ai_tags', []))}")
        
        if document.get('entities'):
            entities = document['entities']
            print(f"\nEntities Found:")
            if entities.get('people'):
                print(f"  People: {', '.join(entities['people'])}")
            if entities.get('organizations'):
                print(f"  Organizations: {', '.join(entities['organizations'])}")
            if entities.get('dates'):
                print(f"  Dates: {', '.join(entities['dates'])}")
    
    except TimeoutError as e:
        print(f"Error: {e}")
    except Exception as e:
        print(f"Error: {e}")


if __name__ == "__main__":
    main()

JavaScript Example

class ArchivusClient {
  constructor(apiKey, tenant) {
    this.apiKey = apiKey;
    this.tenant = tenant;
    this.baseURL = 'https://api.archivus.app/api/v1';
    this.headers = {
      'Authorization': `Bearer ${apiKey}`,
      'X-Tenant-Subdomain': tenant
    };
  }
  
  async uploadDocument(file, folderId = null, enableAI = true, tags = []) {
    const formData = new FormData();
    formData.append('file', file);
    formData.append('enable_ai', enableAI);
    
    if (folderId) formData.append('folder_id', folderId);
    tags.forEach(tag => formData.append('tags[]', tag));
    
    const response = await fetch(`${this.baseURL}/documents/upload`, {
      method: 'POST',
      headers: this.headers,
      body: formData
    });
    
    if (!response.ok) {
      throw new Error(`Upload failed: ${response.statusText}`);
    }
    
    return response.json();
  }
  
  async getDocument(documentId) {
    const response = await fetch(`${this.baseURL}/documents/${documentId}`, {
      headers: this.headers
    });
    
    if (!response.ok) {
      throw new Error(`Failed to get document: ${response.statusText}`);
    }
    
    return response.json();
  }
  
  async waitForProcessing(documentId, timeout = 300000, checkInterval = 5000) {
    const startTime = Date.now();
    
    while (Date.now() - startTime < timeout) {
      const document = await this.getDocument(documentId);
      const status = document.status;
      const aiStatus = document.ai_status;
      
      if (status === 'completed' && aiStatus === 'completed') {
        return document;
      }
      
      if (status === 'failed' || aiStatus === 'failed') {
        throw new Error(`Document processing failed: ${document.error || 'Unknown error'}`);
      }
      
      await new Promise(resolve => setTimeout(resolve, checkInterval));
    }
    
    throw new Error(`Document processing timed out after ${timeout}ms`);
  }
}

// Usage Example
async function main() {
  const client = new ArchivusClient('YOUR_API_KEY', 'your-tenant');
  
  // Get file from input
  const fileInput = document.querySelector('input[type="file"]');
  const file = fileInput.files[0];
  
  if (!file) {
    console.error('No file selected');
    return;
  }
  
  try {
    // Upload document
    console.log('Uploading document...');
    const result = await client.uploadDocument(
      file,
      'folder_legal',
      true,
      ['contract', 'Q4-2025']
    );
    
    const documentId = result.id;
    console.log(`Document uploaded: ${documentId}`);
    console.log(`Status: ${result.status}`);
    
    // Wait for processing
    console.log('Waiting for processing...');
    const document = await client.waitForProcessing(documentId);
    console.log('Processing complete!');
    
    // Display results
    console.log(`\nDocument: ${document.filename}`);
    console.log(`AI Summary: ${document.ai_summary || 'N/A'}`);
    console.log(`AI Tags: ${document.ai_tags?.join(', ') || 'None'}`);
    
    if (document.entities) {
      console.log('\nEntities Found:');
      if (document.entities.people) {
        console.log(`  People: ${document.entities.people.join(', ')}`);
      }
      if (document.entities.organizations) {
        console.log(`  Organizations: ${document.entities.organizations.join(', ')}`);
      }
      if (document.entities.dates) {
        console.log(`  Dates: ${document.entities.dates.join(', ')}`);
      }
    }
  } catch (error) {
    console.error('Error:', error.message);
  }
}

// Run example
main();

cURL Example

#!/bin/bash

# Configuration
API_KEY="YOUR_API_KEY"
TENANT="your-tenant"
BASE_URL="https://api.archivus.app/api/v1"
FILE_PATH="contract.pdf"

# Upload document
echo "Uploading document..."
UPLOAD_RESPONSE=$(curl -s -X POST "${BASE_URL}/documents/upload" \
  -H "Authorization: Bearer ${API_KEY}" \
  -H "X-Tenant-Subdomain: ${TENANT}" \
  -F "file=@${FILE_PATH}" \
  -F "enable_ai=true" \
  -F "folder_id=folder_legal" \
  -F "tags[]=contract" \
  -F "tags[]=Q4-2025")

# Extract document ID
DOCUMENT_ID=$(echo $UPLOAD_RESPONSE | jq -r '.id')
echo "Document uploaded: ${DOCUMENT_ID}"

# Wait for processing
echo "Waiting for processing..."
MAX_WAIT=300
WAITED=0
CHECK_INTERVAL=5

while [ $WAITED -lt $MAX_WAIT ]; do
  DOCUMENT=$(curl -s "${BASE_URL}/documents/${DOCUMENT_ID}" \
    -H "Authorization: Bearer ${API_KEY}" \
    -H "X-Tenant-Subdomain: ${TENANT}")
  
  STATUS=$(echo $DOCUMENT | jq -r '.status')
  AI_STATUS=$(echo $DOCUMENT | jq -r '.ai_status')
  
  if [ "$STATUS" = "completed" ] && [ "$AI_STATUS" = "completed" ]; then
    echo "Processing complete!"
    echo ""
    echo "Document: $(echo $DOCUMENT | jq -r '.filename')"
    echo "AI Summary: $(echo $DOCUMENT | jq -r '.ai_summary // "N/A"')"
    echo "AI Tags: $(echo $DOCUMENT | jq -r '.ai_tags | join(", ")')"
    exit 0
  fi
  
  if [ "$STATUS" = "failed" ] || [ "$AI_STATUS" = "failed" ]; then
    echo "Processing failed!"
    exit 1
  fi
  
  sleep $CHECK_INTERVAL
  WAITED=$((WAITED + CHECK_INTERVAL))
done

echo "Processing timed out!"
exit 1

Error Handling

Handle Upload Errors

try:
    result = client.upload_document("contract.pdf")
except requests.exceptions.HTTPError as e:
    if e.response.status_code == 413:
        print("File too large")
    elif e.response.status_code == 415:
        print("Unsupported file type")
    else:
        print(f"Upload failed: {e}")

Handle Processing Errors

try:
    document = client.wait_for_processing(document_id)
except TimeoutError:
    print("Processing took too long. Check document status manually.")
except Exception as e:
    print(f"Processing error: {e}")

Next Steps


Questions? Check the FAQ or contact support@ubiship.com