TurfAITurfAI Developers
Guides

Event-driven automation

Trigger a workflow automatically when a file lands in a Google Drive folder.

What you'll build

A pipeline that fires every time a file is added to a watched Google Drive folder: TurfAI detects the new file, threads its details into a workflow, and the workflow fetches the file, extracts structured data, and emails the result — no manual trigger, no polling on your side.

You'll create the Drive event source, subscribe a workflow to it, see the exact event variables that flow into your nodes, and monitor and debug the whole thing. Read Event Bus first for the source → event → subscription → dispatch model.

Prerequisites

  • Connected Google access (delegated OAuth) for the user who owns the source. Confirm with GET /google/oauth/status{ "connected": true }. See Authentication.
  • The Drive folder ID you want to watch (the last path segment of the folder URL).
  • A workflow (activity) that processes a file — created in the builder or via API. See Workflows & activities. We build a complete one below.
  • A TurfAI JWT and the base URL.
export TURFAI_JWT="eyJhbGci…"
export BASE="https://apisandbox.turfai.in/api"

The flow

1. Create the Drive event source

TurfAI reads the owner's Google tokens from their connected integration, so you pass the folder — not credentials. The source records a Drive changes watch (channel expires ≤24h, renewed automatically).

curl -X POST "$BASE/event-bus/sources" \
  -H "Authorization: Bearer $TURFAI_JWT" \
  -H "Content-Type: application/json" \
  -d '{
    "source_type": "google_drive",
    "folder_id": "1AbCfolderId",
    "folder_name": "Invoices"
  }'
import os, requests

base = os.environ["BASE"]
headers = {"Authorization": f"Bearer {os.environ['TURFAI_JWT']}"}

r = requests.post(
    f"{base}/event-bus/sources",
    headers=headers,
    json={
        "source_type": "google_drive",
        "folder_id": "1AbCfolderId",
        "folder_name": "Invoices",
    },
)
source = r.json()["data"]
print(source["id"], source["is_active"])  # save the source id
const base = process.env.BASE!;
const headers = {
  Authorization: `Bearer ${process.env.TURFAI_JWT}`,
  "Content-Type": "application/json",
};

const res = await fetch(`${base}/event-bus/sources`, {
  method: "POST",
  headers,
  body: JSON.stringify({
    source_type: "google_drive",
    folder_id: "1AbCfolderId",
    folder_name: "Invoices",
  }),
});
const { data: source } = await res.json();
console.log(source.id, source.is_active); // save the source id
{
  "success": true,
  "data": {
    "id": 12,
    "name": "Invoices",
    "source_type": "google_drive",
    "drive_item_id": "1AbCfolderId",
    "drive_item_name": "Invoices",
    "is_active": true,
    "watch_expiry": "2026-06-21T10:00:00.000Z"
  }
}

2. Activate the source (if pending)

If the server's public HTTPS URL wasn't configured at creation time, the source returns is_active: false. Activate it to register the real Drive push channel. (If step 1 already returned is_active: true, skip this.)

curl -X POST "$BASE/event-bus/sources/12/activate" \
  -H "Authorization: Bearer $TURFAI_JWT"
r = requests.post(f"{base}/event-bus/sources/{source['id']}/activate", headers=headers)
print(r.json()["data"]["is_active"])  # True once the watch is live
const act = await fetch(`${base}/event-bus/sources/${source.id}/activate`, {
  method: "POST",
  headers,
});
console.log((await act.json()).data.is_active); // true once the watch is live

3. Subscribe a workflow to the source

The source only watches. A subscription is what fires a workflow — it links the source to an activity, picks the event_types, and can filter the payload (e.g. PDFs only). Create one against the standard event-subscriptions collection.

curl -X POST "$BASE/event-subscriptions" \
  -H "Authorization: Bearer $TURFAI_JWT" \
  -H "Content-Type: application/json" \
  -d '{
    "data": {
      "name": "Process new invoices",
      "event_source": 12,
      "activity": 123,
      "event_types": ["file_created"],
      "filter": { "mime_type": "application/pdf" }
    }
  }'
r = requests.post(
    f"{base}/event-subscriptions",
    headers=headers,
    json={
        "data": {
            "name": "Process new invoices",
            "event_source": source["id"],
            "activity": 123,                       # your workflow/activity id
            "event_types": ["file_created"],
            "filter": {"mime_type": "application/pdf"},
        }
    },
)
print(r.json()["data"]["id"])
const sub = await fetch(`${base}/event-subscriptions`, {
  method: "POST",
  headers,
  body: JSON.stringify({
    data: {
      name: "Process new invoices",
      event_source: source.id,
      activity: 123,                          // your workflow/activity id
      event_types: ["file_created"],
      filter: { mime_type: "application/pdf" },
    },
  }),
});
console.log((await sub.json()).data.id);

Use event_types: ["*"] to catch every event type, and omit filter to accept all files. The filter matches against the event payload — mime_type, name (supports * wildcards), etc.

4. The workflow that consumes the event

Here's a complete workflow_definition for the activity referenced above. Each node reads the event variables that the bus injected as inputs — {{file_id}}, {{file_name}}, {{folder_name}} — and passes its output to the next node. Fetch the new file, extract its fields, then email the result.

{
  "nodes": [
    {
      "id": "fetch",
      "type": "google_drive_fetch_task",
      "data": {
        "label": "Fetch new file",
        "task_type": "google_drive_fetch_task",
        "config": { "file_id": "{{file_id}}" }
      }
    },
    {
      "id": "extract",
      "type": "extraction_task",
      "data": {
        "label": "Extract invoice fields",
        "task_type": "extraction_task",
        "config": {
          "source": "{{fetch.text}}",
          "schema": {
            "invoice_number": "string",
            "vendor": "string",
            "total_amount": "number",
            "due_date": "string"
          }
        }
      }
    },
    {
      "id": "notify",
      "type": "email_send_task",
      "data": {
        "label": "Email the summary",
        "task_type": "email_send_task",
        "config": {
          "to": "ap@yourco.com",
          "subject": "New invoice in {{folder_name}}: {{file_name}}",
          "body": "Vendor: {{extract.vendor}}\nInvoice #: {{extract.invoice_number}}\nTotal: {{extract.total_amount}}\nDue: {{extract.due_date}}\nFile: {{file_web_link}}"
        }
      }
    }
  ],
  "edges": [
    { "source": "fetch", "target": "extract" },
    { "source": "extract", "target": "notify" }
  ]
}

How the variables thread through:

  • The event payload lands directly in workflow inputs, so {{file_id}}, {{file_name}}, {{file_mime_type}}, {{file_web_link}}, {{folder_id}}, and {{folder_name}} are available to every node from the start, alongside {{trigger_type}} ("event") and {{event_type}}.
  • {{fetch.text}} and {{extract.FIELD}} reference the outputs of earlier nodes by id.
  • TurfAI has already downloaded the file into a DMS document by the time your workflow runs — {{file_document_id}} is available too, if your task type prefers a document id over a Drive fetch.

See Workflows & activities for the task-type catalog and how to create the activity that holds this definition.

5. Monitor

List sources, read the event log (the audit trail), and check aggregate stats.

# sources and their status
curl "$BASE/event-bus/sources" -H "Authorization: Bearer $TURFAI_JWT"

# recent events for one source
curl "$BASE/event-bus/logs?limit=20&source_id=12" -H "Authorization: Bearer $TURFAI_JWT"

# aggregate stats
curl "$BASE/event-bus/stats" -H "Authorization: Bearer $TURFAI_JWT"
sources = requests.get(f"{base}/event-bus/sources", headers=headers).json()["data"]
logs = requests.get(
    f"{base}/event-bus/logs",
    headers=headers,
    params={"limit": 20, "source_id": 12},
).json()
stats = requests.get(f"{base}/event-bus/stats", headers=headers).json()["data"]
print(stats["active_sources"], len(logs["data"]))
const sources = await (await fetch(`${base}/event-bus/sources`, { headers })).json();
const logs = await (
  await fetch(`${base}/event-bus/logs?limit=20&source_id=12`, { headers })
).json();
const stats = await (await fetch(`${base}/event-bus/stats`, { headers })).json();
console.log(stats.data.active_sources, logs.data.length);

A GET /event-bus/logs response — each row records the event, how many subscriptions matched, and which workflow executions it triggered:

{
  "success": true,
  "data": [
    {
      "id": 501,
      "event_id": "evt_a1b2c3d4",
      "source_type": "google_drive",
      "event_type": "file_created",
      "event_source": { "id": 12, "drive_item_name": "Invoices" },
      "subscriptions_matched": 1,
      "workflows_triggered": [{ "subscription_id": 7, "execution_id": 456 }],
      "status": "processed",
      "received_at": "2026-06-20T10:00:00.000Z",
      "processed_at": "2026-06-20T10:00:01.200Z"
    }
  ],
  "pagination": { "total": 1, "limit": 20, "offset": 0 }
}

GET /event-bus/stats returns live counts and the latest events:

{
  "success": true,
  "data": {
    "active_sources": 2,
    "active_subscriptions": 3,
    "recent_events": [{ "event_id": "evt_a1b2c3d4", "event_type": "file_created", "status": "processed" }]
  }
}

Follow the execution_id from a log row to the run with GET /workflow-executions/:id (see the synced webhook integration guide for polling). Each event-log status means:

statusmeaning
processingevent received, matching/dispatch in progress
processedat least one subscription matched and a workflow was queued
ignoredno active subscription matched (often the cause of "nothing happened")
failedsubscription matching or dispatch errored — see error_message

6. Troubleshooting

The workflow didn't trigger. Check, in order:

  1. Is there a subscription? A source alone never fires a workflow. Confirm a row in GET /event-subscriptions with the right event_source, activity, and is_active: true.
  2. Did an event even arrive? Look in GET /event-bus/logs?source_id=ID. No rows → Google isn't pushing (see watch issues below). A row with status: "ignored" → the event arrived but no subscription matched.
  3. Filter too strict. subscriptions_matched: 0 with the right subscription usually means the filter or event_types excluded the event. Drive emits file_created; a mime_type filter must match the file (note Google may report Office files as their converted type). Loosen to event_types: ["*"] and remove filter to confirm.
  4. OAuth. Re-check GET /google/oauth/status. If the owner's Google access was revoked, the source can't call changes.list(); the source's error_message will say so.

Watch registration failed. Creating a source needs a connected Google account ("No Google credentials found" means connect Google first) and a public HTTPS URL — Google refuses to push to localhost/http://. In that case the source is created with is_active: false; once HTTPS is configured, call POST /event-bus/sources/:id/activate. A 403/domain error from Google means the push domain isn't verified.

Watch expired. Channels last ≤24h and renew automatically ~2h before expiry. If renewal fails the source flips to is_active: false with an error_message and stops firing — delete and recreate (or re-activate) it, and check watch_expiry on GET /event-bus/sources.

Duplicate events / idempotency. Google sends multiple notifications per upload. TurfAI settles each file (waits for version/size to stabilize) and de-duplicates per (file_id, source), so you normally get exactly one file_created event per file. If you still see a file_modified after a file_created, that's a genuine later edit of the same file — make your workflow idempotent on {{file_id}} (e.g. upsert keyed on it) rather than assuming each run is a brand-new document.

Roadmap

Beyond Drive, a Gmail (email) source is available in staging — create it with source_type: "email", an email_address, and optional label_ids (defaults to ["INBOX"]); it publishes message events through the same bus, subscriptions, and logs. Slack and other sources are on the roadmap and will reuse the identical source → subscription → dispatch model, so the workflow and subscription code above carries over unchanged.

On this page