Secuencia de eventos de Akka Persistence Query y CQRS

Estoy tratando de implementar el lado de lectura en mi arquitectura ES-CQRS. Digamos que tengo un actor persistente como este:

object UserWrite {

  sealed trait UserEvent
  sealed trait State
  case object Uninitialized extends State
  case class User(username: String, password: String) extends State
  case class AddUser(user: User)
  case class UserAdded(user: User) extends UserEvent
  case class UserEvents(userEvents: Source[(Long, UserEvent), NotUsed])
  case class UsersStream(fromSeqNo: Long)
  case object GetCurrentUser

  def props = Props(new UserWrite)
}

class UserWrite extends PersistentActor {

  import UserWrite._

  private var currentUser: State = Uninitialized

  override def persistenceId: String = "user-write"

  override def receiveRecover: Receive = {
    case UserAdded(user) => currentUser = user
  }

  override def receiveCommand: Receive = {
    case AddUser(user: User) => persist(UserAdded(user)) {
      case UserAdded(`user`) => currentUser = user
    }
    case UsersStream(fromSeqNo: Long) => publishUserEvents(fromSeqNo)
    case GetCurrentUser => sender() ! currentUser
  }

  def publishUserEvents(fromSeqNo: Long) = {
    val readJournal = PersistenceQuery(context.system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
    val userEvents = readJournal
      .eventsByPersistenceId("user-write", fromSeqNo, Long.MaxValue)
      .map { case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event }
    sender() ! UserEvents(userEvents)
  }
}

Según tengo entendido, cada vez que el evento persiste, podemos publicarlo a través deAkka Persistence Query. Ahora, no estoy seguro de cuál sería una forma adecuada de suscribirse a estos eventos para poder conservarlo en mi base de datos del lado de lectura. Una de las ideas es enviar inicialmente unUsersStream mensaje de mi actor del lado de lectura aUserWrite actor y eventos de "hundimiento" en ese actor de lectura.

EDITAR

Siguiendo la sugerencia de @cmbaxter, implementé el lado de lectura de esta manera:

object UserRead {

  case object GetUsers
  case class GetUserByUsername(username: String)
  case class LastProcessedEventOffset(seqNo: Long)
  case object StreamCompleted

  def props = Props(new UserRead)
}

class UserRead extends PersistentActor {
  import UserRead._

  var inMemoryUsers = Set.empty[User]
  var offset        = 0L

  override val persistenceId: String = "user-read"

  override def receiveRecover: Receive = {
    // Recovery from snapshot will always give us last sequence number
    case SnapshotOffer(_, LastProcessedEventOffset(seqNo)) => offset = seqNo
    case RecoveryCompleted                                 => recoveryCompleted()
  }

  // After recovery is being completed, events will be projected to UserRead actor
  def recoveryCompleted(): Unit = {
    implicit val materializer = ActorMaterializer()
    PersistenceQuery(context.system)
      .readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
      .eventsByPersistenceId("user-write", offset + 1, Long.MaxValue)
      .map {
        case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event
      }
      .runWith(Sink.actorRef(self, StreamCompleted))
  }

  override def receiveCommand: Receive = {
    case GetUsers                    => sender() ! inMemoryUsers
    case GetUserByUsername(username) => sender() ! inMemoryUsers.find(_.username == username)
    // Match projected event and update offset
    case (seqNo: Long, UserAdded(user)) =>
      saveSnapshot(LastProcessedEventOffset(seqNo))
      inMemoryUsers += user
  }
}

Hay algunos problemas como: La secuencia de eventos parece ser lenta. Es decir.UserRead El actor puede responder con un conjunto de usuarios antes de que se guarde el usuario recién agregado.

EDITAR 2

Aumenté el intervalo de actualización del diario de consultas de cassandra, que resolvió un problema menos con la secuencia de eventos lenta. Parece que el diario de eventos de Cassandra es por defecto, sondeado cada 3 segundos. En miapplication.conf Yo añadí:

cassandra-query-journal {
  refresh-interval = 20ms
}

EDITAR 3

En realidad, no disminuya el intervalo de actualización. Eso aumentará el uso de memoria, pero eso no es peligroso, ni un punto. En general, el concepto de CQRS es que el lado de escritura y lectura son asíncronos. Por lo tanto, después de escribir datos, nunca estará disponible de inmediato para su lectura. ¿Tratando con la IU? Acabo de abrir la transmisión y enviar datos a través de eventos enviados por el servidor después de que el lado de lectura los reconozca.

Respuestas a la pregunta(2)

Su respuesta a la pregunta