Сбор Observables в Список, кажется, не испускает коллекцию сразу
Я использую RxJava, чтобы собрать список наблюдаемых по отдельности и объединить их в список наблюдаемых (по сути, в противоположность flatMap). Вот мой код:
// myEvent.findMemberships() returns an Observable<List<Membership>>
myEvent.findMemberships()
.flatMap(new Func1<List<Membership>, Observable<User>>() {
@Override
public Observable<User> call(List<Membership> memberships) {
List<User> users = new ArrayList<User>();
for (Membership membership : memberships) {
users.add(membership.getUser());
}
return Observable.from(users);
}
})
.toList()
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<List<User>>() {
@Override
public void onCompleted() { }
@Override
public void onError(Throwable e) {
Timber.e(e, "Error when trying to get memberships");
}
@Override
public void onNext(List<User> users) {
Timber.d("%d users emitted", users.size());
}
})
Я замечаю, что мой onNext никогда не вызывается. Я не могу понять это. Если я удаляю вызов .toList и в основном выводю отдельных пользователей (как показано ниже), он работает, испуская каждый элемент.
subscriptions //
.add(currentEvent.findMemberships()
.flatMap(new Func1<List<Membership>, Observable<User>>() {
@Override
public Observable<User> call(List<Membership> memberships) {
List<User> users = new ArrayList<User>();
for (Membership membership : memberships) {
users.add(membership.getUser());
}
return Observable.from(users);
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<User>() {
@Override
public void onCompleted() { }
@Override
public void onError(Throwable e) {
Timber.e(e, "Error when trying to get memberships");
}
@Override
public void onNext(User user) {
Timber.d("%d users emitted", user.getName());
}
}));
Q1. Мое понимание .toList неверно?
Q2. Как можно расчесывать поток индивидуально излучаемыхObservable<Object>
в одинObservable<List<Object>>
?
** РЕДАКТИРОВАТЬ
@kjones полностью прибили проблему. Я не звонил onComplete с моим вызовом findMemberships. Я добавил фрагмент кода ниже. Мой реальный пример использования был немного более запутанным с кучей дополнительных преобразований, поэтому мне нужно было использовать вызов .toList. Как правильно заметил @zsxwing, для этого случая использования достаточно простой карты.
public Observable<List<Membership>> findMemberships() {
return Observable.create(new Observable.OnSubscribe<List<Membership>>() {
@Override
public void call(Subscriber<? super List<Membership>> subscriber) {
try {
// .....
List<Membership> memberships = queryMyDb();
subscriber.onNext(memberships);
// BELOW STATEMENT FIXES THE PROBLEM ><
// subscriber.onCompleted();
} catch (SQLException e) {
// ...
}
}
});
}