Как применить последовательность очереди сообщений с несколькими экземплярами службы WCF
Я хочу создать службу WCF, которая использует привязку MSMQ, поскольку у меня большой объем уведомлений, которые служба должна обработать. Важно, чтобы клиенты не задерживались службой и чтобы уведомления обрабатывались в том порядке, в котором они были подняты, следовательно, реализация очереди.
Другое соображение - это устойчивость. Я знаю, что мог бы сам кластеризовать MSMQ, чтобы сделать очередь более устойчивой, но я хочу иметь возможность запускать экземпляр моей службы на разных серверах, поэтому в случае сбоя сервера уведомления не накапливаются в очереди, а другой сервер продолжает обработку ,
Я экспериментировал с привязкой MSMQ и обнаружил, что вы можете иметь несколько экземпляров службы, прослушивающих одну и ту же очередь, и предоставляя самим себе, они в итоге выполняют своего рода циклический перебор с распределением нагрузки между доступными службами. Это здорово, но в итоге я теряю последовательность очереди, поскольку разные экземпляры занимают разное количество времени для обработки запроса.
Для экспериментов мы использовали простое консольное приложение, которое представляет собой эпический дамп кода ниже. Когда это's запустить, я получаю вывод как это:
host1 open
host2 open
S1: 01
S1: 03
S1: 05
S2: 02
S1: 06
S1: 08
S1: 09
S2: 04
S1: 10
host1 closed
S2: 07
host2 closed
Я хочу, чтобы это произошло:
host1 open
host2 open
S1: 01
S2: 02
S1: 03
S2: 04
S1: 05
S1: 06
etc.
Я бы подумал, что, поскольку S2 еще не завершен, он все равно может завершиться ошибкой и вернуть сообщение, которое он обрабатывал, в очередь. Поэтому S1 не должно быть позволено извлечь другое сообщение из очереди. Моя очередь нас транзакционная, и я попытался установитьTransactionScopeRequired = true
на службе, но безрезультатно.
Это вообще возможно? Я поступаю неправильно? Есть ли какой-то другой способ создания службы отработки отказа без какого-либо механизма центральной синхронизации?
class WcfMsmqProgram
{
private const string QueueName = "testq1";
static void Main()
{
// Create a transactional queue
string qPath = ".\\private$\\" + QueueName;
if (!MessageQueue.Exists(qPath))
MessageQueue.Create(qPath, true);
else
new MessageQueue(qPath).Purge();
// S1 processes as fast as it can
IService s1 = new ServiceImpl("S1");
// S2 is slow
IService s2 = new ServiceImpl("S2", 2000);
// MSMQ binding
NetMsmqBinding binding = new NetMsmqBinding(NetMsmqSecurityMode.None);
// Host S1
ServiceHost host1 = new ServiceHost(s1, new Uri("net.msmq://localhost/private"));
ConfigureService(host1, binding);
host1.Open();
Console.WriteLine("host1 open");
// Host S2
ServiceHost host2 = new ServiceHost(s2, new Uri("net.msmq://localhost/private"));
ConfigureService(host2, binding);
host2.Open();
Console.WriteLine("host2 open");
// Create a client
ChannelFactory factory = new ChannelFactory(binding, new EndpointAddress("net.msmq://localhost/private/" + QueueName));
IService client = factory.CreateChannel();
// Periodically call the service with a new number
int counter = 1;
using (Timer t = new Timer(o => client.EchoNumber(counter++), null, 0, 500))
{
// Enter to stop
Console.ReadLine();
}
host1.Close();
Console.WriteLine("host1 closed");
host2.Close();
Console.WriteLine("host2 closed");
// Wait for exit
Console.ReadLine();
}
static void ConfigureService(ServiceHost host, NetMsmqBinding binding)
{
var endpoint = host.AddServiceEndpoint(typeof(IService), binding, QueueName);
}
[ServiceContract]
interface IService
{
[OperationContract(IsOneWay = true)]
void EchoNumber(int number);
}
[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]
class ServiceImpl : IService
{
public ServiceImpl(string name, int sleep = 0)
{
this.name = name;
this.sleep = sleep;
}
private string name;
private int sleep;
public void EchoNumber(int number)
{
Thread.Sleep(this.sleep);
Console.WriteLine("{0}: {1:00}", this.name, number);
}
}
}