Multithreading Entity Framework: Połączenie nie zostało zamknięte. Bieżący stan połączenia się łączy

Mam więc proces usługi systemu Windows, który wykonuje proces przepływu pracy. Tylny koniec używa repozytorium i wzorca UnitofWork oraz Unity na szczycie Entity Framework z klasą encji wygenerowaną z edmx. Nie będę wdawał się w wiele szczegółów, ponieważ nie jest to konieczne, ale zasadniczo istnieje 5 kroków, przez które przechodzi przepływ pracy. Określony proces może znajdować się na dowolnym etapie w dowolnym momencie (oczywiście oczywiście). Krok pierwszy generuje dane dla kroku drugiego, który weryfikuje dane za pomocą długiego procesu na innym serwerze. Następnie krok tam generuje pdf z tymi danymi. Na każdym etapie odradzamy czasomierz, jednak można go skonfigurować, aby umożliwić więcej niż jeden zegar na każdym etapie. W tym problem leży. Kiedy dodaję procesor do określonego etapu, losowo pojawia się następujący błąd:

Połączenie nie zostało zamknięte. Bieżący stan połączenia się łączy.

W związku z tym wydaje się oczywiste, że dzieje się tak, ponieważ kontekst próbuje uzyskać dostęp do tego samego podmiotu z dwóch wątków. Ale to jest rodzaj rzucania mnie na pętlę. Wszystkie informacje, które mogę znaleźć w tym stanie, że powinniśmy używać kontekstu instancji dla każdego wątku. O ile mi wiadomo, robię to (zobacz poniższy kod). Nie używam pojedynczego wzorca ani statyki ani niczego, więc nie jestem pewien, dlaczego tak się dzieje lub jak tego uniknąć. Poniżej zamieściłem odpowiednie fragmenty mojego kodu do recenzji.

Repozytorium podstawowe:

 public class BaseRepository
{
    /// <summary>
    /// Initializes a repository and registers with a <see cref="IUnitOfWork"/>
    /// </summary>
    /// <param name="unitOfWork"></param>
    public BaseRepository(IUnitOfWork unitOfWork)
    {
        if (unitOfWork == null) throw new ArgumentException("unitofWork");
        UnitOfWork = unitOfWork;
    }


    /// <summary>
    /// Returns a <see cref="DbSet"/> of entities.
    /// </summary>
    /// <typeparam name="TEntity">Entity type the dbset needs to return.</typeparam>
    /// <returns></returns>
    protected virtual DbSet<TEntity> GetDbSet<TEntity>() where TEntity : class
    {

        return Context.Set<TEntity>();
    }

    /// <summary>
    /// Sets the state of an entity.
    /// </summary>
    /// <param name="entity">object to set state.</param>
    /// <param name="entityState"><see cref="EntityState"/></param>
    protected virtual void SetEntityState(object entity, EntityState entityState)
    {
        Context.Entry(entity).State = entityState;
    }

    /// <summary>
    /// Unit of work controlling this repository.       
    /// </summary>
    protected IUnitOfWork UnitOfWork { get; set; }

    /// <summary>
    /// 
    /// </summary>
    /// <param name="entity"></param>
    protected virtual void Attach(object entity)
    {
        if (Context.Entry(entity).State == EntityState.Detached)
            Context.Entry(entity).State = EntityState.Modified;
    }

    protected virtual void Detach(object entity)
    {
        Context.Entry(entity).State = EntityState.Detached;
    }

    /// <summary>
    /// Provides access to the ef context we are working with
    /// </summary>
    internal StatementAutoEntities Context
    {
        get
        {                
            return (StatementAutoEntities)UnitOfWork;
        }
    }
}

StatementAutoEntities jest automatycznie generowaną klasą EF.

Implementacja repozytorium:

public class ProcessingQueueRepository : BaseRepository, IProcessingQueueRepository
{

    /// <summary>
    /// Creates a new repository and associated with a <see cref="IUnitOfWork"/>
    /// </summary>
    /// <param name="unitOfWork"></param>
    public ProcessingQueueRepository(IUnitOfWork unitOfWork) : base(unitOfWork)
    {
    }

    /// <summary>
    /// Create a new <see cref="ProcessingQueue"/> entry in database
    /// </summary>
    /// <param name="Queue">
    ///     <see cref="ProcessingQueue"/>
    /// </param>
    public void Create(ProcessingQueue Queue)
    {
        GetDbSet<ProcessingQueue>().Add(Queue);
        UnitOfWork.SaveChanges();
    }

    /// <summary>
    /// Updates a <see cref="ProcessingQueue"/> entry in database
    /// </summary>
    /// <param name="queue">
    ///     <see cref="ProcessingQueue"/>
    /// </param>
    public void Update(ProcessingQueue queue)
    {
        //Attach(queue);
        UnitOfWork.SaveChanges();
    }

    /// <summary>
    /// Delete a <see cref="ProcessingQueue"/> entry in database
    /// </summary>
    /// <param name="Queue">
    ///     <see cref="ProcessingQueue"/>
    /// </param>
    public void Delete(ProcessingQueue Queue)
    {
        GetDbSet<ProcessingQueue>().Remove(Queue);  
        UnitOfWork.SaveChanges();
    }

    /// <summary>
    /// Gets a <see cref="ProcessingQueue"/> by its unique Id
    /// </summary>
    /// <param name="id"></param>
    /// <returns></returns>
    public ProcessingQueue GetById(int id)
    {
        return (from e in Context.ProcessingQueue_SelectById(id) select e).FirstOrDefault();
    }

    /// <summary>
    /// Gets a list of <see cref="ProcessingQueue"/> entries by status
    /// </summary>
    /// <param name="status"></param>
    /// <returns></returns>
    public IList<ProcessingQueue> GetByStatus(int status)
    {
        return (from e in Context.ProcessingQueue_SelectByStatus(status) select e).ToList();
    }

    /// <summary>
    /// Gets a list of all <see cref="ProcessingQueue"/> entries
    /// </summary>
    /// <returns></returns>
    public IList<ProcessingQueue> GetAll()
    {
        return (from e in Context.ProcessingQueue_Select() select e).ToList();
    }

    /// <summary>
    /// Gets the next pending item id in the queue for a specific work        
    /// </summary>
    /// <param name="serverId">Unique id of the server that will process the item in the queue</param>
    /// <param name="workTypeId">type of <see cref="WorkType"/> we are looking for</param>
    /// <param name="operationId">if defined only operations of the type indicated are considered.</param>
    /// <returns>Next pending item in the queue for the work type or null if no pending work is found</returns>
    public int GetNextPendingItemId(int serverId, int workTypeId, int? operationId)
    {
        var id = Context.ProcessingQueue_GetNextPending(serverId, workTypeId,  operationId).SingleOrDefault();
        return id.HasValue ? id.Value : -1;
    }

    /// <summary>
    /// Returns a list of <see cref="ProcessingQueueStatus_dto"/>s objects with all
    /// active entries in the queue
    /// </summary>
    /// <returns></returns>
    public IList<ProcessingQueueStatus_dto> GetActiveStatusEntries()
    {
        return (from e in Context.ProcessingQueueStatus_Select() select e).ToList();
    }
    /// <summary>
    /// Bumps an entry to the front of the queue 
    /// </summary>
    /// <param name="processingQueueId"></param>
    public void Bump(int processingQueueId)
    {
        Context.ProcessingQueue_Bump(processingQueueId);
    }
}

Używamy Unity do wstrzykiwania zależności, na przykład kodu wywołującego:

#region Members
    private readonly IProcessingQueueRepository _queueRepository;       
    #endregion

    #region Constructors
    /// <summary>Initializes ProcessingQueue services with repositories</summary>
    /// <param name="queueRepository"><see cref="IProcessingQueueRepository"/></param>        
    public ProcessingQueueService(IProcessingQueueRepository queueRepository)
    {
        Check.Require(queueRepository != null, "processingQueueRepository is required");
        _queueRepository = queueRepository;

    }
    #endregion

Kod w usłudze Windows, który uruchamia liczniki czasu, jest następujący:

            _staWorkTypeConfigLock.EnterReadLock();
        foreach (var timer in from operation in (from o in _staWorkTypeConfig.WorkOperations where o.UseQueueForExecution && o.AssignedProcessors > 0 select o) 
                              let interval = operation.SpawnInternval < 30 ? 30 : operation.SpawnInternval 
                              select new StaTimer
                            {
                                Interval = _runImmediate ? 5000 : interval*1000,
                                Operation = (ProcessingQueue.RequestedOperation) operation.OperationId
                            })
        {
            timer.Elapsed += ApxQueueProcessingOnElapsedInterval;
            timer.Enabled = true;
            Logger.DebugFormat("Queue processing for operations of type {0} will execute every {1} seconds", timer.Operation, timer.Interval/1000);                
        }
        _staWorkTypeConfigLock.ExitReadLock();

StaTimer to po prostu opakowanie na typ operacji dodawania timera. ApxQueueProcessingOnElapsedInterval następnie po prostu przypisuje pracę do procesu na podstawie operacji.

Dodam także trochę kodu ApxQueueProcessingOnElapsedInterval, w którym wykonujemy zadania.

            _staTasksLock.EnterWriteLock();
        for (var x = 0; x < tasksNeeded; x++)
        {
            var t = new Task(obj => ProcessStaQueue((QueueProcessConfig) obj),
                             CreateQueueProcessConfig(true, operation), _cancellationToken);


            _staTasks.Add(new Tuple<ProcessingQueue.RequestedOperation, DateTime, Task>(operation, DateTime.Now,t));

            t.Start();
            Thread.Sleep(300); //so there are less conflicts fighting for jobs in the queue table
        }
        _staTasksLock.ExitWriteLock();

questionAnswers(3)

yourAnswerToTheQuestion