Como garantir que não estou compartilhando o mesmo soquete entre dois threads ao mesmo tempo? [duplicado]

Esta pergunta já tem uma resposta aqui:

Não compartilhe o mesmo soquete entre dois threads ao mesmo tempo 7 respostas

Eu tenho um código no qual estou lidando com soquetes e preciso garantir que nãocompartilham o mesmo soquete entre dois threads. No meu código abaixo, eu tenho um thread em segundo plano que é executado a cada 60 segundos e chamaupdateLiveSockets() método. NoupdateLiveSockets() , itero todos os soquetes que tenho e, em seguida, começo a executá-los um a um chamandosend método deSendToQueue classe e base na resposta, eu os marquei como vivos ou mortos.

Agora todos os tópicos do leitor chamarãogetNextSocket() ao mesmo tempo para obter o próximo soquete disponível ao vivo, para que ele tenha um thread thread safe e eu preciso garantir que todos os threads do leitor vejam o mesmo estado consistente deSocketHolder eSocket.

Abaixo está o meuSocketManager classe:

public class SocketManager {
  private static final Random random = new Random();
  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
  private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
      new ConcurrentHashMap<>();
  private final ZContext ctx = new ZContext();

  // ...

  private SocketManager() {
    connectToZMQSockets();
    scheduler.scheduleAtFixedRate(this::updateLiveSockets, 60, 60, TimeUnit.SECONDS);
  }

  // during startup, making a connection and populate once
  private void connectToZMQSockets() {
    Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
    for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
      List<SocketHolder> addedColoSockets = connect(entry.getValue(), ZMQ.PUSH);
      liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets);
    }
  }

  private List<SocketHolder> connect(List<String> paddes, int socketType) {
    List<SocketHolder> socketList = new ArrayList<>();
    // ....
    return socketList;
  }

  // this method will be called by multiple threads concurrently to get the next live socket
  // is there any concurrency or thread safety issue or race condition here?
  public Optional<SocketHolder> getNextSocket() {
    for (Datacenters dc : Datacenters.getOrderedDatacenters()) {
      Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
      if (liveSocket.isPresent()) {
        return liveSocket;
      }
    }
    return Optional.absent();
  }

  private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
    if (!CollectionUtils.isEmpty(listOfEndPoints)) {
      // The list of live sockets
      List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());
      for (SocketHolder obj : listOfEndPoints) {
        if (obj.isLive()) {
          liveOnly.add(obj);
        }
      }
      if (!liveOnly.isEmpty()) {
        // The list is not empty so we shuffle it an return the first element
        return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one
      }
    }
    return Optional.absent();
  }

  // runs every 60 seconds to ping all the socket to make sure whether they are alive or not
  private void updateLiveSockets() {
    Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;

    for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
      List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
      List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
      for (SocketHolder liveSocket : liveSockets) {
        Socket socket = liveSocket.getSocket();
        String endpoint = liveSocket.getEndpoint();
        Map<byte[], byte[]> holder = populateMap();
        Message message = new Message(holder, Partition.COMMAND);

        // pinging to see whether a socket is live or not
        boolean status = SendToQueue.getInstance().send(message.getAddress(), message.getEncodedRecords(), socket);
        boolean isLive = (status) ? true : false;

        SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
        liveUpdatedSockets.add(zmq);
      }
      liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets));
    }
  }
}

E aqui está o meuSendToQueue classe:

  // this method will be called by multiple threads concurrently to send the data
  public boolean sendAsync(final long address, final byte[] encodedRecords) {
    Optional<SocketHolder> liveSockets = SocketManager.getInstance().getNextSocket();
    PendingMessage m = new PendingMessage(address, encodedRecords, liveSockets.get().getSocket(), true);
    cache.put(address, m);
    return doSendAsync(m, socket);
  }

  private boolean doSendAsync(final PendingMessage pendingMessage, final Socket socket) {
    ZMsg msg = new ZMsg();
    msg.add(pendingMessage.getEncodedRecords());
    try {
      // send data on a socket LINE A
      return msg.send(socket);
    } finally {
      msg.destroy();
    }
  }

  public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
    PendingMessage m = new PendingMessage(address, encodedRecords, socket, false);
    cache.put(address, m);
    try {
      if (doSendAsync(m, socket)) {
        return m.waitForAck();
      }
      return false;
    } finally {
      // Alternatively (checks that address points to m):
      // cache.asMap().remove(address, m);
      cache.invalidate(address);
    }
  }

Declaração do Problema

Agora, como você pode ver, estou compartilhando o mesmo soquete entre dois threads. ParecegetNextSocket() poderia retornar um0MQ socket parathread A. Simultaneamente, otimer thread pode acessar o mesmo0MQ socket para fazer ping. Nesse casothread A e atimer thread estão mudando o mesmo0MQ socket, o que pode levar a problemas. Então, estou tentando encontrar uma maneira de impedir que threads diferentes enviem dados para o mesmo soquete ao mesmo tempo e destruam meus dados.

Por isso, decidi sincronizar o soquete para que dois threads não possam acessar o mesmo soquete ao mesmo tempo. Abaixo está a alteração que fiz emupdateLiveSockets método. Sincronizei no soquete no método abaixo:

  // runs every 60 seconds to ping all the socket to make sure whether they are alive or not
  private void updateLiveSockets() {
    Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;

    for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
      List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
      List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
      for (SocketHolder liveSocket : liveSockets) {
        Socket socket = liveSocket.getSocket();
        String endpoint = liveSocket.getEndpoint();
        Map<byte[], byte[]> holder = populateMap();
        Message message = new Message(holder, Partition.COMMAND);

        // using the socket as its own lock
        synchronized (socket) {
            // pinging to see whether a socket is live or not
            boolean status = SendToQueue.getInstance().execute(message.getAddress(), message.getEncodedRecords(), socket);
            boolean isLive = (status) ? true : false;

            SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
            liveUpdatedSockets.add(zmq);
        }
      }
      liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets)),;
    }
  }

E abaixo está a mudança que eu fizdoSendAsync método. Neste também sincronizei no soquete antes de enviá-lo.

  private boolean doSendAsync(final PendingMessage pendingMessage, final Socket socket) {
    ZMsg msg = new ZMsg();
    msg.add(pendingMessage.getEncodedRecords());
    try {
      // send data on a socket LINE A by synchronizing on it
      synchronized (socket) {
        return msg.send(socket);
      }
    } finally {
      msg.destroy();
    }
  }

Qual é a melhor maneira pela qual posso garantir que não estou compartilhando os mesmos soquetes entre dois threads? Em geral, tenho cerca de 60 soquetes e 20 threads acessando esses soquetes.

Se muitos threads usarem o mesmo soquete, os recursos não serão bem utilizados. Além disso, semsg.send(socket); está bloqueado (tecnicamente não deveria) todos os threads que aguardam esse soquete estão bloqueados. Portanto, acho que pode haver uma maneira melhor de garantir que cada encadeamento use um soquete ativo diferente ao mesmo tempo, em vez de sincronizar em um soquete específico. Também há algum caso de canto ou borda que eu perdi que pode levar a algum bug?

questionAnswers(4)

yourAnswerToTheQuestion