Reintentar la política dentro de ITargetBlock <TInput>

Necesito introducir una política de reintento al flujo de trabajo. Digamos que hay 3 bloques que están conectados de tal manera:

var executionOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3 };
var buffer = new BufferBlock<int>();
var processing = new TransformBlock<int, int>(..., executionOptions);
var send = new ActionBlock<int>(...);

buffer.LinkTo(processing);
processing.LinkTo(send);

Entonces, hay un búfer que acumula datos, luego los envía al bloque de transformación que procesa no más de 3 elementos a la vez, y luego el resultado se envía al bloque de acción.

Potencialmente, durante el procesamiento de la transformación se pueden producir errores transitorios, y quiero volver a intentar el bloqueo si el error es transitorio varias veces.

Sé que los bloques generalmente no son reintentables (los delegados que pasaron a los bloques podrían reintentarse). Y una de las opciones es envolver el delegado pasado para admitir el reintento.

También sé que hay una muy buena biblioteca.TransientFaultHandling.Core que proporciona los mecanismos de reintento a fallas transitorias. Esta es una excelente biblioteca pero no en mi caso. Si envuelvo el delegado que se pasa al bloque de transformación en elRetryPolicy.ExecuteAsync método, elmensaje dentro del bloque de transformación se bloqueará, y hasta que el reintento se complete o falle, el bloque de transformación no podrá recibir un mensaje nuevo. Imagínese, si los 3 mensajes se ingresan en el reintento (digamos, el próximo intento de reintento será en 2 minutos) y fallará, el bloque de transformación se atascará hasta que al menos un mensaje salga del bloque de transformación.

La única solución que veo es extender elTranformBlock (actualmente,ITargetBlock será suficiente también), y hacer el reintento manualmente (como desdeaquí):

do
 {
    try { return await transform(input); }
    catch
    { 
        if( numRetries <= 0 ) throw;
        else Task.Delay(timeout).ContinueWith(t => processing.Post(message));
    }
 } while( numRetries-- > 0 );

yo G. para poner el mensaje dentro del bloque de transformación nuevamente con un retraso, pero en este caso, el contexto de reintento (número de reintentos restantes, etc.) también se debe pasar a este bloque. Suena demasiado complejo ...

¿Alguien ve un enfoque más sencillo para implementar la política de reintento para un bloque de flujo de trabajo?

Respuestas a la pregunta(2)

Su respuesta a la pregunta