Cómo usar destinos de texto io de flujo de datos dinámicos en java

Hola, estoy muy confundido por la API de destinos de archivos dinámicos y no hay documentos, así que aquí estoy.

La situación es que tengo una PCollection y contiene eventos que pertenecen a diferentes particiones. Quiero dividirlos y escribirlos en diferentes carpetas en gcs.

Aquí está lo que tengo.

Objeto de destino dinámico:

  class GCSDestinationString(prefix: String) extends DynamicDestinations[Event, String, String] {

    override def getDestination(element: Event): String = {
      element.partition //this returns a string which is a gcs folder path
    }

    override def getFilenamePolicy(destination: String): FileBasedSink.FilenamePolicy = {
      println(destination)
      val overallPrefix = s"$prefix/$destination/part-"
      DefaultFilenamePolicy.fromStandardParameters(
        ValueProvider.StaticValueProvider.of(
          FileSystems.matchNewResource(overallPrefix, false)),
        null, ".jsonl", true)
    }

    override def formatRecord(record: Event): String = {
      implicit val f = DefaultFormats
      write(record.toDataLakeFormat())
    }

    override def getDefaultDestination: String = "default"
  }

Creo que esta es la lógica correcta, le pregunto a cada elemento cuál es su partición de destino y luego se pasa a getFileNamePolicy y desde allí se crea un nombre de archivo. Para formatear el registro, simplemente lo convierto a json.

El problema es integrar esto con TextIO, probé esto

TextIO.
  write()
  .withWindowedWrites()
  .withTempDirectory(tempDir)
  .to(new GCSDestinationString("gcs://bucket"))

pero requiere que el tipo de fuente sea string, técnicamente esto podría funcionar pero tendría que deserializar varias veces. Encontré en los documentos para texto io destinos dinámicos

Often this is used in conjunction with {@link TextIO#writeCustomType}, which allows your {@link DynamicDestinations} object to examine the input type and takes a format function to convert that type to a string for writing.

Entonces intentemos eso

  TextIO
    .writeCustomType[Event]()
    .withWindowedWrites()
    .withTempDirectory(tempDir)
    .to(new GCSDestinationString("gcs://bucket"))

Esto todavía no se compila, ya que writeCustomType devuelve internamenteTypedWrite<UserT, Void> y eso tiene el efecto de exigir que el segundo parámetro de tipo de mi objeto de destino dinámico sea nulo. Claramente, requiero que sea una cadena o al menos algo distinto de Vacío

Claramente me falta algo

Respuestas a la pregunta(3)

Su respuesta a la pregunta