• Home>
  • data>
If this helped you, please share!

A closer look at Airflow’s KubernetesPodOperator and XCom

Published July 11, 2019 in data - 2 Comments

The KubernetesPodOperator handles communicating XCom values differently than other operators. The basics are described in the operator documentation under the xcom_push parameter. I’ve written up a more detailed example that expands on that documentation.

An Airflow task instance described by the KubernetesPodOperator can write a dict to the file /airflow/xcom/return.json (always the same file) that will be read and turned into an XCom value by a airflow-xcom-sidecar sidecar container. This sidecar gets created when the KubernetesPodOperator parameter xcom_push is True.

For instance, if a KubernetesPodOperator task instance executes the following Python code in a Docker container example_image:

import json

xcom_return = {"key1": value1, "key2": value2}

with open("/airflow/xcom/return.json", "w") as file:
 json.dump(xcom_return, file)

This is the KubernetesPodOperator task instance:

from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator


example_kubernetes_task = KubernetesPodOperator(
 namespace="airflow",
 name="exampletask",
 task_id="example_kubernetes_task",
 image="docker_repo/example_image",
 xcom_push=True,
 ...
)

The XCom values will be made available to downstream task instances under the ‘return_value’ key:

xcom_data = context['ti'].xcom_pull(task_ids='k8s_test_task',
                                    key='return_value')
print(xcom_data["key1"]) # value1
print(xcom_data["key2"]) # value2

The default for xcom_pull‘s key parameter is ‘return_value’, so key is an optional parameter in this example.

XCom values can also be pulled using Jinja templates in operator parameters that support templates, which are listed in operator documentation. Here is the list of parameters for the KubernetesPodOperator, and also for the PythonOperator.

This is how to get the value of ‘key1’ in a Jinja template:

{{ task_instance.xcom_pull(task_ids='example_kubernetes_task', key='return_value')['key1'] }}

A useful way to use XCom values with templates is to pass arguments to a Docker container:

from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator 


another_kubernetes_task = KubernetesPodOperator(
 namespace="airflow",
 name="anotherexampletask",
 task_id="another_kubernetes_task",
 image="docker_repo/another_example_image",
 arguments=["--myarg",
  "{{ task_instance.xcom_pull(task_ids='example_kubernetes_task', key='return_value')['key1'] }}"
 ],
 ...
)

2 comments

shivarp - August 2, 2020 Reply

I am using airflow 1.10.10, If I set do_xcom_push=True in my KubernetesPodOperator below I get 403 error , but with xcom_push set to False, the code starts a pod and returns success!! What could be the issue?

Error with xcom_push set to True: ———–
..DEBUG – response body {.”..pods \NewPodTask-s232e” is forbidden: failed quota:default: must specify requests.memory”,”reason”:”Forbidden”, ” details”:{“name”:”NewPodTask”,”kind”;”pods”|, “code”:403}

,
Task Definition ——
PodOpTask = KubernetesPodOperator(namespace=’Test-NS’, image=”python-3.6″, cmds=[“python”,”-c”],
arguments=[“import os”, “import json”,”Print (‘copying result to /airflow/xcom/return.json’)” ],
labels={“foo”:”bar”}, name=”NewPodTask”, task_id=”NewPodTask_1″, get_logs=True, security_context=pod_security_context,
do_xcom_push=True, get_logs=True, is_delete_operator_pod=True,secrets=[secret_env],
resources={‘request_cpu’:’1′,’limit_cpu’:’2′,’request_memory’:’128Mi’,’limit_memory’:’512Mi’}, dag=dag)

    Ayla Khan - August 2, 2020 Reply

    Can you get logs from the pod? There may be more information there. I can think of a couple of possibilities though: check your RBAC rules (here’s a StackOverflow example) or you could be exceeding your memory quota limit. What happens if you don’t specify ‘limit_memory’?

Leave a Reply: