Многопоточность Entity Framework: Соединение не было закрыто. Текущее состояние соединения
Итак, у меня есть процесс службы Windows, который выполняет рабочий процесс. Бэкэнд использует Repository и UnitofWork Pattern и Unity поверх Entity Framework с классом сущностей, сгенерированным из edmx. Я выиграл'Не вдавайтесь в подробности, так как в этом нет необходимости, но в основном есть 5 этапов, через которые проходит рабочий процесс. Определенный процесс может быть на любой стадии в любой момент времени (в порядке, конечно). Шаг первый просто генерирует данные для шага два, который проверяет данные через длительный процесс на другом сервере. Затем шаг там генерирует PDF с этими данными. Для каждого этапа мы порождаем таймер, однако его можно настроить, чтобы разрешить создание более одного таймера для каждого этапа. В этом и заключается проблема. Когда я добавляю процессор к определенной стадии, это следующая ошибка случайным образом:
Соединение не было закрыто. Связь'Текущее состояние подключено.
Читая об этом, кажется очевидным, что это происходит, потому что контекст пытается получить доступ к одной и той же сущности из двух потоков. Но это то, где меня бросают в петлю. Вся информация, которую я могу найти по этому вопросу, говорит о том, что мы должны использовать контекст экземпляра для каждого потока. Что, насколько я могу судить, я делаю (см. Код ниже). Я не использую шаблон синглтона или статику или что-то еще, поэтому я не совсем уверен, почему это происходит или как этого избежать. Я разместил соответствующие биты моего кода ниже для вашего обзора.
Базовый репозиторий:
public class BaseRepository
{
///
/// Initializes a repository and registers with a
///
///
public BaseRepository(IUnitOfWork unitOfWork)
{
if (unitOfWork == null) throw new ArgumentException("unitofWork");
UnitOfWork = unitOfWork;
}
///
/// Returns a of entities.
///
/// Entity type the dbset needs to return.
///
protected virtual DbSet GetDbSet() where TEntity : class
{
return Context.Set();
}
///
/// Sets the state of an entity.
///
/// object to set state.
///
protected virtual void SetEntityState(object entity, EntityState entityState)
{
Context.Entry(entity).State = entityState;
}
///
/// Unit of work controlling this repository.
///
protected IUnitOfWork UnitOfWork { get; set; }
///
///
///
///
protected virtual void Attach(object entity)
{
if (Context.Entry(entity).State == EntityState.Detached)
Context.Entry(entity).State = EntityState.Modified;
}
protected virtual void Detach(object entity)
{
Context.Entry(entity).State = EntityState.Detached;
}
///
/// Provides access to the ef context we are working with
///
internal StatementAutoEntities Context
{
get
{
return (StatementAutoEntities)UnitOfWork;
}
}
}
StatementAutoEntities - это автоматически сгенерированный класс EF.
Реализация репозитория:
public class ProcessingQueueRepository : BaseRepository, IProcessingQueueRepository
{
///
/// Creates a new repository and associated with a
///
///
public ProcessingQueueRepository(IUnitOfWork unitOfWork) : base(unitOfWork)
{
}
///
/// Create a new entry in database
///
///
///
///
public void Create(ProcessingQueue Queue)
{
GetDbSet().Add(Queue);
UnitOfWork.SaveChanges();
}
///
/// Updates a entry in database
///
///
///
///
public void Update(ProcessingQueue queue)
{
//Attach(queue);
UnitOfWork.SaveChanges();
}
///
/// Delete a entry in database
///
///
///
///
public void Delete(ProcessingQueue Queue)
{
GetDbSet().Remove(Queue);
UnitOfWork.SaveChanges();
}
///
/// Gets a by its unique Id
///
///
///
public ProcessingQueue GetById(int id)
{
return (from e in Context.ProcessingQueue_SelectById(id) select e).FirstOrDefault();
}
///
/// Gets a list of entries by status
///
///
///
public IList GetByStatus(int status)
{
return (from e in Context.ProcessingQueue_SelectByStatus(status) select e).ToList();
}
///
/// Gets a list of all entries
///
///
public IList GetAll()
{
return (from e in Context.ProcessingQueue_Select() select e).ToList();
}
///
/// Gets the next pending item id in the queue for a specific work
///
/// Unique id of the server that will process the item in the queue
/// type of we are looking for
/// if defined only operations of the type indicated are considered.
/// Next pending item in the queue for the work type or null if no pending work is found
public int GetNextPendingItemId(int serverId, int workTypeId, int? operationId)
{
var id = Context.ProcessingQueue_GetNextPending(serverId, workTypeId, operationId).SingleOrDefault();
return id.HasValue ? id.Value : -1;
}
///
/// Returns a list of s objects with all
/// active entries in the queue
///
///
public IList GetActiveStatusEntries()
{
return (from e in Context.ProcessingQueueStatus_Select() select e).ToList();
}
///
/// Bumps an entry to the front of the queue
///
///
public void Bump(int processingQueueId)
{
Context.ProcessingQueue_Bump(processingQueueId);
}
}
Мы используем Unity для внедрения зависимостей, например, некоторый вызывающий код:
#region Members
private readonly IProcessingQueueRepository _queueRepository;
#endregion
#region Constructors
/// Initializes ProcessingQueue services with repositories
///
public ProcessingQueueService(IProcessingQueueRepository queueRepository)
{
Check.Require(queueRepository != null, "processingQueueRepository is required");
_queueRepository = queueRepository;
}
#endregion
Код в службе Windows, который запускает таймеры, выглядит следующим образом:
_staWorkTypeConfigLock.EnterReadLock();
foreach (var timer in from operation in (from o in _staWorkTypeConfig.WorkOperations where o.UseQueueForExecution && o.AssignedProcessors > 0 select o)
let interval = operation.SpawnInternval < 30 ? 30 : operation.SpawnInternval
select new StaTimer
{
Interval = _runImmediate ? 5000 : interval*1000,
Operation = (ProcessingQueue.RequestedOperation) operation.OperationId
})
{
timer.Elapsed += ApxQueueProcessingOnElapsedInterval;
timer.Enabled = true;
Logger.DebugFormat("Queue processing for operations of type {0} will execute every {1} seconds", timer.Operation, timer.Interval/1000);
}
_staWorkTypeConfigLock.ExitReadLock();
StaTimer - это просто оболочка для типа операции добавления таймера. ApxQueueProcessingOnElapsedInterval затем просто назначает работу процессу на основе операции.
Я также добавлю немного кода ApxQueueProcessingOnElapsedInterval, где мы создаем задачи.
_staTasksLock.EnterWriteLock();
for (var x = 0; x < tasksNeeded; x++)
{
var t = new Task(obj => ProcessStaQueue((QueueProcessConfig) obj),
CreateQueueProcessConfig(true, operation), _cancellationToken);
_staTasks.Add(new Tuple(operation, DateTime.Now,t));
t.Start();
Thread.Sleep(300); //so there are less conflicts fighting for jobs in the queue table
}
_staTasksLock.ExitWriteLock();