Jump to content

Airflow Xcom Exclusive _hot_ -

: Avoid relying purely on the default return_value key. Use descriptive keys ( ti.xcom_push(key='validated_user_ids', value=data) ) to ensure that downstream tasks pull exactly what they intend to consume.

import json import uuid from airflow.models.xcom import BaseXCom from airflow.providers.amazon.aws.hooks.s3 import S3Hook class S3XComBackend(BaseXCom): PREFIX = "s3://" BUCKET_NAME = "my-company-airflow-xcom-bucket" @staticmethod def serialize_value(value, **kwargs): # We only want to intercept complex data; plain strings/dicts can use default if needed, # but for safety, we push all substantial data to S3. s3_hook = S3Hook(aws_conn_id="aws_default") key = f"xcom/uuid.uuid4().json" # Serialize your data to string/bytes string_data = json.dumps(value) # Upload to S3 s3_hook.load_string( string_data, key=key, bucket_name=S3XComBackend.BUCKET_NAME, replace=True ) # This return value is what gets written to the Airflow Metadata DB return BaseXCom.serialize_value(f"S3XComBackend.PREFIXS3XComBackend.BUCKET_NAME/key") @staticmethod def deserialize_value(result): # Extract the DB stored string stored_uri = BaseXCom.deserialize_value(result) # Check if it points to our S3 backend if isinstance(stored_uri, str) and stored_uri.startswith(S3XComBackend.PREFIX): s3_hook = S3Hook(aws_conn_id="aws_default") # Parse the bucket and key out of the URI path = stored_uri.replace(S3XComBackend.PREFIX, "") bucket, key = path.split("/", 1) # Download file file_content = s3_hook.read_key(key, bucket_name=bucket) return json.loads(file_content) return stored_uri Use code with caution. Step 2: Configure Airflow to Use Your Backend airflow xcom exclusive

However, XCom is often misunderstood and misused. This comprehensive guide explores the best practices for using XCom in your Airflow pipelines, focusing on the philosophy of keeping XCom messages lightweight, manageable, and efficient—what we call . : Avoid relying purely on the default return_value key

Downstream tasks pull data using xcom_pull . Downstream tasks pull data using xcom_pull

At its foundational level, an XCom is a key-value pair explicitly designed to pass small amounts of metadata between tasks within the same Directed Acyclic Graph (DAG) run. Push and Pull Dynamics XCom communication operates on a push-and-pull model:

Here is an enterprise-grade example of a custom XCom backend using Amazon S3 and pickle serialization (you can substitute pickle with parquet or json depending on security requirements):

export AIRFLOW__CORE__XCOM_BACKEND="include.custom_xcom_backend.S3XComBackend" Use code with caution.

×
×
  • Create New...
На сайте используются файлы cookie и сервисы аналитики для корректной работы форума и улучшения качества обслуживания. Продолжая использовать сайт, вы соглашаетесь с использованием файлов cookie и с Политикой конфиденциальности.