RxJS Promise Composition (Datenübergabe)

Ich bin brandneu bei Rx und finde es schwierig, Unterlagen zum Verfassen von Versprechungen zu finden, sodass Daten aus der ersten Versprechung in die zweite weitergegeben werden und so weiter. Hier sind drei grundsätzliche Versprechen: Die Berechnungen der Daten sind nicht wichtig, nur, dass etwas Asynchrones mit den Daten aus dem vorherigen Versprechen gemacht werden muss.

 const p1 = () => Promise.resolve(1);
 const p2 = x => { const val = x + 1; return Promise.resolve(val); };
 const p3 = x => {
      const isEven = x => x % 2 === 0;
      return Promise.resolve(isEven(x));
 };

er traditionelle Weg, um die Komposition zu erreichen, von der ich sprech

 pl().then(p2).then(p3).then(console.log);

Meine Lieblingsimplementierung ist Ramdas composeP und pipeP:

R.pipeP(p1, p2, p3, console.log)()

Es scheint wahrscheinlich, dass Rx in der Lage ist, diese Art von Situation ziemlich fließend zu handhaben. Allerdings ist der Vergleich zwischen RxJS und Async (Bibliothek) hier der bisher am nächsten liegendhttps: //github.com/Reactive-Extensions/RxJS/blob/master/doc/mapping/async/comparing.m:

 var Rx = require('rx'),
     fs = require('fs'),
     path = require('path');
 var file = path.join(__dirname, 'file.txt'),
     dest = path.join(__dirname, 'file1.txt'),
     exists = Rx.Observable.fromCallback(fs.exists),
     rename = Rx.Observable.fromNodeCallback(fs.rename),
     stat = Rx.Observable.fromNodeCallback(fs.stat);
 exists(file)
    .concatMap(function (flag) {
     return flag ?
         rename(file, dest) :
         Rx.Observable.throw(new Error('File does not exist.'));
    })
    .concatMap(function () {
        return stat(dest);
    })
   .forEach(
      function (fsStat) {
          console.log(JSON.stringify(fsStat));
      },
      function (err) {
          console.log(err);
      }
    );

concatMap scheint vielversprechend, aber der obige Code sieht ziemlich schrecklich aus. Ich hatte auch Probleme mit meinem Beispiel, weil Rx.Observable.fromPromise (p1) nicht funktioniert, da es ein Versprechen selbst erwartet, keine Funktion, und Rx.Observable.defer (p1) keine Parameter wie das übergibt Beispiel

Vielen Dank

Ähnliche Frage, aber ohne Datenübertragung:Kettenversprechen mit RxJS