Какой лучший способ получить противодавление для Кассандры Пишет?
У меня есть служба, которая потребляет сообщения из очереди со скоростью, которую я контролирую. Я выполняю некоторую обработку, а затем пытаюсь выполнить запись в кластер Cassandra через Java-клиент Datastax. Я настроил свой кластер Cassandra сmaxRequestsPerConnection
а такжеmaxConnectionsPerHost
, Тем не менее, в тестировании я обнаружил, что когда я достигmaxConnectionsPerHost
а такжеmaxRequestsPerConnection
звонки вsession.executeAsync
не блокируйте
То, что я делаю сейчас, используетnew Semaphore(maxConnectionsPerHost * maxRequestsPerConnection)
и увеличивая его перед каждым асинхронным запросом и уменьшая его, когда будущее возвращаетсяexecuteAsync
завершается. Это работает достаточно хорошо, но кажется избыточным, так как драйвер уже отслеживает запросы и соединения внутри.
Кто-нибудь придумал лучшее решение этой проблемы?
Одно предостережение: я хотел бы, чтобы запрос считался невыполненным, пока он не завершится. этовключает в себя повторные попытки! Ситуация, когда я получаю повторяющиеся сбои из кластера (такие как тайм-ауты в ожидании согласованности), является основной ситуацией, когда я хочу создать обратное давление и прекратить потребление сообщений из очереди.
Проблема:
// the rate at which I consume messages depends on how fast this method returns
processMessage(message) {
// this appears to return immediately even if I have exhausted connections/requests
session.executeAsync(preparedStatement.bind(...));
}
Текущее решение:
constructor() {
this.concurrentRequestsSemaphore = new Semaphore(maxConnectionsPerHost * maxRequestsPerConnection);
}
processMessage(message) {
ResultSetFuture resultSetFuture = session.executeAsync(preparedStatement.bind(...));
CompletableFuture<ResultSet> future = completableFromListenable(resultSetFuture);
concurrentRequestsSemaphore.acquireUninterruptibly();
future.whenComplete((result, exception) -> concurrentRequests.release());
}
Кроме того, кто-нибудь может увидеть какие-либо очевидные проблемы с этим решением?