Celery Beat: ограничение до одного экземпляра задачи за раз

У меня есть стебель сельдерея и сельдерей (четыре рабочих), чтобы сделать несколько этапов обработки навалом. Одна из этих задач примерно такая же, какдля каждого Х, который неЯ создал Y, создай Y. "

Задача запускается периодически с полу-быстрой скоростью (10 секунд). Задача завершается очень быстро. Есть и другие задачи.

Мы сталкивались с проблемой несколько раз, когда задачи ритма, по-видимому, становились заблокированными, и поэтому одна и та же задача (из разных периодов ритма) выполнялась одновременно, вызывая неправильно дублированную работу. Также представляется, что задачи выполняются не по порядку.

Можно ли ограничить ритм сельдерея, чтобы обеспечить только один выдающийся экземпляр задачи за раз? Настраивает что-то вродеrate_limit=5 на задании "правильный" способ сделать это?

Можно ли обеспечить выполнение задач ритма, например, вместо отправки задачи, бит добавляет ее в цепочку задач?

Какие'лучший способ справиться с этим, если не считать, что эти задачи выполняются атомарно и безопасны для одновременного выполнения? Это не было ограничением, которое я ожидал от битовых заданий ...

Сама задача определена наивноVely:

@periodic_task(run_every=timedelta(seconds=10))
def add_y_to_xs():
    # Do things in a database
    return

Вот'Фактический (очищенный) журнал:

[00:00.000] foocorp.tasks.add_y_to_xs отправлено. id-># 1[00:00.001] Полученное задание: foocorp.tasks.add_y_to_xs [# 1][00:10.009] foocorp.tasks.add_y_to_xs отправлено. id-># 2[00:20.024] foocorp.tasks.add_y_to_xs отправлено. id-># 3[00:26.747] Полученное задание: foocorp.tasks.add_y_to_xs [# 2][00:26.748] TaskPool: Применить № 2[00:26.752] Полученное задание: foocorp.tasks.add_y_to_xs [# 3][00:26.769] Задание принято: foocorp.tasks.add_y_to_xs [# 2] pid: 26528[00:26.775] Задача foocorp.tasks.add_y_to_xs [# 2] выполнена успешно в 0.0197986490093s: нет[00:26.806] TaskPool: применить # 1[00:26.836] TaskPool: Применить № 3[01:30.020] Задание принято: foocorp.tasks.add_y_to_xs [# 1] pid: 26526[01:30.053] Задание принято: foocorp.tasks.add_y_to_xs [# 3] pid: 26529[01:30.055] foocorp.tasks.add_y_to_xs [# 1]: добавление Y для X id # 9725[01:30.070] foocorp.tasks.add_y_to_xs [# 3]: добавление Y для X id # 9725[01:30.074] Задача foocorp.tasks.add_y_to_xs [# 1] выполнена успешно в 0.0594762689434s: нет[01:30.087] Задача foocorp.tasks.add_y_to_xs [# 3] выполнена успешно в 0.0352867960464s: нет

Мы'В настоящее время используется Celery 3.1.4 с RabbitMQ в качестве транспорта.

РЕДАКТИРОВАТЬ Дэн, здесьЧто я придумал:

Дэн, здесьЧто я в итоге использовал:

from sqlalchemy import func
from sqlalchemy.exc import DBAPIError
from contextlib import contextmanager


def _psql_advisory_lock_blocking(conn, lock_id, shared, timeout):
    lock_fn = (func.pg_advisory_xact_lock_shared
               if shared else
               func.pg_advisory_xact_lock)
    if timeout:
        conn.execute(text('SET statement_timeout TO :timeout'),
                     timeout=timeout)
    try:
        conn.execute(select([lock_fn(lock_id)]))
    except DBAPIError:
        return False
    return True


def _psql_advisory_lock_nonblocking(conn, lock_id, shared):
    lock_fn = (func.pg_try_advisory_xact_lock_shared
               if shared else
               func.pg_try_advisory_xact_lock)
    return conn.execute(select([lock_fn(lock_id)])).scalar()


class DatabaseLockFailed(Exception):
    pass


@contextmanager
def db_lock(engine, name, shared=False, block=True, timeout=None):
    """
    Context manager which acquires a PSQL advisory transaction lock with a
    specified name.
    """
    lock_id = hash(name)

    with engine.begin() as conn, conn.begin():
        if block:
            locked = _psql_advisory_lock_blocking(conn, lock_id, shared,
                                                  timeout)
        else:
            locked = _psql_advisory_lock_nonblocking(conn, lock_id, shared)
        if not locked:
            raise DatabaseLockFailed()
        yield

И декоратор задачи сельдерея (используется только для периодических задач):

from functools import wraps
from preo.extensions import db


def locked(name=None, block=True, timeout='1s'):
    """
    Using a PostgreSQL advisory transaction lock, only runs this task if the
    lock is available. Otherwise logs a message and returns `None`.
    """
    def with_task(fn):
        lock_id = name or 'celery:{}.{}'.format(fn.__module__, fn.__name__)

        @wraps(fn)
        def f(*args, **kwargs):
            try:
                with db_lock(db.engine, name=lock_id, block=block,
                             timeout=timeout):
                    return fn(*args, **kwargs)
            except DatabaseLockFailed:
                logger.error('Failed to get lock.')
                return None
        return f
    return with_task

Ответы на вопрос(5)

Ваш ответ на вопрос