En Rx, ¿cómo agrupar eventos por ID y controlar cada grupo por múltiples TimeSpans?

Me metí en una ola de Rx, por así decirlo, y esta pregunta está relacionada con la míaaquí yaquí. Sin embargo, tal vez sean de ayuda para alguien, ya que podría verlos como variaciones útiles del mismo tema.

Pregunta: ¿Cómo podría uno agrupar un flujo aleatorio deint (por ejemplo, en el intervalo [0, 10] producido en un intervalo aleatorio) los objetos se agrupan y proporcionan al grupo de búsqueda un número variable de alarmas de ausencia de eventos (para la falta de una mejor definición, para más información, consulte las publicaciones relacionadas). Más específicamente con el código, ¿cómo podría uno definir la configuración del acelerador de multipe por grupo en lo siguiente:

var idAlarmStream = idStream
.Select(i => i)
.GroupByUntil(key => key.Id, grp => grp.Throttle(Timespan.FromMilliseconds(1000))
.SelectMany(grp => grp.TakeLast(1))
.Subscribe(i => Console.WriteLine(i));

Aquí se llamará a la función de suscripción si hay más de un segundo de ausencia de ID por grupo. ¿Qué sucede si uno quisiera definir tres valores diferentes para la ausencia de eventos (por ejemplo, un segundo, cinco segundos y diez segundos) y todos se cancelan cuando llega un evento? Lo que se me ocurre son:

Dividir cada ID enidStream en varios sintéticos y proporciona un mapeo biyectivo entre las ID reales y las sintéticas. Por ejemplo, en este caso ID: 1 -> 100, 101, 102; ID: 2 -> 200, 201, 203 y luego define una función de selección enThrottle al igual queFunc<int, Timespan>(i => /* switch(i)...*/) y luego cuandoSubscribe se llamará, mapear la ID de nuevo. Consulte también las preguntas vinculadas para obtener más información.Cree una agrupación anidada en la que se agrupen los ID y luego los grupos de ID se copiarán / replicarán / bifurcarán (no conozco el término adecuado) en grupos de acuerdo con los valores de regulación. Creo que este enfoque es bastante complicado y no estoy seguro de si sería el mejor. Sin embargo, estoy seguro de que estaría interesado en ver tal consulta.

En un entorno más general, sospecho, esta es una situación en la que hay varios manejadores por grupo, aunque no he logrado encontrar nada relacionado con esto.

<editar: Como un (con suerte clarificando) un ejemploidStream empuja una ID: 1 sobre la cual se iniciarían tres contadores diferentes, cada uno de los cuales espera que ocurra el siguiente evento o es alarmante si no se detecta una nueva ID a tiempo. El contador 1 (C1) espera cinco segundos, el contador 2 (C2) durante siete segundos y el contador 3 (C3) durante diez segundos. Si se recibe una nueva ID 1 dentro del intervalo [0, 5] segundos, todos los contadores se reiniciarán con los valores mencionados anteriormente y no se enviará ninguna alarma. Si se recibe una nueva ID dentro del intervalo [0, 7) segundos, las alarmas C1 y C2 y C3 se reiniciarán. De manera similar, si se recibe una nueva ID dentro de un intervalo [0, 10) segundos, C1 y C2 se disparan, pero C3 se reinicializa.

Es decir, habría múltiples "alarmas de ausencia" o, en general, acciones tomadas, hacia una ID en ciertas condiciones. No estoy seguro de lo que sería un buen análogo ... Quizás apilar "luces de alerta" en una torre para que primero sea verde, luego amarilla y por último roja. Como la ausencia de una identificación continúa más y más, se encenderá un color tras otro (en este caso, el rojo es el último). Luego, cuando se detecta una identificación, todas las luces se apagarán.

<edit 2: Al adaptar el código de James al ejemplo siguiente y dejar el resto como está escrito, descubrí laSubscribe se llamará directamente al primer evento en los dos niveles de alarma.

const int MaxLevels = 2;
var idAlarmStream = idStream
    .Select(i => i)
    .AlarmSystem(keySelector, thresholdSelector, MaxLevels, TaskPoolScheduler.Default)
    .Subscribe(i =>
    {
        Debug.WriteLine("Alarm on id \"{0}\" at {1}", i, DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff", CultureInfo.InvariantCulture));
    });

A ver que esta pasando aqui y siMaxLevels Podría proporcionarse dinámicamente ...

<edit 3: el código de James funciona. ¡El problema estaba entre la silla y el teclado! Cambiar el tiempo a algo más sensato seguro que ayudó. De hecho, los cambié a figuras más grandes, pero fue.FromTicks y se me escapó por unos minutos.

Respuestas a la pregunta(1)

Su respuesta a la pregunta