Coletar Observáveis em uma Lista não parece emitir a coleção de uma só vez
Estou usando o RxJava para coletar essencialmente a lista de observáveis emitidos individualmente e combiná-los em uma lista de observáveis (essencialmente o oposto do flatMap). Aqui está o meu código:
// myEvent.findMemberships() returns an Observable<List<Membership>>
myEvent.findMemberships()
.flatMap(new Func1<List<Membership>, Observable<User>>() {
@Override
public Observable<User> call(List<Membership> memberships) {
List<User> users = new ArrayList<User>();
for (Membership membership : memberships) {
users.add(membership.getUser());
}
return Observable.from(users);
}
})
.toList()
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<List<User>>() {
@Override
public void onCompleted() { }
@Override
public void onError(Throwable e) {
Timber.e(e, "Error when trying to get memberships");
}
@Override
public void onNext(List<User> users) {
Timber.d("%d users emitted", users.size());
}
})
Percebo que meu onNext nunca é chamado. Eu não consigo entender isso. Se eu remover a chamada .toList e basicamente emitir os usuários individuais (como mostrado abaixo), ele funcionará emitindo cada item.
subscriptions //
.add(currentEvent.findMemberships()
.flatMap(new Func1<List<Membership>, Observable<User>>() {
@Override
public Observable<User> call(List<Membership> memberships) {
List<User> users = new ArrayList<User>();
for (Membership membership : memberships) {
users.add(membership.getUser());
}
return Observable.from(users);
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<User>() {
@Override
public void onCompleted() { }
@Override
public void onError(Throwable e) {
Timber.e(e, "Error when trying to get memberships");
}
@Override
public void onNext(User user) {
Timber.d("%d users emitted", user.getName());
}
}));
Q1 O meu entendimento de .toList está incorreto?
Q2 Como se vasculha um fluxo de emissões emitidas individualmenteObservable<Object>
s em um únicoObservable<List<Object>>
?
** EDIT
@kjones acertou em cheio na questão. Eu não estava ligando para Concluir com minha chamada findMemberships. Adicionei o snippet de código abaixo. Meu caso de uso real foi um pouco mais complicado com várias transformações, e é por isso que eu precisava usar a chamada .toList. Como o @zsxwing também apontou, justamente neste caso de uso, um mapa simples será suficiente.
public Observable<List<Membership>> findMemberships() {
return Observable.create(new Observable.OnSubscribe<List<Membership>>() {
@Override
public void call(Subscriber<? super List<Membership>> subscriber) {
try {
// .....
List<Membership> memberships = queryMyDb();
subscriber.onNext(memberships);
// BELOW STATEMENT FIXES THE PROBLEM ><
// subscriber.onCompleted();
} catch (SQLException e) {
// ...
}
}
});
}