Snowflake operator parameter passing in Airflow workflow tasks

I need help with dynamic parameter injection in my Airflow pipeline when working with Snowflake operators. I’m trying to create a workflow where one task generates a database object name and passes it to another task that executes SQL queries.

My main issues:

  1. How do I properly inject parameters into the Snowflake operator?
  2. How can I retrieve values from upstream tasks (either through XCom or other methods)?

Current setup:

DAG file:

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator

DAG_DEFAULTS = {
    "owner": "data_team",
    "depends_on_past": False,
    "email_on_failure": False,
    "email_on_retry": False
}

my_dag = DAG(
    dag_id="dynamic_snowflake_query",
    default_args=DAG_DEFAULTS,
    start_date=days_ago(2),
    schedule_interval="@daily",
)

def generate_target_table(context):
    context['ti'].xcom_push(key="target_table_name", value="customer_data_staging")
    return "customer_data_staging"

table_generator = PythonOperator(
    task_id="generate_target_table",
    dag=my_dag,
    python_callable=generate_target_table,
)

data_extraction = SnowflakeOperator(
    dag=my_dag,
    task_id="data_extraction",
    snowflake_conn_id="my_snowflake_conn",
    sql='queries/extract_data.sql',
    params={"target_table": "{{ti.xcom_pull(task_ids='generate_target_table', key='target_table_name')}}"},
)

table_generator >> data_extraction

SQL file (extract_data.sql):

USE WAREHOUSE analytics_wh;

SELECT customer_id, order_date, amount 
FROM {{target_table}} 
WHERE order_date >= CURRENT_DATE - 30;

I get template rendering errors when trying to access the task instance variable. The workflow fails because it cannot resolve the XCom reference properly.

Environment details:

  • Python 3.7
  • apache-airflow==2.2.2
  • apache-airflow-providers-snowflake==2.3.0

Has anyone successfully implemented parameter passing between tasks in similar Snowflake workflows? Any guidance would be really helpful.

interesting setup! are you getting this error when the DAG parses or when the task runs? also, why are you using both xcom_push AND return in your python function? that might be messing with the templating engine.

yeah, i had the same issue. just skip the xcom_pull and go for the params dict directly in your sql template. the snowflake operator handles things differently and xcom refs can mess up. switch to jinja templating in your sql, it should work better!

This issue arises from how Airflow manages template rendering. The SnowflakeOperator is unable to resolve XCom references when they are nested within the params dictionary. To resolve this, you have two options: Option 1: Use sql_params instead of params, allowing you to reference it in your SQL file using Jinja syntax like {{ params.target_table }}. Option 2: Directly template the entire SQL string in the operator, for example: sql="SELECT customer_id, order_date, amount FROM {{ ti.xcom_pull(task_ids='generate_target_table', key='target_table_name') }} WHERE order_date >= CURRENT_DATE - 30;". Both of these methods avoid the problematic intermediate templating step that leads to rendering failures.