Airflow: ExternalTaskSensor não dispara a tarefa

Eu já viist eist perguntas sobre SO e fez as alterações adequadamente. No entanto, meu DAG dependente ainda fica preso no estado cutucando. Abaixo está o meu DAG mestre:

from airflow import DAG
from airflow.operators.jdbc_operator import JdbcOperator
from datetime import datetime
from airflow.operators.bash_operator import BashOperator

today = datetime.today()

default_args = {
    'depends_on_past': False,
    'retries': 0,
    'start_date': datetime(today.year, today.month, today.day),
    'schedule_interval': '@once'
}

dag = DAG('call-procedure-and-bash', default_args=default_args)

call_procedure = JdbcOperator(
    task_id='call_procedure',
    jdbc_conn_id='airflow_db2',
    sql='CALL AIRFLOW.TEST_INSERT (20)',
    dag=dag
)

call_procedure

Abaixo é meu DAG dependente:

from airflow import DAG
from airflow.operators.jdbc_operator import JdbcOperator
from datetime import datetime, timedelta
from airflow.sensors.external_task_sensor import ExternalTaskSensor

today = datetime.today()

default_args = {
    'depends_on_past': False,
    'retries': 0,
    'start_date': datetime(today.year, today.month, today.day),
    'schedule_interval': '@once'
}

dag = DAG('external-dag-upstream', default_args=default_args)

task_sensor = ExternalTaskSensor(
    task_id='link_upstream',
    external_dag_id='call-procedure-and-bash',
    external_task_id='call_procedure',
    execution_delta=timedelta(minutes=-2),
    dag=dag
)

count_rows = JdbcOperator(
    task_id='count_rows',
    jdbc_conn_id='airflow_db2',
    sql='SELECT COUNT(*) FROM AIRFLOW.TEST',
    dag=dag
)

count_rows.set_upstream(task_sensor)

Abaixo estão os logs do DAG dependente assim que o DAG principal é executado:

[2019-01-10 11:43:52,951] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... 
[2019-01-10 11:44:52,955] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... 
[2019-01-10 11:45:52,961] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... 
[2019-01-10 11:46:52,949] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... 
[2019-01-10 11:47:52,928] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... 
[2019-01-10 11:48:52,928] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... 
[2019-01-10 11:49:52,905] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... 

Abaixo estão os logs de execução do DAG mestre:

[2019-01-10 11:45:20,215] {{jdbc_operator.py:56}} INFO - Executing: CALL AIRFLOW.TEST_INSERT (20)
[2019-01-10 11:45:21,477] {{logging_mixin.py:95}} INFO - [2019-01-10 11:45:21,476] {{dbapi_hook.py:166}} INFO - CALL AIRFLOW.TEST_INSERT (20)
[2019-01-10 11:45:24,139] {{logging_mixin.py:95}} INFO - [2019-01-10 11:45:24,137] {{jobs.py:2627}} INFO - Task exited with return code 0

Minha suposição é que o fluxo de ar deve acionar o DAG dependente se o mestre funcionar bem? Eu tentei brincar comexecution_delta mas isso não parece funcionar.

Além disso,schedule_interval estart_date são iguais para os dois DAGs, então não pense que isso deve causar problema

Estou perdendo alguma coisa?

questionAnswers(2)

yourAnswerToTheQuestion