Также в отношении «Обратите внимание, что фабрика считывателей будет сериализована и отправлена исполнителям, затем будет создан считыватель данных для исполнителей и будет выполняться фактическое чтение». Реализация сокета соблюдает это - просто статическая часть буфера данных сериализуется как часть фабрики и читается.

аюсь написать собственный приемник дляStructured Streaming что будет потреблять сообщения отRabbitMQ. Spark недавно выпущенный DataSource V2 API, который кажется очень перспективным. Поскольку он абстрагирует многие детали, я хочу использовать этот API для простоты и производительности. Однако, поскольку он довольно новый, источников не так много. Мне нужны разъяснения от опытныхSpark ребята, так как они поймут ключевые моменты легче. Вот так:

Моя отправная точка - серия постов в блоге, с первой частьюВот, Он показывает, как реализовать источник данных, без возможности потоковой передачи. Чтобы сделать потоковый источник, я немного изменил их, так как мне нужно реализоватьMicroBatchReadSupport вместо (или в дополнение к)DataSourceV2.

Чтобы быть эффективным, целесообразно иметь несколько искровых исполнителей, потребляющихRabbitMQ одновременно, то есть из одной и той же очереди. Если я не запутался, каждый раздел ввода -inSparkтерминология - соответствует потребителю из очереди - вRabbitMQ терминология. Таким образом, нам нужно иметь несколько разделов для входного потока, верно?

Похоже начасть 4 серииЯ реализовалMicroBatchReader следующее:

@Override
public List<DataReaderFactory<Row>> createDataReaderFactor,ies() {
    int partition = options.getInt(RMQ.PARTITICN, 5);
    List<DataReaderFactory<Row>> factories = new LinkedList<>();
    for (int i = 0; i < partition; i++) {
        factories.add(new RMQDataReaderFactory(options));
    }
    return factories;
}

Я возвращаю список фабрик и надеюсь, что каждый экземпляр в списке будет использован для создания ридера, который также будет потребителем. Это правильный подход?

Я хочу, чтобы мой получатель был надежным, то есть после каждого обработанного сообщения (или, по крайней мере, записи в каталог chekpoint для дальнейшей обработки), мне нужно вернуть его обратно вRabbitMQ, Проблема начинается после того, как здесь: эти фабрики создаются в драйвере, а фактический процесс чтения происходит у исполнителей черезDataReaders. Однакосовершить метод является частьюMicroBatchReaderнеDataReader, Так как у меня многоDataReaderс заMicroBatchReaderкак я должен подтвердить эти сообщенияRabbitMQ? Или я должен подтвердить, когдаследующий метод вызываетсяDataReader? Это безопасно? Если да, то какова цельcommit функция тогда?

ПОЯСНЕНИЯ: запутывания: Ссылка, приведенная в ответе о переименовании некоторых классов / функций (в дополнение к объяснениям там) сделала всегораздо понятнее хуже чем когда-либо, Цитировать изтам:

переименовывает:

DataReaderFactory вInputPartition

DataReader вInputPartitionReader

...

InputPartitionЦель состоит в том, чтобы управлять жизненным циклом ассоциированного ридера, который теперь называетсяInputPartitionReader, с явной операцией создания, чтобы отразить операцию закрытия. Это больше не было ясно из API, потому чтоDataReaderFactory оказалось более общим, чем оно есть, и не ясно, почему набор из них создается для чтения.

РЕДАКТИРОВАТЬ: Однакодокументы Ясно скажу, что «фабрика считывателей будет сериализована и отправлена исполнителям, затем будет создан считыватель данных по исполнителям и выполнению фактического чтения».

Чтобы сделать потребителя надежным, мне нужно подтверждать конкретное сообщение только после того, как оно будет передано на стороне Spark.Обратите внимание, что сообщения должны быть подтверждены тем же соединением, через которое они были доставлены, но функция фиксации вызывается на узле драйвера. Как я могу зафиксировать на узле работник / исполнитель?

Ответы на вопрос(1)

Ваш ответ на вопрос