RxJava Observando na chamada / assinatura do thread
Estou com alguns problemas para entender como o subscribeOn / observeOn funciona no RxJava. Eu criei um aplicativo simples com observável que emite nomes de planetas do sistema solar, faz alguns mapeamentos e filtros e imprime resultados.
Pelo que entendi, o agendamento do trabalho para o thread em segundo plano é feito viasubscribeOn
operador (e parece funcionar bem).
A observação no encadeamento em segundo plano também funciona bem comobserveOn
operador.
Mas eu tenho problemas para entender como observar o thread de chamada (se é o thread principal ou qualquer outro). Isso é feito facilmente no Android comAndroidSchedulers.mainThread()
operador, mas não sei como conseguir isso em java puro.
Aqui está o meu código:
public class Main {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 5, 3000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
System.out.println("Main thread: " + getCurrentThreadInfo());
Observable<String> stringObservable = Observable.from(Arrays.asList("Merkury", "Wenus", "Ziemia", "Mars", "Jowisz", "Saturn", "Uran", "Neptun", "Pluton"))
.map(in -> {
System.out.println("map on: " + getCurrentThreadInfo());
return in.toUpperCase();
})
.filter(in -> {
System.out.println("filter on: " + getCurrentThreadInfo());
return in.contains("A");
})
.subscribeOn(Schedulers.from(executor));
for (int i = 0; i < 5; i++) {
Thread thread = new Thread("Thread-" + i) {
@Override
public void run() {
stringObservable
.buffer(5)
.subscribe(s -> System.out.println("Result " + s + " on: " + getCurrentThreadInfo()));
}
};
thread.start();
}
}
private static String getCurrentThreadInfo() {
return Thread.currentThread().getName() + "(" + Thread.currentThread().getId() + ")";
}
}
Observável no criado e no trabalho é inscrito em um dos três segmentos do executor. Isso funciona conforme o esperado. Mas como observar resultados nesses segmentos criados dinamicamente no loop for? Existe uma maneira de criar o Agendador a partir do thread atual?
Além disso, descobri que, depois de executar esse código, ele nunca termina e não sei por quê? :(