Akka Persistence Query Event Stream und CQRS

Ich versuche, die Leseseite in meine ES-CQRS-Architektur zu implementieren. Nehmen wir an, ich habe einen hartnäckigen Schauspieler wie diesen:

object UserWrite {

  sealed trait UserEvent
  sealed trait State
  case object Uninitialized extends State
  case class User(username: String, password: String) extends State
  case class AddUser(user: User)
  case class UserAdded(user: User) extends UserEvent
  case class UserEvents(userEvents: Source[(Long, UserEvent), NotUsed])
  case class UsersStream(fromSeqNo: Long)
  case object GetCurrentUser

  def props = Props(new UserWrite)
}

class UserWrite extends PersistentActor {

  import UserWrite._

  private var currentUser: State = Uninitialized

  override def persistenceId: String = "user-write"

  override def receiveRecover: Receive = {
    case UserAdded(user) => currentUser = user
  }

  override def receiveCommand: Receive = {
    case AddUser(user: User) => persist(UserAdded(user)) {
      case UserAdded(`user`) => currentUser = user
    }
    case UsersStream(fromSeqNo: Long) => publishUserEvents(fromSeqNo)
    case GetCurrentUser => sender() ! currentUser
  }

  def publishUserEvents(fromSeqNo: Long) = {
    val readJournal = PersistenceQuery(context.system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
    val userEvents = readJournal
      .eventsByPersistenceId("user-write", fromSeqNo, Long.MaxValue)
      .map { case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event }
    sender() ! UserEvents(userEvents)
  }
}

oweit ich weiß, können wir jedes Mal, wenn das Ereignis andauert, es über @ veröffentlicheAkka Persistence Query. Ich bin mir nicht sicher, wie ich diese Ereignisse ordnungsgemäß abonnieren kann, damit ich sie in meiner Datenbank auf der Leseseite beibehalten kann. Eine der Ideen ist, zunächst ein @ zu sendUsersStream Nachricht von meinem Leseseitenschauspieler anUserWrite actor und "sink" -Ereignisse in diesem gelesenen Schauspieler.

BEARBEITE

Folgend dem Vorschlag von @cmbaxter habe ich die Leseseite folgendermaßen implementiert:

object UserRead {

  case object GetUsers
  case class GetUserByUsername(username: String)
  case class LastProcessedEventOffset(seqNo: Long)
  case object StreamCompleted

  def props = Props(new UserRead)
}

class UserRead extends PersistentActor {
  import UserRead._

  var inMemoryUsers = Set.empty[User]
  var offset        = 0L

  override val persistenceId: String = "user-read"

  override def receiveRecover: Receive = {
    // Recovery from snapshot will always give us last sequence number
    case SnapshotOffer(_, LastProcessedEventOffset(seqNo)) => offset = seqNo
    case RecoveryCompleted                                 => recoveryCompleted()
  }

  // After recovery is being completed, events will be projected to UserRead actor
  def recoveryCompleted(): Unit = {
    implicit val materializer = ActorMaterializer()
    PersistenceQuery(context.system)
      .readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
      .eventsByPersistenceId("user-write", offset + 1, Long.MaxValue)
      .map {
        case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event
      }
      .runWith(Sink.actorRef(self, StreamCompleted))
  }

  override def receiveCommand: Receive = {
    case GetUsers                    => sender() ! inMemoryUsers
    case GetUserByUsername(username) => sender() ! inMemoryUsers.find(_.username == username)
    // Match projected event and update offset
    case (seqNo: Long, UserAdded(user)) =>
      saveSnapshot(LastProcessedEventOffset(seqNo))
      inMemoryUsers += user
  }
}

Es gibt einige Probleme wie: Der Ereignisstrom scheint langsam zu sein. Das heißtUserRead actor kann mit mehreren Benutzern antworten, bevor der neu hinzugefügte Benutzer gespeichert wird.

EDIT 2

Ich habe das Aktualisierungsintervall des Cassandra-Abfragejournals erhöht, wodurch das Problem mit dem langsamen Ereignisstrom weniger gelöst wurde. Es scheint, dass das Cassandra-Ereignisjournal standardmäßig alle 3 Sekunden abgefragt wird. In meinemapplication.conf Ich fügte hinzu

cassandra-query-journal {
  refresh-interval = 20ms
}

EDIT 3

Verringern Sie das Aktualisierungsintervall nicht. Das erhöht die Speichernutzung, aber das ist weder gefährlich noch ein Punkt. Im Allgemeinen ist das Konzept von CQRS, dass Schreib- und Leseseite asynchron sind. Nach dem Schreiben stehen die Daten daher niemals sofort zum Lesen zur Verfügung. Umgang mit der Benutzeroberfläche? Ich öffne einfach den Stream und schicke Daten über vom Server gesendete Ereignisse, nachdem die Leseseite sie bestätigt hat.

Antworten auf die Frage(4)

Ihre Antwort auf die Frage