Przekazywanie zestawu obiektów między wątkami

Bieżący projekt, nad którym pracuję, wymaga zaimplementowania sposobu wydajnego przekazywania zestawu obiektów z jednego wątku, który działa nieprzerwanie, do głównego wątku. Obecna konfiguracja jest następująca.

Mam główny wątek, który tworzy nowy wątek. Ten nowy wątek działa nieprzerwanie i wywołuje metodę opartą na zegarze. Ta metoda pobiera grupę wiadomości ze źródła online i organizuje je w TreeSet.

Ten TreeSet musi być następnie przekazany z powrotem do głównego wątku, aby zawarte w nim wiadomości mogły być obsługiwane niezależnie od cyklicznego zegara.

Dla lepszego odniesienia mój kod wygląda następująco

// Called by the main thread on start.  
void StartProcesses()
{
    if(this.IsWindowing)
    {
        return;
    }

    this._windowTimer = Executors.newSingleThreadScheduledExecutor();

    Runnable task = new Runnable() {
        public void run() {
            WindowCallback();
        }
    };

    this.CancellationToken = false; 
    _windowTimer.scheduleAtFixedRate(task,
            0, this.SQSWindow, TimeUnit.MILLISECONDS);

    this.IsWindowing = true;
}

/////////////////////////////////////////////////////////////////////////////////

private void WindowCallback()
{
    ArrayList<Message> messages = new ArrayList<Message>();

    //TODO create Monitor
    if((!CancellationToken))
    {
        try
        {
            //TODO fix epochWindowTime
            long epochWindowTime = 0;
            int numberOfMessages = 0;
            Map<String, String> attributes;

            // Setup the SQS client
            AmazonSQS client = new AmazonSQSClient(new 
                    ClasspathPropertiesFileCredentialsProvider());

            client.setEndpoint(this.AWSSQSServiceUrl);

            // get the NumberOfMessages to optimize how to 
            // Receive all of the messages from the queue

            GetQueueAttributesRequest attributesRequest = 
                    new GetQueueAttributesRequest();
            attributesRequest.setQueueUrl(this.QueueUrl);
            attributesRequest.withAttributeNames(
                    "ApproximateNumberOfMessages");
            attributes = client.getQueueAttributes(attributesRequest).
                    getAttributes();

            numberOfMessages = Integer.valueOf(attributes.get(
                    "ApproximateNumberOfMessages")).intValue();

            // determine if we need to Receive messages from the Queue
            if (numberOfMessages > 0)
            {

                if (numberOfMessages < 10)
                {
                    // just do it inline it's less expensive than 
                    //spinning threads
                    ReceiveTask(numberOfMessages);
                }
                else
                {
                    //TODO Create a multithreading version for this
                    ReceiveTask(numberOfMessages);
                }
            }

            if (!CancellationToken)
            {

                //TODO testing
                _setLock.lock();

                Iterator<Message> _setIter = _set.iterator();
                //TODO
                while(_setIter.hasNext())
                {
                    Message temp = _setIter.next();

                    Long value = Long.valueOf(temp.getAttributes().
                            get("Timestamp"));
                    if(value.longValue() < epochWindowTime)
                    {
                        messages.add(temp);
                        _set.remove(temp);
                    }
                }

                _setLock.unlock();

                // TODO deduplicate the messages

                // TODO reorder the messages

                // TODO raise new Event with the results
            }

            if ((!CancellationToken) && (messages.size() > 0))
            {
                if (messages.size() < 10)
                {
                    Pair<Integer, Integer> range = 
                            new Pair<Integer, Integer>(Integer.valueOf(0), 
                                    Integer.valueOf(messages.size()));
                    DeleteTask(messages, range);
                }
                else
                {
                    //TODO Create a way to divide this work among 
                    //several threads
                    Pair<Integer, Integer> range = 
                            new Pair<Integer, Integer>(Integer.valueOf(0), 
                                    Integer.valueOf(messages.size()));
                    DeleteTask(messages, range);
                }
            }
        }catch (AmazonServiceException ase){
            ase.printStackTrace();
        }catch (AmazonClientException ace) {
            ace.printStackTrace();
        }
    }
}

Jak zauważyli niektórzy z komentujących, moim obecnym preferowanym sposobem radzenia sobie z tym jest utworzenie zdarzenia w wątku timera, jeśli istnieją wiadomości. Główny wątek będzie nasłuchiwał tego zdarzenia i odpowiednio go obsługiwał.

Obecnie nie znam się na tym, jak Java obsługuje zdarzenia lub jak je tworzyć / słuchać. Nie wiem też, czy możliwe jest tworzenie zdarzeń i przekazywanie zawartych w nich informacji między wątkami.

Czy ktoś może udzielić mi rady / wglądu, czy moje metody są możliwe? Jeśli tak, to gdzie mogę znaleźć informacje o tym, jak je wdrożyć, ponieważ moje obecne próby poszukiwania nie okazują się owocne.

Jeśli nie, czy mogę uzyskać pewne sugestie na temat tego, jak to zrobić, pamiętając, że chciałbym uniknąć konieczności zarządzania gniazdami, jeśli to możliwe.

EDYTUJ 1:

Główny wątek będzie również odpowiedzialny za wydawanie poleceń na podstawie otrzymywanych wiadomości lub wydawanie poleceń w celu uzyskania wymaganych informacji. Z tego powodu główny wątek nie może czekać na otrzymywanie wiadomości i powinien obsługiwać je w sposób oparty na zdarzeniach.

questionAnswers(1)

yourAnswerToTheQuestion