Многопоточность Entity Framework: Соединение не было закрыто. Текущее состояние соединения

Итак, у меня есть процесс службы Windows, который выполняет рабочий процесс. Бэкэнд использует Repository и UnitofWork Pattern и Unity поверх Entity Framework с классом сущностей, сгенерированным из edmx. Я выиграл'Не вдавайтесь в подробности, так как в этом нет необходимости, но в основном есть 5 этапов, через которые проходит рабочий процесс. Определенный процесс может быть на любой стадии в любой момент времени (в порядке, конечно). Шаг первый просто генерирует данные для шага два, который проверяет данные через длительный процесс на другом сервере. Затем шаг там генерирует PDF с этими данными. Для каждого этапа мы порождаем таймер, однако его можно настроить, чтобы разрешить создание более одного таймера для каждого этапа. В этом и заключается проблема. Когда я добавляю процессор к определенной стадии, это следующая ошибка случайным образом:

Соединение не было закрыто. Связь'Текущее состояние подключено.

Читая об этом, кажется очевидным, что это происходит, потому что контекст пытается получить доступ к одной и той же сущности из двух потоков. Но это то, где меня бросают в петлю. Вся информация, которую я могу найти по этому вопросу, говорит о том, что мы должны использовать контекст экземпляра для каждого потока. Что, насколько я могу судить, я делаю (см. Код ниже). Я не использую шаблон синглтона или статику или что-то еще, поэтому я не совсем уверен, почему это происходит или как этого избежать. Я разместил соответствующие биты моего кода ниже для вашего обзора.

Базовый репозиторий:

 public class BaseRepository
{
    /// 
    /// Initializes a repository and registers with a 
    /// 
    /// 
    public BaseRepository(IUnitOfWork unitOfWork)
    {
        if (unitOfWork == null) throw new ArgumentException("unitofWork");
        UnitOfWork = unitOfWork;
    }


    /// 
    /// Returns a  of entities.
    /// 
    /// Entity type the dbset needs to return.
    /// 
    protected virtual DbSet GetDbSet() where TEntity : class
    {

        return Context.Set();
    }

    /// 
    /// Sets the state of an entity.
    /// 
    /// object to set state.
    /// 
    protected virtual void SetEntityState(object entity, EntityState entityState)
    {
        Context.Entry(entity).State = entityState;
    }

    /// 
    /// Unit of work controlling this repository.       
    /// 
    protected IUnitOfWork UnitOfWork { get; set; }

    /// 
    /// 
    /// 
    /// 
    protected virtual void Attach(object entity)
    {
        if (Context.Entry(entity).State == EntityState.Detached)
            Context.Entry(entity).State = EntityState.Modified;
    }

    protected virtual void Detach(object entity)
    {
        Context.Entry(entity).State = EntityState.Detached;
    }

    /// 
    /// Provides access to the ef context we are working with
    /// 
    internal StatementAutoEntities Context
    {
        get
        {                
            return (StatementAutoEntities)UnitOfWork;
        }
    }
}

StatementAutoEntities - это автоматически сгенерированный класс EF.

Реализация репозитория:

public class ProcessingQueueRepository : BaseRepository, IProcessingQueueRepository
{

    /// 
    /// Creates a new repository and associated with a 
    /// 
    /// 
    public ProcessingQueueRepository(IUnitOfWork unitOfWork) : base(unitOfWork)
    {
    }

    /// 
    /// Create a new  entry in database
    /// 
    /// 
    ///     
    /// 
    public void Create(ProcessingQueue Queue)
    {
        GetDbSet().Add(Queue);
        UnitOfWork.SaveChanges();
    }

    /// 
    /// Updates a  entry in database
    /// 
    /// 
    ///     
    /// 
    public void Update(ProcessingQueue queue)
    {
        //Attach(queue);
        UnitOfWork.SaveChanges();
    }

    /// 
    /// Delete a  entry in database
    /// 
    /// 
    ///     
    /// 
    public void Delete(ProcessingQueue Queue)
    {
        GetDbSet().Remove(Queue);  
        UnitOfWork.SaveChanges();
    }

    /// 
    /// Gets a  by its unique Id
    /// 
    /// 
    /// 
    public ProcessingQueue GetById(int id)
    {
        return (from e in Context.ProcessingQueue_SelectById(id) select e).FirstOrDefault();
    }

    /// 
    /// Gets a list of  entries by status
    /// 
    /// 
    /// 
    public IList GetByStatus(int status)
    {
        return (from e in Context.ProcessingQueue_SelectByStatus(status) select e).ToList();
    }

    /// 
    /// Gets a list of all  entries
    /// 
    /// 
    public IList GetAll()
    {
        return (from e in Context.ProcessingQueue_Select() select e).ToList();
    }

    /// 
    /// Gets the next pending item id in the queue for a specific work        
    /// 
    /// Unique id of the server that will process the item in the queue
    /// type of  we are looking for
    /// if defined only operations of the type indicated are considered.
    /// Next pending item in the queue for the work type or null if no pending work is found
    public int GetNextPendingItemId(int serverId, int workTypeId, int? operationId)
    {
        var id = Context.ProcessingQueue_GetNextPending(serverId, workTypeId,  operationId).SingleOrDefault();
        return id.HasValue ? id.Value : -1;
    }

    /// 
    /// Returns a list of s objects with all
    /// active entries in the queue
    /// 
    /// 
    public IList GetActiveStatusEntries()
    {
        return (from e in Context.ProcessingQueueStatus_Select() select e).ToList();
    }
    /// 
    /// Bumps an entry to the front of the queue 
    /// 
    /// 
    public void Bump(int processingQueueId)
    {
        Context.ProcessingQueue_Bump(processingQueueId);
    }
}

Мы используем Unity для внедрения зависимостей, например, некоторый вызывающий код:

#region Members
    private readonly IProcessingQueueRepository _queueRepository;       
    #endregion

    #region Constructors
    /// Initializes ProcessingQueue services with repositories
    ///         
    public ProcessingQueueService(IProcessingQueueRepository queueRepository)
    {
        Check.Require(queueRepository != null, "processingQueueRepository is required");
        _queueRepository = queueRepository;

    }
    #endregion

Код в службе Windows, который запускает таймеры, выглядит следующим образом:

            _staWorkTypeConfigLock.EnterReadLock();
        foreach (var timer in from operation in (from o in _staWorkTypeConfig.WorkOperations where o.UseQueueForExecution && o.AssignedProcessors > 0 select o) 
                              let interval = operation.SpawnInternval < 30 ? 30 : operation.SpawnInternval 
                              select new StaTimer
                            {
                                Interval = _runImmediate ? 5000 : interval*1000,
                                Operation = (ProcessingQueue.RequestedOperation) operation.OperationId
                            })
        {
            timer.Elapsed += ApxQueueProcessingOnElapsedInterval;
            timer.Enabled = true;
            Logger.DebugFormat("Queue processing for operations of type {0} will execute every {1} seconds", timer.Operation, timer.Interval/1000);                
        }
        _staWorkTypeConfigLock.ExitReadLock();

StaTimer - это просто оболочка для типа операции добавления таймера. ApxQueueProcessingOnElapsedInterval затем просто назначает работу процессу на основе операции.

Я также добавлю немного кода ApxQueueProcessingOnElapsedInterval, где мы создаем задачи.

            _staTasksLock.EnterWriteLock();
        for (var x = 0; x < tasksNeeded; x++)
        {
            var t = new Task(obj => ProcessStaQueue((QueueProcessConfig) obj),
                             CreateQueueProcessConfig(true, operation), _cancellationToken);


            _staTasks.Add(new Tuple(operation, DateTime.Now,t));

            t.Start();
            Thread.Sleep(300); //so there are less conflicts fighting for jobs in the queue table
        }
        _staTasksLock.ExitWriteLock();

Ответы на вопрос(3)

Ваш ответ на вопрос