Передача набора объектов между потоками
Текущий проект, над которым я работаю, требует, чтобы я реализовал способ эффективной передачи набора объектов из одного потока, который выполняется непрерывно, в основной поток. Текущая настройка примерно такая.
У меня есть основная тема, которая создает новую тему. Этот новый поток работает непрерывно и вызывает метод, основанный на таймере. Этот метод выбирает группу сообщений из онлайн-источника и организует их в TreeSet.
Затем этот TreeSet необходимо передать обратно в основной поток, чтобы содержащиеся в нем сообщения могли обрабатываться независимо от повторяющегося таймера.
Для лучшего ознакомления мой код выглядит следующим образом
// Called by the main thread on start.
void StartProcesses()
{
if(this.IsWindowing)
{
return;
}
this._windowTimer = Executors.newSingleThreadScheduledExecutor();
Runnable task = new Runnable() {
public void run() {
WindowCallback();
}
};
this.CancellationToken = false;
_windowTimer.scheduleAtFixedRate(task,
0, this.SQSWindow, TimeUnit.MILLISECONDS);
this.IsWindowing = true;
}
/////////////////////////////////////////////////////////////////////////////////
private void WindowCallback()
{
ArrayList<Message> messages = new ArrayList<Message>();
//TODO create Monitor
if((!CancellationToken))
{
try
{
//TODO fix epochWindowTime
long epochWindowTime = 0;
int numberOfMessages = 0;
Map<String, String> attributes;
// Setup the SQS client
AmazonSQS client = new AmazonSQSClient(new
ClasspathPropertiesFileCredentialsProvider());
client.setEndpoint(this.AWSSQSServiceUrl);
// get the NumberOfMessages to optimize how to
// Receive all of the messages from the queue
GetQueueAttributesRequest attributesRequest =
new GetQueueAttributesRequest();
attributesRequest.setQueueUrl(this.QueueUrl);
attributesRequest.withAttributeNames(
"ApproximateNumberOfMessages");
attributes = client.getQueueAttributes(attributesRequest).
getAttributes();
numberOfMessages = Integer.valueOf(attributes.get(
"ApproximateNumberOfMessages")).intValue();
// determine if we need to Receive messages from the Queue
if (numberOfMessages > 0)
{
if (numberOfMessages < 10)
{
// just do it inline it's less expensive than
//spinning threads
ReceiveTask(numberOfMessages);
}
else
{
//TODO Create a multithreading version for this
ReceiveTask(numberOfMessages);
}
}
if (!CancellationToken)
{
//TODO testing
_setLock.lock();
Iterator<Message> _setIter = _set.iterator();
//TODO
while(_setIter.hasNext())
{
Message temp = _setIter.next();
Long value = Long.valueOf(temp.getAttributes().
get("Timestamp"));
if(value.longValue() < epochWindowTime)
{
messages.add(temp);
_set.remove(temp);
}
}
_setLock.unlock();
// TODO deduplicate the messages
// TODO reorder the messages
// TODO raise new Event with the results
}
if ((!CancellationToken) && (messages.size() > 0))
{
if (messages.size() < 10)
{
Pair<Integer, Integer> range =
new Pair<Integer, Integer>(Integer.valueOf(0),
Integer.valueOf(messages.size()));
DeleteTask(messages, range);
}
else
{
//TODO Create a way to divide this work among
//several threads
Pair<Integer, Integer> range =
new Pair<Integer, Integer>(Integer.valueOf(0),
Integer.valueOf(messages.size()));
DeleteTask(messages, range);
}
}
}catch (AmazonServiceException ase){
ase.printStackTrace();
}catch (AmazonClientException ace) {
ace.printStackTrace();
}
}
}
Как видно из некоторых комментариев, мой текущий предпочтительный способ справиться с этим - создать событие в потоке таймера, если есть сообщения. Основной поток будет прослушивать это событие и обрабатывать его соответствующим образом.
В настоящее время я не знаком с тем, как Java обрабатывает события или как их создавать / прослушивать. Я также не знаю, возможно ли создавать события и передавать содержащуюся в них информацию между потоками.
Может кто-нибудь дать мне совет / понимание того, возможны ли мои методы? Если так, где я могу найти некоторую информацию о том, как их реализовать, поскольку мои текущие попытки поиска не оказываются плодотворными.
Если нет, могу ли я получить некоторые предложения о том, как мне поступить, помня о том, что я бы хотел избежать управления сокетами, если это вообще возможно.
РЕДАКТИРОВАТЬ 1:
Основной поток также будет отвечать за выдачу команд на основе полученных сообщений или за выдачу команд для получения необходимой информации. По этой причине основной поток не может ожидать получения сообщений и должен обрабатывать их на основе событий.