So kombinieren Sie Streaming-Daten mit umfangreichen Verlaufsdaten in Dataflow / Beam

Ich untersuche die Verarbeitung von Protokollen aus Webbenutzersitzungen über Google Dataflow / Apache Beam und muss die Protokolle des Benutzers beim Eingang (Streaming) mit dem Verlauf der Sitzung eines Benutzers aus dem letzten Monat kombinieren.

Ich habe mir folgende Ansätze angeschaut:

Verwenden Sie ein 30-Tage-Fenster: Es ist sehr wahrscheinlich, dass ein zu großes Fenster in den Arbeitsspeicher passt, und ich muss den Verlauf des Benutzers nicht aktualisieren. Beziehen Sie sich einfach darauf.Verwenden Sie CoGroupByKey, um zwei Datensätze zu verbinden, aber die beiden Datensätze müssen dieselbe Fenstergröße haben https: //cloud.google.com/dataflow/model/group-by-key#joi), was in meinem Fall nicht zutrifft (24h vs 30 Tage) Verwenden Sie Side Input, um den Sitzungsverlauf des Benutzers für ein bestimmtes @ abzurufeelement improcessElement(ProcessContext processContext)

Mein Verständnis ist, dass die Daten über @ gelad.withSideInputs(pCollectionView) muss in den Speicher passen. Ich weiß, dass ich den gesamten Sitzungsverlauf eines einzelnen Benutzers in den Speicher einpassen kann, aber nichtall Sitzungsverläufe.

Meine Frage ist, ob es eine Möglichkeit gibt, Daten von einer Nebeneingabe zu laden / zu streamen, die nur für die aktuelle Benutzersitzung relevant ist?

Ich stelle mir eine parDo-Funktion vor, mit der die Verlaufssitzung des Benutzers von der Seite geladen werden kann, indem die ID des Benutzers angegeben wird. Es würde jedoch nur die Verlaufssitzung des aktuellen Benutzers in den Speicher passen. Wird geladenall Protokollsitzungen durch Nebeneingabe wären zu groß.

Einige Pseudocodes zur Veranschaulichung:

public static class MetricFn extends DoFn<LogLine, String> {

    final PCollectionView<Map<String, Iterable<LogLine>>> pHistoryView;

    public MetricFn(PCollectionView<Map<String, Iterable<LogLine>>> historyView) {
        this.pHistoryView = historyView;
    }

    @Override
    public void processElement(ProcessContext processContext) throws Exception {
        Map<String, Iterable<LogLine>> historyLogData = processContext.sideInput(pHistoryView);

        final LogLine currentLogLine = processContext.element();
        final Iterable<LogLine> userHistory = historyLogData.get(currentLogLine.getUserId());
        final String outputMetric = calculateMetricWithUserHistory(currentLogLine, userHistory);
        processContext.output(outputMetric);
    }
}

Antworten auf die Frage(2)

Ihre Antwort auf die Frage