Was ist der beste Weg, um Gegendruck für Cassandra Writes zu bekommen?

Ich habe einen Dienst, der Nachrichten mit einer von mir kontrollierten Rate aus einer Warteschlange verbraucht. Ich verarbeite etwas und versuche dann, über den Datastax Java-Client in einen Cassandra-Cluster zu schreiben. Ich habe meinen Cassandra-Cluster mit @ eingerichtemaxRequestsPerConnection undmaxConnectionsPerHost. Beim Testen habe ich jedoch festgestellt, dass, wenn ich @ erreicht hamaxConnectionsPerHost undmaxRequestsPerConnection Anrufe ansession.executeAsync nicht blockieren.

Was ich gerade tue, ist die Verwendung einesnew Semaphore(maxConnectionsPerHost * maxRequestsPerConnection) und vor jeder asynchronen Anforderung inkrementieren und dekrementieren, wenn die Zukunft von @ zurückgegeben wiexecuteAsync wird abgeschlossen. Dies funktioniert gut genug, scheint jedoch überflüssig zu sein, da der Treiber Anforderungen und Verbindungen bereits intern verfolgt.

Hat jemand eine bessere Lösung für dieses Problem gefunden?

Eine Einschränkung: Ich möchte, dass eine Anfrage als ausstehend betrachtet wird, bis sie abgeschlossen ist. Diesumfasst Wiederholungsversuche! Die Situation, in der ich wiederholbare Fehler vom Cluster erhalte (z. B. Zeitüberschreitungen beim Warten auf Konsistenz), ist die primäre Situation, in der ich den Gegendruck aufheben und keine Nachrichten aus der Warteschlange mehr verbrauchen möcht

Problem

// the rate at which I consume messages depends on how fast this method returns
processMessage(message) {
    // this appears to return immediately even if I have exhausted connections/requests
    session.executeAsync(preparedStatement.bind(...));
}

Aktuelle Lösung:

constructor() {
    this.concurrentRequestsSemaphore = new Semaphore(maxConnectionsPerHost * maxRequestsPerConnection);
}

processMessage(message) {
    ResultSetFuture resultSetFuture = session.executeAsync(preparedStatement.bind(...));
    CompletableFuture<ResultSet> future = completableFromListenable(resultSetFuture);
    concurrentRequestsSemaphore.acquireUninterruptibly();
    future.whenComplete((result, exception) -> concurrentRequests.release());
}

Auch kann jemand offensichtliche Probleme mit dieser Lösung sehen?

Antworten auf die Frage(4)

Ihre Antwort auf die Frage