as Sammeln von Observables in einer Liste scheint die Sammlung nicht sofort zu sende

Ich benutze RxJava, um im Wesentlichen die Liste der einzeln ausgegebenen Observables zu sammeln und zu einer Liste der Observables zu kombinieren (im Wesentlichen das Gegenteil von flatMap). Hier ist mein Code:

        // myEvent.findMemberships() returns an Observable<List<Membership>>

        myEvent.findMemberships()
             .flatMap(new Func1<List<Membership>, Observable<User>>() {
               @Override
               public Observable<User> call(List<Membership> memberships) {
                 List<User> users = new ArrayList<User>();
                 for (Membership membership : memberships) {
                   users.add(membership.getUser());
                 }
                 return Observable.from(users);
               }
             })
             .toList()
             .subscribeOn(Schedulers.newThread())
             .observeOn(AndroidSchedulers.mainThread())
             .subscribe(new Observer<List<User>>() {
               @Override
               public void onCompleted() { }

               @Override
               public void onError(Throwable e) {
                 Timber.e(e, "Error when trying to get memberships");
               }

               @Override
               public void onNext(List<User> users) {
                 Timber.d("%d users emitted", users.size());
               }
             })

Ich stelle fest, dass mein onNext niemals aufgerufen wird. Ich kann das nicht verstehen. Wenn ich den Aufruf ".toList" entferne und im Grunde genommen die einzelnen Benutzer ausgebe (siehe unten), wird jedes Element ausgegeben.

subscriptions //
    .add(currentEvent.findMemberships()
             .flatMap(new Func1<List<Membership>, Observable<User>>() {
               @Override
               public Observable<User> call(List<Membership> memberships) {
                 List<User> users = new ArrayList<User>();
                 for (Membership membership : memberships) {
                   users.add(membership.getUser());
                 }
                 return Observable.from(users);
               }
             })
             .subscribeOn(Schedulers.newThread())
             .observeOn(AndroidSchedulers.mainThread())
             .subscribe(new Observer<User>() {
               @Override
               public void onCompleted() { }

               @Override
               public void onError(Throwable e) {
                 Timber.e(e, "Error when trying to get memberships");
               }

               @Override
               public void onNext(User user) {
                 Timber.d("%d users emitted", user.getName());
               }
             }));

Q1. Ist mein Verständnis von .toList falsch?

Q2. Wie kämmt man einen Strom von einzeln emittiertenObservable<Object>s in ein einzelnesObservable<List<Object>> ?

** EDIT

@ kjones hat das Problem vollkommen gelöst. Ich habe mit meinem findMemberships-Aufruf nicht aufComplete angerufen. Ich habe das Code-Snippet unten hinzugefügt. Mein wirklicher Anwendungsfall war ein wenig komplizierter, und ich musste einige Transformationen durchführen, weshalb ich den Aufruf .toList verwenden musste. Wie @zsxwing auch zu Recht hervorhob, reicht für diesen Anwendungsfall eine einfache Karte aus.

public Observable<List<Membership>> findMemberships() {
return Observable.create(new Observable.OnSubscribe<List<Membership>>() {
  @Override
  public void call(Subscriber<? super List<Membership>> subscriber) {
    try {
      // .....
      List<Membership> memberships = queryMyDb();

      subscriber.onNext(memberships);

      // BELOW STATEMENT FIXES THE PROBLEM ><
      // subscriber.onCompleted();

    } catch (SQLException e) {
      // ...
    }
  }
});

}

Antworten auf die Frage(1)

Ihre Antwort auf die Frage