TurfAITurfAI Developers
Api_synced

TurfAI API Communication Standards v1.0

Synced from the source repositories. Do not edit by hand.

Date: 2025-10-23 Status: Proposed Scope: All communication between DMS, Router, Processor, and LLM Service

Overview

This document defines the standardized communication formats for the TurfAI distributed architecture. All components MUST adhere to these standards to ensure interoperability and maintainability.

Architecture Layers

┌─────────┐         ┌────────┐         ┌───────────┐         ┌─────────────┐
│   DMS   │────────>│ Router │────────>│ Processor │────────>│ LLM Service │
│ (Strapi)│<────────│(FastAPI)│<────────│  (Python) │<────────│  (FastAPI)  │
└─────────┘         └────────┘         └───────────┘         └─────────────┘
     │                   │                    │                       │
     └───────────────────┴────────────────────┴───────────────────────┘
                    Redis (queues + status)

1. DMS → Router Communication

1.1 Job Submission Format

Endpoint: POST /api/v1/jobs Authentication: X-API-Key header Content-Type: application/json

Request Schema

{
  "job_id": "string (required, unique)",
  "job_type": "enum (required)",
  "payload": {
    "...": "job-type-specific data"
  },
  "token": "string (optional, JWT for DMS authentication)"
}

Job Types

job_typeTarget Queuepayload.workflow_definitionpayload.inputspayload.documents
task_based_workflowworkflow_queue✅ Required✅ Required❌ N/A
document_extractionextraction_queue❌ N/A❌ N/A✅ Required
classificationextraction_queue❌ N/A❌ N/A✅ Required
OCRextraction_queue❌ N/A❌ N/A✅ Required

Workflow Job Payload Schema

{
  "job_id": "wf_123",
  "job_type": "task_based_workflow",
  "payload": {
    "workflow_definition": {
      "nodes": [
        {
          "id": "string (unique)",
          "type": "task | input | output",
          "data": {
            "label": "string",
            "task_type": "string",
            "config": {}
          },
          "position": {"x": 0, "y": 0}
        }
      ],
      "edges": [
        {
          "id": "string",
          "source": "node_id",
          "target": "node_id"
        }
      ],
      "input_schema": {
        "type": "object",
        "properties": {},
        "required": []
      }
    },
    "inputs": {
      "...": "workflow input data matching input_schema"
    }
  },
  "token": "eyJhbGciOi..."
}

Response Schema

{
  "job_id": "string",
  "success": true,
  "message": "Job enqueued on 'queue_name'"
}

1.2 Job Status Query

Endpoint: GET /api/v1/jobs/{job_id} Authentication: X-API-Key header

Response Schema

{
  "job_id": "string",
  "success": true,
  "message": "OK",
  "status": "queued | processing | completed | failed | awaiting_user_input",
  "metadata": {
    "status": "string",
    "created_at": "ISO8601 timestamp",
    "...": "additional fields"
  }
}

2. Router → Processor Communication

2.1 Message Format (via Redis)

Queue Names:

  • workflow_queue - Workflow jobs
  • extraction_queue - Extraction, classification, OCR jobs

Message Format: JSON string (same as DMS → Router request body)

{
  "job_id": "string",
  "job_type": "string",
  "payload": {},
  "token": "string"
}

IMPORTANT: Router forwards the exact payload it received from DMS without modification.

2.2 Results Publishing (Processor → Router)

Queue Name: results_queue

Message Format:

{
  "job_id": "string",
  "status": "completed | failed | awaiting_user_input",
  "outputs": {
    "...": "results data"
  },
  "error": "string (optional, only if status=failed)",
  "task_states": {
    "node_id": {
      "status": "completed | failed | pending",
      "result": {}
    }
  },
  "awaiting_node": "string (optional, if status=awaiting_user_input)",
  "awaiting_input_schema": {},
  "correlation_token": "string (optional)",
  "awaiting_since": "ISO8601 timestamp (optional)"
}

3. Processor → LLM Service Communication

3.1 Current Issue

Problem: Processor sends messages format but LLM API requires content + prompt_template format.

Processor sends:

{
  "model_type": "vertex",
  "messages": [
    {"role": "system", "content": "You are an AI assistant..."},
    {"role": "user", "content": "Extract information..."}
  ],
  "file_urls": [],
  "legacy_format": true
}

LLM API expects:

{
  "content": "string (required)",
  "prompt_template": "string (required)",
  "model_type": "vertex",
  "redact_pii": false,
  "legacy_format": true
}

3.2 Standardization Options

Change: Make LLM API support BOTH formats:

  1. content + prompt_template (current, for backward compatibility)
  2. messages array (OpenAI-style, modern standard)

Benefits:

  • Aligns with industry standards (OpenAI, Anthropic, etc.)
  • Processors don't need complex conversion logic
  • More flexible for future integrations

Implementation:

# llm-api/app/api/v1/extract.py

class ExtractRequest(BaseModel):
    # Make both optional
    content: Optional[Union[str, Dict[str, Any]]] = Field(
        None, description="Content to process (legacy format)"
    )
    prompt_template: Optional[str] = Field(
        None, description="Prompt template (legacy format)"
    )
    # Add messages support
    messages: Optional[List[Dict[str, str]]] = Field(
        None, description="Messages array (modern format)"
    )
    model_type: str = Field(default="vertex")
    redact_pii: bool = Field(default=False)
    legacy_format: bool = Field(default=True)

    @validator('messages', 'content', 'prompt_template')
    def validate_input_format(cls, v, values):
        # Require either messages OR (content + prompt_template)
        has_messages = values.get('messages') is not None
        has_legacy = (values.get('content') is not None or
                     values.get('prompt_template') is not None)

        if not has_messages and not has_legacy:
            raise ValueError(
                "Must provide either 'messages' array OR "
                "'content'/'prompt_template' fields"
            )
        return v

# In extract() function:
if request.messages:
    # Modern format - use messages directly
    messages = request.messages
else:
    # Legacy format - convert to messages
    messages = format_messages(request.prompt_template, request.content)

Option B: Update Processor Client (CURRENT APPROACH)

Change: Convert messages to content + prompt_template in TurfAI client

Already Implemented in /Users/zunder/code/turfai/processors/utils/turfai_client.py:237-261

Issues:

  • Still failing with HTTP 422
  • Conversion logic may lose message context
  • Doesn't follow industry standards

4. Standardized Response Formats

4.1 LLM Service Response

Format:

{
  "content": "string | dict | list",
  "model": "string (actual model name)",
  "provider": "string (vertex | openai | anthropic)",
  "cost": 0.0123
}

4.2 Processor Task Results

Format:

{
  "extraction_result": {},
  "metadata": {
    "document_name": "string",
    "section": "string",
    "output_format": "json | text",
    "batch_status": "completed | failed"
  },
  "processing_steps": ["step1", "step2"],
  "model_used": "string",
  "error": "string (optional)"
}

4.3 Workflow Execution Results

Format:

{
  "status": "completed | failed | awaiting_user_input",
  "results": {
    "node_id": {
      "extraction_result": {},
      "metadata": {},
      "error": "string (optional)"
    }
  },
  "task_states": {
    "node_id": {
      "status": "completed | failed | pending",
      "result": {}
    }
  }
}

5. Error Handling Standards

5.1 HTTP Status Codes

CodeMeaningWhen to Use
200SuccessRequest completed successfully
400Bad RequestInvalid input, validation failure
401UnauthorizedMissing or invalid API key/token
404Not FoundJob/resource doesn't exist
422Unprocessable EntityValid format but semantic error
500Internal Server ErrorUnexpected server error
503Service UnavailableDependency unavailable (retry)

5.2 Error Response Format

ALL services MUST return errors in this format:

{
  "error": "string (short error code)",
  "message": "string (human-readable description)",
  "details": {
    "...": "optional additional context"
  }
}

Examples:

// 400 Bad Request
{
  "error": "validation_error",
  "message": "Invalid job payload",
  "details": {
    "field": "workflow_definition.nodes",
    "reason": "nodes array cannot be empty"
  }
}

// 422 Unprocessable Entity
{
  "error": "missing_field",
  "message": "Required field 'content' or 'prompt_template' missing",
  "details": {
    "received_fields": ["model_type", "messages", "legacy_format"]
  }
}

// 503 Service Unavailable
{
  "error": "llm_service_unavailable",
  "message": "LLM service is temporarily unavailable",
  "details": {
    "service": "vertex-ai",
    "retry_after": 5
  }
}

6. Authentication Standards

6.1 Router Authentication

Method: API Key in header Header: X-API-Key: {api_key} Configuration: API_KEY environment variable

6.2 DMS Authentication

Method: JWT Bearer token Header: Authorization: Bearer {jwt_token} Token Source: DMS user authentication

6.3 Token Flow

User authenticates with DMS

     ├─> DMS generates JWT token

     ├─> Token passed in job submission

     ├─> Router stores token in Redis

     ├─> Processor retrieves token from Redis

     └─> Processor uses token for DMS callbacks

7. Implementation Checklist

  • Update /llm-api/app/api/v1/extract.py to accept both formats
  • Add validation for either messages OR content+prompt_template
  • Update internal message formatting logic
  • Add comprehensive tests for both formats
  • Update API documentation

Phase 2: Processor Client Cleanup

  • Remove message-to-content conversion from turfai_client.py
  • Update all processor callers to use messages format
  • Simplify extraction.py to use messages directly
  • Remove legacy conversion functions

Phase 3: Testing & Validation

  • Create E2E test for standardized communication
  • Test workflow execution with new format
  • Test extraction jobs with new format
  • Test error scenarios (missing fields, invalid tokens)
  • Performance testing

Phase 4: Documentation

  • Update CLAUDE.md with new standards
  • Create migration guide for existing workflows
  • Update API documentation (Swagger/OpenAPI)
  • Create troubleshooting guide

8. Migration Strategy

For Existing Systems

Timeline: 2-week migration window

  1. Week 1: Update LLM API with backward compatibility

    • Deploy updated LLM API supporting both formats
    • Monitor logs for format usage
    • No breaking changes
  2. Week 2: Update processors and clients

    • Update TurfAI client to use messages format
    • Test all workflows
    • Deprecate content+prompt_template format

Breaking Changes

None - Both formats will be supported indefinitely for backward compatibility.


9. Appendix: Complete Example Flow

Workflow Submission

  1. DMS → Router:
POST /api/v1/jobs
{
  "job_id": "wf_123",
  "job_type": "task_based_workflow",
  "payload": {
    "workflow_definition": {
      "nodes": [
        {
          "id": "extract-1",
          "type": "task",
          "data": {
            "task_type": "document_extraction",
            "config": {"output_format": "json"}
          }
        }
      ],
      "edges": []
    },
    "inputs": {"file_url": "gs://bucket/file.pdf"}
  },
  "token": "eyJhbGc..."
}
  1. Router → workflow_queue (Redis):
{
  "job_id": "wf_123",
  "job_type": "task_based_workflow",
  "payload": { /* same as above */ },
  "token": "eyJhbGc..."
}
  1. Processor → LLM Service:
POST /api/v1/extract
{
  "messages": [
    {"role": "system", "content": "You are an AI assistant..."},
    {"role": "user", "content": "Extract information from: ..."}
  ],
  "model_type": "vertex",
  "file_urls": [{"url": "gs://bucket/file.pdf", "mime_type": "application/pdf"}]
}
  1. LLM Service Response:
{
  "content": {"name": "John Doe", "age": 30},
  "model": "gemini-2.0-flash-exp",
  "provider": "vertex",
  "cost": 0.0012
}
  1. Processor → results_queue (Redis):
{
  "job_id": "wf_123",
  "status": "completed",
  "outputs": {
    "extract-1": {
      "extraction_result": {"name": "John Doe", "age": 30},
      "metadata": {
        "section": "general",
        "output_format": "json",
        "batch_status": "completed"
      },
      "model_used": "vertex-ai"
    }
  }
}
  1. Router → DMS:
PUT /api/workflow-executions/wf_123
Authorization: Bearer eyJhbGc...

{
  "data": {
    "status": "completed",
    "outputs": { /* same as above */ }
  }
}

10. Contact & Feedback

Document Owner: TurfAI Architecture Team Last Updated: 2025-10-23 Version: 1.0

For questions or suggestions, contact the development team.

On this page