Como evitar a corrida de dados com `asio :: ip :: tcp :: iostream`?

Minha pergunta

Como evito uma corrida de dados ao usar dois threads para enviar e receber mais de umasio::ip::tcp::iostream?

desenhar

Estou escrevendo um programa que usa umasio::ip::tcp::iostream para entrada e saída. O programa aceita comandos do usuário (remoto) pela porta 5555 e envia mensagens pela mesma conexão TCP para o usuário. Como esses eventos (comandos recebidos do usuário ou mensagens enviadas ao usuário) ocorrem de forma assíncrona, tenho segmentos de transmissão e recebimento separados.

Nesta versão de brinquedo, os comandos são "um", "dois" e "sair". Claro que "sair" sai do programa. Os outros comandos não fazem nada e qualquer comando não reconhecido faz com que o servidor feche a conexão TCP.

As mensagens transmitidas são simples mensagens numeradas em série que são enviadas uma vez por segundo.

Na versão do brinquedo e no código real que estou tentando escrever, os processos de transmissão e recepção estão usando E / S de bloqueio, portanto, não parece haver uma boa maneira de usar umstd::mutex ou outro mecanismo de sincronização. (Nas minhas tentativas, um processo pegava o mutex e depois bloqueava, o que não funcionaria para isso.)

Construa e teste

Para criar e testar isso, estou usando o gcc versão 7.2.1 e o valgrind 3.13 em uma máquina Linux de 64 bits. Construir:

g++ -DASIO_STANDALONE -Wall -Wextra -pedantic -std=c++14 concurrent.cpp -o concurrent -lpthread

Para testar, eu executo o servidor com este comando:

valgrind --tool=helgrind --log-file=helgrind.txt ./concurrent 

Então eu usotelnet 127.0.0.1 5555 em outra janela para criar uma conexão com o servidor. o quehelgrind corretamente aponta é que há uma corrida de dados porque ambosrunTx erunRx estão tentando acessar o mesmo fluxo de forma assíncrona:

== 16188 == Possível corrida de dados durante a leitura do tamanho 1 em 0x1FFEFFF1CC pelo segmento # 1

== 16188 == Fechaduras retidas: nenhuma

... muito mais linhas elided

concurrent.cpp
#include <asio.hpp>
#include <iostream>
#include <fstream>
#include <thread>
#include <array>
#include <chrono>

class Console {
public:
    Console() :
        want_quit{false},
        want_reset{false}
    {}
    bool getQuitValue() const { return want_quit; }
    int run(std::istream *in, std::ostream *out);
    bool wantReset() const { return want_reset; }
private:
    int runTx(std::istream *in);
    int runRx(std::ostream *out);
    bool want_quit;
    bool want_reset;
};

int Console::runTx(std::istream *in) {
    static const std::array<std::string, 3> cmds{
        "quit", "one", "two", 
    };
    std::string command;
    while (!want_quit && !want_reset && *in >> command) {
        if (command == cmds.front()) {
            want_quit = true;
        }
        if (std::find(cmds.cbegin(), cmds.cend(), command) == cmds.cend()) {
            want_reset = true;
            std::cout << "unknown command [" << command << "]\n";
        } else {
            std::cout << command << '\n';
        }
    }
    return 0;
}

int Console::runRx(std::ostream *out) {
    for (int i=0; !(want_reset || want_quit); ++i) {
        (*out) << "This is message number " << i << '\n';
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
        out->flush();
    }
    return 0;
}

int Console::run(std::istream *in, std::ostream *out) {
    want_reset = false;
    std::thread t1{&Console::runRx, this, out};
    int status = runTx(in);
    t1.join();
    return status;
}

int main()
{
    Console con;
    asio::io_service ios;
    // IPv4 address, port 5555
    asio::ip::tcp::acceptor acceptor(ios, 
            asio::ip::tcp::endpoint{asio::ip::tcp::v4(), 5555});
    while (!con.getQuitValue()) {
        asio::ip::tcp::iostream stream;
        acceptor.accept(*stream.rdbuf());
        con.run(&stream, &stream);
        if (con.wantReset()) {
            std::cout << "resetting\n";
        }
    }
}

questionAnswers(1)

yourAnswerToTheQuestion