Так будет выглядеть

етил, что для запланированной задачи дата выполнения устанавливается в прошлом в соответствии с

Воздушный поток был разработан как решение для нужд ETL. В мире ETL вы обычно суммируете данные. Итак, если я хочу обобщить данные за 2016-02-19, я бы сделал это в полночь по Гринвичу 2016-02-20, что будет правильно после того, как станут доступны все данные за 2016-02-19.

однако, когда dag вызывает другой dag, время выполнения устанавливается в now ().

Есть ли способ, чтобы триггеры с одинаковым временем выполнения запускали даг? Конечно, я могу переписать шаблон и использовать вчерашний день, однако это сложное решение.

Ответы на вопрос(4)

Решение Вопроса

Следующий класс расширяетсяTriggerDagRunOperator чтобы разрешить передачу даты выполнения в виде строки, которая затем преобразуется обратно в дату и время. Это немного глупо, но это единственный способ найти работу.

from datetime import datetime
import logging

from airflow import settings
from airflow.utils.state import State
from airflow.models import DagBag
from airflow.operators.dagrun_operator import TriggerDagRunOperator, DagRunOrder

class MMTTriggerDagRunOperator(TriggerDagRunOperator):
    """
    MMT-patched for passing explicit execution date
    (otherwise it's hard to hook the datetime.now() date).
    Use when you want to explicity set the execution date on the target DAG
    from the controller DAG.

    Adapted from Paul Elliot's solution on airflow-dev mailing list archives:
    http://mail-archives.apache.org/mod_mbox/airflow-dev/201711.mbox/%[email protected].com%3e

    Parameters
    ------------------
    execution_date: str
        the custom execution date (jinja'd)

    Usage Example:
    -------------------
    my_dag_trigger_operator = MMTTriggerDagRunOperator(
        execution_date="{{execution_date}}"
        task_id='my_dag_trigger_operator',
        trigger_dag_id='my_target_dag_id',
        python_callable=lambda: random.getrandbits(1),
        params={},
        dag=my_controller_dag
    )
    """
    template_fields = ('execution_date',)

    def __init__(
        self, trigger_dag_id, python_callable, execution_date,
        *args, **kwargs
        ):
        self.execution_date = execution_date
        super(MMTTriggerDagRunOperator, self).__init__(
            trigger_dag_id=trigger_dag_id, python_callable=python_callable,
           *args, **kwargs
       )

    def execute(self, context):
        run_id_dt = datetime.strptime(self.execution_date, '%Y-%m-%d %H:%M:%S')
        dro = DagRunOrder(run_id='trig__' + run_id_dt.isoformat())
        dro = self.python_callable(context, dro)
        if dro:
            session = settings.Session()
            dbag = DagBag(settings.DAGS_FOLDER)
            trigger_dag = dbag.get_dag(self.trigger_dag_id)
            dr = trigger_dag.create_dagrun(
                run_id=dro.run_id,
                state=State.RUNNING,
                execution_date=self.execution_date,
                conf=dro.payload,
                external_trigger=True)
            logging.info("Creating DagRun {}".format(dr))
            session.add(dr)
            session.commit()
            session.close()
        else:
            logging.info("Criteria not met, moving on")

Существует проблема, с которой вы можете столкнуться при использовании этого параметра, а не при настройкеexecution_date=now(): ваш оператор выдаст ошибку MySQL, если вы попытаетесь запустить DAG с идентичнымexecution_date дважды. Это потому чтоexecution_date а такжеdag_id используются для создания индекса строки, и строки с одинаковыми индексами не могут быть вставлены.

Я не могу вспомнить причину, по которой вы захотите запустить два одинаковых даг с одним и тем жеexecution_date в любом случае, но я столкнулся с этим во время тестирования, и вас это не должно пугать. Просто очистите старую работу или используйте другую дату и время.

 7yl4r22 мар. 2018 г., 17:24
Я работаю v1.8.0, так что вы можете быть правы, что это не проблема в 1.9+
 Ena19 июн. 2018 г., 16:26
Я использую airflow 1.9.0 и у меня та же ошибка.sqlalchemy.exc.IntegrityError: (psycopg2.IntegrityError) duplicate key value violates unique constraint "dag_run_dag_id_execution_date_key Key (dag_id, execution_date) already exists.
 Tw UxTLi51Nus22 мар. 2018 г., 13:38
у текущего (v1.9.0) воздушного потока есть индекс (dag_id, run_id) ... Ваш комментарий относительно ошибки sql для более ранней версии воздушного потока?
 ozw1z5rd31 янв. 2018 г., 14:58
Это хорошее решение, далеко не самое лучшее, оно все еще помогает (очень).

TriggerDagRunOperator теперь имеетexecution_date Параметр для установки даты выполнения запуска. К сожалению, параметр отсутствует в полях шаблона. Если он будет добавлен в поля шаблона (или если вы переопределите оператор и измените значение template_fields), его можно будет использовать следующим образом:

my_trigger_task',
                                              trigger_dag_id="triggered_dag_id",
                                              python_callable=conditionally_trigger,
                                              execution_date= '{{execution_date}}',
                                              dag=dag)

Он еще не выпущен, но вы можете увидеть источники здесь:https://github.com/apache/incubator-airflow/blob/master/airflow/operators/dagrun_operator.py

Коммит, который сделал изменение, был:https://github.com/apache/incubator-airflow/commit/089c996fbd9ecb0014dbefedff232e8699ce6283#diff-41f9029188bd5e500dec9804fed26fb4

существует ли dag_run, если найден, перезапустите dag, используя функцию очистки воздушного потока. Это позволяет нам создать зависимость между дагами, потому что возможность перенести дату исполнения в сработавший даг открывает целую вселенную удивительных возможностей. Интересно, почему это не стандартное поведение в потоке воздуха.

   def execute(self, context):
        run_id_dt = datetime.strptime(self.execution_date, '%Y-%m-%d %H:%M:%S')
        dro = DagRunOrder(run_id='trig__' + run_id_dt.isoformat())
        dro = self.python_callable(context, dro)
        if dro:
            session = settings.Session()
            dbag = DagBag(settings.DAGS_FOLDER)
            trigger_dag = dbag.get_dag(self.trigger_dag_id)

            if not trigger_dag.get_dagrun( self.execution_date ):
                dr = trigger_dag.create_dagrun(
                       run_id=dro.run_id,
                       state=State.RUNNING,
                       execution_date=self.execution_date,
                       conf=dro.payload,
                       external_trigger=True
                )
                logging.info("Creating DagRun {}".format(dr))
                session.add(dr)
                session.commit()
            else:
                trigger_dag.clear( 
                    start_date = self.execution_date,
                    end_date = self.execution_date,
                    only_failed = False,
                    only_running = False,
                    confirm_prompt = False, 
                    reset_dag_runs = True, 
                    include_subdags= False,
                    dry_run = False 
                )
                logging.info("Cleared DagRun {}".format(trigger_dag))

            session.close()
        else:
            logging.info("Criteria not met, moving on")

которая позволяет запускать метку с определенной датой выполнения.
https://github.com/apache/incubator-airflow/blob/master/airflow/api/common/experimental/trigger_dag.py

Вы можете вызвать эту функцию как частьPythonOperator и достичь цели.

Так будет выглядеть
from airflow.api.common.experimental.trigger_dag import trigger_dag

trigger_operator=PythonOperator(task_id='YOUR_TASK_ID',
                                python_callable=trigger_dag,
                                op_args=['dag_id'],
                                op_kwargs={'execution_date': datetime.now()})

Ваш ответ на вопрос