Я только что понял, что мы используем слегка устаревшую версию django-celery, какую версию вы используете?
запускается тестовый пример Django, он создает изолированную тестовую базу данных, поэтому запись в базу данных откатывается после завершения каждого теста. Я пытаюсь создать интеграционный тест с Celery, но не могу понять, как подключить Celery к этой эфемерной тестовой базе данных. В наивной настройке объекты, сохраненные в Django, невидимы для сельдерея, а объекты, сохраненные в сельдерее, сохраняются бесконечно.
Вот пример теста:
import json
from rest_framework.test import APITestCase
from myapp.models import MyModel
from myapp.util import get_result_from_response
class MyTestCase(APITestCase):
@classmethod
def setUpTestData(cls):
# This object is not visible to Celery
MyModel(id='test_object').save()
def test_celery_integration(self):
# This view spawns a Celery task
# Task should see MyModel.objects.get(id='test_object'), but can't
http_response = self.client.post('/', 'test_data', format='json')
result = get_result_from_response(http_response)
result.get() # Wait for task to finish before ending test case
# Objects saved by Celery task should be deleted, but persist
У меня есть два вопроса:
Как сделать так, чтобы Celery мог видеть объекты, которые тестирует Django?
Как сделать так, чтобы все объекты, сохраненные Celery, автоматически откатывались после завершения теста?
Я готов вручную очистить объекты, если это невозможно сделать автоматически, но удаление объектов вtearDown
даже вAPISimpleTestCase
похоже откатился назад.