Celery Beat: ограничение до одного экземпляра задачи за раз
У меня есть стебель сельдерея и сельдерей (четыре рабочих), чтобы сделать несколько этапов обработки навалом. Одна из этих задач примерно такая: «для каждого X, для которого не было создано Y, создайте Y».
Задача запускается периодически с полу-быстрой скоростью (10 секунд). Задача завершается очень быстро. Есть и другие задачи.
Я сталкивался с проблемой несколько раз, когда задачи ритма, по-видимому, становились незаполненными, и поэтому одна и та же задача (из разных периодов ритма) выполнялась одновременно, вызывая неправильно дублированную работу. Также представляется, что задачи выполняются не по порядку.
Можно ли ограничить ритм сельдерея, чтобы обеспечить только один выдающийся экземпляр задачи за раз? Настраивает что-то вродеrate_limit=5
на задании "правильный" способ сделать это?
Можно ли обеспечить выполнение задач ритма, например, вместо отправки задачи, бит добавляет ее в цепочку задач?
Как лучше всего справиться с этим, если не считать, что эти задачи выполняются атомарно и безопасны для одновременного выполнения? Это не было ограничением, которое я ожидал от битовых заданий ...
Сама задача определяется наивно:
@periodic_task(run_every=timedelta(seconds=10))
def add_y_to_xs():
# Do things in a database
return
Вот фактический (очищенный) журнал:
[00:00.000]
foocorp.tasks.add_y_to_xs отправлено. ID -> # 1[00:00.001]
Полученное задание: foocorp.tasks.add_y_to_xs [# 1][00:10.009]
foocorp.tasks.add_y_to_xs отправлено. ID -> # 2[00:20.024]
foocorp.tasks.add_y_to_xs отправлено. ID -> # 3[00:26.747]
Полученное задание: foocorp.tasks.add_y_to_xs [# 2][00:26.748]
TaskPool: Применить № 2[00:26.752]
Полученное задание: foocorp.tasks.add_y_to_xs [# 3][00:26.769]
Задание принято: foocorp.tasks.add_y_to_xs [# 2] pid: 26528[00:26.775]
Задача foocorp.tasks.add_y_to_xs [# 2] выполнена успешно в 0.0197986490093s: нет[00:26.806]
TaskPool: применить # 1[00:26.836]
TaskPool: Применить № 3[01:30.020]
Задание принято: foocorp.tasks.add_y_to_xs [# 1] pid: 26526[01:30.053]
Задание принято: foocorp.tasks.add_y_to_xs [# 3] pid: 26529[01:30.055]
foocorp.tasks.add_y_to_xs [# 1]: добавление Y для X id # 9725[01:30.070]
foocorp.tasks.add_y_to_xs [# 3]: добавление Y для X id # 9725[01:30.074]
Задача foocorp.tasks.add_y_to_xs [# 1] выполнена успешно в 0.0594762689434s: нет[01:30.087]
Задача foocorp.tasks.add_y_to_xs [# 3] выполнена успешно в 0.0352867960464s: нетВ настоящее время мы используем Celery 3.1.4 с RabbitMQ в качестве транспорта.
РЕДАКТИРОВАТЬ Дэн, вот что я придумал:
Дэн, вот что я в итоге использовал:
from sqlalchemy import func
from sqlalchemy.exc import DBAPIError
from contextlib import contextmanager
def _psql_advisory_lock_blocking(conn, lock_id, shared, timeout):
lock_fn = (func.pg_advisory_xact_lock_shared
if shared else
func.pg_advisory_xact_lock)
if timeout:
conn.execute(text('SET statement_timeout TO :timeout'),
timeout=timeout)
try:
conn.execute(select([lock_fn(lock_id)]))
except DBAPIError:
return False
return True
def _psql_advisory_lock_nonblocking(conn, lock_id, shared):
lock_fn = (func.pg_try_advisory_xact_lock_shared
if shared else
func.pg_try_advisory_xact_lock)
return conn.execute(select([lock_fn(lock_id)])).scalar()
class DatabaseLockFailed(Exception):
pass
@contextmanager
def db_lock(engine, name, shared=False, block=True, timeout=None):
"""
Context manager which acquires a PSQL advisory transaction lock with a
specified name.
"""
lock_id = hash(name)
with engine.begin() as conn, conn.begin():
if block:
locked = _psql_advisory_lock_blocking(conn, lock_id, shared,
timeout)
else:
locked = _psql_advisory_lock_nonblocking(conn, lock_id, shared)
if not locked:
raise DatabaseLockFailed()
yield
И декоратор задачи сельдерея (используется только для периодических задач):
from functools import wraps
from preo.extensions import db
def locked(name=None, block=True, timeout='1s'):
"""
Using a PostgreSQL advisory transaction lock, only runs this task if the
lock is available. Otherwise logs a message and returns `None`.
"""
def with_task(fn):
lock_id = name or 'celery:{}.{}'.format(fn.__module__, fn.__name__)
@wraps(fn)
def f(*args, **kwargs):
try:
with db_lock(db.engine, name=lock_id, block=block,
timeout=timeout):
return fn(*args, **kwargs)
except DatabaseLockFailed:
logger.error('Failed to get lock.')
return None
return f
return with_task