Поток событий Akka Persistence Query и CQRS

Я пытаюсь реализовать сторону чтения в моей архитектуре ES-CQRS. Допустим, у меня есть постоянный актер, как это:

object UserWrite {

  sealed trait UserEvent
  sealed trait State
  case object Uninitialized extends State
  case class User(username: String, password: String) extends State
  case class AddUser(user: User)
  case class UserAdded(user: User) extends UserEvent
  case class UserEvents(userEvents: Source[(Long, UserEvent), NotUsed])
  case class UsersStream(fromSeqNo: Long)
  case object GetCurrentUser

  def props = Props(new UserWrite)
}

class UserWrite extends PersistentActor {

  import UserWrite._

  private var currentUser: State = Uninitialized

  override def persistenceId: String = "user-write"

  override def receiveRecover: Receive = {
    case UserAdded(user) => currentUser = user
  }

  override def receiveCommand: Receive = {
    case AddUser(user: User) => persist(UserAdded(user)) {
      case UserAdded(`user`) => currentUser = user
    }
    case UsersStream(fromSeqNo: Long) => publishUserEvents(fromSeqNo)
    case GetCurrentUser => sender() ! currentUser
  }

  def publishUserEvents(fromSeqNo: Long) = {
    val readJournal = PersistenceQuery(context.system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
    val userEvents = readJournal
      .eventsByPersistenceId("user-write", fromSeqNo, Long.MaxValue)
      .map { case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event }
    sender() ! UserEvents(userEvents)
  }
}

Насколько я понимаю, каждый раз, когда событие сохраняется, мы можем опубликовать его черезAkka Persistence Query, Теперь я не уверен, что будет правильным способом подписаться на эти события, чтобы я мог сохранить его в своей базе данных для чтения? Одна из идей состоит в том, чтобы изначально отправитьUsersStream сообщение от моего прочитанного стороннего актераUserWrite Актер и «утонуть» события в этом актере чтения.

РЕДАКТИРОВАТЬ

Следуя предложению @cmbaxter, я реализовал read сторону таким образом:

object UserRead {

  case object GetUsers
  case class GetUserByUsername(username: String)
  case class LastProcessedEventOffset(seqNo: Long)
  case object StreamCompleted

  def props = Props(new UserRead)
}

class UserRead extends PersistentActor {
  import UserRead._

  var inMemoryUsers = Set.empty[User]
  var offset        = 0L

  override val persistenceId: String = "user-read"

  override def receiveRecover: Receive = {
    // Recovery from snapshot will always give us last sequence number
    case SnapshotOffer(_, LastProcessedEventOffset(seqNo)) => offset = seqNo
    case RecoveryCompleted                                 => recoveryCompleted()
  }

  // After recovery is being completed, events will be projected to UserRead actor
  def recoveryCompleted(): Unit = {
    implicit val materializer = ActorMaterializer()
    PersistenceQuery(context.system)
      .readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
      .eventsByPersistenceId("user-write", offset + 1, Long.MaxValue)
      .map {
        case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event
      }
      .runWith(Sink.actorRef(self, StreamCompleted))
  }

  override def receiveCommand: Receive = {
    case GetUsers                    => sender() ! inMemoryUsers
    case GetUserByUsername(username) => sender() ! inMemoryUsers.find(_.username == username)
    // Match projected event and update offset
    case (seqNo: Long, UserAdded(user)) =>
      saveSnapshot(LastProcessedEventOffset(seqNo))
      inMemoryUsers += user
  }
}

Есть некоторые проблемы, такие как: Поток событий кажется медленным. То естьUserRead Актер может ответить с набором пользователей, прежде чем новый пользователь будет сохранен.

РЕДАКТИРОВАТЬ 2

Я увеличил интервал обновления журнала запросов кассандры, что более менее решило проблему с медленным потоком событий. Похоже, что журнал событий Cassandra по умолчанию опрашивается каждые 3 секунды. В моемapplication.conf Я добавил:

cassandra-query-journal {
  refresh-interval = 20ms
}

РЕДАКТИРОВАТЬ 3

На самом деле, не уменьшайте интервал обновления. Это увеличит использование памяти, но это не опасно. В целом концепция CQRS заключается в том, что сторона записи и чтения асинхронна. Поэтому после записи данные никогда не будут сразу доступны для чтения. Имеете дело с пользовательским интерфейсом? Я просто открываю поток и отправляю данные через отправленные сервером события после того, как сторона чтения подтверждает их.

Ответы на вопрос(1)

Ваш ответ на вопрос