В Rx, как группировать события по идентификатору и регулировать каждую группу по нескольким временным интервалам?
Я получил, так сказать, веселье Rx, и этот вопрос связан с моимВот а такжеВот, Тем не менее, может быть, это кому-то поможет, так как я мог бы рассматривать их как полезные варианты одной и той же темы.
Вопрос: Как можно сгруппировать случайный потокint
(скажем, на интервале [0, 10], создаваемом на случайном интервале) объекты объединяются в группы и предусматривают для каждой группы переменное число сообщений об отсутствии событий (отсутствие лучшего определения для дальнейшего фона см. в связанных сообщениях). Более конкретно, с помощью кода, как можно определить настройки нескольких дросселей для каждой группы следующим образом:
var idAlarmStream = idStream
.Select(i => i)
.GroupByUntil(key => key.Id, grp => grp.Throttle(Timespan.FromMilliseconds(1000))
.SelectMany(grp => grp.TakeLast(1))
.Subscribe(i => Console.WriteLine(i));
В этом случае функция подписки будет вызываться, если идентификаторы на группу более одной секунды. Что делать, если вы хотите определить три разных значения для отсутствия событий (скажем, одна секунда, пять секунд и десять секунд) и все они будут отменены, когда событие придет? Что я могу думать о:
Разделить каждый ID вidStream
на несколько синтетических и обеспечивают биективное отображение между реальными идентификаторами и синтетическими. Например, в этом случае ID: 1 -> 100, 101, 102; ID: 2 -> 200, 201, 203, а затем определить функцию выбора вThrottle
вот такFunc<int, Timespan>(i => /* switch(i)...*/)
а потом когдаSubscribe
будет вызван, сопоставьте идентификатор обратно. Смотрите также связанные вопросы для дальнейшей информации.Создайте вложенную группировку, в которой идентификаторы сгруппированы, а затем группы идентификаторов будут скопированы / реплицированы / разветвлены (я не знаю подходящий термин) в группы в соответствии со значениями регулирования. Этот подход, я думаю, довольно сложен, и я не уверен, будет ли он лучшим. Тем не менее, мне было бы интересно увидеть такой запрос.В более общей ситуации, я подозреваю, это ситуация, когда в одной группе есть несколько обработчиков, хотя мне не удалось найти ничего, связанного с этим.
<Редактировать: В качестве (надеюсь уточняющего) примераidStream
выдвигает один ID: 1, после которого будут инициированы три разных счетчика, каждый из которых ожидает следующего события или вызывает тревогу, если новый ID 1 не обнаружен во времени. Счетчик 1 (C1) ожидает в течение пяти секунд, счетчик 2 (C2) в течение семи секунд и счетчик 3 (C3) в течение десяти секунд. Если в течение интервала [0, 5] секунд будет получен новый идентификатор 1, все счетчики будут повторно инициализированы с вышеуказанными значениями, и сигнал тревоги не будет отправлен. Если новый идентификатор будет получен в течение интервала [0, 7) секунд, аварийные сигналы C1 и C2 и C3 будут повторно инициализированы. Точно так же, если новый идентификатор будет получен в течение интервала [0, 10) секунд, C1 и C2 срабатывают, но C3 будет просто повторно инициализирован.
То есть было бы несколько «сигналов об отсутствии» или вообще предпринятых действий в отношении одного идентификатора при определенных условиях. Я не уверен, что было бы хорошим аналогом ... Возможно, расположить "сигнальные огни" в башне, чтобы сначала зеленый, затем желтый и, наконец, красный. Поскольку отсутствие идентификатора продолжается дольше и дольше, цвет за цветом будет гореть (в этом случае красный - последний). Затем при обнаружении одного идентификатора все огни будут выключены.
<edit 2: После модификации кода Джеймса в следующем примере и оставив все остальное, как написано, я обнаружилSubscribe
будет вызван сразу после первого события на обоих уровнях тревоги.
const int MaxLevels = 2;
var idAlarmStream = idStream
.Select(i => i)
.AlarmSystem(keySelector, thresholdSelector, MaxLevels, TaskPoolScheduler.Default)
.Subscribe(i =>
{
Debug.WriteLine("Alarm on id \"{0}\" at {1}", i, DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff", CultureInfo.InvariantCulture));
});
Давайте посмотрим, что здесь происходит, и еслиMaxLevels
может быть предоставлен динамически ...
<edit 3: код Джеймса работает. Проблема была между креслом и клавиатурой! Изменение времени на что-то более разумное, безусловно, помогло. На самом деле, я изменил их на большие цифры, но это было.FromTicks
и это ускользнуло от меня на несколько минут.