Я только что понял, что мы используем слегка устаревшую версию django-celery, какую версию вы используете?

запускается тестовый пример Django, он создает изолированную тестовую базу данных, поэтому запись в базу данных откатывается после завершения каждого теста. Я пытаюсь создать интеграционный тест с Celery, но не могу понять, как подключить Celery к этой эфемерной тестовой базе данных. В наивной настройке объекты, сохраненные в Django, невидимы для сельдерея, а объекты, сохраненные в сельдерее, сохраняются бесконечно.

Вот пример теста:

import json
from rest_framework.test import APITestCase
from myapp.models import MyModel
from myapp.util import get_result_from_response

class MyTestCase(APITestCase):
    @classmethod
    def setUpTestData(cls):
        # This object is not visible to Celery
        MyModel(id='test_object').save()

    def test_celery_integration(self):
        # This view spawns a Celery task
        # Task should see MyModel.objects.get(id='test_object'), but can't
        http_response = self.client.post('/', 'test_data', format='json')

        result = get_result_from_response(http_response)
        result.get()  # Wait for task to finish before ending test case
        # Objects saved by Celery task should be deleted, but persist

У меня есть два вопроса:

Как сделать так, чтобы Celery мог видеть объекты, которые тестирует Django?

Как сделать так, чтобы все объекты, сохраненные Celery, автоматически откатывались после завершения теста?

Я готов вручную очистить объекты, если это невозможно сделать автоматически, но удаление объектов вtearDown даже вAPISimpleTestCase похоже откатился назад.

Ответы на вопрос(2)

Ваш ответ на вопрос