Qual é a melhor maneira de obter contrapressão para o Cassandra Writes?
Eu tenho um serviço que consome mensagens de uma fila a uma taxa que eu controle. Faço algum processamento e, em seguida, tento gravar em um cluster Cassandra por meio do cliente Java Datastax. Eu configurei meu cluster Cassandra commaxRequestsPerConnection
emaxConnectionsPerHost
. No entanto, nos testes, descobri que, quando atingimaxConnectionsPerHost
emaxRequestsPerConnection
chama parasession.executeAsync
não bloqueie.
O que estou fazendo agora é usar umnew Semaphore(maxConnectionsPerHost * maxRequestsPerConnection)
e incrementando-o antes de cada solicitação assíncrona e diminuindo-o quando o futuro retornado porexecuteAsync
completa. Isso funciona bem o suficiente, mas parece redundante, pois o driver já está rastreando solicitações e conexões internamente.
Alguém veio com uma solução melhor para esse problema?
Uma ressalva: gostaria que um pedido fosse considerado excepcional até que seja concluído. esteinclui novas tentativas! A situação em que estou recebendo falhas repetíveis no cluster (como tempos limite aguardando consistência) é a principal situação em que quero fazer uma contrapressão e parar de consumir mensagens da fila.
Problema:
// the rate at which I consume messages depends on how fast this method returns
processMessage(message) {
// this appears to return immediately even if I have exhausted connections/requests
session.executeAsync(preparedStatement.bind(...));
}
Solução atual:
constructor() {
this.concurrentRequestsSemaphore = new Semaphore(maxConnectionsPerHost * maxRequestsPerConnection);
}
processMessage(message) {
ResultSetFuture resultSetFuture = session.executeAsync(preparedStatement.bind(...));
CompletableFuture<ResultSet> future = completableFromListenable(resultSetFuture);
concurrentRequestsSemaphore.acquireUninterruptibly();
future.whenComplete((result, exception) -> concurrentRequests.release());
}
Além disso, alguém pode ver algum problema óbvio com esta solução?