метод, я перебираю все сокеты и пингую их, чтобы понять, живы они или нет?

я есть класс, в котором я заполняю картуliveSocketsByDatacenter из одного фонового потока каждые 30 секунд внутриupdateLiveSockets() метод, а затем у меня есть методgetNextSocket() который будет вызываться несколькими нитями читателя, чтобы получить доступный живой сокет, который использует ту же карту для получения этой информации.

public class SocketManager {
  private static final Random random = new Random();
  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
  private final AtomicReference<Map<Datacenters, List<SocketHolder>>> liveSocketsByDatacenter =
      new AtomicReference<>(Collections.unmodifiableMap(new HashMap<>()));
  private final ZContext ctx = new ZContext();

  // Lazy Loaded Singleton Pattern
  private static class Holder {
    private static final SocketManager instance = new SocketManager();
  }

  public static SocketManager getInstance() {
    return Holder.instance;
  }

  private SocketManager() {
    connectToZMQSockets();
    scheduler.scheduleAtFixedRate(new Runnable() {
      public void run() {
        updateLiveSockets();
      }
    }, 30, 30, TimeUnit.SECONDS);
  }

  // during startup, making a connection and populate once
  private void connectToZMQSockets() {
    Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;
    // The map in which I put all the live sockets
    Map<Datacenters, List<SocketHolder>> updatedLiveSocketsByDatacenter = new HashMap<>();
    for (Map.Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) {
      List<SocketHolder> addedColoSockets = connect(entry.getKey(), entry.getValue(), ZMQ.PUSH);
      updatedLiveSocketsByDatacenter.put(entry.getKey(),
          Collections.unmodifiableList(addedColoSockets));
    }
    // Update the map content
    this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(updatedLiveSocketsByDatacenter));
  }

  private List<SocketHolder> connect(Datacenters colo, List<String> addresses, int socketType) {
    List<SocketHolder> socketList = new ArrayList<>();
    for (String address : addresses) {
      try {
        Socket client = ctx.createSocket(socketType);
        // Set random identity to make tracing easier
        String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt());
        client.setIdentity(identity.getBytes(ZMQ.CHARSET));
        client.setTCPKeepAlive(1);
        client.setSendTimeOut(7);
        client.setLinger(0);
        client.connect(address);

        SocketHolder zmq = new SocketHolder(client, ctx, address, true);
        socketList.add(zmq);
      } catch (Exception ex) {
        // log error
      }
    }
    return socketList;
  }

  // this method will be called by multiple threads to get the next live socket
  // is there any concurrency or thread safety issue or race condition here?
  public Optional<SocketHolder> getNextSocket() {
    // For the sake of consistency make sure to use the same map instance
    // in the whole implementation of my method by getting my entries
    // from the local variable instead of the member variable
    Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
        this.liveSocketsByDatacenter.get();
    Optional<SocketHolder> liveSocket = Optional.absent();
    List<Datacenters> dcs = Datacenters.getOrderedDatacenters();
    for (Datacenters dc : dcs) {
      liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
      if (liveSocket.isPresent()) {
        break;
      }
    }
    return liveSocket;
  }

  // is there any concurrency or thread safety issue or race condition here?
  private Optional<SocketHolder> getLiveSocketX(final List<SocketHolder> endpoints) {
    if (!CollectionUtils.isEmpty(endpoints)) {
      // The list of live sockets
      List<SocketHolder> liveOnly = new ArrayList<>(endpoints.size());
      for (SocketHolder obj : endpoints) {
        if (obj.isLive()) {
          liveOnly.add(obj);
        }
      }
      if (!liveOnly.isEmpty()) {
        // The list is not empty so we shuffle it an return the first element
        Collections.shuffle(liveOnly);
        return Optional.of(liveOnly.get(0));
      }
    }
    return Optional.absent();
  }

  // Added the modifier synchronized to prevent concurrent modification
  // it is needed because to build the new map we first need to get the
  // old one so both must be done atomically to prevent concistency issues
  private synchronized void updateLiveSockets() {
    Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;

    // Initialize my new map with the current map content
    Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
        new HashMap<>(this.liveSocketsByDatacenter.get());

    for (Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) {
      List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
      List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
      for (SocketHolder liveSocket : liveSockets) { // LINE A
        Socket socket = liveSocket.getSocket();
        String endpoint = liveSocket.getEndpoint();
        Map<byte[], byte[]> holder = populateMap();
        Message message = new Message(holder, Partition.COMMAND);

        boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket);
        boolean isLive = (status) ? true : false;
        // is there any problem the way I am using `SocketHolder` class?
        SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
        liveUpdatedSockets.add(zmq);
      }
      liveSocketsByDatacenter.put(entry.getKey(),
          Collections.unmodifiableList(liveUpdatedSockets));
    }
    this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(liveSocketsByDatacenter));
  }
}

Как вы можете видеть в моем классе:

Из одного фонового потока, который запускается каждые 30 секунд, я заполняюliveSocketsByDatacenter карта со всеми живыми сокетами вupdateLiveSockets() метод.А потом из нескольких потоков, я называюgetNextSocket() метод, чтобы дать мне живой сокет, который используетliveSocketsByDatacenter карта, чтобы получить необходимую информацию.

Мой код работает нормально, без проблем, и я хотел посмотреть, есть ли лучший или более эффективный способ написать это. Я также хотел получить мнение по вопросам безопасности потоков или любых условий гонки, если таковые имеются, но до сих пор я не видел ни одного, но могу ошибаться.

Я больше всего беспокоюсь оupdateLiveSockets() метод иgetLiveSocketX() метод. Я перебираюliveSockets который являетсяList изSocketHolder на линии А, а затем делает новыйSocketHolder объект и добавление в другой новый список. Это нормально здесь?

Примечание: SocketHolder неизменный класс. И вы можете игнорироватьZeroMQ вещи у меня есть.

Ответы на вопрос(2)

Ваш ответ на вопрос