Event-driven automation
Trigger a workflow automatically when a file lands in a Google Drive folder.
What you'll build
A pipeline that fires every time a file is added to a watched Google Drive folder: TurfAI detects the new file, threads its details into a workflow, and the workflow fetches the file, extracts structured data, and emails the result — no manual trigger, no polling on your side.
You'll create the Drive event source, subscribe a workflow to it, see the exact event variables that flow into your nodes, and monitor and debug the whole thing. Read Event Bus first for the source → event → subscription → dispatch model.
Prerequisites
- Connected Google access (delegated OAuth) for the user who owns the source. Confirm with
GET /google/oauth/status→{ "connected": true }. See Authentication. - The Drive folder ID you want to watch (the last path segment of the folder URL).
- A workflow (activity) that processes a file — created in the builder or via API. See Workflows & activities. We build a complete one below.
- A TurfAI JWT and the base URL.
export TURFAI_JWT="eyJhbGci…"
export BASE="https://apisandbox.turfai.in/api"The flow
1. Create the Drive event source
TurfAI reads the owner's Google tokens from their connected integration, so you pass the folder — not credentials. The source records a Drive changes watch (channel expires ≤24h, renewed automatically).
curl -X POST "$BASE/event-bus/sources" \
-H "Authorization: Bearer $TURFAI_JWT" \
-H "Content-Type: application/json" \
-d '{
"source_type": "google_drive",
"folder_id": "1AbCfolderId",
"folder_name": "Invoices"
}'import os, requests
base = os.environ["BASE"]
headers = {"Authorization": f"Bearer {os.environ['TURFAI_JWT']}"}
r = requests.post(
f"{base}/event-bus/sources",
headers=headers,
json={
"source_type": "google_drive",
"folder_id": "1AbCfolderId",
"folder_name": "Invoices",
},
)
source = r.json()["data"]
print(source["id"], source["is_active"]) # save the source idconst base = process.env.BASE!;
const headers = {
Authorization: `Bearer ${process.env.TURFAI_JWT}`,
"Content-Type": "application/json",
};
const res = await fetch(`${base}/event-bus/sources`, {
method: "POST",
headers,
body: JSON.stringify({
source_type: "google_drive",
folder_id: "1AbCfolderId",
folder_name: "Invoices",
}),
});
const { data: source } = await res.json();
console.log(source.id, source.is_active); // save the source id{
"success": true,
"data": {
"id": 12,
"name": "Invoices",
"source_type": "google_drive",
"drive_item_id": "1AbCfolderId",
"drive_item_name": "Invoices",
"is_active": true,
"watch_expiry": "2026-06-21T10:00:00.000Z"
}
}2. Activate the source (if pending)
If the server's public HTTPS URL wasn't configured at creation time, the source returns
is_active: false. Activate it to register the real Drive push channel. (If step 1 already
returned is_active: true, skip this.)
curl -X POST "$BASE/event-bus/sources/12/activate" \
-H "Authorization: Bearer $TURFAI_JWT"r = requests.post(f"{base}/event-bus/sources/{source['id']}/activate", headers=headers)
print(r.json()["data"]["is_active"]) # True once the watch is liveconst act = await fetch(`${base}/event-bus/sources/${source.id}/activate`, {
method: "POST",
headers,
});
console.log((await act.json()).data.is_active); // true once the watch is live3. Subscribe a workflow to the source
The source only watches. A subscription is what fires a workflow — it links the source to
an activity, picks the event_types, and can filter the payload (e.g. PDFs only). Create one
against the standard event-subscriptions collection.
curl -X POST "$BASE/event-subscriptions" \
-H "Authorization: Bearer $TURFAI_JWT" \
-H "Content-Type: application/json" \
-d '{
"data": {
"name": "Process new invoices",
"event_source": 12,
"activity": 123,
"event_types": ["file_created"],
"filter": { "mime_type": "application/pdf" }
}
}'r = requests.post(
f"{base}/event-subscriptions",
headers=headers,
json={
"data": {
"name": "Process new invoices",
"event_source": source["id"],
"activity": 123, # your workflow/activity id
"event_types": ["file_created"],
"filter": {"mime_type": "application/pdf"},
}
},
)
print(r.json()["data"]["id"])const sub = await fetch(`${base}/event-subscriptions`, {
method: "POST",
headers,
body: JSON.stringify({
data: {
name: "Process new invoices",
event_source: source.id,
activity: 123, // your workflow/activity id
event_types: ["file_created"],
filter: { mime_type: "application/pdf" },
},
}),
});
console.log((await sub.json()).data.id);Use event_types: ["*"] to catch every event type, and omit filter to accept all files. The
filter matches against the event payload — mime_type, name (supports * wildcards), etc.
4. The workflow that consumes the event
Here's a complete workflow_definition for the activity referenced above. Each node reads the
event variables that the bus injected as inputs — {{file_id}}, {{file_name}}, {{folder_name}}
— and passes its output to the next node. Fetch the new file, extract its fields, then email the
result.
{
"nodes": [
{
"id": "fetch",
"type": "google_drive_fetch_task",
"data": {
"label": "Fetch new file",
"task_type": "google_drive_fetch_task",
"config": { "file_id": "{{file_id}}" }
}
},
{
"id": "extract",
"type": "extraction_task",
"data": {
"label": "Extract invoice fields",
"task_type": "extraction_task",
"config": {
"source": "{{fetch.text}}",
"schema": {
"invoice_number": "string",
"vendor": "string",
"total_amount": "number",
"due_date": "string"
}
}
}
},
{
"id": "notify",
"type": "email_send_task",
"data": {
"label": "Email the summary",
"task_type": "email_send_task",
"config": {
"to": "ap@yourco.com",
"subject": "New invoice in {{folder_name}}: {{file_name}}",
"body": "Vendor: {{extract.vendor}}\nInvoice #: {{extract.invoice_number}}\nTotal: {{extract.total_amount}}\nDue: {{extract.due_date}}\nFile: {{file_web_link}}"
}
}
}
],
"edges": [
{ "source": "fetch", "target": "extract" },
{ "source": "extract", "target": "notify" }
]
}How the variables thread through:
- The event payload lands directly in workflow inputs, so
{{file_id}},{{file_name}},{{file_mime_type}},{{file_web_link}},{{folder_id}}, and{{folder_name}}are available to every node from the start, alongside{{trigger_type}}("event") and{{event_type}}. {{fetch.text}}and{{extract.FIELD}}reference the outputs of earlier nodes by id.- TurfAI has already downloaded the file into a DMS document by the time your workflow runs —
{{file_document_id}}is available too, if your task type prefers a document id over a Drive fetch.
See Workflows & activities for the task-type catalog and how to create the activity that holds this definition.
5. Monitor
List sources, read the event log (the audit trail), and check aggregate stats.
# sources and their status
curl "$BASE/event-bus/sources" -H "Authorization: Bearer $TURFAI_JWT"
# recent events for one source
curl "$BASE/event-bus/logs?limit=20&source_id=12" -H "Authorization: Bearer $TURFAI_JWT"
# aggregate stats
curl "$BASE/event-bus/stats" -H "Authorization: Bearer $TURFAI_JWT"sources = requests.get(f"{base}/event-bus/sources", headers=headers).json()["data"]
logs = requests.get(
f"{base}/event-bus/logs",
headers=headers,
params={"limit": 20, "source_id": 12},
).json()
stats = requests.get(f"{base}/event-bus/stats", headers=headers).json()["data"]
print(stats["active_sources"], len(logs["data"]))const sources = await (await fetch(`${base}/event-bus/sources`, { headers })).json();
const logs = await (
await fetch(`${base}/event-bus/logs?limit=20&source_id=12`, { headers })
).json();
const stats = await (await fetch(`${base}/event-bus/stats`, { headers })).json();
console.log(stats.data.active_sources, logs.data.length);A GET /event-bus/logs response — each row records the event, how many subscriptions matched, and
which workflow executions it triggered:
{
"success": true,
"data": [
{
"id": 501,
"event_id": "evt_a1b2c3d4",
"source_type": "google_drive",
"event_type": "file_created",
"event_source": { "id": 12, "drive_item_name": "Invoices" },
"subscriptions_matched": 1,
"workflows_triggered": [{ "subscription_id": 7, "execution_id": 456 }],
"status": "processed",
"received_at": "2026-06-20T10:00:00.000Z",
"processed_at": "2026-06-20T10:00:01.200Z"
}
],
"pagination": { "total": 1, "limit": 20, "offset": 0 }
}GET /event-bus/stats returns live counts and the latest events:
{
"success": true,
"data": {
"active_sources": 2,
"active_subscriptions": 3,
"recent_events": [{ "event_id": "evt_a1b2c3d4", "event_type": "file_created", "status": "processed" }]
}
}Follow the execution_id from a log row to the run with
GET /workflow-executions/:id (see the synced
webhook integration guide
for polling). Each event-log status means:
| status | meaning |
|---|---|
processing | event received, matching/dispatch in progress |
processed | at least one subscription matched and a workflow was queued |
ignored | no active subscription matched (often the cause of "nothing happened") |
failed | subscription matching or dispatch errored — see error_message |
6. Troubleshooting
The workflow didn't trigger. Check, in order:
- Is there a subscription? A source alone never fires a workflow. Confirm a row in
GET /event-subscriptionswith the rightevent_source,activity, andis_active: true. - Did an event even arrive? Look in
GET /event-bus/logs?source_id=ID. No rows → Google isn't pushing (see watch issues below). A row withstatus: "ignored"→ the event arrived but no subscription matched. - Filter too strict.
subscriptions_matched: 0with the right subscription usually means thefilterorevent_typesexcluded the event. Drive emitsfile_created; amime_typefilter must match the file (note Google may report Office files as their converted type). Loosen toevent_types: ["*"]and removefilterto confirm. - OAuth. Re-check
GET /google/oauth/status. If the owner's Google access was revoked, the source can't callchanges.list(); the source'serror_messagewill say so.
Watch registration failed. Creating a source needs a connected Google account
("No Google credentials found" means connect Google first) and a public HTTPS URL —
Google refuses to push to localhost/http://. In that case the source is created with
is_active: false; once HTTPS is configured, call
POST /event-bus/sources/:id/activate. A 403/domain error from Google means the push domain
isn't verified.
Watch expired. Channels last ≤24h and renew automatically ~2h before expiry. If renewal
fails the source flips to is_active: false with an error_message and stops firing — delete and
recreate (or re-activate) it, and check watch_expiry on GET /event-bus/sources.
Duplicate events / idempotency. Google sends multiple notifications per upload. TurfAI
settles each file (waits for version/size to stabilize) and de-duplicates per
(file_id, source), so you normally get exactly one file_created event per file. If you
still see a file_modified after a file_created, that's a genuine later edit of the same file —
make your workflow idempotent on {{file_id}} (e.g. upsert keyed on it) rather than assuming each
run is a brand-new document.
Roadmap
Beyond Drive, a Gmail (email) source is available in staging — create it with
source_type: "email", an email_address, and optional label_ids (defaults to ["INBOX"]);
it publishes message events through the same bus, subscriptions, and logs. Slack and other
sources are on the roadmap and will reuse the identical source → subscription → dispatch model, so
the workflow and subscription code above carries over unchanged.
Related
- Event Bus — the model and watch lifecycle.
- Human-in-the-loop — add an approval step before the workflow acts.
- Webhook integration — the push alternative for non-Google sources, plus execution polling.