Как объединить потоковые данные с большим набором исторических данных в Dataflow / Beam
Я изучаю обработку журналов из сеансов веб-пользователей через Google Dataflow / Apache Beam, и мне нужно объединить журналы пользователя по мере их поступления (потоковую передачу) с историей сеанса пользователя за последний месяц.
Я посмотрел на следующие подходы:
Используйте 30-дневное фиксированное окно: скорее всего, оно будет умещаться в памяти большого размера, и мне не нужно обновлять историю пользователя, просто обратитесь к немуИспользуйте CoGroupByKey для объединения двух наборов данных, но оба набора данных должны иметь одинаковый размер окна (https://cloud.google.com/dataflow/model/group-by-key#join), что не так в моем случае (24 часа против 30 дней)Используйте боковой ввод для получения истории сеанса пользователя для данногоelement
вprocessElement(ProcessContext processContext)
Насколько я понимаю, данные загружаются через.withSideInputs(pCollectionView)
должен вписаться в память. Я знаю, что могу поместить всю историю сессий одного пользователя в память, но не могувсе истории сессий.
У меня вопрос, есть ли способ загрузки / потоковой передачи данных с бокового ввода, который имеет отношение только к текущей сессии пользователя?
Я представляю себе функцию parDo, которая загружала бы сеанс истории пользователя с бокового ввода, указывая идентификатор пользователя. Но только сеанс истории текущего пользователя уместился бы в память; погрузкавсе история сессий через сторонний ввод была бы слишком большой.
Некоторый псевдокод для иллюстрации:
public static class MetricFn extends DoFn<LogLine, String> {
final PCollectionView<Map<String, Iterable<LogLine>>> pHistoryView;
public MetricFn(PCollectionView<Map<String, Iterable<LogLine>>> historyView) {
this.pHistoryView = historyView;
}
@Override
public void processElement(ProcessContext processContext) throws Exception {
Map<String, Iterable<LogLine>> historyLogData = processContext.sideInput(pHistoryView);
final LogLine currentLogLine = processContext.element();
final Iterable<LogLine> userHistory = historyLogData.get(currentLogLine.getUserId());
final String outputMetric = calculateMetricWithUserHistory(currentLogLine, userHistory);
processContext.output(outputMetric);
}
}