Celery Beat: ограничение до одного экземпляра задачи за раз
У меня есть стебель сельдерея и сельдерей (четыре рабочих), чтобы сделать несколько этапов обработки навалом. Одна из этих задач примерно такая же, какдля каждого Х, который неЯ создал Y, создай Y. "
Задача запускается периодически с полу-быстрой скоростью (10 секунд). Задача завершается очень быстро. Есть и другие задачи.
Мы сталкивались с проблемой несколько раз, когда задачи ритма, по-видимому, становились заблокированными, и поэтому одна и та же задача (из разных периодов ритма) выполнялась одновременно, вызывая неправильно дублированную работу. Также представляется, что задачи выполняются не по порядку.
Можно ли ограничить ритм сельдерея, чтобы обеспечить только один выдающийся экземпляр задачи за раз? Настраивает что-то вродеrate_limit=5
на задании "правильный" способ сделать это?
Можно ли обеспечить выполнение задач ритма, например, вместо отправки задачи, бит добавляет ее в цепочку задач?
Какие'лучший способ справиться с этим, если не считать, что эти задачи выполняются атомарно и безопасны для одновременного выполнения? Это не было ограничением, которое я ожидал от битовых заданий ...
Сама задача определена наивноVely:
@periodic_task(run_every=timedelta(seconds=10))
def add_y_to_xs():
# Do things in a database
return
Вот'Фактический (очищенный) журнал:
[00:00.000]
foocorp.tasks.add_y_to_xs отправлено. id-># 1[00:00.001]
Полученное задание: foocorp.tasks.add_y_to_xs [# 1][00:10.009]
foocorp.tasks.add_y_to_xs отправлено. id-># 2[00:20.024]
foocorp.tasks.add_y_to_xs отправлено. id-># 3[00:26.747]
Полученное задание: foocorp.tasks.add_y_to_xs [# 2][00:26.748]
TaskPool: Применить № 2[00:26.752]
Полученное задание: foocorp.tasks.add_y_to_xs [# 3][00:26.769]
Задание принято: foocorp.tasks.add_y_to_xs [# 2] pid: 26528[00:26.775]
Задача foocorp.tasks.add_y_to_xs [# 2] выполнена успешно в 0.0197986490093s: нет[00:26.806]
TaskPool: применить # 1[00:26.836]
TaskPool: Применить № 3[01:30.020]
Задание принято: foocorp.tasks.add_y_to_xs [# 1] pid: 26526[01:30.053]
Задание принято: foocorp.tasks.add_y_to_xs [# 3] pid: 26529[01:30.055]
foocorp.tasks.add_y_to_xs [# 1]: добавление Y для X id # 9725[01:30.070]
foocorp.tasks.add_y_to_xs [# 3]: добавление Y для X id # 9725[01:30.074]
Задача foocorp.tasks.add_y_to_xs [# 1] выполнена успешно в 0.0594762689434s: нет[01:30.087]
Задача foocorp.tasks.add_y_to_xs [# 3] выполнена успешно в 0.0352867960464s: нетМы'В настоящее время используется Celery 3.1.4 с RabbitMQ в качестве транспорта.
РЕДАКТИРОВАТЬ Дэн, здесьЧто я придумал:
Дэн, здесьЧто я в итоге использовал:
from sqlalchemy import func
from sqlalchemy.exc import DBAPIError
from contextlib import contextmanager
def _psql_advisory_lock_blocking(conn, lock_id, shared, timeout):
lock_fn = (func.pg_advisory_xact_lock_shared
if shared else
func.pg_advisory_xact_lock)
if timeout:
conn.execute(text('SET statement_timeout TO :timeout'),
timeout=timeout)
try:
conn.execute(select([lock_fn(lock_id)]))
except DBAPIError:
return False
return True
def _psql_advisory_lock_nonblocking(conn, lock_id, shared):
lock_fn = (func.pg_try_advisory_xact_lock_shared
if shared else
func.pg_try_advisory_xact_lock)
return conn.execute(select([lock_fn(lock_id)])).scalar()
class DatabaseLockFailed(Exception):
pass
@contextmanager
def db_lock(engine, name, shared=False, block=True, timeout=None):
"""
Context manager which acquires a PSQL advisory transaction lock with a
specified name.
"""
lock_id = hash(name)
with engine.begin() as conn, conn.begin():
if block:
locked = _psql_advisory_lock_blocking(conn, lock_id, shared,
timeout)
else:
locked = _psql_advisory_lock_nonblocking(conn, lock_id, shared)
if not locked:
raise DatabaseLockFailed()
yield
И декоратор задачи сельдерея (используется только для периодических задач):
from functools import wraps
from preo.extensions import db
def locked(name=None, block=True, timeout='1s'):
"""
Using a PostgreSQL advisory transaction lock, only runs this task if the
lock is available. Otherwise logs a message and returns `None`.
"""
def with_task(fn):
lock_id = name or 'celery:{}.{}'.format(fn.__module__, fn.__name__)
@wraps(fn)
def f(*args, **kwargs):
try:
with db_lock(db.engine, name=lock_id, block=block,
timeout=timeout):
return fn(*args, **kwargs)
except DatabaseLockFailed:
logger.error('Failed to get lock.')
return None
return f
return with_task