Fluxo de ar - Instância de tarefa no operador EMR

No Airflow, estou enfrentando o problema de que preciso passar ojob_flow_id para um dos meus passos emr. Eu sou capaz de recuperar ojob_flow_id do operador, mas quando vou criar as etapas a serem enviadas ao cluster, otask_instance valor não está certo. Eu tenho o seguinte código:

def issue_step(name, args):
    return [
        {
            "Name": name,
            "ActionOnFailure": "CONTINUE",
            "HadoopJarStep": {
                "Jar": "s3://....",
                "Args": args
            }
        }
    ]

dag = DAG('example',
          description='My dag',
          schedule_interval='0 8 * * 6',
          dagrun_timeout=timedelta(days=2))

try:

    create_emr = EmrCreateJobFlowOperator(
        task_id='create_job_flow',
        aws_conn_id='aws_default',        
        dag=dag
    )

    load_data_steps = issue_step('load', ['arg1', 'arg2'])

    load_data_steps[0]["HadoopJarStep"]["Args"].append('--cluster-id')
    load_data_steps[0]["HadoopJarStep"]["Args"].append(
        "{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}") # the value here is not exchanged with the actual job_flow_id

    load_data = EmrAddStepsOperator(
        task_id='load_data',
        job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",  # this is correctly exchanged with the job_flow_id - same for the others
        aws_conn_id='aws_default',
        steps=load_data_steps,
        dag=dag
    )

    check_load_data = EmrStepSensor(
        task_id='watch_load_data',
        job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
        step_id="{{ task_instance.xcom_pull('load_data', key='return_value')[0] }}",
        aws_conn_id='aws_default',
        dag=dag
    )

    cluster_remover = EmrTerminateJobFlowOperator(
        task_id='remove_cluster',
        job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
        aws_conn_id='aws_default',
        dag=dag
    )

    create_emr_recommendations >> load_data
    load_data >> check_load_data
    check_load_data >> cluster_remover

except AirflowException as ae:
    print ae.message

O problema é que, quando verifico o EMR, em vez de ver o--cluster-id j-1234 noload_data passo, eu vejo--cluster-id "{{task_instance.xcom_pull('create_job_flow', key='return_value')}}", o que faz com que meu passo falhe.

Como posso obter o valor real dentro da minha função step?

Obrigado e boas festas

questionAnswers(1)

yourAnswerToTheQuestion