The KubernetesPodOperator handles communicating XCom values differently than other operators. The basics are described in the operator documentation under the xcom_push parameter. I’ve written up a more detailed example that expands on that documentation.
An Airflow task instance described by the KubernetesPodOperator can write a dict to the file /airflow/xcom/return.json (always the same file) that will be read and turned into an XCom value by a airflow-xcom-sidecar sidecar container. This sidecar gets created when the KubernetesPodOperator parameter xcom_push is True.
For instance, if a KubernetesPodOperator task instance executes the following Python code in a Docker container example_image:
import json xcom_return = {"key1": value1, "key2": value2} with open("/airflow/xcom/return.json", "w") as file: json.dump(xcom_return, file)
This is the KubernetesPodOperator task instance:
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator example_kubernetes_task = KubernetesPodOperator( namespace="airflow", name="exampletask", task_id="example_kubernetes_task", image="docker_repo/example_image", xcom_push=True, ... )
The XCom values will be made available to downstream task instances under the ‘return_value’ key:
xcom_data = context['ti'].xcom_pull(task_ids='k8s_test_task', key='return_value') print(xcom_data["key1"]) # value1 print(xcom_data["key2"]) # value2
The default for xcom_pull‘s key parameter is ‘return_value’, so key is an optional parameter in this example.
XCom values can also be pulled using Jinja templates in operator parameters that support templates, which are listed in operator documentation. Here is the list of parameters for the KubernetesPodOperator, and also for the PythonOperator.
This is how to get the value of ‘key1’ in a Jinja template:
{{ task_instance.xcom_pull(task_ids='example_kubernetes_task', key='return_value')['key1'] }}
A useful way to use XCom values with templates is to pass arguments to a Docker container:
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator another_kubernetes_task = KubernetesPodOperator( namespace="airflow", name="anotherexampletask", task_id="another_kubernetes_task", image="docker_repo/another_example_image", arguments=["--myarg", "{{ task_instance.xcom_pull(task_ids='example_kubernetes_task', key='return_value')['key1'] }}" ], ... )
I am using airflow 1.10.10, If I set do_xcom_push=True in my KubernetesPodOperator below I get 403 error , but with xcom_push set to False, the code starts a pod and returns success!! What could be the issue?
Error with xcom_push set to True: ———–
..DEBUG – response body {.”..pods \NewPodTask-s232e” is forbidden: failed quota:default: must specify requests.memory”,”reason”:”Forbidden”, ” details”:{“name”:”NewPodTask”,”kind”;”pods”|, “code”:403}
,
Task Definition ——
PodOpTask = KubernetesPodOperator(namespace=’Test-NS’, image=”python-3.6″, cmds=[“python”,”-c”],
arguments=[“import os”, “import json”,”Print (‘copying result to /airflow/xcom/return.json’)” ],
labels={“foo”:”bar”}, name=”NewPodTask”, task_id=”NewPodTask_1″, get_logs=True, security_context=pod_security_context,
do_xcom_push=True, get_logs=True, is_delete_operator_pod=True,secrets=[secret_env],
resources={‘request_cpu’:’1′,’limit_cpu’:’2′,’request_memory’:’128Mi’,’limit_memory’:’512Mi’}, dag=dag)
Can you get logs from the pod? There may be more information there. I can think of a couple of possibilities though: check your RBAC rules (here’s a StackOverflow example) or you could be exceeding your memory quota limit. What happens if you don’t specify ‘limit_memory’?
hi shivarp,
have u also executed a python script test.py with the KubernetesPodOperator could you share an example trying to
thanks
we have the same issue ? can any one share what was their solution ?
we are using k8s version 18 and flannel CNI.
checked the sa, clusterrole and cluster role binding, the sa has full permissions.
[2021-02-01 11:50:54,103] {taskinstance.py:1145} ERROR – (0)
Reason: failed CONNECT via proxy status: 403
could it be that we need a service mesh CNI for it work ? ( assuming it uses a side car and the error is failed connection to proxy).
thank you,
Amir
Hi Ayla, thank you for this write up.
This setup works for me. When I xcom push from two different tasks using the same method and then assign it to env vars in a third task, the env vars are not getting assigned from the second task. Any ideas why this might be the case?
please ignore my question. I had a typo error in the task id name. Thanks anyway.