GCP Dataflow 2.0 PubSub para GCS

Estou com dificuldades para entender os conceitos de .withFileNamePolicy de TextIO.write (). Os requisitos para fornecer uma FileNamePolicy parecem incrivelmente complexos para fazer algo tão simples quanto especificar um bucket do GCS para gravar arquivado em fluxo contínuo.

Em um nível alto, tenho mensagens JSON sendo transmitidas para um tópico do PubSub e gostaria de gravar essas mensagens brutas em arquivos no GCS para armazenamento permanente (também estarei fazendo outro processamento nas mensagens). Inicialmente, comecei com esse pipeline, pensando que seria bem simples:

public static void main(String[] args) {

        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();

        Pipeline p = Pipeline.create(options); 

        p.apply("Read From PubSub", PubsubIO.readStrings().fromTopic(topic))
            .apply("Write to GCS", TextIO.write().to(gcs_bucket);

        p.run();

    }

Eu recebi o erro sobre a necessidade de WindowedWrites, que eu apliquei e, em seguida, a necessidade de um FileNamePolicy. É aqui que as coisas ficam peludas.

Eu fui aos documentos do Beam e fiz check-outFilenamePolicy. Parece que eu precisaria estender essa classe, que também exigia a extensão de outras classes abstratas para fazer esse trabalho. Infelizmente, a documentação do Apache é um pouco escassa e não consigo encontrar exemplos para o Dataflow 2.0, exceto porO exemplo de contagem de palavras, que ainda usa implementa esses detalhes em uma classe auxiliar.

Então, eu provavelmente poderia fazer isso funcionar apenas copiando grande parte do exemplo do WordCount, mas estou tentando entender melhor os detalhes. Algumas perguntas que tenho:

1) Existe algum item do roteiro para abstrair muita dessa complexidade? Parece que eu deveria poder fornecer um bucket do GCS como faria em um nonWindowedWrite e fornecer apenas algumas opções básicas, como a regra de tempo e nomeação de arquivos. Eu sei que gravar dados em janela de streaming em arquivos é mais complexo do que apenas abrir um ponteiro de arquivo (ou equivalente a armazenamento de objetos).

2) Parece que, para fazer isso funcionar, preciso criar um objeto WindowedContext que exija o fornecimento de uma classe abstrata BoundedWindow, de PaneInfo Object Class e de algumas informações de fragmentos. As informações disponíveis para essas informações são bastante nuas e estou tendo dificuldades para saber o que é realmente necessário para todas elas, especialmente devido ao meu caso de uso simples. Existem bons exemplos disponíveis para implementá-los? Além disso, também parece que eu preciso definir o número de shards como parte do TextIO.write, mas também fornecer # shards como parte do fileNamePolicy?

Obrigado por qualquer coisa para me ajudar a entender os detalhes por trás disso, na esperança de aprender algumas coisas!

Editar 20/07/17 Então, finalmente consegui executar esse pipeline com a extensão da FilenamePolicy. Meu desafio foi precisar definir a janela dos dados de streaming do PubSub. Aqui está uma representação bem próxima do código:

public class ReadData {
    public static void main(String[] args) {

        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();

        Pipeline p = Pipeline.create(options);

        p.apply("Read From PubSub", PubsubIO.readStrings().fromTopic(topic))
            .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
            .apply("Write to GCS", TextIO.write().to("gcs_bucket")
                .withWindowedWrites()
                .withFilenamePolicy(new TestPolicy())
                .withNumShards(10));

        p.run();

    }
}

class TestPolicy extends FileBasedSink.FilenamePolicy {
    @Override
    public ResourceId windowedFilename(
        ResourceId outputDirectory, WindowedContext context, String extension) {
        IntervalWindow window = (IntervalWindow) context.getWindow();
        String filename = String.format(
            "%s-%s-%s-%s-of-%s.json",
            "test",
            window.start().toString(),
            window.end().toString(),
            context.getShardNumber(),
            context.getShardNumber()
        );
        return outputDirectory.resolve(filename, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
    }

    @Override
    public ResourceId unwindowedFilename(
        ResourceId outputDirectory, Context context, String extension) {
        throw new UnsupportedOperationException("Unsupported.");
    }
}

questionAnswers(2)

yourAnswerToTheQuestion