Akka Persistence Query Event Stream und CQRS
Ich versuche, die Leseseite in meine ES-CQRS-Architektur zu implementieren. Nehmen wir an, ich habe einen hartnäckigen Schauspieler wie diesen:
object UserWrite {
sealed trait UserEvent
sealed trait State
case object Uninitialized extends State
case class User(username: String, password: String) extends State
case class AddUser(user: User)
case class UserAdded(user: User) extends UserEvent
case class UserEvents(userEvents: Source[(Long, UserEvent), NotUsed])
case class UsersStream(fromSeqNo: Long)
case object GetCurrentUser
def props = Props(new UserWrite)
}
class UserWrite extends PersistentActor {
import UserWrite._
private var currentUser: State = Uninitialized
override def persistenceId: String = "user-write"
override def receiveRecover: Receive = {
case UserAdded(user) => currentUser = user
}
override def receiveCommand: Receive = {
case AddUser(user: User) => persist(UserAdded(user)) {
case UserAdded(`user`) => currentUser = user
}
case UsersStream(fromSeqNo: Long) => publishUserEvents(fromSeqNo)
case GetCurrentUser => sender() ! currentUser
}
def publishUserEvents(fromSeqNo: Long) = {
val readJournal = PersistenceQuery(context.system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
val userEvents = readJournal
.eventsByPersistenceId("user-write", fromSeqNo, Long.MaxValue)
.map { case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event }
sender() ! UserEvents(userEvents)
}
}
oweit ich weiß, können wir jedes Mal, wenn das Ereignis andauert, es über @ veröffentlicheAkka Persistence Query
. Ich bin mir nicht sicher, wie ich diese Ereignisse ordnungsgemäß abonnieren kann, damit ich sie in meiner Datenbank auf der Leseseite beibehalten kann. Eine der Ideen ist, zunächst ein @ zu sendUsersStream
Nachricht von meinem Leseseitenschauspieler anUserWrite
actor und "sink" -Ereignisse in diesem gelesenen Schauspieler.
BEARBEITE
Folgend dem Vorschlag von @cmbaxter habe ich die Leseseite folgendermaßen implementiert:
object UserRead {
case object GetUsers
case class GetUserByUsername(username: String)
case class LastProcessedEventOffset(seqNo: Long)
case object StreamCompleted
def props = Props(new UserRead)
}
class UserRead extends PersistentActor {
import UserRead._
var inMemoryUsers = Set.empty[User]
var offset = 0L
override val persistenceId: String = "user-read"
override def receiveRecover: Receive = {
// Recovery from snapshot will always give us last sequence number
case SnapshotOffer(_, LastProcessedEventOffset(seqNo)) => offset = seqNo
case RecoveryCompleted => recoveryCompleted()
}
// After recovery is being completed, events will be projected to UserRead actor
def recoveryCompleted(): Unit = {
implicit val materializer = ActorMaterializer()
PersistenceQuery(context.system)
.readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
.eventsByPersistenceId("user-write", offset + 1, Long.MaxValue)
.map {
case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event
}
.runWith(Sink.actorRef(self, StreamCompleted))
}
override def receiveCommand: Receive = {
case GetUsers => sender() ! inMemoryUsers
case GetUserByUsername(username) => sender() ! inMemoryUsers.find(_.username == username)
// Match projected event and update offset
case (seqNo: Long, UserAdded(user)) =>
saveSnapshot(LastProcessedEventOffset(seqNo))
inMemoryUsers += user
}
}
Es gibt einige Probleme wie: Der Ereignisstrom scheint langsam zu sein. Das heißtUserRead
actor kann mit mehreren Benutzern antworten, bevor der neu hinzugefügte Benutzer gespeichert wird.
EDIT 2
Ich habe das Aktualisierungsintervall des Cassandra-Abfragejournals erhöht, wodurch das Problem mit dem langsamen Ereignisstrom weniger gelöst wurde. Es scheint, dass das Cassandra-Ereignisjournal standardmäßig alle 3 Sekunden abgefragt wird. In meinemapplication.conf
Ich fügte hinzu
cassandra-query-journal {
refresh-interval = 20ms
}
EDIT 3
Verringern Sie das Aktualisierungsintervall nicht. Das erhöht die Speichernutzung, aber das ist weder gefährlich noch ein Punkt. Im Allgemeinen ist das Konzept von CQRS, dass Schreib- und Leseseite asynchron sind. Nach dem Schreiben stehen die Daten daher niemals sofort zum Lesen zur Verfügung. Umgang mit der Benutzeroberfläche? Ich öffne einfach den Stream und schicke Daten über vom Server gesendete Ereignisse, nachdem die Leseseite sie bestätigt hat.