Como usar destinos dinâmicos io de texto io de fluxo de dados em java

Olá, estou muito confuso Com a API de destinos de arquivos dinâmicos e não há documentos, então aqui estou.

A situação é que eu tenho um PCollection e contém eventos pertencentes a diferentes partições. Quero dividi-los e gravá-los em diferentes pastas no gcs.

Aqui está o que eu tenho.

Objeto de destino dinâmico:

  class GCSDestinationString(prefix: String) extends DynamicDestinations[Event, String, String] {

    override def getDestination(element: Event): String = {
      element.partition //this returns a string which is a gcs folder path
    }

    override def getFilenamePolicy(destination: String): FileBasedSink.FilenamePolicy = {
      println(destination)
      val overallPrefix = s"$prefix/$destination/part-"
      DefaultFilenamePolicy.fromStandardParameters(
        ValueProvider.StaticValueProvider.of(
          FileSystems.matchNewResource(overallPrefix, false)),
        null, ".jsonl", true)
    }

    override def formatRecord(record: Event): String = {
      implicit val f = DefaultFormats
      write(record.toDataLakeFormat())
    }

    override def getDefaultDestination: String = "default"
  }

Eu acredito que esta é a lógica correta, pergunto a cada elemento qual é sua partição de destino e, em seguida, que são passados para o getFileNamePolicy e, a partir daí, um nome de arquivo é criado. Para formatar o registro, basta convertê-lo para json.

O problema é integrar isso ao TextIO, tentei isso

TextIO.
  write()
  .withWindowedWrites()
  .withTempDirectory(tempDir)
  .to(new GCSDestinationString("gcs://bucket"))

mas exige que o tipo de fonte seja string, tecnicamente isso poderia funcionar, mas eu precisaria desserializar várias vezes. Encontrei nos documentos para destinos dinâmicos de texto io

Often this is used in conjunction with {@link TextIO#writeCustomType}, which allows your {@link DynamicDestinations} object to examine the input type and takes a format function to convert that type to a string for writing.

Então vamos tentar isso

  TextIO
    .writeCustomType[Event]()
    .withWindowedWrites()
    .withTempDirectory(tempDir)
    .to(new GCSDestinationString("gcs://bucket"))

Isso ainda não é compilado, pois writeCustomType retorna internamenteTypedWrite<UserT, Void>&nbsp;e isso tem o efeito de exigir que o parâmetro do segundo tipo do meu objeto de destino dinâmico seja nulo. Claramente eu exijo que seja uma string ou pelo menos algo diferente de Void

Estou claramente sentindo falta de algo