Angular 2 - Как изменить интервал наблюдаемой RxJS

Я использую rxJS Observable Interval для обновления извлекаемых данных. Я не могу понять, как изменить настройки интервала. Я видел кое-что об использовании класса Subject, предоставляемого rxJS, но мне не удалось заставить его работать.

Я привел упрощенный пример в этомшлепнуть

В AppComponent у меня есть этот метод.

getTime() {
        this.timeService.getTime(this.refreshInterval)
          .subscribe(t => {
            this.currentTime = t;
            console.log('Refresh interval is: ' + this.refreshInterval);
          }
        );
}

И в сервисном компоненте у меня в настоящее время есть этот код.

getTime(refreshInterval: number) {
  return Observable.interval(refreshInterval)
        .startWith(0)
        .map((res: any) => this.getDate())
        .catch(this.handleError)
}

Может кто-нибудь, возможно, предоставит мне рабочий пример, это было бы здорово!

Ответы на вопрос(3)

Вам не нужно уничтожать и воссоздавать весь наблюдаемый поток, чтобы изменитьrefreshInterval, Вам нужно только обновить часть потока, которая зависит от интервала изменения.

Сначала упростите ваши услугиgetTime() поэтому он не отвечает за определение частоты выходов. Все, что он делает, это возвращает время:

getTime() { return (new Date()).toString(); }

Теперь вызывающий код будет определять расписание. Всего 3 простых шага:

1. Функция источника, которая настраивается на желаемый интервал:

/** Observable waits for the current interval, then emits once */
refreshObs() {return Observable.timer(this.refreshInterval)}

2. Наблюдаемая цепочка, которая используетrepeat оператор для непрерывного повторного выполнения потока:

getTime$ = Observable.of(null)
            .switchMap(e=>this.refreshObs()) // wait for interval, then emit
            .map(() => this.timeService.getTime()) // get new time
            .repeat(); // start over

3. Подписка для запуска всего этого:

ngOnInit(){
    this.getTime$.subscribe(t => {
        this.currentTime = t;
        console.log('refresh interval = '+this.refreshInterval);
    });
}

Это работает, потому чтоrefreshObs() возвращает новую наблюдаемую информацию каждый раз, когда поток повторяется, и эта новая наблюдаемая будет ожидать в соответствии с текущим установленным интервалом, прежде чем испустить.

Live демо

 BeetleJuice28 авг. 2019 г., 16:50
Привет @Arvand Код ведет себя как описанный ОП. Указанный интервал соблюдается, и только тогда вступает в силу следующий интервал. Если вы хотите, чтобы изменение задержки немедленно отменяло текущий запущенный интервал и заменяло его новым, это можно сделать, но тогда вы просите что-то другое.
 Arvand09 авг. 2019 г., 20:53
Одно серьезное предостережение в отношении кода: когда я уменьшу интервал, я ожидаю, что он снова запустится с новым значением и повторится с более коротким интервалом. В вашей демонстрации, если установить интервал в 10 минут, а затем установить его на 1сек, я должен ждать 10 минут, чтобы увидеть обновление.

Я хотел опираться на предыдущие ответы здесь (и в других местах Stack Overflow). Мой пример имеет общееRefreshService что различные компоненты могут использовать для подписок. Таким образом, сайт может иметь один компонент «Обновлять каждые X секунд», на который может подписаться каждый компонент.

https://plnkr.co/edit/960yztjl3dqXQD2XPSei?p=preview

Сервис предоставляет функциюwithRefresh это обеспечиваетObservable, Это использует в своих интересахBehaviorSubject, который сразу же вызовет событие в подписке.

export class RefreshService {

  static interval$: BehaviorSubject<number> = new BehaviorSubject<number>(30000);

  setInterval(newInterval: number){
    RefreshService.interval$.next(newInterval);
  }

  public withRefresh() {
    return RefreshService.interval$
      .switchMap((int: number) => Observable
        .interval(int)
        .startWith(0)
      );
  }
}

Тогда любой компонент может использовать этот сервис следующим образом:

this.refreshService
  .withRefresh()
  .switchMap(() => /* do something on each interval of the timer */);
Решение Вопроса

Как я понял из вашего plnkr, ваша цель - позволить пользователю изменять интервалы таймера.

Вы ожидаете, что изменение refreshInterval изменит заявленный поток rxJs:

    this.timeService.getTime(this.refreshInterval)
      .subscribe(t => {
        this.currentTime = t;
        console.log('Refresh interval is: ' + this.refreshInterval);
      }
    );

и это неправильно.

каждый раз, когда вы обновляете refreshInterval, вам необходимо:

отписаться или уничтожить предыдущий поток.создать новый поток и подписаться снова
 BeetleJuice05 окт. 2016 г., 05:40
На самом деле, этот ответ может быть неточным. Я смог достичь поставленной цели, не разрушая и не воссоздавая потоки после того, как сам некоторое время боролся с этим

Ваш ответ на вопрос