GCP Dataflow 2.0 PubSub a GCS

Me está costando entender los conceptos de .withFileNamePolicy de TextIO.write (). Los requisitos para suministrar una FileNamePolicy parecen increíblemente complejos para hacer algo tan simple como especificar un depósito de GCS para escribir un archivo transmitido.

En un nivel alto, tengo mensajes JSON que se transmiten a un tema de PubSub, y me gustaría escribir esos mensajes en bruto en archivos en GCS para almacenamiento permanente (también haré otro procesamiento en los mensajes). Inicialmente comencé con este Pipeline, pensando que sería bastante simple:

public static void main(String[] args) {

        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();

        Pipeline p = Pipeline.create(options); 

        p.apply("Read From PubSub", PubsubIO.readStrings().fromTopic(topic))
            .apply("Write to GCS", TextIO.write().to(gcs_bucket);

        p.run();

    }

Recibí el error sobre la necesidad de WindowedWrites, que apliqué, y luego necesité un FileNamePolicy. Aquí es donde las cosas se ponen difíciles.

Fui a los documentos de Beam y reviséFilenamePolicy. Parece que necesitaría extender esta clase que luego también requiere extender otras clases abstractas para que esto funcione. Desafortunadamente, la documentación sobre Apache es un poco escasa y no puedo encontrar ningún ejemplo para Dataflow 2.0 haciendo esto, excepto porEl ejemplo de Wordcount, que incluso entonces utiliza implementa estos detalles en una clase auxiliar.

Entonces, probablemente podría hacer que esto funcione simplemente copiando gran parte del ejemplo de WordCount, pero estoy tratando de comprender mejor los detalles de esto. Algunas preguntas que tengo:

1) ¿Hay algún elemento de hoja de ruta para abstraer mucha de esta complejidad? Parece que debería poder suministrar un depósito de GCS como lo haría en una escritura sin ventana, y luego proporcionar algunas opciones básicas como la regla de tiempo y nombre de archivo. Sé que escribir datos en ventana de transmisión a archivos es más complejo que simplemente abrir un puntero de archivo (o equivalente de almacenamiento de objetos).

2) Parece que esto funciona, necesito crear un objeto WindowedContext que requiera suministrar una clase abstracta BoundedWindow y una clase de objeto PaneInfo, y luego información de fragmentos. La información disponible para estos es bastante simple y estoy teniendo dificultades para saber lo que realmente se necesita para todo esto, especialmente dado mi caso de uso simple. ¿Hay algún buen ejemplo disponible que implemente estos? Además, también parece que necesito establecer el número de fragmentos como parte de TextIO.write, pero también proporcionar # fragmentos como parte del fileNamePolicy?

¡Gracias por ayudarme a comprender los detalles detrás de esto, con la esperanza de aprender algunas cosas!

Editar 20/07/17 Así que finalmente pude ejecutar esta tubería con la extensión de FilenamePolicy. Mi desafío fue definir la ventana de transmisión de datos desde PubSub. Aquí hay una representación bastante cercana del código:

public class ReadData {
    public static void main(String[] args) {

        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();

        Pipeline p = Pipeline.create(options);

        p.apply("Read From PubSub", PubsubIO.readStrings().fromTopic(topic))
            .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
            .apply("Write to GCS", TextIO.write().to("gcs_bucket")
                .withWindowedWrites()
                .withFilenamePolicy(new TestPolicy())
                .withNumShards(10));

        p.run();

    }
}

class TestPolicy extends FileBasedSink.FilenamePolicy {
    @Override
    public ResourceId windowedFilename(
        ResourceId outputDirectory, WindowedContext context, String extension) {
        IntervalWindow window = (IntervalWindow) context.getWindow();
        String filename = String.format(
            "%s-%s-%s-%s-of-%s.json",
            "test",
            window.start().toString(),
            window.end().toString(),
            context.getShardNumber(),
            context.getShardNumber()
        );
        return outputDirectory.resolve(filename, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
    }

    @Override
    public ResourceId unwindowedFilename(
        ResourceId outputDirectory, Context context, String extension) {
        throw new UnsupportedOperationException("Unsupported.");
    }
}

Respuestas a la pregunta(2)

Su respuesta a la pregunta