Entidad marco de subprocesos múltiples: La conexión no se cerró. El estado actual de la conexión es la conexión.

Así que tengo un proceso de servicio de Windows que realiza un proceso de flujo de trabajo. El back-end utiliza Repository y UnitofWork Pattern and Unity sobre Entity Framework con la clase de entidades generada desde edmx. No voy a entrar en muchos detalles ya que no es necesario, pero básicamente hay 5 pasos por los que pasa el flujo de trabajo. Un proceso en particular puede estar en cualquier etapa en cualquier momento (en orden, por supuesto). El paso uno solo genera datos para el paso dos, que valida los datos a través de un proceso de larga ejecución en otro servidor. Entonces el paso allí genera un pdf con esos datos. Para cada etapa, generamos un temporizador, sin embargo, es configurable para permitir que se genere más de un temporizador para cada etapa. Ahí radica el problema. Cuando agrego un procesador a una etapa en particular, aparece el siguiente error al azar:

La conexión no estaba cerrada. El estado actual de la conexión se está conectando.

Al leer esto, parece obvio que esto está sucediendo porque el contexto está tratando de acceder a la misma entidad desde dos hilos. Pero aquí es donde me está lanzando como un bucle. Toda la información que puedo encontrar en este estado indica que deberíamos estar usando un contexto de instancia por hilo. Que, por lo que puedo decir, lo estoy haciendo (ver el código a continuación). No estoy usando un patrón de singleton o estática o algo así, así que no estoy realmente seguro de por qué sucede esto o cómo evitarlo. He publicado los bits relevantes de mi código a continuación para su revisión.

El repositorio base:

 public class BaseRepository
{
    /// <summary>
    /// Initializes a repository and registers with a <see cref="IUnitOfWork"/>
    /// </summary>
    /// <param name="unitOfWork"></param>
    public BaseRepository(IUnitOfWork unitOfWork)
    {
        if (unitOfWork == null) throw new ArgumentException("unitofWork");
        UnitOfWork = unitOfWork;
    }


    /// <summary>
    /// Returns a <see cref="DbSet"/> of entities.
    /// </summary>
    /// <typeparam name="TEntity">Entity type the dbset needs to return.</typeparam>
    /// <returns></returns>
    protected virtual DbSet<TEntity> GetDbSet<TEntity>() where TEntity : class
    {

        return Context.Set<TEntity>();
    }

    /// <summary>
    /// Sets the state of an entity.
    /// </summary>
    /// <param name="entity">object to set state.</param>
    /// <param name="entityState"><see cref="EntityState"/></param>
    protected virtual void SetEntityState(object entity, EntityState entityState)
    {
        Context.Entry(entity).State = entityState;
    }

    /// <summary>
    /// Unit of work controlling this repository.       
    /// </summary>
    protected IUnitOfWork UnitOfWork { get; set; }

    /// <summary>
    /// 
    /// </summary>
    /// <param name="entity"></param>
    protected virtual void Attach(object entity)
    {
        if (Context.Entry(entity).State == EntityState.Detached)
            Context.Entry(entity).State = EntityState.Modified;
    }

    protected virtual void Detach(object entity)
    {
        Context.Entry(entity).State = EntityState.Detached;
    }

    /// <summary>
    /// Provides access to the ef context we are working with
    /// </summary>
    internal StatementAutoEntities Context
    {
        get
        {                
            return (StatementAutoEntities)UnitOfWork;
        }
    }
}

StatementAutoEntities es la clase EF generada automáticamente.

La implementación del repositorio:

public class ProcessingQueueRepository : BaseRepository, IProcessingQueueRepository
{

    /// <summary>
    /// Creates a new repository and associated with a <see cref="IUnitOfWork"/>
    /// </summary>
    /// <param name="unitOfWork"></param>
    public ProcessingQueueRepository(IUnitOfWork unitOfWork) : base(unitOfWork)
    {
    }

    /// <summary>
    /// Create a new <see cref="ProcessingQueue"/> entry in database
    /// </summary>
    /// <param name="Queue">
    ///     <see cref="ProcessingQueue"/>
    /// </param>
    public void Create(ProcessingQueue Queue)
    {
        GetDbSet<ProcessingQueue>().Add(Queue);
        UnitOfWork.SaveChanges();
    }

    /// <summary>
    /// Updates a <see cref="ProcessingQueue"/> entry in database
    /// </summary>
    /// <param name="queue">
    ///     <see cref="ProcessingQueue"/>
    /// </param>
    public void Update(ProcessingQueue queue)
    {
        //Attach(queue);
        UnitOfWork.SaveChanges();
    }

    /// <summary>
    /// Delete a <see cref="ProcessingQueue"/> entry in database
    /// </summary>
    /// <param name="Queue">
    ///     <see cref="ProcessingQueue"/>
    /// </param>
    public void Delete(ProcessingQueue Queue)
    {
        GetDbSet<ProcessingQueue>().Remove(Queue);  
        UnitOfWork.SaveChanges();
    }

    /// <summary>
    /// Gets a <see cref="ProcessingQueue"/> by its unique Id
    /// </summary>
    /// <param name="id"></param>
    /// <returns></returns>
    public ProcessingQueue GetById(int id)
    {
        return (from e in Context.ProcessingQueue_SelectById(id) select e).FirstOrDefault();
    }

    /// <summary>
    /// Gets a list of <see cref="ProcessingQueue"/> entries by status
    /// </summary>
    /// <param name="status"></param>
    /// <returns></returns>
    public IList<ProcessingQueue> GetByStatus(int status)
    {
        return (from e in Context.ProcessingQueue_SelectByStatus(status) select e).ToList();
    }

    /// <summary>
    /// Gets a list of all <see cref="ProcessingQueue"/> entries
    /// </summary>
    /// <returns></returns>
    public IList<ProcessingQueue> GetAll()
    {
        return (from e in Context.ProcessingQueue_Select() select e).ToList();
    }

    /// <summary>
    /// Gets the next pending item id in the queue for a specific work        
    /// </summary>
    /// <param name="serverId">Unique id of the server that will process the item in the queue</param>
    /// <param name="workTypeId">type of <see cref="WorkType"/> we are looking for</param>
    /// <param name="operationId">if defined only operations of the type indicated are considered.</param>
    /// <returns>Next pending item in the queue for the work type or null if no pending work is found</returns>
    public int GetNextPendingItemId(int serverId, int workTypeId, int? operationId)
    {
        var id = Context.ProcessingQueue_GetNextPending(serverId, workTypeId,  operationId).SingleOrDefault();
        return id.HasValue ? id.Value : -1;
    }

    /// <summary>
    /// Returns a list of <see cref="ProcessingQueueStatus_dto"/>s objects with all
    /// active entries in the queue
    /// </summary>
    /// <returns></returns>
    public IList<ProcessingQueueStatus_dto> GetActiveStatusEntries()
    {
        return (from e in Context.ProcessingQueueStatus_Select() select e).ToList();
    }
    /// <summary>
    /// Bumps an entry to the front of the queue 
    /// </summary>
    /// <param name="processingQueueId"></param>
    public void Bump(int processingQueueId)
    {
        Context.ProcessingQueue_Bump(processingQueueId);
    }
}

Usamos Unity para inyección de dependencia, por ejemplo, un código de llamada:

#region Members
    private readonly IProcessingQueueRepository _queueRepository;       
    #endregion

    #region Constructors
    /// <summary>Initializes ProcessingQueue services with repositories</summary>
    /// <param name="queueRepository"><see cref="IProcessingQueueRepository"/></param>        
    public ProcessingQueueService(IProcessingQueueRepository queueRepository)
    {
        Check.Require(queueRepository != null, "processingQueueRepository is required");
        _queueRepository = queueRepository;

    }
    #endregion

El código en el servicio de Windows que inicia los temporizadores es el siguiente:

            _staWorkTypeConfigLock.EnterReadLock();
        foreach (var timer in from operation in (from o in _staWorkTypeConfig.WorkOperations where o.UseQueueForExecution && o.AssignedProcessors > 0 select o) 
                              let interval = operation.SpawnInternval < 30 ? 30 : operation.SpawnInternval 
                              select new StaTimer
                            {
                                Interval = _runImmediate ? 5000 : interval*1000,
                                Operation = (ProcessingQueue.RequestedOperation) operation.OperationId
                            })
        {
            timer.Elapsed += ApxQueueProcessingOnElapsedInterval;
            timer.Enabled = true;
            Logger.DebugFormat("Queue processing for operations of type {0} will execute every {1} seconds", timer.Operation, timer.Interval/1000);                
        }
        _staWorkTypeConfigLock.ExitReadLock();

StaTimer es solo una envoltura en el temporizador que agrega el tipo de operación. ApxQueueProcessingOnElapsedInterval luego básicamente asigna trabajo al proceso basado en la operación.

También agregaré un poco del código de Intervalo de ApxQueueProcessingOnElapsed en el que estamos generando tareas.

            _staTasksLock.EnterWriteLock();
        for (var x = 0; x < tasksNeeded; x++)
        {
            var t = new Task(obj => ProcessStaQueue((QueueProcessConfig) obj),
                             CreateQueueProcessConfig(true, operation), _cancellationToken);


            _staTasks.Add(new Tuple<ProcessingQueue.RequestedOperation, DateTime, Task>(operation, DateTime.Now,t));

            t.Start();
            Thread.Sleep(300); //so there are less conflicts fighting for jobs in the queue table
        }
        _staTasksLock.ExitWriteLock();

Respuestas a la pregunta(3)

Su respuesta a la pregunta