Sellerie stoppt die Ausführung einer Kette

Ich habe eine check_orders-Aufgabe, die regelmäßig ausgeführt wird. Es erstellt eine Gruppe von Tasks, damit ich die Ausführungsdauer der Tasks zeitlich festlegen und etwas ausführen kann, wenn alle erledigt sind (dies ist der Zweck von res.join [1] und grouped_subs). Die gruppierten Tasks sind Paare von verkettete Aufgaben.

Ich möchte, dass die erste Aufgabe eine Bedingung nicht erfüllt (fehlschlägt) und die zweite Aufgabe in der Kette nicht ausgeführt wird. Ich kann das für mein ganzes Leben nicht herausfinden, und ich bin der Meinung, dass dies eine ziemlich grundlegende Funktionalität für einen Jobwarteschlangenmanager ist. Wenn ich die Dinge ausprobiere, die ich nach [2] auskommentiert habe (Ausnahmen auslösen, Rückrufe entfernen) ... bleiben wir aus irgendeinem Grund beim join () in check_orders stecken (es zerstört die Gruppe). Ich habe versucht, ignore_result für all diese Aufgaben ebenfalls auf False zu setzen, aber es funktioniert immer noch nicht.

@task(ignore_result=True)
def check_orders():
    # check all the orders and send out appropriate notifications
    grouped_subs = []

    for thingy in things:
       ...

        grouped_subs.append(chain(is_room_open.subtask((args_sub_1, )), 
                        notify.subtask((args_sub_2, ), immutable=True)))

    res = group(grouped_subs).apply_async()

    res.join()         #[1]
    logger.info('Done checking orders at %s' % current_task.request.id))

@task(ignore_result=True)
def is_room_open(args_sub_1):
    #something time consuming
    if http_req_and_parse(args_sub_1):
        # go on and do the notify task
        return True
    else:
        # [2]
        # STOP THE CHAIN SOMEHOW! Don't execute the rest of the chain, how?
        # None of the following things work:
        # is_room_open.update_state(state='FAILURE')
        # raise celery.exceptions.Ignore()
        # raise Exception('spam', 'eggs')
        # current_task.request.callbacks[:] = []

@task(ignore_result=True)
def notify(args_sub_2):
    # something else time consuming, only do this if the first part of the chain 
    # passed a test (the chained tasks before this were 'successful'
    notify_user(args_sub_2)

Antworten auf die Frage(3)

Ihre Antwort auf die Frage