Передача набора объектов между потоками

Текущий проект, над которым я работаю, требует, чтобы я реализовал способ эффективной передачи набора объектов из одного потока, который выполняется непрерывно, в основной поток. Текущая настройка примерно такая.

У меня есть основная тема, которая создает новую тему. Этот новый поток работает непрерывно и вызывает метод, основанный на таймере. Этот метод выбирает группу сообщений из онлайн-источника и организует их в TreeSet.

Затем этот TreeSet необходимо передать обратно в основной поток, чтобы содержащиеся в нем сообщения могли обрабатываться независимо от повторяющегося таймера.

Для лучшего ознакомления мой код выглядит следующим образом

// Called by the main thread on start.  
void StartProcesses()
{
    if(this.IsWindowing)
    {
        return;
    }

    this._windowTimer = Executors.newSingleThreadScheduledExecutor();

    Runnable task = new Runnable() {
        public void run() {
            WindowCallback();
        }
    };

    this.CancellationToken = false; 
    _windowTimer.scheduleAtFixedRate(task,
            0, this.SQSWindow, TimeUnit.MILLISECONDS);

    this.IsWindowing = true;
}

/////////////////////////////////////////////////////////////////////////////////

private void WindowCallback()
{
    ArrayList<Message> messages = new ArrayList<Message>();

    //TODO create Monitor
    if((!CancellationToken))
    {
        try
        {
            //TODO fix epochWindowTime
            long epochWindowTime = 0;
            int numberOfMessages = 0;
            Map<String, String> attributes;

            // Setup the SQS client
            AmazonSQS client = new AmazonSQSClient(new 
                    ClasspathPropertiesFileCredentialsProvider());

            client.setEndpoint(this.AWSSQSServiceUrl);

            // get the NumberOfMessages to optimize how to 
            // Receive all of the messages from the queue

            GetQueueAttributesRequest attributesRequest = 
                    new GetQueueAttributesRequest();
            attributesRequest.setQueueUrl(this.QueueUrl);
            attributesRequest.withAttributeNames(
                    "ApproximateNumberOfMessages");
            attributes = client.getQueueAttributes(attributesRequest).
                    getAttributes();

            numberOfMessages = Integer.valueOf(attributes.get(
                    "ApproximateNumberOfMessages")).intValue();

            // determine if we need to Receive messages from the Queue
            if (numberOfMessages > 0)
            {

                if (numberOfMessages < 10)
                {
                    // just do it inline it's less expensive than 
                    //spinning threads
                    ReceiveTask(numberOfMessages);
                }
                else
                {
                    //TODO Create a multithreading version for this
                    ReceiveTask(numberOfMessages);
                }
            }

            if (!CancellationToken)
            {

                //TODO testing
                _setLock.lock();

                Iterator<Message> _setIter = _set.iterator();
                //TODO
                while(_setIter.hasNext())
                {
                    Message temp = _setIter.next();

                    Long value = Long.valueOf(temp.getAttributes().
                            get("Timestamp"));
                    if(value.longValue() < epochWindowTime)
                    {
                        messages.add(temp);
                        _set.remove(temp);
                    }
                }

                _setLock.unlock();

                // TODO deduplicate the messages

                // TODO reorder the messages

                // TODO raise new Event with the results
            }

            if ((!CancellationToken) && (messages.size() > 0))
            {
                if (messages.size() < 10)
                {
                    Pair<Integer, Integer> range = 
                            new Pair<Integer, Integer>(Integer.valueOf(0), 
                                    Integer.valueOf(messages.size()));
                    DeleteTask(messages, range);
                }
                else
                {
                    //TODO Create a way to divide this work among 
                    //several threads
                    Pair<Integer, Integer> range = 
                            new Pair<Integer, Integer>(Integer.valueOf(0), 
                                    Integer.valueOf(messages.size()));
                    DeleteTask(messages, range);
                }
            }
        }catch (AmazonServiceException ase){
            ase.printStackTrace();
        }catch (AmazonClientException ace) {
            ace.printStackTrace();
        }
    }
}

Как видно из некоторых комментариев, мой текущий предпочтительный способ справиться с этим - создать событие в потоке таймера, если есть сообщения. Основной поток будет прослушивать это событие и обрабатывать его соответствующим образом.

В настоящее время я не знаком с тем, как Java обрабатывает события или как их создавать / прослушивать. Я также не знаю, возможно ли создавать события и передавать содержащуюся в них информацию между потоками.

Может кто-нибудь дать мне совет / понимание того, возможны ли мои методы? Если так, где я могу найти некоторую информацию о том, как их реализовать, поскольку мои текущие попытки поиска не оказываются плодотворными.

Если нет, могу ли я получить некоторые предложения о том, как мне поступить, помня о том, что я бы хотел избежать управления сокетами, если это вообще возможно.

РЕДАКТИРОВАТЬ 1:

Основной поток также будет отвечать за выдачу команд на основе полученных сообщений или за выдачу команд для получения необходимой информации. По этой причине основной поток не может ожидать получения сообщений и должен обрабатывать их на основе событий.

Ответы на вопрос(1)

Ваш ответ на вопрос