Using Google Cloud Workflows for ETL orchestration

Using Google Cloud Workflows for ETL orchestration

Using Google Cloud Workflows for ETL orchestration

Serverless data pipelines with Google Cloud Workflows, Cloud Run Functions, and dbt Cloud. How to build a fully orchestrated, observable pipeline with no infrastructure to manage.

google cloud workflows + dbt

Most teams building data pipelines default to what they know: a VM running cron jobs, an Airflow cluster someone set up two years ago, or a tangle of scripts that only one engineer fully understands. It works until it doesn't — and when it breaks, nobody knows why.

There's a cleaner approach. One that gives you orchestration, error handling, scheduling, and observability — without managing any infrastructure. This post walks through how we build serverless data pipelines on Google Cloud using Cloud Run Functions, Google Cloud Workflows, and dbt Cloud, and why this stack holds up in production.


The Stack at a Glance

Before the detail, here's what this architecture looks like end to end:




Four components. Zero servers to manage. Every execution logged. Every failure alerted.


Part 1 — Ingestion: Cloud Run Functions

The pipeline starts with two Cloud Run Functions — one per data domain. Both are HTTP-triggered, stateless, and use a watermark pattern to handle incremental syncing without re-pulling data that hasn't changed.


How the watermark pattern works

Each function reads the maximum timestamp from its target BigQuery table before calling the API. That timestamp becomes the "from" parameter for the next API call. If the table is empty, it falls back to a sensible default date.

This matters because APIs don't tell you what changed since your last run — you have to track that yourself. The watermark is how you do it without duplicating records or missing updates.


Clients function (call_clients):


# Read the watermark from BigQuery
watermark_query = """
    SELECT COALESCE(MAX(LastModifiedDateTime), DATE('2023-01-01'))
    FROM `project.dataset.clients`
"""
watermark = bq_client.query(watermark_query).result()

# Authenticate and pull records modified since the watermark
token = get_bearer_token()
records = paginate_api(endpoint="/clients", since=watermark, limit=500)

# Filter to schema, insert into BigQuery
cleaned = [filter_to_schema(r, BQ_SCHEMA) for r in records]
bq_client.insert_rows_json("project.dataset.clients", cleaned)
# Read the watermark from BigQuery
watermark_query = """
    SELECT COALESCE(MAX(LastModifiedDateTime), DATE('2023-01-01'))
    FROM `project.dataset.clients`
"""
watermark = bq_client.query(watermark_query).result()

# Authenticate and pull records modified since the watermark
token = get_bearer_token()
records = paginate_api(endpoint="/clients", since=watermark, limit=500)

# Filter to schema, insert into BigQuery
cleaned = [filter_to_schema(r, BQ_SCHEMA) for r in records]
bq_client.insert_rows_json("project.dataset.clients", cleaned)
# Read the watermark from BigQuery
watermark_query = """
    SELECT COALESCE(MAX(LastModifiedDateTime), DATE('2023-01-01'))
    FROM `project.dataset.clients`
"""
watermark = bq_client.query(watermark_query).result()

# Authenticate and pull records modified since the watermark
token = get_bearer_token()
records = paginate_api(endpoint="/clients", since=watermark, limit=500)

# Filter to schema, insert into BigQuery
cleaned = [filter_to_schema(r, BQ_SCHEMA) for r in records]
bq_client.insert_rows_json("project.dataset.clients", cleaned)


The function also fires a Microsoft Teams alert if the most recent record is older than yesterday — a staleness check that catches silent API failures before they compound.

Sales function (call_sales): Same pattern, different watermark column (MAX(saleDateTime)), with a 5-day staleness threshold instead of 1. The tolerance is wider because sales data sometimes has legitimate reporting delays.

Both functions are configured via a config.ini file — no hardcoded credentials, easy to swap values per environment.


Part 2 — Orchestration: Google Cloud Workflows

Once the functions exist, you need something to call them in order — and to handle failures cleanly. Google Cloud Workflows is a fully managed YAML-based orchestration service. You define steps, it handles execution, logging, and retry logic. No cluster, no infrastructure.


Why not Airflow?

Airflow is powerful and we use it on projects that need it. But for a pipeline this size — two ingestion steps and a dbt job — the overhead of maintaining an Airflow cluster doesn't make sense. Cloud Workflows gives you sequential execution, try/except error handling, and a full audit trail in Cloud Console for a fraction of the operational cost.


The full workflow YAML

main:
  steps:
  - run_pipeline:
      try:
        steps:
        # Step 1: Sync client records from external API into BigQuery
        - call_client_function:
            call: http.post
            args:
              url: "https://us-central1-<project>.cloudfunctions.net/get_client_auth"
              headers: { Content-Type: "application/json" }
              body: { source: "workflow" }
            result: client_response

        # Step 2: Sync sales records from external API into BigQuery
        - call_sales_function:
            call: http.post
            args:
              url: "https://us-central1-<project>.cloudfunctions.net/get_sales_auth"
              headers: { Content-Type: "application/json" }
              body: { source: "workflow" }
            result: sales_response

        # Step 3: Run dbt transformation on the fresh raw data
        - trigger_dbt:
            call: http.post
            args:
              url: "https://cloud.getdbt.com/api/v2/accounts/<account_id>/jobs/<job_id>/run/"
              headers:
                Authorization: "Bearer <dbt_cloud_token>"
                Content-Type: "application/json"
              body: { cause: "Triggered via Google Workflow" }
            result: dbt_response

        # Step 4: Notify on success
        - send_success_alert:
            call: http.post
            args:
              url: "<teams_webhook_url>"
              body: { text: "Pipeline completed successfully" }

        - finish:
            return: "Done"

      except:
        as: error
        steps:
        - send_failure_alert:
            call: http.post
            args:
              url: "<teams_webhook_url>"
              body: { text: "${'Pipeline failed: ' + error.message}" }
        - fail_workflow:
            raise

main:
  steps:
  - run_pipeline:
      try:
        steps:
        # Step 1: Sync client records from external API into BigQuery
        - call_client_function:
            call: http.post
            args:
              url: "https://us-central1-<project>.cloudfunctions.net/get_client_auth"
              headers: { Content-Type: "application/json" }
              body: { source: "workflow" }
            result: client_response

        # Step 2: Sync sales records from external API into BigQuery
        - call_sales_function:
            call: http.post
            args:
              url: "https://us-central1-<project>.cloudfunctions.net/get_sales_auth"
              headers: { Content-Type: "application/json" }
              body: { source: "workflow" }
            result: sales_response

        # Step 3: Run dbt transformation on the fresh raw data
        - trigger_dbt:
            call: http.post
            args:
              url: "https://cloud.getdbt.com/api/v2/accounts/<account_id>/jobs/<job_id>/run/"
              headers:
                Authorization: "Bearer <dbt_cloud_token>"
                Content-Type: "application/json"
              body: { cause: "Triggered via Google Workflow" }
            result: dbt_response

        # Step 4: Notify on success
        - send_success_alert:
            call: http.post
            args:
              url: "<teams_webhook_url>"
              body: { text: "Pipeline completed successfully" }

        - finish:
            return: "Done"

      except:
        as: error
        steps:
        - send_failure_alert:
            call: http.post
            args:
              url: "<teams_webhook_url>"
              body: { text: "${'Pipeline failed: ' + error.message}" }
        - fail_workflow:
            raise

main:
  steps:
  - run_pipeline:
      try:
        steps:
        # Step 1: Sync client records from external API into BigQuery
        - call_client_function:
            call: http.post
            args:
              url: "https://us-central1-<project>.cloudfunctions.net/get_client_auth"
              headers: { Content-Type: "application/json" }
              body: { source: "workflow" }
            result: client_response

        # Step 2: Sync sales records from external API into BigQuery
        - call_sales_function:
            call: http.post
            args:
              url: "https://us-central1-<project>.cloudfunctions.net/get_sales_auth"
              headers: { Content-Type: "application/json" }
              body: { source: "workflow" }
            result: sales_response

        # Step 3: Run dbt transformation on the fresh raw data
        - trigger_dbt:
            call: http.post
            args:
              url: "https://cloud.getdbt.com/api/v2/accounts/<account_id>/jobs/<job_id>/run/"
              headers:
                Authorization: "Bearer <dbt_cloud_token>"
                Content-Type: "application/json"
              body: { cause: "Triggered via Google Workflow" }
            result: dbt_response

        # Step 4: Notify on success
        - send_success_alert:
            call: http.post
            args:
              url: "<teams_webhook_url>"
              body: { text: "Pipeline completed successfully" }

        - finish:
            return: "Done"

      except:
        as: error
        steps:
        - send_failure_alert:
            call: http.post
            args:
              url: "<teams_webhook_url>"
              body: { text: "${'Pipeline failed: ' + error.message}" }
        - fail_workflow:
            raise


Three things worth highlighting here:

Sequential guarantee. dbt only runs after both API syncs have returned a 2xx response. No partial transformations on stale data.

Centralised error handling. The except block catches any failure across any step — whether it's an API timeout, a BigQuery insert error, or a dbt job failure — and routes it to a single Teams alert. You don't need per-function error handling logic.

Audit trail out of the box. Every execution, every step result, every error message is logged in the Cloud Console. When something fails at 3am, you can see exactly where and why without digging through CloudWatch-equivalent logs.


Triggering dbt from the workflow

dbt Cloud exposes a REST API for job runs. The workflow calls it like any other HTTP endpoint — pass the account ID, job ID, and a Bearer token. The cause field appears in dbt Cloud's run history, so you can distinguish workflow-triggered runs from manual ones.

This keeps the transformation layer clean. dbt doesn't know or care about the ingestion logic upstream. It just runs when it's told, on data that's already fresh.


Part 3 — Scheduling: Cloud Scheduler

Google Cloud Scheduler is managed cron. You point it at your workflow, set a cron expression, and it fires on schedule. Nothing to maintain.





The scheduler calls the workflow's HTTP trigger endpoint. The workflow handles everything from there.


What This Architecture Gives You

Concern

How it's handled

Incremental ingestion

Watermark pattern in each Cloud Run Function

Sequencing

Google Cloud Workflows — steps run in strict order

Error handling

Single except block covers the entire pipeline

Observability

Cloud Console logs every execution automatically

Alerts

Teams webhooks on success and failure

Transformation

dbt Cloud — triggered only after successful ingestion

Scheduling

Cloud Scheduler — cron-as-a-service

Infrastructure

None — fully serverless


No VMs. No Kubernetes. No Airflow cluster to patch. Just clean, composable building blocks that each do one thing well.


When This Pattern Works — and When It Doesn't

This stack is well-suited for pipelines with a small number of sources, clear dependencies between steps, and predictable run times. It's what we'd reach for when a client needs a production-grade pipeline without the overhead of a full orchestration platform.

It's not the right call when you need complex DAG logic, dynamic task generation, or dozens of interdependent steps running in parallel. For those cases, Apache Airflow — or Databricks Workflows for compute-heavy pipelines — makes more sense.

The decision isn't about which tool is "better." It's about matching the tool to the actual complexity of the problem.


What to Do Next

If your pipeline is currently a cron job on a VM, a series of manually triggered scripts, or an Airflow cluster that one person understands — there's likely a simpler architecture that gives you more reliability with less maintenance burden.

We've built and migrated data pipelines across 50+ projects. If your ingestion layer is giving you problems, it's worth a conversation.

Ready to make your data work?

We've delivered 50+ data engineering projects across SaaS, e-commerce, and fintech. Official partners of Snowflake, dbt Labs, and Databricks.

Not Sure Which Fits?

We'll diagnose your situation in 30 minutes and tell you honestly what's broken and whether we can help.

Cta Image

Not Sure Which Fits?

We'll diagnose your situation in 30 minutes and tell you honestly what's broken and whether we can help.

Cta Image

Not Sure Which Fits?

We'll diagnose your situation in 30 minutes and tell you honestly what's broken and whether we can help.

Cta Image