Secuencia de eventos de Akka Persistence Query y CQRS
Estoy tratando de implementar el lado de lectura en mi arquitectura ES-CQRS. Digamos que tengo un actor persistente como este:
object UserWrite {
sealed trait UserEvent
sealed trait State
case object Uninitialized extends State
case class User(username: String, password: String) extends State
case class AddUser(user: User)
case class UserAdded(user: User) extends UserEvent
case class UserEvents(userEvents: Source[(Long, UserEvent), NotUsed])
case class UsersStream(fromSeqNo: Long)
case object GetCurrentUser
def props = Props(new UserWrite)
}
class UserWrite extends PersistentActor {
import UserWrite._
private var currentUser: State = Uninitialized
override def persistenceId: String = "user-write"
override def receiveRecover: Receive = {
case UserAdded(user) => currentUser = user
}
override def receiveCommand: Receive = {
case AddUser(user: User) => persist(UserAdded(user)) {
case UserAdded(`user`) => currentUser = user
}
case UsersStream(fromSeqNo: Long) => publishUserEvents(fromSeqNo)
case GetCurrentUser => sender() ! currentUser
}
def publishUserEvents(fromSeqNo: Long) = {
val readJournal = PersistenceQuery(context.system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
val userEvents = readJournal
.eventsByPersistenceId("user-write", fromSeqNo, Long.MaxValue)
.map { case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event }
sender() ! UserEvents(userEvents)
}
}
Según tengo entendido, cada vez que el evento persiste, podemos publicarlo a través deAkka Persistence Query
. Ahora, no estoy seguro de cuál sería una forma adecuada de suscribirse a estos eventos para poder conservarlo en mi base de datos del lado de lectura. Una de las ideas es enviar inicialmente unUsersStream
mensaje de mi actor del lado de lectura aUserWrite
actor y eventos de "hundimiento" en ese actor de lectura.
EDITAR
Siguiendo la sugerencia de @cmbaxter, implementé el lado de lectura de esta manera:
object UserRead {
case object GetUsers
case class GetUserByUsername(username: String)
case class LastProcessedEventOffset(seqNo: Long)
case object StreamCompleted
def props = Props(new UserRead)
}
class UserRead extends PersistentActor {
import UserRead._
var inMemoryUsers = Set.empty[User]
var offset = 0L
override val persistenceId: String = "user-read"
override def receiveRecover: Receive = {
// Recovery from snapshot will always give us last sequence number
case SnapshotOffer(_, LastProcessedEventOffset(seqNo)) => offset = seqNo
case RecoveryCompleted => recoveryCompleted()
}
// After recovery is being completed, events will be projected to UserRead actor
def recoveryCompleted(): Unit = {
implicit val materializer = ActorMaterializer()
PersistenceQuery(context.system)
.readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
.eventsByPersistenceId("user-write", offset + 1, Long.MaxValue)
.map {
case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event
}
.runWith(Sink.actorRef(self, StreamCompleted))
}
override def receiveCommand: Receive = {
case GetUsers => sender() ! inMemoryUsers
case GetUserByUsername(username) => sender() ! inMemoryUsers.find(_.username == username)
// Match projected event and update offset
case (seqNo: Long, UserAdded(user)) =>
saveSnapshot(LastProcessedEventOffset(seqNo))
inMemoryUsers += user
}
}
Hay algunos problemas como: La secuencia de eventos parece ser lenta. Es decir.UserRead
El actor puede responder con un conjunto de usuarios antes de que se guarde el usuario recién agregado.
EDITAR 2
Aumenté el intervalo de actualización del diario de consultas de cassandra, que resolvió un problema menos con la secuencia de eventos lenta. Parece que el diario de eventos de Cassandra es por defecto, sondeado cada 3 segundos. En miapplication.conf
Yo añadí:
cassandra-query-journal {
refresh-interval = 20ms
}
EDITAR 3
En realidad, no disminuya el intervalo de actualización. Eso aumentará el uso de memoria, pero eso no es peligroso, ni un punto. En general, el concepto de CQRS es que el lado de escritura y lectura son asíncronos. Por lo tanto, después de escribir datos, nunca estará disponible de inmediato para su lectura. ¿Tratando con la IU? Acabo de abrir la transmisión y enviar datos a través de eventos enviados por el servidor después de que el lado de lectura los reconozca.