Спасибо, я посмотрю в LinkedTransferQueue и посмотрю, подходит ли он для моего приложения.

ользую BlockingQueue: s (пытаюсь и ArrayBlockingQueue, и LinkedBlockingQueue) передавать объекты между различными потоками в приложении, над которым я сейчас работаю. Производительность и задержка относительно важны в этом приложении, поэтому мне было любопытно, сколько времени требуется для передачи объектов между двумя потоками с помощью BlockingQueue. Чтобы измерить это, я написал простую программу с двумя потоками (один потребитель и один производитель), где я позволил производителю передать временную метку (полученную с помощью System.nanoTime ()) для потребителя, см. Код ниже.

Я вспоминаю, что читал где-то на каком-то форуме, что кому-то, кто пытался это сделать, потребовалось около 10 микросекунд (не знаю, на какой ОС и оборудовании это было), поэтому я не был слишком удивлен, когда мне потребовалось ~ 30 микросекунд на моем компьютере. Windows 7 (процессор Intel E7500 Core 2 Duo, 2,93 ГГц), в то время как многие другие приложения работают в фоновом режиме. Однако я был весьма удивлен, когда провел такой же тест на нашем гораздо более быстром Linux-сервере (два четырехъядерных процессора Intel X5677 3,46 ГГц и Debian 5 с ядром 2.6.26-2-amd64). Я ожидал, что задержка будет ниже, чем на моем окне Windows, но, наоборот, она была намного выше - ~ 75 - 100 микросекунд! Оба теста были выполнены с использованием Sun Hotspot JVM версии 1.6.0-23.

Кто-нибудь еще проводил подобные тесты с похожими результатами в Linux? Или кто-нибудь знает, почему в Linux он работает намного медленнее (с лучшим оборудованием), может ли быть так, что переключение потоков просто намного медленнее в Linux по сравнению с Windows? Если это так, кажется, что Windows на самом деле гораздо лучше подходит для каких-либо приложений. Любая помощь в понимании относительно высоких цифр очень ценится.

Редактировать:
После комментария от DaveC я также провел тест, в котором ограничил JVM (на компьютере с Linux) одним ядром (то есть все потоки работали на одном и том же ядре). Это резко изменило результаты - задержка опустилась ниже 20 микросекунд, то есть лучше, чем результаты на компьютере с Windows. Я также провел несколько тестов, в которых я ограничил поток производителя одним ядром, а поток потребителя - другим (пытаясь установить их на одном и том же сокете и на разных сокетах), но это, похоже, не помогло - задержка все еще составляла ~ 75 микросекунд. Кстати, это тестовое приложение - почти все, что я запускаю на машине во время выполнения теста.

Кто-нибудь знает, имеют ли эти результаты смысл? Должно ли это быть намного медленнее, если производитель и потребитель работают на разных ядрах? Любой вклад действительно ценится.

Отредактировано снова (6 января):
Я экспериментировал с различными изменениями в коде и рабочей среде:

Я обновил ядро ​​Linux до 2.6.36.2 (с 2.6.26.2). После обновления ядра измеренное время изменилось до 60 микросекунд с очень небольшими изменениями, с 75-100 до обновления. Установка привязки ЦП к потокам производителя и потребителя не имела никакого эффекта, за исключением ограничения их одним и тем же ядром. При работе на том же ядре измеренная задержка составляла 13 микросекунд.

В исходном коде я заставлял производителя переходить в спящий режим на 1 секунду между каждой итерацией, чтобы дать потребителю достаточно времени для вычисления истекшего времени и его вывода на консоль. Если я удаляю вызов Thread.sleep () и вместо этого позволяю как производителю, так и потребителю вызывать барьер.await () на каждой итерации (потребитель вызывает его после печати истекшего времени на консоли), измеренная задержка уменьшается с 60 микросекунд до менее 10 микросекунд. Если потоки выполняются на одном и том же ядре, задержка становится меньше 1 мкс. Кто-нибудь может объяснить, почему это так значительно уменьшило время ожидания? Моим первым предположением было то, что изменение вызвало то, что производитель вызвал queue.put () до того, как потребитель вызвал queue.take (), поэтому потребителю никогда не приходилось блокировать, но после игры с измененной версией ArrayBlockingQueue я обнаружил, что это предположение, чтобы быть ложным - потребитель фактически заблокировал. Если у вас есть другие предположения, пожалуйста, дайте мне знать. (Между прочим, если я позволю производителю вызвать и Thread.sleep () и барьер.await (), задержка останется на уровне 60 микросекунд).

Я также попробовал другой подход - вместо вызова queue.take () я вызвал queue.poll () с таймаутом в 100 микросекунд. Это уменьшило среднюю задержку до уровня ниже 10 микросекунд, но, конечно, намного более интенсивно использует ЦП (но, вероятно, менее интенсивно использует ЦП при ожидании?).

Отредактировано снова (10 января) - Проблема решена:
Ниндзяль предположил, что задержка ~ 60 микросекунд была вызвана тем, что процессору приходилось выходить из более глубоких состояний сна - и он был совершенно прав! После отключения C-состояний в BIOS задержка была уменьшена до <10 микросекунд. Это объясняет, почему у меня намного больше задержка в пункте 2 выше - когда я отправлял объекты чаще, процессор был достаточно занят, чтобы не переходить в более глубокие состояния сна. Большое спасибо всем, кто нашел время, чтобы прочитать мой вопрос и поделился своими мыслями здесь!

...

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CyclicBarrier;

public class QueueTest {

    ArrayBlockingQueue<Long> queue = new ArrayBlockingQueue<Long>(10);
    Thread consumerThread;
    CyclicBarrier barrier = new CyclicBarrier(2);
    static final int RUNS = 500000;
    volatile int sleep = 1000;

    public void start() {
        consumerThread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    barrier.await();
                    for(int i = 0; i < RUNS; i++) {
                        consume();

                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } 
            }
        });
        consumerThread.start();

        try {
            barrier.await();
        } catch (Exception e) { e.printStackTrace(); }

        for(int i = 0; i < RUNS; i++) {
            try {
                if(sleep > 0)
                    Thread.sleep(sleep);
                produce();

            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public void produce() {
        try {
            queue.put(System.nanoTime());
        } catch (InterruptedException e) {
        }
    }

    public void consume() {
        try {
            long t = queue.take();
            long now = System.nanoTime();
            long time = (now - t) / 1000; // Divide by 1000 to get result in microseconds
            if(sleep > 0) {
                System.out.println("Time: " + time);
            }

        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    public static void main(String[] args) {
        QueueTest test = new QueueTest();
        System.out.println("Starting...");
        // Run first once, ignoring results
        test.sleep = 0;
        test.start();
        // Run again, printing the results
        System.out.println("Starting again...");
        test.sleep = 1000;
        test.start();
    }
}

Ответы на вопрос(0)

Ваш ответ на вопрос