, Я добавлю простой пример.

ужно импортировать данные о клиентах из устаревшей БД и выполнить несколько преобразований в процессе. Это означает, что одна запись должна выполнять дополнительные «события» (синхронизировать продукты, создавать счета и т. Д.).

Моим первоначальным решением был простой параллельный подход. Работает нормально, ноиногда у него есть проблемы. Если обработанные в данный момент клиенты должны ждать событий того же типа, их очереди обработки могут зависнуть и в конечном итоге истечь, что приведет к сбою каждого основного события (они зависят от того, который произошел сбой). Это не происходит все время, но это раздражает.

Так что у меня появилась другая идея - работать партиями. Я имею в виду не только ограничение количества одновременно обрабатываемых клиентов, но и количество событий, которые передаются в очереди. В поисках идей я нашелэто ответ, который указывает наTPL DataFlow.

Я сделал скелет, чтобы познакомиться с ним. Я настроил простой конвейер, но меня немного смущает использованиеComplete() и в ожиданииCompletion().

Шаги следующие

Составьте список номеров (идентификаторы клиентов, которые будут импортированы) - это вне логики импорта, оно просто там, чтобы иметь возможность активировать остальную часть логикиСоздатьBatchBlock (чтобы иметь возможность ограничить количество клиентов, обрабатываемых одновременно)Создать синглMyClass1 элемент на основе идентификатора (TransformBlock<int, MyClass1>)Выполните некоторую логику и сгенерируйте коллекциюMyClass2 (TransformManyBlock<MyClass1, MyClass2>) - как, например, спать в течение 1 секундыВыполните некоторую логику для каждого элемента коллекции (ActionBlock<MyClass2>) - как, например, спать в течение 1 секунды

Вот полный код:

public static class Program
{
    private static void Main(string[] args)
    {
        var batchBlock = new BatchBlock<int>(2);
        for (var i = 1; i < 10; i++)
        {
            batchBlock.Post(i);
        }


        batchBlock.Complete();
        while (batchBlock.TryReceive(null, out var ids))
        {
            var transformBlock = new TransformBlock<int, MyClass1>(delegate (int id)
            {
                Console.WriteLine($"TransformBlock(id: {id})");
                return new MyClass1(id, "Star Wars");
            });
            var transformManyBlock = new TransformManyBlock<MyClass1, MyClass2>(delegate (MyClass1 myClass1)
            {
                Console.WriteLine($"TransformManyBlock(myClass1: {myClass1.Id}|{myClass1.Value})");
                Thread.Sleep(1000);
                return GetMyClass22Values(myClass1);
            });

            var actionBlock = new ActionBlock<MyClass2>(delegate (MyClass2 myClass2)
            {
                Console.WriteLine($"ActionBlock(myClass2: {myClass2.Id}|{myClass2.Value})");
                Thread.Sleep(1000);
            });
            transformBlock.LinkTo(transformManyBlock);
            transformManyBlock.LinkTo(actionBlock);
            foreach (var id in ids)
            {
                transformBlock.Post(id);
            }

            // this is the point when I'm not 100% sure

            //transformBlock.Complete();
            //transformManyBlock.Complete();
            //transformManyBlock.Completion.Wait();
            actionBlock.Complete();
            actionBlock.Completion.Wait();
        }

        Console.WriteLine();
        Console.WriteLine("Press any key to continue...");
        Console.ReadKey();
    }

    private static IEnumerable<MyClass2> GetMyClass22Values(MyClass1 myClass1)
    {
        return new List<MyClass2>
               {
                   new MyClass2(1, myClass1.Id+ " did this"),
                   new MyClass2(2, myClass1.Id+ " did that"),
                   new MyClass2(3, myClass1.Id+ " did this again")
               };
    }
}

public class MyClass1
{
    public MyClass1(int id, string value)
    {
        Id = id;
        Value = value;
    }

    public int Id { get; set; }

    public string Value { get; set; }
}

public class MyClass2
{
    public MyClass1(int id, string value)
    {
        Id = id;
        Value = value;
    }

    public int Id { get; set; }

    public string Value { get; set; }
}

Таким образом, точка, с которой я борюсь, это конец, где мне нужно позвонитьComplete() или ждатьCompletion, Я не могу найти правильную комбинацию. Я хотел бы видеть вывод следующим образом:

TransformBlock(id: 1)
TransformBlock(id: 2)
TransformManyBlock(myClass1: 1|Star Wars)
TransformManyBlock(myClass1: 2|Star Wars)
ActionBlock(myClass2: 1|1 did this)
ActionBlock(myClass2: 2|1 did that)
ActionBlock(myClass2: 3|1 did this again)
ActionBlock(myClass2: 1|2 did this)
ActionBlock(myClass2: 2|2 did that)
ActionBlock(myClass2: 3|2 did this again)
TransformBlock(id: 3)
TransformBlock(id: 4)
TransformManyBlock(myClass1: 3|Star Wars)
TransformManyBlock(myClass1: 4|Star Wars)
ActionBlock(myClass2: 1|3 did this)
ActionBlock(myClass2: 2|3 did that)
ActionBlock(myClass2: 3|3 did this again)
ActionBlock(myClass2: 1|4 did this)
ActionBlock(myClass2: 2|4 did that)
ActionBlock(myClass2: 3|4 did this again)

[the rest of the items]


Press any key to exit...   

Кто-нибудь может указать мне правильное направление?

Ответы на вопрос(1)

Ваш ответ на вопрос