TPL completo vs finalización

Necesito importar datos relacionados con el cliente desde la base de datos heredada y realizar varias transformaciones durante el proceso. Esto significa que una sola entrada necesita realizar "eventos" adicionales (sincronizar productos, crear facturas, etc.).

Mi solución inicial fue un enfoque paralelo simple. Funciona bien, peroa veces Tiene problemas. Si los clientes procesados actualmente necesitan esperar el mismo tipo de eventos, sus colas de procesamiento pueden quedar bloqueadas y, finalmente, agotar el tiempo de espera, lo que hace que todos los eventos subyacentes también fallen (dependen del que falló). No sucede todo el tiempo, pero es molesto.

Así que tengo otra idea, trabajar en lotes. Me refiero no solo a limitar la cantidad de clientes que se procesan al mismo tiempo, sino también a la cantidad de eventos que se transmiten a las colas. Mientras buscaba ideas, encontréesta respuesta, que apunta a laTPL DataFlow.

Hice un esqueleto para familiarizarme con él. Configuré una tubería simple, pero estoy un poco confundido sobre el uso deComplete() y esperandoCompletion().

Los pasos son los siguientes.

Haga una lista de números (los identificadores de los clientes que se importarán): esto está fuera de la lógica de importación, solo allí para poder activar el resto de la lógicaCrear unBatchBlock (para poder limitar el número de clientes que se procesarán al mismo tiempo)Crea un soloMyClass1 elemento basado en la identificación (TransformBlock<int, MyClass1>)Realizar alguna lógica y generar una colección deMyClass2 (TransformManyBlock<MyClass1, MyClass2>) - como ejemplo, duerme 1 segundoRealice alguna lógica en cada elemento de la colección (ActionBlock<MyClass2>) - como ejemplo, duerme 1 segundo

Aquí está el código completo:

public static class Program
{
    private static void Main(string[] args)
    {
        var batchBlock = new BatchBlock<int>(2);
        for (var i = 1; i < 10; i++)
        {
            batchBlock.Post(i);
        }


        batchBlock.Complete();
        while (batchBlock.TryReceive(null, out var ids))
        {
            var transformBlock = new TransformBlock<int, MyClass1>(delegate (int id)
            {
                Console.WriteLine($"TransformBlock(id: {id})");
                return new MyClass1(id, "Star Wars");
            });
            var transformManyBlock = new TransformManyBlock<MyClass1, MyClass2>(delegate (MyClass1 myClass1)
            {
                Console.WriteLine($"TransformManyBlock(myClass1: {myClass1.Id}|{myClass1.Value})");
                Thread.Sleep(1000);
                return GetMyClass22Values(myClass1);
            });

            var actionBlock = new ActionBlock<MyClass2>(delegate (MyClass2 myClass2)
            {
                Console.WriteLine($"ActionBlock(myClass2: {myClass2.Id}|{myClass2.Value})");
                Thread.Sleep(1000);
            });
            transformBlock.LinkTo(transformManyBlock);
            transformManyBlock.LinkTo(actionBlock);
            foreach (var id in ids)
            {
                transformBlock.Post(id);
            }

            // this is the point when I'm not 100% sure

            //transformBlock.Complete();
            //transformManyBlock.Complete();
            //transformManyBlock.Completion.Wait();
            actionBlock.Complete();
            actionBlock.Completion.Wait();
        }

        Console.WriteLine();
        Console.WriteLine("Press any key to continue...");
        Console.ReadKey();
    }

    private static IEnumerable<MyClass2> GetMyClass22Values(MyClass1 myClass1)
    {
        return new List<MyClass2>
               {
                   new MyClass2(1, myClass1.Id+ " did this"),
                   new MyClass2(2, myClass1.Id+ " did that"),
                   new MyClass2(3, myClass1.Id+ " did this again")
               };
    }
}

public class MyClass1
{
    public MyClass1(int id, string value)
    {
        Id = id;
        Value = value;
    }

    public int Id { get; set; }

    public string Value { get; set; }
}

public class MyClass2
{
    public MyClass1(int id, string value)
    {
        Id = id;
        Value = value;
    }

    public int Id { get; set; }

    public string Value { get; set; }
}

Entonces, el punto con el que lucho es el final, donde tendría que llamarComplete() o esperaCompletion. Parece que no puedo encontrar la combinación correcta. Me gustaría ver una salida de la siguiente manera:

TransformBlock(id: 1)
TransformBlock(id: 2)
TransformManyBlock(myClass1: 1|Star Wars)
TransformManyBlock(myClass1: 2|Star Wars)
ActionBlock(myClass2: 1|1 did this)
ActionBlock(myClass2: 2|1 did that)
ActionBlock(myClass2: 3|1 did this again)
ActionBlock(myClass2: 1|2 did this)
ActionBlock(myClass2: 2|2 did that)
ActionBlock(myClass2: 3|2 did this again)
TransformBlock(id: 3)
TransformBlock(id: 4)
TransformManyBlock(myClass1: 3|Star Wars)
TransformManyBlock(myClass1: 4|Star Wars)
ActionBlock(myClass2: 1|3 did this)
ActionBlock(myClass2: 2|3 did that)
ActionBlock(myClass2: 3|3 did this again)
ActionBlock(myClass2: 1|4 did this)
ActionBlock(myClass2: 2|4 did that)
ActionBlock(myClass2: 3|4 did this again)

[the rest of the items]


Press any key to exit...   

¿Alguien puede señalarme en la dirección correcta?

Respuestas a la pregunta(1)

Su respuesta a la pregunta