I need help with dynamic parameter injection in my Airflow pipeline when working with Snowflake operators. I’m trying to create a workflow where one task generates a database object name and passes it to another task that executes SQL queries.
My main issues:
- How do I properly inject parameters into the Snowflake operator?
- How can I retrieve values from upstream tasks (either through XCom or other methods)?
Current setup:
DAG file:
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
DAG_DEFAULTS = {
"owner": "data_team",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False
}
my_dag = DAG(
dag_id="dynamic_snowflake_query",
default_args=DAG_DEFAULTS,
start_date=days_ago(2),
schedule_interval="@daily",
)
def generate_target_table(context):
context['ti'].xcom_push(key="target_table_name", value="customer_data_staging")
return "customer_data_staging"
table_generator = PythonOperator(
task_id="generate_target_table",
dag=my_dag,
python_callable=generate_target_table,
)
data_extraction = SnowflakeOperator(
dag=my_dag,
task_id="data_extraction",
snowflake_conn_id="my_snowflake_conn",
sql='queries/extract_data.sql',
params={"target_table": "{{ti.xcom_pull(task_ids='generate_target_table', key='target_table_name')}}"},
)
table_generator >> data_extraction
SQL file (extract_data.sql):
USE WAREHOUSE analytics_wh;
SELECT customer_id, order_date, amount
FROM {{target_table}}
WHERE order_date >= CURRENT_DATE - 30;
I get template rendering errors when trying to access the task instance variable. The workflow fails because it cannot resolve the XCom reference properly.
Environment details:
- Python 3.7
- apache-airflow==2.2.2
- apache-airflow-providers-snowflake==2.3.0
Has anyone successfully implemented parameter passing between tasks in similar Snowflake workflows? Any guidance would be really helpful.