Eventos Socket.io, cluster, express e sync

Eu tenho um grande problema desde 1 semana. Eu tento converter meu projeto node.JS, na verdade, é executado em núcleo único para núcleo múltiplo com cluster.

Com os websockets, no momento, não tenho problemas para eventos, mas, para xhr-polling ou jsonp-polling, tenho grandes problemas com o socket.io no modo de cluster.

esta é a minha configuração de servidor:

00-generic.js

'use strict';

var http            = require('http'),
    os              = require('os'),
    cluster         = require('cluster');

module.exports = function(done) {
    var app = this.express,
        port = process.env.PORT || 3000,
        address = '0.0.0.0';

    if(this.env == 'test'){
        port = 3030;
    }

    var self = this;
    var size = os.cpus().length;

    if (cluster.isMaster) {
        console.info('Creating HTTP server cluster with %d workers', size);

        for (var i = 0; i < size; ++i) {
            console.log('spawning worker process %d', (i + 1));
            cluster.fork();
        }

        cluster.on('fork', function(worker) {
            console.log('worker %s spawned', worker.id);
        });
        cluster.on('online', function(worker) {
            console.log('worker %s online', worker.id);
        });
        cluster.on('listening', function(worker, addr) {
            console.log('worker %s listening on %s:%d', worker.id, addr.address, addr.port);
        });
        cluster.on('disconnect', function(worker) {
            console.log('worker %s disconnected', worker.id);
        });
        cluster.on('exit', function(worker, code, signal) {
            console.log('worker %s died (%s)', worker.id, signal || code);
            if (!worker.suicide) {
                console.log('restarting worker');
                cluster.fork();
            }
        });
    } else {
        http.createServer(app).listen(port, address, function() {
            var addr = this.address();
            console.log('listening on %s:%d', addr.address, addr.port);
            self.server = this;
            done();
        });
    }
};

03-socket.io.js

"use strict";
var _               = require('underscore'),
    socketio        = require('socket.io'),
    locomotive      = require('locomotive'),
    RedisStore      = require("socket.io/lib/stores/redis"),
    redis           = require("socket.io/node_modules/redis"),
    v1              = require(__dirname + '/../app/socket.io/v1'),
    sockets         = require(__dirname + '/../../app/socket/socket'),
    config          = require(__dirname + '/../app/global'),
    cluster         = require('cluster');

module.exports = function () {
    if (!cluster.isMaster) {
        this.io = socketio.listen(this.server);

        var pub             = redis.createClient(),
            sub             = redis.createClient(),
            client          = redis.createClient();

        this.io.enable('browser client minification');  // send minified client
        this.io.enable('browser client etag');          // apply etag caching logic based on version number
        this.io.enable('browser client gzip');          // gzip the file

        this.io.set("store", new RedisStore({
            redisPub        : pub,
            redisSub        : sub,
            redisClient     : client
        }));
        this.io.set('log level', 2);
        this.io.set('transports', [
            'websocket',
            'jsonp-polling'
        ]);
        this.io.set('close timeout', 24*60*60);
        this.io.set('heartbeat timeout', 24*60*60);

        this.io.sockets.on('connection', function (socket) {
            console.log('connected with ' + this.io.transports[socket.id].name);

            // partie v1 @deprecated
            v1.events(socket);

            // partie v1.1 refaite
            _.each(sockets['1.1'], function(Mod) {
                var mod = new Mod();
                mod.launch({
                    socket  : socket,
                    io      : this.io
                });
            }, this);

        }.bind(this));
    }
};

Com a pesquisa, o cliente se conecta de tempos em tempos em um processo diferente daquele iniciado pelos ouvintes. Da mesma forma, o servidor de comunicação para o cliente com emit.

Com um pouco de pesquisa, achei necessário passar por uma loja para socket.io para compartilhar a conexão de dados. Então, eu construí o RedisStore socket.io, conforme mostrado na documentação, mas mesmo assim, encontro eventos que não chegam com segurança e ainda recebo esta mensagem de erro:

warn: client not handshaken client should reconnect

EDITAR

Agora, o erro de aviso não é chamado. Altero o redisStore para socket.io-clusterhub MAS agora, os eventos nem sempre são chamados. Às vezes, como se a solicitação de pesquisa fosse capturada por outro trabalhador que não o que iniciou os ouvintes e, portanto, nada acontece. Aqui está a nova configuração:

'use strict';

var http            = require('http'),
    locomotive      = require('locomotive'),
    os              = require('os'),
    cluster         = require('cluster'),
    config          = require(__dirname + '/../app/global'),
    _               = require('underscore'),
    socketio        = require('socket.io'),
    v1              = require(__dirname + '/../app/socket.io/v1'),
    sockets         = require(__dirname + '/../../app/socket/socket');

module.exports = function(done) {
    var app = this.express,
        port = process.env.PORT || 3000,
        address = '0.0.0.0';

    if(this.env == 'test'){
        port = 3030;
    }

    var self = this;
    var size = os.cpus().length;

    this.clusterStore = new (require('socket.io-clusterhub'));

    if (cluster.isMaster) {
        for (var i = 0; i < size; ++i) {
            console.log('spawning worker process %d', (i + 1));
            cluster.fork();
        }

        cluster.on('fork', function(worker) {
            console.log('worker %s spawned', worker.id);
        });
        cluster.on('online', function(worker) {
            console.log('worker %s online', worker.id);
        });
        cluster.on('listening', function(worker, addr) {
            console.log('worker %s listening on %s:%d', worker.id, addr.address, addr.port);
        });
        cluster.on('disconnect', function(worker) {
            console.log('worker %s disconnected', worker.id);
        });
        cluster.on('exit', function(worker, code, signal) {
            console.log('worker %s died (%s)', worker.id, signal || code);
            if (!worker.suicide) {
                console.log('restarting worker');
                cluster.fork();
            }
        });
    } else {
        var server = http.createServer(app);

        this.io = socketio.listen(server);

        this.io.configure(function() {
            this.io.enable('browser client minification');  // send minified client
            this.io.enable('browser client etag');          // apply etag caching logic based on version number
            this.io.enable('browser client gzip');          // gzip the file

            this.io.set('store', this.clusterStore);
            this.io.set('log level', 2);
            this.io.set('transports', [
                'websocket',
                'jsonp-polling'
            ]);
            //this.io.set('close timeout', 24*60*60);
            //this.io.set('heartbeat timeout', 24*60*60);
        }.bind(this));

        this.io.sockets.on('connection', function (socket) {
            console.log('connected with ' + this.io.transports[socket.id].name);
            console.log('connected to worker: ' + cluster.worker.id);

            // partie v1 @deprecated
            v1.events(socket);

            // partie v1.1 refaite
            _.each(sockets['1.1'], function(Mod) {
                var mod = new Mod();
                mod.launch({
                    socket  : socket,
                    io      : this.io
                });
            }, this);

        }.bind(this));

        server.listen(port, address, function() {
            var addr = this.address();
            console.log('listening on %s:%d', addr.address, addr.port);
            self.server = this;
            done();
        });
    }
};

questionAnswers(3)

yourAnswerToTheQuestion