Archives

Categories

  • Home>
  • data>
If this helped you, please share!

A closer look at Airflow’s KubernetesPodOperator and XCom

Published July 11, 2019 in data - 8 Comments

The KubernetesPodOperator handles communicating XCom values differently than other operators. The basics are described in the operator documentation under the xcom_push parameter. I’ve written up a more detailed example that expands on that documentation.

An Airflow task instance described by the KubernetesPodOperator can write a dict to the file /airflow/xcom/return.json (always the same file) that will be read and turned into an XCom value by a airflow-xcom-sidecar sidecar container. This sidecar gets created when the KubernetesPodOperator parameter xcom_push is True.

For instance, if a KubernetesPodOperator task instance executes the following Python code in a Docker container example_image:

import json

xcom_return = {"key1": value1, "key2": value2}

with open("/airflow/xcom/return.json", "w") as file:
 json.dump(xcom_return, file)

This is the KubernetesPodOperator task instance:

from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator


example_kubernetes_task = KubernetesPodOperator(
 namespace="airflow",
 name="exampletask",
 task_id="example_kubernetes_task",
 image="docker_repo/example_image",
 xcom_push=True,
 ...
)

The XCom values will be made available to downstream task instances under the ‘return_value’ key:

xcom_data = context['ti'].xcom_pull(task_ids='k8s_test_task',
                                    key='return_value')
print(xcom_data["key1"]) # value1
print(xcom_data["key2"]) # value2

The default for xcom_pull‘s key parameter is ‘return_value’, so key is an optional parameter in this example.

XCom values can also be pulled using Jinja templates in operator parameters that support templates, which are listed in operator documentation. Here is the list of parameters for the KubernetesPodOperator, and also for the PythonOperator.

This is how to get the value of ‘key1’ in a Jinja template:

{{ task_instance.xcom_pull(task_ids='example_kubernetes_task', key='return_value')['key1'] }}

A useful way to use XCom values with templates is to pass arguments to a Docker container:

from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator 


another_kubernetes_task = KubernetesPodOperator(
 namespace="airflow",
 name="anotherexampletask",
 task_id="another_kubernetes_task",
 image="docker_repo/another_example_image",
 arguments=["--myarg",
  "{{ task_instance.xcom_pull(task_ids='example_kubernetes_task', key='return_value')['key1'] }}"
 ],
 ...
)

8 comments

shivarp - August 2, 2020 Reply

I am using airflow 1.10.10, If I set do_xcom_push=True in my KubernetesPodOperator below I get 403 error , but with xcom_push set to False, the code starts a pod and returns success!! What could be the issue?

Error with xcom_push set to True: ———–
..DEBUG – response body {.”..pods \NewPodTask-s232e” is forbidden: failed quota:default: must specify requests.memory”,”reason”:”Forbidden”, ” details”:{“name”:”NewPodTask”,”kind”;”pods”|, “code”:403}

,
Task Definition ——
PodOpTask = KubernetesPodOperator(namespace=’Test-NS’, image=”python-3.6″, cmds=[“python”,”-c”],
arguments=[“import os”, “import json”,”Print (‘copying result to /airflow/xcom/return.json’)” ],
labels={“foo”:”bar”}, name=”NewPodTask”, task_id=”NewPodTask_1″, get_logs=True, security_context=pod_security_context,
do_xcom_push=True, get_logs=True, is_delete_operator_pod=True,secrets=[secret_env],
resources={‘request_cpu’:’1′,’limit_cpu’:’2′,’request_memory’:’128Mi’,’limit_memory’:’512Mi’}, dag=dag)

    Ayla Khan - August 2, 2020 Reply

    Can you get logs from the pod? There may be more information there. I can think of a couple of possibilities though: check your RBAC rules (here’s a StackOverflow example) or you could be exceeding your memory quota limit. What happens if you don’t specify ‘limit_memory’?

    adi - January 21, 2021 Reply

    hi shivarp,
    have u also executed a python script test.py with the KubernetesPodOperator could you share an example trying to
    thanks

    Amir Avraham - February 2, 2021 Reply

    we have the same issue ? can any one share what was their solution ?
    we are using k8s version 18 and flannel CNI.

    checked the sa, clusterrole and cluster role binding, the sa has full permissions.
    [2021-02-01 11:50:54,103] {taskinstance.py:1145} ERROR – (0)
    Reason: failed CONNECT via proxy status: 403

    could it be that we need a service mesh CNI for it work ? ( assuming it uses a side car and the error is failed connection to proxy).

    thank you,

    Amir

      Ayla Khan - February 2, 2021 Reply

      Yes, it uses a sidecar. I haven’t worked with flannel

Meglo - January 10, 2021 Reply

Hi Ayla, thank you for this write up.
This setup works for me. When I xcom push from two different tasks using the same method and then assign it to env vars in a third task, the env vars are not getting assigned from the second task. Any ideas why this might be the case?

Meglo - January 10, 2021 Reply

please ignore my question. I had a typo error in the task id name. Thanks anyway.

    Ayla Khan - January 12, 2021 Reply

    Glad you found the error cause!

Leave a Reply: