Asynchrone RestAPIs mit RxJava / Jersey2. Fragen zum Thema?
Wir sind dabei, ein REST-API mithilfe reaktiver Programmierung zu prototypisieren. Wie im Diagramm gezeigt, behalten wir 3 Ebenen bei, die wir in unseren vorherigen Sync-API-Designs verwendet habe
http: //oi59.tinypic.com/339hhki.jp
API Layer implementiert mit Jersey2, das die Anforderung / Deserialisierung von JSON und die Übergabe an Service Layer verarbeitet.Service Layer, der die Business-Logik implementiert. Implementiert mit reaktiver Programmierung (RxJava)Dao Layer, der von Service Layer für Persistenzoperationen verwendet wird. Da CouchBase verwendet wird, wird CouchBase RxClient verwendet.ach meinem Verständnis ist der Ablauf wie folg
einenn eine HTTP-Anforderung eingeht, verarbeitet Jersery das Anforderungsmodell "request / parse JSON / deserialize" in einem RequestThread aus dem "Container-Thread-Pool".
b) Wenn Jersey2 Async unterstützt wird, kehrt RequestThread zum Container-Thread-Pool zurück und der Service-Layer wird im Scheduler Schedulers.computation () ausgeführt.
@Path("/resource")
public class AsyncUserResource {
@GET
public void asyncGet(@Suspended final AsyncResponse asyncResponse) {
Observable<User> user = userService.getUser(...); //this is executed using Schedulers.computation() inside Service implementation
user.subscribe(new Observer<User>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
//handle error using ExceptionMappers
}
@Override
public void onNext(User user) {
asyncResponse.resume(user);
}});
}
}
c) Alle E / A-Vorgänge in DAOs verwenden Schedulers.io (), um diese Vorgänge mit langer Verarbeitungsdauer in einem separaten Thread auszuführen.
Meine Fragen sind:
Wenn DAOs / Services implementiert werden, sollte ich die verwendeten Zeitpläne (Threading) in der Implementierung ausblenden.eg Dao:
public interface UserDao {
public Observable<User> getUser();
}
Bei der Implementierung empfiehlt es sich, den Zeitplan wie folgt anzugeben;
public Observable<User> getUser() {
Observable<User> ret = Observable.create((subscriber)->{
try {
//Do DB call
User u = null;
subscriber.onNext(u);
subscriber.onCompleted();
}catch (Exception e) {
subscriber.onError(e);
}
});
return ret.subscribeOn(Schedulers.io());
}
Oder ist es besser, einfach das Observable zurückzugeben, und die obere Schicht verwendet dementsprechend ein bestimmtes Schedular?
Da DAOs hauptsächlich IO- / Netzwerkaufrufe abwickeln, gehe ich davon aus, dass Schedulars.io () verwendet werden sollte. Wie wäre es mit der Geschäftslogik in der Service-Schicht? Sollten sie in Schedulers.computation () (Event Loop) ausgeführt werden?
In der JVM befinden sich zwei Thread-Pools. Einer ist "Container-Thread-Pool" und der andere ist "RxThread-Pool", der von Schedulers.io () verwendet wird. Wie konfiguriere ich die Pooleinstellungen / Größe von RxJava?