TurfAITurfAI Developers
Concepts

Task types

The unit of work in a workflow — AI, control flow, and integration tasks, with copy-pasteable config and output for each.

A task is one node in a workflow; its task type determines what it does. This page is about the task types themselves — the concrete config and the output shape each one produces. The companion Workflows page covers the DAG, triggers, and execution lifecycle.

Discover the live catalog (with config, input, and output JSON schemas) at GET /workflows/task-types. The task types reference has the exhaustive, synced per-type schemas — link there for the full contract; this page is the guided tour.

Categories

CategoryTask types
AI operationsclassification, extraction, llm, summarization, classify_and_extract, rag_enable, rag_query
Control flowdecision, wait, for_each
Integrationsgoogle_drive_fetch, rest_api, email_send
Agenticagent (see Agents), squad (see Squads)

Every task type accepts {{variable}} templates in its config.

How a task works

A task is a single node in the workflow's nodes array. The processor only executes nodes whose type is the literal string "task"; the specific task type lives in data.task_type. Each node carries its config and, for AI tasks, declares its I/O via input_schema / output_schema.

{
  "id": "extract-resume",
  "type": "task",
  "position": { "x": 100, "y": 200 },
  "data": {
    "label": "Extract Resume Fields",
    "task_type": "extraction_task",
    "config": {
      "output_format": "json",
      "prompt_id": 42
    },
    "output_mapping": {
      "candidate_name": "$.extraction_result.name",
      "candidate_skills": "$.extraction_result.skills"
    }
  }
}

In the editor a node's type is the specific task type (e.g. extraction_task); before execution it is transformed to the generic type: "task" with data.task_type set, and the workflow definition must include a top-level input_schema. The visual builder does this for you — you only hand-write it when posting a definition directly. See Workflows → the workflow definition.

Data flow: {{variable}} and output_mapping

Two complementary mechanisms move data between nodes:

  • {{variable}} templates pull a value into a node's config. Variables come from the trigger payload (inputs), workflow context (user_id, execution_id), or any upstream node's output. Example: "url": "https://api.example.com/roles/{{role_id}}".
  • output_mapping names values out of a node. Each entry is variable_name → JSONPath evaluated against the node's raw result, promoting a nested field to a top-level workflow variable that downstream {{variable}} templates and decision operands can reference.
trigger inputs { "role_id": 7, "resume_file_id": "1ABC" }

   ▼  uses {{role_id}}                       output_mapping → {{jd_file_id}}
node rest_api_task ───────────────────────────────────────────────────────┐
   │                                                                        ▼
node google_drive_fetch_task uses {{jd_file_id}} → emits {{document_id}}, {{file_url}}

node extraction_task → output_mapping → {{candidate_name}}, {{candidate_skills}}

JSONPath in output_mapping and decision operands supports $, $.field, $.a.b, $.items[0], and $.items[0].field.

AI operations

All AI tasks take a model_type (vertex default, plus openai, anthropic). Document tasks read a file_url (a GCS URL, usually emitted by an upstream fetch or the trigger's file upload).

classification

Label a document (document_type, sentiment, or topic) with a confidence score.

{
  "task_type": "classification_task",
  "config": {
    "classification_type": "document_type",
    "include_confidence": true,
    "return_multiple": false
  }
}

Output — produces a classification object plus a confidence:

{
  "prompt_title": "Resume Classification",
  "section": "HR Documents",
  "confidence": 0.95,
  "classification": { "type": "resume", "subtype": "technical" }
}

extraction

Pull structured fields from a document. output_format is json or text; point at a saved prompt with prompt_id, or set use_classification_prompt: true to reuse the prompt chosen by an upstream classification.

{
  "task_type": "extraction_task",
  "config": {
    "prompt_id": 42,
    "model_type": "vertex",
    "output_format": "json"
  }
}

Output — fields land under extraction_result:

{
  "extraction_result": {
    "name": "John Doe",
    "email": "john@example.com",
    "skills": ["Python", "Machine Learning"],
    "experience_years": 5
  },
  "confidence": 0.92,
  "tokens_used": 1250
}

llm

Free-form LLM processing. Supply a saved prompt_id or inline prompt_text; feed it a document (file_url), prior outputs (process_combined_inputs: true), or direct text_input.

{
  "task_type": "llm_task",
  "config": {
    "prompt_text": "Compare this resume to the job description and return a match_score 0-100 and a one-line rationale.",
    "model_type": "vertex",
    "process_combined_inputs": true,
    "output_format": "json"
  }
}

Output — the model's payload under result:

{ "result": { "match_score": 87, "rationale": "Strong Python + ML overlap" }, "tokens_used": 850 }

summarization

summary_type is brief, detailed, or bullet_points, capped by max_length (words).

{
  "task_type": "summarization_task",
  "config": { "summary_type": "brief", "max_length": 500 }
}

Output:

{ "summary": "Senior software engineer with 5 years in Python and ML...", "word_count": 150 }

classify_and_extract

Classify, then extract with the matched prompt, in one step.

{
  "task_type": "classify_and_extract",
  "config": { "classification_type": "document_type", "extract_after_classify": true }
}

Output combines both stages:

{
  "classification": { "type": "resume", "confidence": 0.95 },
  "extraction_result": { "name": "John Doe", "skills": ["Python", "ML"] }
}

rag_query

Query the knowledge base. (rag_enable indexes a document first — see RAG.)

{
  "task_type": "rag_query_task",
  "config": {
    "body": { "query": "{{question}}", "user_id": "{{user_id}}", "top_k": 5 }
  }
}

Output — a grounded answer plus the sources it cites:

{
  "answer": "According to the Remote Work Policy, employees may work remotely up to 3 days per week...",
  "sources": [
    {
      "document_id": 45,
      "document_name": "Remote Work Policy.pdf",
      "snippet": "...employees may work from home up to three days...",
      "score": 0.92
    }
  ],
  "session_id": "sess-abc123"
}

Choosing an AI task type

If you need to…UseWhy
Route by a single label (type / sentiment / topic)classificationReturns one label + confidence; pair with a decision.
Pull known fields into a fixed shapeextractionSchema-driven, deterministic field names under extraction_result.
Do both in one nodeclassify_and_extractLabel, then extract with the matched prompt.
Anything open-ended over text (compare, score, rewrite, reason)llmFree-form prompt; you define the output shape.
Answer from a corpus you've indexedrag_queryGrounded answer with cited sources.
Multi-step reasoning that picks its own toolsagentAutonomy + tool use — see Agents.

Rule of thumb: reach for classification/extraction when the output shape is fixed, llm when it isn't, and an agent only when the task needs to decide what to do rather than just process its input.

Control flow

decision

Evaluate one condition and route execution down a true or false edge — deterministic branching, no LLM call.

{
  "task_type": "decision",
  "config": {
    "operator": "==",
    "left_operand": "$.classification.type",
    "right_operand": "Invoice",
    "true_label": "true",
    "false_label": "false"
  }
}
OperatorMeaningExample
== / !=(Not) equals$.type == "Invoice"
> < >= <=Numeric comparison$.amount > 1000
inLeft value is in right (array/string)"pdf" in $.allowed_types
containsRight value is in left (array/string)$.tags contains "urgent"
matchesRegex match$.email matches ".*@company.com"

Operands are JSONPath expressions or literals. A decision node must have exactly two outgoing edges, labeled true and false. Nodes on the inactive path get status: "skipped"; paths can merge back downstream. Its output records the branch taken:

{ "result": true, "branch": "true", "active_path": "true",
  "condition": { "operator": "==", "left_value": "Invoice", "right_value": "Invoice" } }

Phase 1 is binary, single-comparison, and acyclic — no AND/OR/NOT and no loops. Chain several decision nodes for compound logic.

wait

Poll an endpoint until a watch_field reaches success_value — the generic async wait primitive (e.g. waiting on RAG indexing). It blocks the workflow until success, a failure_values match, or timeout.

{
  "task_type": "wait_task",
  "config": {
    "endpoint": "/api/internal/documents/{{document_id}}/rag-status",
    "method": "GET",
    "watch_field": "rag_processing_status",
    "success_value": "completed",
    "failure_values": ["failed", "error"],
    "poll_interval": 5,
    "timeout": 300
  }
}

Output:

{ "status": "success", "final_value": "completed", "response": { }, "elapsed_seconds": 45 }

Use wait for async steps inside one workflow run. To watch the whole workflow from outside, poll the execution status from your own code instead (see the tabbed example).

for_each

Iterate over an array and run a sub-workflow per item. items is a {{variable}} pointing at the array; item_variable names the current element inside sub_tasks. mode is sequential or parallel (bounded by max_parallel, default 10). accumulate rolls per-item values back up into workflow variables.

{
  "task_type": "for_each",
  "config": {
    "items": "{{employees}}",
    "item_variable": "employee",
    "mode": "parallel",
    "max_parallel": 5,
    "sub_tasks": [
      {
        "task_type": "llm_task",
        "config": { "prompt_text": "Write a one-line reminder for {{employee.name}}." }
      },
      {
        "task_type": "email_send_task",
        "config": { "to": ["{{employee.email}}"], "subject": "Reminder", "body": "{{result}}" }
      }
    ],
    "accumulate": [{ "as": "sent_summary", "type": "join", "delimiter": "\n" }]
  }
}

Output — per-iteration results plus counts (and any accumulators as top-level variables):

{ "iteration_results": [ { "index": 0, "status": "success" } ],
  "count": 12, "success_count": 12, "error_count": 0, "mode": "parallel" }

Integrations

Short config sketches below; the deep dives live in the REST integration guide and the event-driven guide, and the concepts in Integrations.

rest_api

Call any HTTP API with templated url / headers / body, a timeout, and output_mapping (JSONPath → variable) to lift fields out of the response.

{
  "task_type": "rest_api_task",
  "config": {
    "url": "https://api.example.com/roles/{{role_id}}",
    "method": "GET",
    "headers": { "Authorization": "Bearer {{api_token}}" },
    "timeout": 30,
    "output_mapping": {
      "jd_file_id": "$.data.jd_file_id",
      "hiring_manager_email": "$.data.hiring_manager_email"
    }
  }
}

The raw output exposes data, status_code, and _raw_response; the mapped names (jd_file_id, …) become first-class workflow variables.

email_send

Send templated HTML email. to and subject are required; cc / bcc / from / html optional.

{
  "task_type": "email_send_task",
  "config": {
    "to": ["{{hiring_manager_email}}"],
    "cc": ["hr@company.com"],
    "subject": "New Application: {{candidate_name}}",
    "body": "<h2>Match Score: {{match_score}}%</h2><p>{{ai_summary}}</p>",
    "html": true
  }
}

Output: { "sent": true, "recipients": [...], "timestamp": "..." }.

google_drive_fetch

Fetch a Drive file by file_id, or search a folder_id for a file_name; imports to storage and returns a file_url + document_id you can feed straight into an AI task.

{
  "task_type": "google_drive_fetch_task",
  "config": { "file_id": "{{resume_file_id}}" }
}

Output: { "file_url": "gs://turfdms/documents/123/file.pdf", "document_id": 123, "mime_type": "application/pdf", "size": 245789 }.

A realistic call: extraction end-to-end

Trigger a workflow whose first node is an extraction_task, then read that task's output from the execution status. The three tabs are equivalent — base URL https://apisandbox.turfai.in/api.

BASE="https://apisandbox.turfai.in/api"

# 1. Create an execution bound to the saved activity, then start it.
EXEC=$(curl -s -X POST "$BASE/workflow-executions" \
  -H "Authorization: Bearer $TURFAI_JWT" \
  -H "Content-Type: application/json" \
  -d '{ "data": { "activity": 42, "inputs": { "file_url": "gs://bucket/resume.pdf" } } }' \
  | python3 -c 'import sys,json;print(json.load(sys.stdin)["data"]["id"])')

curl -X POST "$BASE/workflow-executions/$EXEC/execute" \
  -H "Authorization: Bearer $TURFAI_JWT" \
  -H "Content-Type: application/json" \
  -d '{ "inputs": { "file_url": "gs://bucket/resume.pdf" } }'

# 2. Poll status; the extraction task's output is under results.extraction_result.
curl -s "$BASE/workflow-executions/$EXEC/status" \
  -H "Authorization: Bearer $TURFAI_JWT"
import os, time, requests

BASE = "https://apisandbox.turfai.in/api"
HEAD = {"Authorization": f"Bearer {os.environ['TURFAI_JWT']}", "Content-Type": "application/json"}

def run_extraction(activity_id: int, inputs: dict) -> dict:
    created = requests.post(f"{BASE}/workflow-executions", headers=HEAD,
                            json={"data": {"activity": activity_id, "inputs": inputs}})
    created.raise_for_status()
    exec_id = created.json()["data"]["id"]

    requests.post(f"{BASE}/workflow-executions/{exec_id}/execute",
                  headers=HEAD, json={"inputs": inputs}).raise_for_status()

    for _ in range(30):
        data = requests.get(f"{BASE}/workflow-executions/{exec_id}/status",
                            headers=HEAD).json()["data"]
        if data["status"] == "completed":
            return data["results"]["extraction_result"]   # the task's output
        if data["status"] == "failed":
            raise RuntimeError(data.get("error"))
        time.sleep(2)
    raise TimeoutError("polling timed out")

print(run_extraction(42, {"file_url": "gs://bucket/resume.pdf"}))
const BASE = "https://apisandbox.turfai.in/api";
const HEAD = {
  Authorization: `Bearer ${process.env.TURFAI_JWT}`,
  "Content-Type": "application/json",
};

export async function runExtraction(activityId: number, inputs: object) {
  const created = await fetch(`${BASE}/workflow-executions`, {
    method: "POST", headers: HEAD,
    body: JSON.stringify({ data: { activity: activityId, inputs } }),
  });
  if (!created.ok) throw new Error(`create exec failed: ${created.status}`);
  const execId = (await created.json()).data.id;

  await fetch(`${BASE}/workflow-executions/${execId}/execute`, {
    method: "POST", headers: HEAD, body: JSON.stringify({ inputs }),
  });

  for (let i = 0; i < 30; i++) {
    const res = await fetch(`${BASE}/workflow-executions/${execId}/status`, { headers: HEAD });
    const { data } = await res.json();
    if (data.status === "completed") return data.results.extraction_result; // the task's output
    if (data.status === "failed") throw new Error(data.error);
    await new Promise((r) => setTimeout(r, 2000));
  }
  throw new Error("polling timed out");
}

A completed status carries the per-node results under results, keyed by what each node produced — so the extraction_task above surfaces as results.extraction_result.

Error handling & retries

A non-2xx from a rest_api call, an AI processing failure, or any unhandled error fails the node. When that happens the execution status becomes failed and names the offending node:

{
  "status": "failed",
  "error": "REST API call failed: 404 Not Found",
  "failed_node": "fetch-role-config",
  "task_states": {
    "fetch-role-config": { "status": "failed", "error": "404 Not Found" }
  }
}

Design for failure rather than assuming every step succeeds:

  • Retries. Set a per-node max_retries to retry transient/retryable errors with backoff before the node is marked failed. It resolves through a tier chain (per-node → per-workflow → tenant → defaults), so a node value overrides the workflow- and tenant-level defaults.

    { "data": { "task_type": "rest_api_task", "config": { "max_retries": 3, "url": "..." } } }
  • Decision-gate on failure. Capture a status with output_mapping, then branch on it with a decision so a soft failure routes to a fallback path instead of aborting the run.

  • timeouts. Give rest_api and wait realistic timeouts. A wait that times out returns a failure with final_value and elapsed_seconds rather than hanging.

    { "status": "failure", "final_value": "processing", "elapsed_seconds": 300,
      "error": "Timeout waiting for completion" }
  • wait vs application polling. Use a wait node when one workflow run must block on an async step. Poll the execution status from your own code (as in the tabs above) to track the whole run from outside.

See also

  • Workflows — the DAG, triggers, variables, and execution lifecycle.
  • Agents and Squads — when a step should reason and pick its own tools.
  • RAG — indexing with rag_enable and querying with rag_query.
  • Task types reference and the API reference — the exhaustive, synced per-type schemas.

On this page