Como evitar a corrida de dados com `asio :: ip :: tcp :: iostream`?
Como evito uma corrida de dados ao usar dois threads para enviar e receber mais de umasio::ip::tcp::iostream
?
Estou escrevendo um programa que usa umasio::ip::tcp::iostream
para entrada e saída. O programa aceita comandos do usuário (remoto) pela porta 5555 e envia mensagens pela mesma conexão TCP para o usuário. Como esses eventos (comandos recebidos do usuário ou mensagens enviadas ao usuário) ocorrem de forma assíncrona, tenho segmentos de transmissão e recebimento separados.
Nesta versão de brinquedo, os comandos são "um", "dois" e "sair". Claro que "sair" sai do programa. Os outros comandos não fazem nada e qualquer comando não reconhecido faz com que o servidor feche a conexão TCP.
As mensagens transmitidas são simples mensagens numeradas em série que são enviadas uma vez por segundo.
Na versão do brinquedo e no código real que estou tentando escrever, os processos de transmissão e recepção estão usando E / S de bloqueio, portanto, não parece haver uma boa maneira de usar umstd::mutex
ou outro mecanismo de sincronização. (Nas minhas tentativas, um processo pegava o mutex e depois bloqueava, o que não funcionaria para isso.)
Para criar e testar isso, estou usando o gcc versão 7.2.1 e o valgrind 3.13 em uma máquina Linux de 64 bits. Construir:
g++ -DASIO_STANDALONE -Wall -Wextra -pedantic -std=c++14 concurrent.cpp -o concurrent -lpthread
Para testar, eu executo o servidor com este comando:
valgrind --tool=helgrind --log-file=helgrind.txt ./concurrent
Então eu usotelnet 127.0.0.1 5555
em outra janela para criar uma conexão com o servidor. o quehelgrind
corretamente aponta é que há uma corrida de dados porque ambosrunTx
erunRx
estão tentando acessar o mesmo fluxo de forma assíncrona:
== 16188 == Possível corrida de dados durante a leitura do tamanho 1 em 0x1FFEFFF1CC pelo segmento # 1
== 16188 == Fechaduras retidas: nenhuma
... muito mais linhas elided
concurrent.cpp#include <asio.hpp>
#include <iostream>
#include <fstream>
#include <thread>
#include <array>
#include <chrono>
class Console {
public:
Console() :
want_quit{false},
want_reset{false}
{}
bool getQuitValue() const { return want_quit; }
int run(std::istream *in, std::ostream *out);
bool wantReset() const { return want_reset; }
private:
int runTx(std::istream *in);
int runRx(std::ostream *out);
bool want_quit;
bool want_reset;
};
int Console::runTx(std::istream *in) {
static const std::array<std::string, 3> cmds{
"quit", "one", "two",
};
std::string command;
while (!want_quit && !want_reset && *in >> command) {
if (command == cmds.front()) {
want_quit = true;
}
if (std::find(cmds.cbegin(), cmds.cend(), command) == cmds.cend()) {
want_reset = true;
std::cout << "unknown command [" << command << "]\n";
} else {
std::cout << command << '\n';
}
}
return 0;
}
int Console::runRx(std::ostream *out) {
for (int i=0; !(want_reset || want_quit); ++i) {
(*out) << "This is message number " << i << '\n';
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
out->flush();
}
return 0;
}
int Console::run(std::istream *in, std::ostream *out) {
want_reset = false;
std::thread t1{&Console::runRx, this, out};
int status = runTx(in);
t1.join();
return status;
}
int main()
{
Console con;
asio::io_service ios;
// IPv4 address, port 5555
asio::ip::tcp::acceptor acceptor(ios,
asio::ip::tcp::endpoint{asio::ip::tcp::v4(), 5555});
while (!con.getQuitValue()) {
asio::ip::tcp::iostream stream;
acceptor.accept(*stream.rdbuf());
con.run(&stream, &stream);
if (con.wantReset()) {
std::cout << "resetting\n";
}
}
}