This is part 2 of a personal project called antren.
When it came to getting my parquet files out of my Cloud Storage bucket (see part 1) and into BigQuery, I used Airflow. There are certainly simpler solutions, but I wanted the opportunity to set up an Airflow DAG from scratch.
Any time the Airflow DAG is run, the contents of all parquet files found in a Google Cloud Storage bucket are written to a BigQuery table.
Repo: alhankeser/antren-orchestration
This is what the dedicated operator settings look like for the intended outcome:
with DAG(...) as dag:
load_gcs_to_bigquery = GCSToBigQueryOperator(
task_id="load_gcs_to_bigquery",
source_objects=["*.parquet"],
source_format="PARQUET",
skip_leading_rows=1,
destination_project_dataset_table=f"{DATASET_NAME}.{TABLE_NAME}",
write_disposition="WRITE_TRUNCATE",
project_id=PROJECT_ID,
bucket=BUCKET_NAME,
location=GOOGLE_CLOUD_LOCATION,
gcp_conn_id=GOOGLE_CLOUD_CONNECTION_ID,
)
To test that the operator is working as expected (name of dag is antren
):
airflow dags test antren
Which will output something like this:
[2024-02-18 10:51:35,858] {gcs_to_bigquery.py:379} INFO - Using existing BigQuery table for storing data...
[2024-02-18T10:51:35.858+0100] {gcs_to_bigquery.py:379} INFO - Using existing BigQuery table for storing data...
[2024-02-18 10:51:35,936] {gcs_to_bigquery.py:383} INFO - Executing: {'load': {'autodetect': True, 'createDisposition': 'CREATE_IF_NEEDED', 'destinationTable': {'projectId': 'antren', 'datasetId': 'antren_app', 'tableId': 'activities_raw'}, 'sourceFormat': 'PARQUET', 'sourceUris': ['gs://activity_files/*.parquet'], 'writeDisposition': 'WRITE_TRUNCATE', 'ignoreUnknownValues': False}}
[2024-02-18T10:51:35.936+0100] {gcs_to_bigquery.py:383} INFO - Executing: {'load': {'autodetect': True, 'createDisposition': 'CREATE_IF_NEEDED', 'destinationTable': {'projectId': 'antren', 'datasetId': 'antren_app', 'tableId': 'activities_raw'}, 'sourceFormat': 'PARQUET', 'sourceUris': ['gs://activity_files/*.parquet'], 'writeDisposition': 'WRITE_TRUNCATE', 'ignoreUnknownValues': False}}
[2024-02-18 10:51:35,937] {bigquery.py:1596} INFO - Inserting job airflow_antren_load_gcs_to_bigquery_2024_02_18T09_50_38_872420_00_00_637cb7a4eb45a3c3199ba9e7d60bd637
[2024-02-18T10:51:35.937+0100] {bigquery.py:1596} INFO - Inserting job airflow_antren_load_gcs_to_bigquery_2024_02_18T09_50_38_872420_00_00_637cb7a4eb45a3c3199ba9e7d60bd637
[2024-02-18T10:51:43.325+0100] {taskinstance.py:1138} INFO - Marking task as SUCCESS. dag_id=antren, task_id=load_gcs_to_bigquery, execution_date=20240218T095038, start_date=, end_date=20240218T095143
Once Airflow is set up, it's as simple as that!
The other parts of the Airflow DAG do the other things. That might be the dumbest thing I've written so far. Here they are:
with DAG(
"antren",
schedule=timedelta(days=1),
start_date=datetime(2024, 1, 29),
catchup=False,
) as dag:
get_latest_activities = CloudRunExecuteJobOperator(
task_id="get_latest_activities",
job_name="app",
project_id=PROJECT_ID,
region=GOOGLE_CLOUD_LOCATION,
gcp_conn_id=GOOGLE_CLOUD_CONNECTION_ID,
)
load_gcs_to_bigquery = GCSToBigQueryOperator(
task_id="load_gcs_to_bigquery",
source_objects=["*.parquet"],
source_format="PARQUET",
skip_leading_rows=1,
destination_project_dataset_table=f"{DATASET_NAME}.{TABLE_NAME}",
write_disposition="WRITE_TRUNCATE",i
project_id=PROJECT_ID,
bucket=BUCKET_NAME,
location=GOOGLE_CLOUD_LOCATION,
gcp_conn_id=GOOGLE_CLOUD_CONNECTION_ID,
)
run_dbt = CloudRunExecuteJobOperator(
task_id="run_dbt",
job_name="dbt",
project_id=PROJECT_ID,
region=GOOGLE_CLOUD_LOCATION,
gcp_conn_id=GOOGLE_CLOUD_CONNECTION_ID,
)
get_latest_activities >> load_gcs_to_bigquery >> run_dbt
I feel like Airflow is 99% setup and 1% DAG creation.