RestAPIs assíncronos com RxJava / Jersey2. Enfiando perguntas?

Estamos no processo de prototipar uma API REST usando programação reativa. Conforme mostrado no diagrama, mantemos três camadas iguais às usadas em nossos projetos de API de sincronização anteriores;

http://oi59.tinypic.com/339hhki.jpg

Camada de API implementada usando Jersey2, que processará a solicitação / desserialização de JSON e a transferência para a Camada de serviço.Camada de serviço que implementa a lógica de negócios. Implementada usando programação reativa (RxJava)Camada Dao, que é usada para operações de persistência pela Camada de Serviço. Desde que usamos o CouchBase, ele usará o CouchBase RxClient.

Para meu entendimento, o fluxo é o seguinte:

a) A solicitação HTTP é recebida, o Jersery processará o modelo de solicitação JSON / desserialize de solicitação / análise dentro de um RequestThread do "conjunto de threads do contêiner".

b) Com o suporte Jersey2 Async, o RequestThread retornará ao Pool de Threads de Contêiner e a Camada de Serviço será executada no agendador Schedulers.computation ().

@Path("/resource")
public class AsyncUserResource {
    @GET
    public void asyncGet(@Suspended final AsyncResponse asyncResponse) {
 
       Observable<User> user = userService.getUser(...); //this is executed using Schedulers.computation() inside Service implementation


       user.subscribe(new Observer<User>() {

            @Override
            public void onCompleted() { 

            }

            @Override
            public void onError(Throwable e) {
                //handle error using ExceptionMappers

            }

            @Override
            public void onNext(User user) {
               asyncResponse.resume(user); 

            }});
    }        


}

c) Quaisquer operações de E / S dentro do DAOs usarão Schedulers.io () para executar essas operações de processamento longo em um thread separado.

Minhas perguntas são:

Ao implementar DAOs / Services, devo ocultar os agendamentos em uso (Threading) dentro da implementação.

por exemplo, Dao:

public interface UserDao {
  public Observable<User> getUser();
}

Na implementação, é uma boa prática especificar o Agendamento como abaixo;

public Observable<User> getUser() {

        Observable<User> ret = Observable.create((subscriber)->{
            try {

                 //Do DB call
                 User u = null;
                 subscriber.onNext(u);
                 subscriber.onCompleted();

            }catch (Exception e) {
                subscriber.onError(e);  
            }

        });
        return ret.subscribeOn(Schedulers.io());
}

Ou é melhor simplesmente devolver o Observable, e a camada superior usará um agendamento particular de acordo?

Como os DAOs geralmente invocam chamadas io / rede, presumo que Schedulars.io () deve ser usado. Que tal para a lógica de negócios ao lado da camada de serviço? Eles devem ser executados dentro de Schedulers.computation () (Event Loop)?

Existem dois conjuntos de encadeamentos dentro da JVM.One é "Conjunto de Encadeamentos de Contêiner" e o outro é "Conjunto de RxThread" usado por Schedulers.io (). Como definir as configurações do pool / tamanho do RxJava?

questionAnswers(1)

yourAnswerToTheQuestion