data engineering
Contact

How to load parquet files from Cloud Storage to BigQuery

Feb 18, 2024

This is part 2 of a personal project called antren.

When it came to getting my parquet files out of my Cloud Storage bucket (see part 1) and into BigQuery, I used Airflow. There are certainly simpler solutions, but I wanted the opportunity to set up an Airflow DAG from scratch.

The outcome

Any time the Airflow DAG is run, the contents of all parquet files found in a Google Cloud Storage bucket are written to a BigQuery table.

Repo: alhankeser/antren-orchestration

Using GCSToBigQueryOperator

This is what the dedicated operator settings look like for the intended outcome:

with DAG(...) as dag:
	load_gcs_to_bigquery = GCSToBigQueryOperator(
        task_id="load_gcs_to_bigquery",
        source_objects=["*.parquet"],
        source_format="PARQUET",
        skip_leading_rows=1,
        destination_project_dataset_table=f"{DATASET_NAME}.{TABLE_NAME}",
        write_disposition="WRITE_TRUNCATE",
        project_id=PROJECT_ID,
        bucket=BUCKET_NAME,
        location=GOOGLE_CLOUD_LOCATION,
        gcp_conn_id=GOOGLE_CLOUD_CONNECTION_ID,
    )

To test that the operator is working as expected (name of dag is antren):

airflow dags test antren

Which will output something like this:

[2024-02-18 10:51:35,858] {gcs_to_bigquery.py:379} INFO - Using existing BigQuery table for storing data...
[2024-02-18T10:51:35.858+0100] {gcs_to_bigquery.py:379} INFO - Using existing BigQuery table for storing data...
[2024-02-18 10:51:35,936] {gcs_to_bigquery.py:383} INFO - Executing: {'load': {'autodetect': True, 'createDisposition': 'CREATE_IF_NEEDED', 'destinationTable': {'projectId': 'antren', 'datasetId': 'antren_app', 'tableId': 'activities_raw'}, 'sourceFormat': 'PARQUET', 'sourceUris': ['gs://activity_files/*.parquet'], 'writeDisposition': 'WRITE_TRUNCATE', 'ignoreUnknownValues': False}}
[2024-02-18T10:51:35.936+0100] {gcs_to_bigquery.py:383} INFO - Executing: {'load': {'autodetect': True, 'createDisposition': 'CREATE_IF_NEEDED', 'destinationTable': {'projectId': 'antren', 'datasetId': 'antren_app', 'tableId': 'activities_raw'}, 'sourceFormat': 'PARQUET', 'sourceUris': ['gs://activity_files/*.parquet'], 'writeDisposition': 'WRITE_TRUNCATE', 'ignoreUnknownValues': False}}
[2024-02-18 10:51:35,937] {bigquery.py:1596} INFO - Inserting job airflow_antren_load_gcs_to_bigquery_2024_02_18T09_50_38_872420_00_00_637cb7a4eb45a3c3199ba9e7d60bd637
[2024-02-18T10:51:35.937+0100] {bigquery.py:1596} INFO - Inserting job airflow_antren_load_gcs_to_bigquery_2024_02_18T09_50_38_872420_00_00_637cb7a4eb45a3c3199ba9e7d60bd637
[2024-02-18T10:51:43.325+0100] {taskinstance.py:1138} INFO - Marking task as SUCCESS. dag_id=antren, task_id=load_gcs_to_bigquery, execution_date=20240218T095038, start_date=, end_date=20240218T095143

Once Airflow is set up, it's as simple as that!

The rest of the Airflow DAG

The other parts of the Airflow DAG do the other things. That might be the dumbest thing I've written so far. Here they are:

with DAG(
    "antren",
    schedule=timedelta(days=1),
    start_date=datetime(2024, 1, 29),
    catchup=False,
) as dag:
    get_latest_activities = CloudRunExecuteJobOperator(
        task_id="get_latest_activities",
        job_name="app",
        project_id=PROJECT_ID,
        region=GOOGLE_CLOUD_LOCATION,
        gcp_conn_id=GOOGLE_CLOUD_CONNECTION_ID,
    )

    load_gcs_to_bigquery = GCSToBigQueryOperator(
        task_id="load_gcs_to_bigquery",
        source_objects=["*.parquet"],
        source_format="PARQUET",
        skip_leading_rows=1,
        destination_project_dataset_table=f"{DATASET_NAME}.{TABLE_NAME}",
        write_disposition="WRITE_TRUNCATE",i
        project_id=PROJECT_ID,
        bucket=BUCKET_NAME,
        location=GOOGLE_CLOUD_LOCATION,
        gcp_conn_id=GOOGLE_CLOUD_CONNECTION_ID,
    )

    run_dbt = CloudRunExecuteJobOperator(
        task_id="run_dbt",
        job_name="dbt",
        project_id=PROJECT_ID,
        region=GOOGLE_CLOUD_LOCATION,
        gcp_conn_id=GOOGLE_CLOUD_CONNECTION_ID,
    )

    get_latest_activities >> load_gcs_to_bigquery >> run_dbt

I feel like Airflow is 99% setup and 1% DAG creation.

---
Last update: Mar 4, 2024
Privacy