с параметром параллелизма.

отаю над чем-то, что записывает данные, поступающие из очереди. Было достаточно легко обработать очередь в Observable, чтобы в моем коде было несколько конечных точек, получающих информацию в очереди.

Кроме того, я могу быть уверен, что информация поступает в порядке. Этот бит работает хорошо, так как Обсерватории гарантируют это.Ноодин хитрый момент в том, что я не хочу, чтобы наблюдатель был уведомлен о следующей вещи, пока он не завершит обработку предыдущей вещи. Но обработка, выполняемая Observer, является асинхронной.

В качестве более конкретного примера, который, вероятно, достаточно прост для подражания. Представьте, что моя очередь содержит URL. Я выставляю их как Observable в своем коде. Я подписываю Обозревателя, чья работа состоит в том, чтобы извлекать URL-адреса и записывать содержимое на диск (это надуманный пример, поэтому не стоит спорить с этими особенностями). Важным моментом является то, что выборка и сохранение являются асинхронными. Моя проблема в том, что я не хочу, чтобы наблюдателю давали «следующий» URL-адрес из наблюдаемой, пока они не завершат предыдущую обработку.

Но призыв кnext на интерфейс Observer возвращаетvoid, Поэтому у Обозревателя нет возможности связаться со мной, который фактически выполнил асинхронную задачу.

Какие-либо предложения? Я подозреваю, что, возможно, существует какой-то оператор, который мог бы быть закодирован, который бы в основном удерживал будущие значения (помещать их в очередь в памяти?), Пока он каким-то образом не знал, что Наблюдатель готов к этому. Но я надеялся, что нечто подобное уже существовало по какой-то установленной схеме.

Ответы на вопрос(3)

что вы описываете, звучит как «противодавление». Вы можете прочитать об этом в документации RxJS 4https://github.com/Reactive-Extensions/RxJS/blob/master/doc/gettingstarted/backpressure.md, Однако здесь упоминаются операторы, которых нет в RxJS 5. Например, посмотрите на «Контролируемые наблюдаемые», которые должны ссылаться на то, что вам нужно.

Я думаю, что вы могли бы достичь того же сconcatMap и экземпляр субъекта:

const asyncOperationEnd = new Subject();

source.concatMap(val => asyncOperationEnd
    .mapTo(void 0)
    .startWith(val)
    .take(2) // that's `val` and the `void 0` that ends this inner Observable
  )
  .filter(Boolean) // Always ignore `void 0`
  .subscribe(val => {
    // do some async operation...
    // call `asyncOperationEnd.next()` and let `concatMap` process another value
  });

Из вашего описания на самом деле кажется, что «наблюдатель», о котором вы говорите, работает как Subject, так что было бы, возможно, более разумно создать собственный класс Subject, который вы могли бы использовать в любой цепочке Observable.

window.document.onkeydown=(e)=>{
  return false
}
let count=0;
let asyncTask=(name,time)=>{
  time=time || 2000
  return Rx.Observable.create(function(obs) {
      setTimeout(function() {
       count++
        obs.next('task:'+name+count);
           console.log('Task:',count ,'   ', time, 'task complete') 
        obs.complete();
      }, time);
    });
}

let subject=new Rx.Subject()
let queueExec$=new Rx.Subject()


Rx.Observable.fromEvent(btnA, 'click').subscribe(()=>{
 queueExec$.next(asyncTask('A',4000)) 
})

Rx.Observable.fromEvent(btnB, 'click').subscribe(()=>{
 queueExec$.next(asyncTask('B',4000)) 
})

Rx.Observable.fromEvent(btnC, 'click').subscribe(()=>{
 queueExec$.next(asyncTask('C',4000)) 
})

  queueExec$.concatMap(value=>value)
    .subscribe(function(data) {
      console.log('onNext', data);
    }, 
    function(error) {
      console.log('onError', error);
    },function(){
 console.log('completed') 
});

 Fan Cheung28 сент. 2017 г., 18:56
С этим шаблоном вы можете бросить все что угодно в асинхронную очередь, если только это можно наблюдать. В некоторых случаях это более гибкий
 Michael Tiller28 сент. 2017 г., 18:21
Так что, если я это понимаю, у вас естьasyncTask вернуть наблюдаемое, а не значение. Этот Observable публикует ровно одно значение, а затем закрывается. Но когда вы тогда бежитеconcatMap чтобы объединить их все вместе, эффект состоит в том, что вы не получите значения из следующей наблюдаемой, пока первая не получитзавершено (что он делает, когда выполняется асинхронный). Это интересная идея. Я нашел что-то лучше подходящее для моего случая, но это определенно интересная идея для других случаев использования.

Разве это не простоconcatMap?

// Requests are coming in a stream, with small intervals or without any.
const requests=Rx.Observable.of(2,1,16,8,16)
    .concatMap(v=>Rx.Observable.timer(1000).mapTo(v));

// Fetch, it takes some time.
function fetch(query){
  return Rx.Observable.timer(100*query)
      .mapTo('!'+query).startWith('?'+query);
}

requests.concatMap(q=>fetch(q));

https://rxviz.com/v/Mog1rmGJ

Если вы хотите разрешить несколько выборок одновременно, используйтеmergeMap с параметром параллелизма.

Ваш ответ на вопрос