Wie kann man in Rx Ereignisse nach ID gruppieren und jede Gruppe nach mehreren TimeSpans drosseln?

Ich bin sozusagen in einen Rx-Bummel geraten, und diese Frage hängt mit meiner zusammenHier undHier. Vielleicht sind diese jedoch für jemanden hilfreich, da ich sie als nützliche Variationen desselben Themas ansehen könnte.

Frage: Wie könnte man einen zufälligen Strom vonint (z. B. nach Intervall [0, 10], das in zufälligen Intervallen erstellt wird) gruppieren Sie Objekte und geben Sie für die Suchgruppe eine variable Anzahl von Ereignismeldungen an, bei denen keine Ereignisse vorliegen (mangels besserer Definition, für den weiteren Hintergrund siehe verknüpfte Beiträge). Genauer gesagt, wie könnte man mit Code die Einstellungen für Mehrfachdrosselung pro Gruppe wie folgt definieren:

var idAlarmStream = idStream
.Select(i => i)
.GroupByUntil(key => key.Id, grp => grp.Throttle(Timespan.FromMilliseconds(1000))
.SelectMany(grp => grp.TakeLast(1))
.Subscribe(i => Console.WriteLine(i));

Hier wird die Abonnementfunktion aufgerufen, wenn mehr als eine Sekunde lang keine IDs pro Gruppe vorhanden sind. Was ist, wenn drei verschiedene Werte für die Abwesenheit von Ereignissen definiert werden sollen (z. B. eine Sekunde, fünf Sekunden und zehn Sekunden) und alle abgebrochen werden sollen, wenn ein Ereignis eintrifft? Woran ich denken kann sind:

Teilen Sie jede ID inidStream in mehrere synthetische und bieten eine bijektive Zuordnung zwischen den realen IDs und den synthetischen. Zum Beispiel in diesem Fall ID: 1 -> 100, 101, 102; ID: 2 -> 200, 201, 203 und definieren Sie dann eine Auswahlfunktion inThrottle wie soFunc<int, Timespan>(i => /* switch(i)...*/) und dann wannSubscribe angerufen werden, ordnen Sie die ID zurück. Siehe auch die verknüpften Fragen für weitere Hintergrundinformationen.Erstellen Sie eine verschachtelte Gruppierung, in der die IDs gruppiert werden. Anschließend werden die ID-Gruppen entsprechend den Drosselungswerten in Gruppen kopiert / repliziert / gegabelt (ich kenne den richtigen Begriff nicht). Ich denke, dieser Ansatz ist ziemlich kompliziert, und ich bin mir nicht sicher, ob er der beste wäre. Ich würde mich auf jeden Fall für eine solche Anfrage interessieren.

Ich vermute, dass es in einer allgemeineren Umgebung eine Situation gibt, in der es mehrere Bearbeiter pro Gruppe gibt, obwohl ich es nicht geschafft habe, etwas im Zusammenhang damit zu finden.

<edit: Als (hoffentlich klarstellendes) BeispielidStream drückt eine ID: 1, bei der drei verschiedene Zähler ausgelöst werden, von denen jeder darauf wartet, dass das nächste Ereignis eintritt, oder alarmiert, wenn keine neue ID 1 rechtzeitig erkannt wird. Zähler 1 (C1) wartet fünf Sekunden, Zähler 2 (C2) sieben Sekunden und Zähler 3 (C3) zehn Sekunden. Wird innerhalb von [0, 5] Sekunden eine neue ID 1 empfangen, werden alle Zähler mit den oben genannten Werten neu initialisiert und es wird kein Alarm gesendet. Wenn innerhalb der Intervalle [0, 7) Sekunden eine neue ID empfangen wird, werden die Alarme C1 und C2 und C3 neu initialisiert. Ebenso, wenn eine neue ID innerhalb eines Intervalls von [0, 10] Sekunden empfangen wird, feuern C1 und C2, aber C3 wird gerade neu initialisiert.

Das heißt, unter bestimmten Bedingungen würde es mehrere "Abwesenheitsalarme" oder allgemein ergriffene Maßnahmen gegenüber einer ID geben. Ich bin mir nicht sicher, was ein gutes Analogon wäre ... Vielleicht stapeln Sie "Warnlichter" in einem Turm, sodass zuerst grün, dann gelb und zuletzt rot sind. Da das Fehlen einer ID immer länger anhält, leuchtet eine Farbe nach der anderen (in diesem Fall ist Rot die letzte). Wenn dann eine ID erkannt wird, werden alle Lichter ausgeschaltet.

<edit 2: Nachdem ich James 'Code wie folgt auf das Beispiel umgerüstet und den Rest wie geschrieben belassen hatte, entdeckte ich denSubscribe wird direkt beim ersten Ereignis in beiden Alarmstufen aufgerufen.

const int MaxLevels = 2;
var idAlarmStream = idStream
    .Select(i => i)
    .AlarmSystem(keySelector, thresholdSelector, MaxLevels, TaskPoolScheduler.Default)
    .Subscribe(i =>
    {
        Debug.WriteLine("Alarm on id \"{0}\" at {1}", i, DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff", CultureInfo.InvariantCulture));
    });

Mal sehen, was hier passiert und obMaxLevels könnte dynamisch bereitgestellt werden ...

<edit 3: James 'Code funktioniert. Das Problem war zwischen dem Stuhl und der Tastatur! Es hat sicher geholfen, die Zeit auf etwas Vernünftigeres zu stellen. Tatsächlich habe ich sie in größere Figuren geändert, aber das war es.FromTicks und es entging mir für ein paar Minuten.

Antworten auf die Frage(1)

Ihre Antwort auf die Frage