Asynchrone RestAPIs mit RxJava / Jersey2. Fragen zum Thema?

Wir sind dabei, ein REST-API mithilfe reaktiver Programmierung zu prototypisieren. Wie im Diagramm gezeigt, behalten wir 3 Ebenen bei, die wir in unseren vorherigen Sync-API-Designs verwendet habe

http: //oi59.tinypic.com/339hhki.jp

API Layer implementiert mit Jersey2, das die Anforderung / Deserialisierung von JSON und die Übergabe an Service Layer verarbeitet.Service Layer, der die Business-Logik implementiert. Implementiert mit reaktiver Programmierung (RxJava)Dao Layer, der von Service Layer für Persistenzoperationen verwendet wird. Da CouchBase verwendet wird, wird CouchBase RxClient verwendet.

ach meinem Verständnis ist der Ablauf wie folg

einenn eine HTTP-Anforderung eingeht, verarbeitet Jersery das Anforderungsmodell "request / parse JSON / deserialize" in einem RequestThread aus dem "Container-Thread-Pool".

b) Wenn Jersey2 Async unterstützt wird, kehrt RequestThread zum Container-Thread-Pool zurück und der Service-Layer wird im Scheduler Schedulers.computation () ausgeführt.

@Path("/resource")
public class AsyncUserResource {
    @GET
    public void asyncGet(@Suspended final AsyncResponse asyncResponse) {
 
       Observable<User> user = userService.getUser(...); //this is executed using Schedulers.computation() inside Service implementation


       user.subscribe(new Observer<User>() {

            @Override
            public void onCompleted() { 

            }

            @Override
            public void onError(Throwable e) {
                //handle error using ExceptionMappers

            }

            @Override
            public void onNext(User user) {
               asyncResponse.resume(user); 

            }});
    }        


}

c) Alle E / A-Vorgänge in DAOs verwenden Schedulers.io (), um diese Vorgänge mit langer Verarbeitungsdauer in einem separaten Thread auszuführen.

Meine Fragen sind:

Wenn DAOs / Services implementiert werden, sollte ich die verwendeten Zeitpläne (Threading) in der Implementierung ausblenden.

eg Dao:

public interface UserDao {
  public Observable<User> getUser();
}

Bei der Implementierung empfiehlt es sich, den Zeitplan wie folgt anzugeben;

public Observable<User> getUser() {

        Observable<User> ret = Observable.create((subscriber)->{
            try {

                 //Do DB call
                 User u = null;
                 subscriber.onNext(u);
                 subscriber.onCompleted();

            }catch (Exception e) {
                subscriber.onError(e);  
            }

        });
        return ret.subscribeOn(Schedulers.io());
}

Oder ist es besser, einfach das Observable zurückzugeben, und die obere Schicht verwendet dementsprechend ein bestimmtes Schedular?

Da DAOs hauptsächlich IO- / Netzwerkaufrufe abwickeln, gehe ich davon aus, dass Schedulars.io () verwendet werden sollte. Wie wäre es mit der Geschäftslogik in der Service-Schicht? Sollten sie in Schedulers.computation () (Event Loop) ausgeführt werden?

In der JVM befinden sich zwei Thread-Pools. Einer ist "Container-Thread-Pool" und der andere ist "RxThread-Pool", der von Schedulers.io () verwendet wird. Wie konfiguriere ich die Pooleinstellungen / Größe von RxJava?

Antworten auf die Frage(2)

Ihre Antwort auf die Frage