Qual é a melhor maneira de obter contrapressão para o Cassandra Writes?

Eu tenho um serviço que consome mensagens de uma fila a uma taxa que eu controle. Faço algum processamento e, em seguida, tento gravar em um cluster Cassandra por meio do cliente Java Datastax. Eu configurei meu cluster Cassandra commaxRequestsPerConnection emaxConnectionsPerHost. No entanto, nos testes, descobri que, quando atingimaxConnectionsPerHost emaxRequestsPerConnection chama parasession.executeAsync não bloqueie.

O que estou fazendo agora é usar umnew Semaphore(maxConnectionsPerHost * maxRequestsPerConnection) e incrementando-o antes de cada solicitação assíncrona e diminuindo-o quando o futuro retornado porexecuteAsync completa. Isso funciona bem o suficiente, mas parece redundante, pois o driver já está rastreando solicitações e conexões internamente.

Alguém veio com uma solução melhor para esse problema?

Uma ressalva: gostaria que um pedido fosse considerado excepcional até que seja concluído. esteinclui novas tentativas! A situação em que estou recebendo falhas repetíveis no cluster (como tempos limite aguardando consistência) é a principal situação em que quero fazer uma contrapressão e parar de consumir mensagens da fila.

Problema:

// the rate at which I consume messages depends on how fast this method returns
processMessage(message) {
    // this appears to return immediately even if I have exhausted connections/requests
    session.executeAsync(preparedStatement.bind(...));
}

Solução atual:

constructor() {
    this.concurrentRequestsSemaphore = new Semaphore(maxConnectionsPerHost * maxRequestsPerConnection);
}

processMessage(message) {
    ResultSetFuture resultSetFuture = session.executeAsync(preparedStatement.bind(...));
    CompletableFuture<ResultSet> future = completableFromListenable(resultSetFuture);
    concurrentRequestsSemaphore.acquireUninterruptibly();
    future.whenComplete((result, exception) -> concurrentRequests.release());
}

Além disso, alguém pode ver algum problema óbvio com esta solução?

questionAnswers(2)

yourAnswerToTheQuestion