Асинхронные RestAPI с RxJava / Jersey2. Вопросы по теме?
Мы находимся в процессе создания прототипа REST API с использованием реактивного программирования. Как показано на диаграмме, мы оставляем 3 уровня такими же, как мы использовали в наших предыдущих проектах API синхронизации;
http://oi59.tinypic.com/339hhki.jpg
Уровень API реализован с использованием Jersey2, который будет обрабатывать запрос / десериализацию JSON и передачу обслуживания на уровень обслуживания.Сервисный уровень, который реализует бизнес-логику. Реализуется с помощью реактивного программирования (RxJava)Dao Layer, который используется для персистентных операций Service Layer. Так как мы используем CouchBase, он будет использовать CouchBase RxClient.Насколько я понимаю, поток выглядит следующим образом:
а) При поступлении HTTP-запроса Jersery будет обрабатывать модель запроса / синтаксического анализа JSON / десериализации внутри RequestThread из «пула потоков контейнера».
б) При поддержке Jersey2 Async RequestThread будет возвращен обратно в пул потоков контейнера, а уровень обслуживания будет выполнен в планировщике Schedulers.computation ().
@Path("/resource")
public class AsyncUserResource {
@GET
public void asyncGet(@Suspended final AsyncResponse asyncResponse) {
Observable<User> user = userService.getUser(...); //this is executed using Schedulers.computation() inside Service implementation
user.subscribe(new Observer<User>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
//handle error using ExceptionMappers
}
@Override
public void onNext(User user) {
asyncResponse.resume(user);
}});
}
}
с) Любые операции ввода-вывода внутри DAO будут использовать Schedulers.io () для запуска этих операций длительной обработки в отдельном потоке.
Мои вопросы:
При реализации DAO / Services я должен скрывать используемые графики (Threading) внутри реализации.например, Дао:
public interface UserDao {
public Observable<User> getUser();
}
В реализации, это хорошая практика, чтобы указать Schedular, как показано ниже;
public Observable<User> getUser() {
Observable<User> ret = Observable.create((subscriber)->{
try {
//Do DB call
User u = null;
subscriber.onNext(u);
subscriber.onCompleted();
}catch (Exception e) {
subscriber.onError(e);
}
});
return ret.subscribeOn(Schedulers.io());
}
Или лучше просто вернуть Observable, и верхний слой будет соответственно использовать Schedular?
Поскольку DAO чаще всего используют вызовы io / network, я предполагаю, что следует использовать Schedulars.io (). Как насчет бизнес-логики на стороне сервисного уровня? Должны ли они выполняться внутри Schedulers.computation () (цикл обработки событий)?
Внутри JVM есть два пула потоков. Один - «Пул потоков контейнера», а другой - «Пул RxThread», используемый Schedulers.io (). Как настроить параметры пула / размер RxJava?