Использование неограниченных данных в окнах с триггером по умолчанию
у меня естьPub / Sub тема + подписка и хотите использовать и объединять неограниченные данные из подписки вПоток данных, Я использую фиксированное окно и записываю агрегаты в BigQuery.
Чтение и запись (без окон и агрегации) работает нормально. Но когда я передаю данные в фиксированное окно (для подсчета элементов в каждом окне), окно никогда не открываетсясрабатывает, И, таким образом, агрегаты не записываются.
Вот мое слово издатель (он использует kinglear.txt изПримеры как входной файл):
public static class AddCurrentTimestampFn extends DoFn<String, String> {
@ProcessElement public void processElement(ProcessContext c) {
c.outputWithTimestamp(c.element(), new Instant(System.currentTimeMillis()));
}
}
public static class ExtractWordsFn extends DoFn<String, String> {
@ProcessElement public void processElement(ProcessContext c) {
String[] words = c.element().split("[^a-zA-Z']+");
for (String word:words){ if(!word.isEmpty()){ c.output(word); }}
}
}
// main:
Pipeline p = Pipeline.create(o); // 'o' are the pipeline options
p.apply("ReadLines", TextIO.Read.from(o.getInputFile()))
.apply("Lines2Words", ParDo.of(new ExtractWordsFn()))
.apply("AddTimestampFn", ParDo.of(new AddCurrentTimestampFn()))
.apply("WriteTopic", PubsubIO.Write.topic(o.getTopic()));
p.run();
Вот мой счетчик слов в окнах:
Pipeline p = Pipeline.create(o); // 'o' are the pipeline options
BigQueryIO.Write.Bound tablePipe = BigQueryIO.Write.to(o.getTable(o))
.withSchema(o.getSchema())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);
Window.Bound<String> w = Window
.<String>into(FixedWindows.of(Duration.standardSeconds(1)));
p.apply("ReadTopic", PubsubIO.Read.subscription(o.getSubscription()))
.apply("FixedWindow", w)
.apply("CountWords", Count.<String>perElement())
.apply("CreateRows", ParDo.of(new WordCountToRowFn()))
.apply("WriteRows", tablePipe);
p.run();
Вышеупомянутый подписчик не будет работать, так как окно, кажется, не вызывает использованиетриггер по умолчанию, Тем не менее, если я вручную определяю триггер, код работает и счет записывается в BigQuery.
Window.Bound<String> w = Window.<String>into(FixedWindows.of(Duration.standardSeconds(1)))
.triggering(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(1)))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes();
Я хотел бы избежать указания пользовательских триггеров, если это возможно.
Вопросы:
Почему мое решение не работает с Dataflowтриггер по умолчанию?Как мне изменить моего издателя или подписчика для запуска окон с помощьютриггер по умолчанию?