Постоянный многопроцессорный общий кэш в Python с stdlib или минимальными зависимостями

Я только что попробовал Pythonоткладывать в долгий ящик модуль в качестве постоянного кэша для данных, извлекаемых из внешней службы.Полный пример здесь.

Мне было интересно, как лучше всего подойти, если я хочу сделать этот многопроцессный процесс безопасным? Я в курсе Redis, Memcached и тому подобное "реальные решения ", но я'Я хотел бы использовать только части стандартной библиотеки Python или очень минимальные зависимости, чтобы сохранить мой код компактным и не вносить ненужную сложность при запуске кода в одном процессе - однопоточной модели.

Это'Легко придумать решение с одним процессом, но это не очень хорошо работает во время работы Python. В частности, проблема заключается в том, что в среде Apache + mod_wsgi

Только один процесс обновляет кэшированные данные один раз (блокировка файлов, как-нибудь?)

Другие процессы используют кэшированные данные во время обновления

Если процессу не удается обновить кэшированные данные, штрафуется в течение N минут, прежде чем другой процесс может повторить попытку (чтобы предотвратитьгромоподобное стадо и такой) - как сигнализировать это между процессами mod_wsgi

Вы не используете "тяжелые инструменты " для этого только стандартные библиотеки Python и UNIX

Также, если какой-то пакет PyPi делает это без внешних зависимостей, сообщите мне об этом, пожалуйста. Альтернативные подходы и рекомендации, вроде "просто используйте sqlite " Добро пожаловать

Пример:

import datetime
import os
import shelve
import logging


logger = logging.getLogger(__name__)


class Converter:

    def __init__(self, fpath):
        self.last_updated = None
        self.data = None

        self.data = shelve.open(fpath)

        if os.path.exists(fpath):
            self.last_updated = datetime.datetime.fromtimestamp(os.path.getmtime(fpath))

    def convert(self, source, target, amount, update=True, determiner="24h_avg"):
        # Do something with cached data
        pass

    def is_up_to_date(self):
        if not self.last_updated:
            return False

        return datetime.datetime.now() < self.last_updated + self.refresh_delay

    def update(self):
        try:
            # Update data from the external server
            self.last_updated = datetime.datetime.now()
            self.data.sync()
        except Exception as e:
            logger.error("Could not refresh market data: %s %s", self.api_url, e)
            logger.exception(e)

Ответы на вопрос(3)

Ваш ответ на вопрос