Как объединить потоковые данные с большим набором исторических данных в Dataflow / Beam

Я изучаю обработку журналов из сеансов веб-пользователей через Google Dataflow / Apache Beam, и мне нужно объединить журналы пользователя по мере их поступления (потоковую передачу) с историей сеанса пользователя за последний месяц.

Я посмотрел на следующие подходы:

Используйте 30-дневное фиксированное окно: скорее всего, оно будет умещаться в памяти большого размера, и мне не нужно обновлять историю пользователя, просто обратитесь к немуИспользуйте CoGroupByKey для объединения двух наборов данных, но оба набора данных должны иметь одинаковый размер окна (https://cloud.google.com/dataflow/model/group-by-key#join), что не так в моем случае (24 часа против 30 дней)Используйте боковой ввод для получения истории сеанса пользователя для данногоelement вprocessElement(ProcessContext processContext)

Насколько я понимаю, данные загружаются через.withSideInputs(pCollectionView) должен вписаться в память. Я знаю, что могу поместить всю историю сессий одного пользователя в память, но не могувсе истории сессий.

У меня вопрос, есть ли способ загрузки / потоковой передачи данных с бокового ввода, который имеет отношение только к текущей сессии пользователя?

Я представляю себе функцию parDo, которая загружала бы сеанс истории пользователя с бокового ввода, указывая идентификатор пользователя. Но только сеанс истории текущего пользователя уместился бы в память; погрузкавсе история сессий через сторонний ввод была бы слишком большой.

Некоторый псевдокод для иллюстрации:

public static class MetricFn extends DoFn<LogLine, String> {

    final PCollectionView<Map<String, Iterable<LogLine>>> pHistoryView;

    public MetricFn(PCollectionView<Map<String, Iterable<LogLine>>> historyView) {
        this.pHistoryView = historyView;
    }

    @Override
    public void processElement(ProcessContext processContext) throws Exception {
        Map<String, Iterable<LogLine>> historyLogData = processContext.sideInput(pHistoryView);

        final LogLine currentLogLine = processContext.element();
        final Iterable<LogLine> userHistory = historyLogData.get(currentLogLine.getUserId());
        final String outputMetric = calculateMetricWithUserHistory(currentLogLine, userHistory);
        processContext.output(outputMetric);
    }
}

Ответы на вопрос(1)

Ваш ответ на вопрос