So erzwingen Sie die Reihenfolge der Nachrichtenwarteschlangen mit mehreren WCF-Dienstinstanzen

Ich möchte einen WCF-Dienst erstellen, der eine MSMQ-Bindung verwendet, da der Dienst eine große Anzahl von Benachrichtigungen verarbeiten soll. Es ist wichtig, dass Clients nicht vom Service aufgehalten werden und dass die Benachrichtigungen in der Reihenfolge verarbeitet werden, in der sie ausgelöst werden, daher die Implementierung der Warteschlange.

Ein weiterer Gesichtspunkt ist die Belastbarkeit. Ich weiß, dass ich MSMQ selbst zu Clustern zusammenfassen könnte, um die Warteschlange robuster zu machen, aber ich möchte eine Instanz meines Dienstes auf verschiedenen Servern ausführen können. Wenn also ein Server abstürzt, werden keine Benachrichtigungen in der Warteschlange aufgebaut, sondern ein anderer Server wird weiter verarbeitet .

Ich habe mit der MSMQ-Bindung experimentiert und festgestellt, dass mehrere Instanzen eines Dienstes in derselben Warteschlange empfangsbereit sind, und sie haben sich selbst überlassen, eine Art Round-Robin mit der auf die verfügbaren Dienste verteilten Last durchzuführen. Das ist großartig, aber am Ende geht die Sequenzierung der Warteschlange verloren, da verschiedene Instanzen unterschiedlich viel Zeit für die Verarbeitung der Anforderung benötigen.

Ich habe eine einfache Konsolen-App verwendet, um zu experimentieren. Dies ist der epische Codeabzug unten. Wenn es läuft, bekomme ich eine Ausgabe wie diese:

host1 open
host2 open
S1: 01
S1: 03
S1: 05
S2: 02
S1: 06
S1: 08
S1: 09
S2: 04
S1: 10
host1 closed
S2: 07
host2 closed

Was ich möchte ist:

host1 open
host2 open
S1: 01
<pause while S2 completes>
S2: 02
S1: 03
<pause while S2 completes>
S2: 04
S1: 05
S1: 06
etc.

Ich hätte gedacht, dass S2, da es noch nicht abgeschlossen ist, möglicherweise immer noch fehlschlägt und die Nachricht, die es verarbeitet, an die Warteschlange zurückgibt. Aus diesem Grund darf S1 keine weitere Nachricht aus der Warteschlange ziehen. Meine Warteschlange uns Transaktions und ich habe versucht, EinstellungTransactionScopeRequired = true auf den Service, aber ohne Erfolg.

Ist das überhaupt möglich? Gehe ich falsch vor Gibt es eine andere Möglichkeit, einen Failover-Dienst ohne einen zentralen Synchronisationsmechanismus zu erstellen?

class WcfMsmqProgram
{
    private const string QueueName = "testq1";

    static void Main()
    {
        // Create a transactional queue
        string qPath = ".\\private$\\" + QueueName;
        if (!MessageQueue.Exists(qPath))
            MessageQueue.Create(qPath, true);
        else
            new MessageQueue(qPath).Purge();

        // S1 processes as fast as it can
        IService s1 = new ServiceImpl("S1");
        // S2 is slow
        IService s2 = new ServiceImpl("S2", 2000);

        // MSMQ binding
        NetMsmqBinding binding = new NetMsmqBinding(NetMsmqSecurityMode.None);

        // Host S1
        ServiceHost host1 = new ServiceHost(s1, new Uri("net.msmq://localhost/private"));
        ConfigureService(host1, binding);
        host1.Open();
        Console.WriteLine("host1 open");

        // Host S2
        ServiceHost host2 = new ServiceHost(s2, new Uri("net.msmq://localhost/private"));
        ConfigureService(host2, binding);
        host2.Open();
        Console.WriteLine("host2 open");

        // Create a client 
        ChannelFactory<IService> factory = new ChannelFactory<IService>(binding, new EndpointAddress("net.msmq://localhost/private/" + QueueName));
        IService client = factory.CreateChannel();

        // Periodically call the service with a new number
        int counter = 1;
        using (Timer t = new Timer(o => client.EchoNumber(counter++), null, 0, 500))
        {
            // Enter to stop
            Console.ReadLine();
        }

        host1.Close();
        Console.WriteLine("host1 closed");
        host2.Close();
        Console.WriteLine("host2 closed");

        // Wait for exit
        Console.ReadLine();
    }

    static void ConfigureService(ServiceHost host, NetMsmqBinding binding)
    {
        var endpoint = host.AddServiceEndpoint(typeof(IService), binding, QueueName);
    }

    [ServiceContract]
    interface IService
    {
        [OperationContract(IsOneWay = true)]
        void EchoNumber(int number);
    }

    [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]
    class ServiceImpl : IService
    {
        public ServiceImpl(string name, int sleep = 0)
        {
            this.name = name;
            this.sleep = sleep;
        }

        private string name;
        private int sleep;

        public void EchoNumber(int number)
        {
            Thread.Sleep(this.sleep);
            Console.WriteLine("{0}: {1:00}", this.name, number);
        }
    }
}

Antworten auf die Frage(2)

Ihre Antwort auf die Frage