Какой лучший способ получить противодавление для Кассандры Пишет?

У меня есть служба, которая потребляет сообщения из очереди со скоростью, которую я контролирую. Я выполняю некоторую обработку, а затем пытаюсь выполнить запись в кластер Cassandra через Java-клиент Datastax. Я настроил свой кластер Cassandra сmaxRequestsPerConnection а такжеmaxConnectionsPerHost, Тем не менее, в тестировании я обнаружил, что когда я достигmaxConnectionsPerHost а такжеmaxRequestsPerConnection звонки вsession.executeAsync не блокируйте

То, что я делаю сейчас, используетnew Semaphore(maxConnectionsPerHost * maxRequestsPerConnection) и увеличивая его перед каждым асинхронным запросом и уменьшая его, когда будущее возвращаетсяexecuteAsync завершается. Это работает достаточно хорошо, но кажется избыточным, так как драйвер уже отслеживает запросы и соединения внутри.

Кто-нибудь придумал лучшее решение этой проблемы?

Одно предостережение: я хотел бы, чтобы запрос считался невыполненным, пока он не завершится. этовключает в себя повторные попытки! Ситуация, когда я получаю повторяющиеся сбои из кластера (такие как тайм-ауты в ожидании согласованности), является основной ситуацией, когда я хочу создать обратное давление и прекратить потребление сообщений из очереди.

Проблема:

// the rate at which I consume messages depends on how fast this method returns
processMessage(message) {
    // this appears to return immediately even if I have exhausted connections/requests
    session.executeAsync(preparedStatement.bind(...));
}

Текущее решение:

constructor() {
    this.concurrentRequestsSemaphore = new Semaphore(maxConnectionsPerHost * maxRequestsPerConnection);
}

processMessage(message) {
    ResultSetFuture resultSetFuture = session.executeAsync(preparedStatement.bind(...));
    CompletableFuture<ResultSet> future = completableFromListenable(resultSetFuture);
    concurrentRequestsSemaphore.acquireUninterruptibly();
    future.whenComplete((result, exception) -> concurrentRequests.release());
}

Кроме того, кто-нибудь может увидеть какие-либо очевидные проблемы с этим решением?

Ответы на вопрос(2)

Ваш ответ на вопрос