Jak wymusić sekwencję kolejki komunikatów z wieloma instancjami usługi WCF
Chcę utworzyć usługę WCF, która używa powiązania MSMQ, ponieważ mam dużą ilość powiadomień, które usługa ma przetwarzać. Ważne jest, aby klienci nie byli wstrzymywani przez usługę i aby powiadomienia były przetwarzane w kolejności, w jakiej zostały podniesione, stąd implementacja kolejki.
Inną kwestią jest odporność. Wiem, że mogłem sam połączyć MSMQ, aby kolejka była bardziej niezawodna, ale chcę mieć możliwość uruchomienia instancji mojej usługi na różnych serwerach, więc jeśli serwer zawiesza się, powiadomienia nie gromadzą się w kolejce, ale inny serwer kontynuuje przetwarzanie .
Eksperymentowałem z wiązaniem MSMQ i odkryłem, że możesz mieć wiele instancji usługi nasłuchujących na tej samej kolejce i pozostawieni samym sobie robią coś w rodzaju rundy robota z obciążeniem rozłożonym na dostępne usługi. To wspaniałe, ale w końcu tracę kolejkowanie kolejki, ponieważ różne instancje wymagają innej ilości czasu na przetworzenie żądania.
Do eksperymentowania używam prostej aplikacji konsoli, która jest epickim zrzutem kodu poniżej. Po uruchomieniu otrzymuję dane wyjściowe w następujący sposób:
host1 open
host2 open
S1: 01
S1: 03
S1: 05
S2: 02
S1: 06
S1: 08
S1: 09
S2: 04
S1: 10
host1 closed
S2: 07
host2 closed
Chcę:
host1 open
host2 open
S1: 01
<pause while S2 completes>
S2: 02
S1: 03
<pause while S2 completes>
S2: 04
S1: 05
S1: 06
etc.
Pomyślałbym, że ponieważ S2 nie zostało ukończone, może nadal zawieść i zwrócić przetwarzaną wiadomość do kolejki. Dlatego S1 nie powinien mieć możliwości wyłączenia kolejnej wiadomości z kolejki. Moja kolejka jest transakcyjna i próbowałem ustawićTransactionScopeRequired = true
na usługi, ale bezskutecznie.
Czy to możliwe? Czy to nie tak? Czy jest jakiś inny sposób na zbudowanie usługi przełączania awaryjnego bez jakiegoś centralnego mechanizmu synchronizacji?
class WcfMsmqProgram
{
private const string QueueName = "testq1";
static void Main()
{
// Create a transactional queue
string qPath = ".\\private$\\" + QueueName;
if (!MessageQueue.Exists(qPath))
MessageQueue.Create(qPath, true);
else
new MessageQueue(qPath).Purge();
// S1 processes as fast as it can
IService s1 = new ServiceImpl("S1");
// S2 is slow
IService s2 = new ServiceImpl("S2", 2000);
// MSMQ binding
NetMsmqBinding binding = new NetMsmqBinding(NetMsmqSecurityMode.None);
// Host S1
ServiceHost host1 = new ServiceHost(s1, new Uri("net.msmq://localhost/private"));
ConfigureService(host1, binding);
host1.Open();
Console.WriteLine("host1 open");
// Host S2
ServiceHost host2 = new ServiceHost(s2, new Uri("net.msmq://localhost/private"));
ConfigureService(host2, binding);
host2.Open();
Console.WriteLine("host2 open");
// Create a client
ChannelFactory<IService> factory = new ChannelFactory<IService>(binding, new EndpointAddress("net.msmq://localhost/private/" + QueueName));
IService client = factory.CreateChannel();
// Periodically call the service with a new number
int counter = 1;
using (Timer t = new Timer(o => client.EchoNumber(counter++), null, 0, 500))
{
// Enter to stop
Console.ReadLine();
}
host1.Close();
Console.WriteLine("host1 closed");
host2.Close();
Console.WriteLine("host2 closed");
// Wait for exit
Console.ReadLine();
}
static void ConfigureService(ServiceHost host, NetMsmqBinding binding)
{
var endpoint = host.AddServiceEndpoint(typeof(IService), binding, QueueName);
}
[ServiceContract]
interface IService
{
[OperationContract(IsOneWay = true)]
void EchoNumber(int number);
}
[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]
class ServiceImpl : IService
{
public ServiceImpl(string name, int sleep = 0)
{
this.name = name;
this.sleep = sleep;
}
private string name;
private int sleep;
public void EchoNumber(int number)
{
Thread.Sleep(this.sleep);
Console.WriteLine("{0}: {1:00}", this.name, number);
}
}
}