Task types
The unit of work in a workflow — AI, control flow, and integration tasks, with copy-pasteable config and output for each.
A task is one node in a workflow; its task type determines
what it does. This page is about the task types themselves — the concrete config and the
output shape each one produces. The companion Workflows page covers
the DAG, triggers, and execution lifecycle.
Discover the live catalog (with config, input, and output JSON schemas) at
GET /workflows/task-types. The task types reference
has the exhaustive, synced per-type schemas — link there for the full contract; this page is the
guided tour.
Categories
| Category | Task types |
|---|---|
| AI operations | classification, extraction, llm, summarization, classify_and_extract, rag_enable, rag_query |
| Control flow | decision, wait, for_each |
| Integrations | google_drive_fetch, rest_api, email_send |
| Agentic | agent (see Agents), squad (see Squads) |
Every task type accepts {{variable}} templates in its config.
How a task works
A task is a single node in the workflow's nodes array. The processor only executes nodes whose
type is the literal string "task"; the specific task type lives in data.task_type. Each
node carries its config and, for AI tasks, declares its I/O via input_schema / output_schema.
{
"id": "extract-resume",
"type": "task",
"position": { "x": 100, "y": 200 },
"data": {
"label": "Extract Resume Fields",
"task_type": "extraction_task",
"config": {
"output_format": "json",
"prompt_id": 42
},
"output_mapping": {
"candidate_name": "$.extraction_result.name",
"candidate_skills": "$.extraction_result.skills"
}
}
}In the editor a node's type is the specific task type (e.g. extraction_task); before
execution it is transformed to the generic type: "task" with data.task_type set, and the
workflow definition must include a top-level input_schema. The visual builder does this for
you — you only hand-write it when posting a definition directly. See
Workflows → the workflow definition.
Data flow: {{variable}} and output_mapping
Two complementary mechanisms move data between nodes:
{{variable}}templates pull a value into a node's config. Variables come from the trigger payload (inputs), workflow context (user_id,execution_id), or any upstream node's output. Example:"url": "https://api.example.com/roles/{{role_id}}".output_mappingnames values out of a node. Each entry isvariable_name → JSONPathevaluated against the node's raw result, promoting a nested field to a top-level workflow variable that downstream{{variable}}templates anddecisionoperands can reference.
trigger inputs { "role_id": 7, "resume_file_id": "1ABC" }
│
▼ uses {{role_id}} output_mapping → {{jd_file_id}}
node rest_api_task ───────────────────────────────────────────────────────┐
│ ▼
node google_drive_fetch_task uses {{jd_file_id}} → emits {{document_id}}, {{file_url}}
│
node extraction_task → output_mapping → {{candidate_name}}, {{candidate_skills}}JSONPath in output_mapping and decision operands supports $, $.field, $.a.b,
$.items[0], and $.items[0].field.
AI operations
All AI tasks take a model_type (vertex default, plus openai, anthropic). Document tasks
read a file_url (a GCS URL, usually emitted by an upstream fetch or the trigger's file upload).
classification
Label a document (document_type, sentiment, or topic) with a confidence score.
{
"task_type": "classification_task",
"config": {
"classification_type": "document_type",
"include_confidence": true,
"return_multiple": false
}
}Output — produces a classification object plus a confidence:
{
"prompt_title": "Resume Classification",
"section": "HR Documents",
"confidence": 0.95,
"classification": { "type": "resume", "subtype": "technical" }
}extraction
Pull structured fields from a document. output_format is json or text; point at a saved
prompt with prompt_id, or set use_classification_prompt: true to reuse the prompt chosen by an
upstream classification.
{
"task_type": "extraction_task",
"config": {
"prompt_id": 42,
"model_type": "vertex",
"output_format": "json"
}
}Output — fields land under extraction_result:
{
"extraction_result": {
"name": "John Doe",
"email": "john@example.com",
"skills": ["Python", "Machine Learning"],
"experience_years": 5
},
"confidence": 0.92,
"tokens_used": 1250
}llm
Free-form LLM processing. Supply a saved prompt_id or inline prompt_text; feed it a document
(file_url), prior outputs (process_combined_inputs: true), or direct text_input.
{
"task_type": "llm_task",
"config": {
"prompt_text": "Compare this resume to the job description and return a match_score 0-100 and a one-line rationale.",
"model_type": "vertex",
"process_combined_inputs": true,
"output_format": "json"
}
}Output — the model's payload under result:
{ "result": { "match_score": 87, "rationale": "Strong Python + ML overlap" }, "tokens_used": 850 }summarization
summary_type is brief, detailed, or bullet_points, capped by max_length (words).
{
"task_type": "summarization_task",
"config": { "summary_type": "brief", "max_length": 500 }
}Output:
{ "summary": "Senior software engineer with 5 years in Python and ML...", "word_count": 150 }classify_and_extract
Classify, then extract with the matched prompt, in one step.
{
"task_type": "classify_and_extract",
"config": { "classification_type": "document_type", "extract_after_classify": true }
}Output combines both stages:
{
"classification": { "type": "resume", "confidence": 0.95 },
"extraction_result": { "name": "John Doe", "skills": ["Python", "ML"] }
}rag_query
Query the knowledge base. (rag_enable indexes a document first — see RAG.)
{
"task_type": "rag_query_task",
"config": {
"body": { "query": "{{question}}", "user_id": "{{user_id}}", "top_k": 5 }
}
}Output — a grounded answer plus the sources it cites:
{
"answer": "According to the Remote Work Policy, employees may work remotely up to 3 days per week...",
"sources": [
{
"document_id": 45,
"document_name": "Remote Work Policy.pdf",
"snippet": "...employees may work from home up to three days...",
"score": 0.92
}
],
"session_id": "sess-abc123"
}Choosing an AI task type
| If you need to… | Use | Why |
|---|---|---|
| Route by a single label (type / sentiment / topic) | classification | Returns one label + confidence; pair with a decision. |
| Pull known fields into a fixed shape | extraction | Schema-driven, deterministic field names under extraction_result. |
| Do both in one node | classify_and_extract | Label, then extract with the matched prompt. |
| Anything open-ended over text (compare, score, rewrite, reason) | llm | Free-form prompt; you define the output shape. |
| Answer from a corpus you've indexed | rag_query | Grounded answer with cited sources. |
| Multi-step reasoning that picks its own tools | agent | Autonomy + tool use — see Agents. |
Rule of thumb: reach for classification/extraction when the output shape is fixed, llm when
it isn't, and an agent only when the task needs to decide what to do rather than just process
its input.
Control flow
decision
Evaluate one condition and route execution down a true or false edge — deterministic branching,
no LLM call.
{
"task_type": "decision",
"config": {
"operator": "==",
"left_operand": "$.classification.type",
"right_operand": "Invoice",
"true_label": "true",
"false_label": "false"
}
}| Operator | Meaning | Example |
|---|---|---|
== / != | (Not) equals | $.type == "Invoice" |
> < >= <= | Numeric comparison | $.amount > 1000 |
in | Left value is in right (array/string) | "pdf" in $.allowed_types |
contains | Right value is in left (array/string) | $.tags contains "urgent" |
matches | Regex match | $.email matches ".*@company.com" |
Operands are JSONPath expressions or literals. A decision node must have exactly two outgoing
edges, labeled true and false. Nodes on the inactive path get status: "skipped"; paths can
merge back downstream. Its output records the branch taken:
{ "result": true, "branch": "true", "active_path": "true",
"condition": { "operator": "==", "left_value": "Invoice", "right_value": "Invoice" } }Phase 1 is binary, single-comparison, and acyclic — no AND/OR/NOT and no loops. Chain
several decision nodes for compound logic.
wait
Poll an endpoint until a watch_field reaches success_value — the generic async wait primitive
(e.g. waiting on RAG indexing). It blocks the workflow until success, a failure_values match, or
timeout.
{
"task_type": "wait_task",
"config": {
"endpoint": "/api/internal/documents/{{document_id}}/rag-status",
"method": "GET",
"watch_field": "rag_processing_status",
"success_value": "completed",
"failure_values": ["failed", "error"],
"poll_interval": 5,
"timeout": 300
}
}Output:
{ "status": "success", "final_value": "completed", "response": { }, "elapsed_seconds": 45 }Use wait for async steps inside one workflow run. To watch the whole workflow from outside, poll
the execution status from your own code instead (see the tabbed example).
for_each
Iterate over an array and run a sub-workflow per item. items is a {{variable}} pointing at the
array; item_variable names the current element inside sub_tasks. mode is sequential or
parallel (bounded by max_parallel, default 10). accumulate rolls per-item values back up into
workflow variables.
{
"task_type": "for_each",
"config": {
"items": "{{employees}}",
"item_variable": "employee",
"mode": "parallel",
"max_parallel": 5,
"sub_tasks": [
{
"task_type": "llm_task",
"config": { "prompt_text": "Write a one-line reminder for {{employee.name}}." }
},
{
"task_type": "email_send_task",
"config": { "to": ["{{employee.email}}"], "subject": "Reminder", "body": "{{result}}" }
}
],
"accumulate": [{ "as": "sent_summary", "type": "join", "delimiter": "\n" }]
}
}Output — per-iteration results plus counts (and any accumulators as top-level variables):
{ "iteration_results": [ { "index": 0, "status": "success" } ],
"count": 12, "success_count": 12, "error_count": 0, "mode": "parallel" }Integrations
Short config sketches below; the deep dives live in the REST integration guide and the event-driven guide, and the concepts in Integrations.
rest_api
Call any HTTP API with templated url / headers / body, a timeout, and output_mapping
(JSONPath → variable) to lift fields out of the response.
{
"task_type": "rest_api_task",
"config": {
"url": "https://api.example.com/roles/{{role_id}}",
"method": "GET",
"headers": { "Authorization": "Bearer {{api_token}}" },
"timeout": 30,
"output_mapping": {
"jd_file_id": "$.data.jd_file_id",
"hiring_manager_email": "$.data.hiring_manager_email"
}
}
}The raw output exposes data, status_code, and _raw_response; the mapped names
(jd_file_id, …) become first-class workflow variables.
email_send
Send templated HTML email. to and subject are required; cc / bcc / from / html optional.
{
"task_type": "email_send_task",
"config": {
"to": ["{{hiring_manager_email}}"],
"cc": ["hr@company.com"],
"subject": "New Application: {{candidate_name}}",
"body": "<h2>Match Score: {{match_score}}%</h2><p>{{ai_summary}}</p>",
"html": true
}
}Output: { "sent": true, "recipients": [...], "timestamp": "..." }.
google_drive_fetch
Fetch a Drive file by file_id, or search a folder_id for a file_name; imports to storage and
returns a file_url + document_id you can feed straight into an AI task.
{
"task_type": "google_drive_fetch_task",
"config": { "file_id": "{{resume_file_id}}" }
}Output: { "file_url": "gs://turfdms/documents/123/file.pdf", "document_id": 123, "mime_type": "application/pdf", "size": 245789 }.
A realistic call: extraction end-to-end
Trigger a workflow whose first node is an extraction_task, then read that task's output from the
execution status. The three tabs are equivalent — base URL https://apisandbox.turfai.in/api.
BASE="https://apisandbox.turfai.in/api"
# 1. Create an execution bound to the saved activity, then start it.
EXEC=$(curl -s -X POST "$BASE/workflow-executions" \
-H "Authorization: Bearer $TURFAI_JWT" \
-H "Content-Type: application/json" \
-d '{ "data": { "activity": 42, "inputs": { "file_url": "gs://bucket/resume.pdf" } } }' \
| python3 -c 'import sys,json;print(json.load(sys.stdin)["data"]["id"])')
curl -X POST "$BASE/workflow-executions/$EXEC/execute" \
-H "Authorization: Bearer $TURFAI_JWT" \
-H "Content-Type: application/json" \
-d '{ "inputs": { "file_url": "gs://bucket/resume.pdf" } }'
# 2. Poll status; the extraction task's output is under results.extraction_result.
curl -s "$BASE/workflow-executions/$EXEC/status" \
-H "Authorization: Bearer $TURFAI_JWT"import os, time, requests
BASE = "https://apisandbox.turfai.in/api"
HEAD = {"Authorization": f"Bearer {os.environ['TURFAI_JWT']}", "Content-Type": "application/json"}
def run_extraction(activity_id: int, inputs: dict) -> dict:
created = requests.post(f"{BASE}/workflow-executions", headers=HEAD,
json={"data": {"activity": activity_id, "inputs": inputs}})
created.raise_for_status()
exec_id = created.json()["data"]["id"]
requests.post(f"{BASE}/workflow-executions/{exec_id}/execute",
headers=HEAD, json={"inputs": inputs}).raise_for_status()
for _ in range(30):
data = requests.get(f"{BASE}/workflow-executions/{exec_id}/status",
headers=HEAD).json()["data"]
if data["status"] == "completed":
return data["results"]["extraction_result"] # the task's output
if data["status"] == "failed":
raise RuntimeError(data.get("error"))
time.sleep(2)
raise TimeoutError("polling timed out")
print(run_extraction(42, {"file_url": "gs://bucket/resume.pdf"}))const BASE = "https://apisandbox.turfai.in/api";
const HEAD = {
Authorization: `Bearer ${process.env.TURFAI_JWT}`,
"Content-Type": "application/json",
};
export async function runExtraction(activityId: number, inputs: object) {
const created = await fetch(`${BASE}/workflow-executions`, {
method: "POST", headers: HEAD,
body: JSON.stringify({ data: { activity: activityId, inputs } }),
});
if (!created.ok) throw new Error(`create exec failed: ${created.status}`);
const execId = (await created.json()).data.id;
await fetch(`${BASE}/workflow-executions/${execId}/execute`, {
method: "POST", headers: HEAD, body: JSON.stringify({ inputs }),
});
for (let i = 0; i < 30; i++) {
const res = await fetch(`${BASE}/workflow-executions/${execId}/status`, { headers: HEAD });
const { data } = await res.json();
if (data.status === "completed") return data.results.extraction_result; // the task's output
if (data.status === "failed") throw new Error(data.error);
await new Promise((r) => setTimeout(r, 2000));
}
throw new Error("polling timed out");
}A completed status carries the per-node results under results, keyed by what each node produced —
so the extraction_task above surfaces as results.extraction_result.
Error handling & retries
A non-2xx from a rest_api call, an AI processing failure, or any unhandled error fails the node.
When that happens the execution status becomes failed and names the offending node:
{
"status": "failed",
"error": "REST API call failed: 404 Not Found",
"failed_node": "fetch-role-config",
"task_states": {
"fetch-role-config": { "status": "failed", "error": "404 Not Found" }
}
}Design for failure rather than assuming every step succeeds:
-
Retries. Set a per-node
max_retriesto retry transient/retryable errors with backoff before the node is marked failed. It resolves through a tier chain (per-node → per-workflow → tenant → defaults), so a node value overrides the workflow- and tenant-level defaults.{ "data": { "task_type": "rest_api_task", "config": { "max_retries": 3, "url": "..." } } } -
Decision-gate on failure. Capture a status with
output_mapping, then branch on it with adecisionso a soft failure routes to a fallback path instead of aborting the run. -
timeouts. Giverest_apiandwaitrealistictimeouts. Awaitthat times out returns a failure withfinal_valueandelapsed_secondsrather than hanging.{ "status": "failure", "final_value": "processing", "elapsed_seconds": 300, "error": "Timeout waiting for completion" } -
waitvs application polling. Use awaitnode when one workflow run must block on an async step. Poll the execution status from your own code (as in the tabs above) to track the whole run from outside.
See also
- Workflows — the DAG, triggers, variables, and execution lifecycle.
- Agents and Squads — when a step should reason and pick its own tools.
- RAG — indexing with
rag_enableand querying withrag_query. - Task types reference and the API reference — the exhaustive, synced per-type schemas.