в качестве выходного пункта назначения. Дайте мне знать, если вы хотите больше информации об этом бите.

ствуйте, я очень смущен динамическим назначением файлов API, и нет документов, так что я здесь.

Ситуация у меня есть PCollection, и он содержит события, принадлежащие к различным разделам. Я хочу разделить их и записать в разные папки в gcs.

Вот что у меня есть.

Динамический целевой объект:

  class GCSDestinationString(prefix: String) extends DynamicDestinations[Event, String, String] {

    override def getDestination(element: Event): String = {
      element.partition //this returns a string which is a gcs folder path
    }

    override def getFilenamePolicy(destination: String): FileBasedSink.FilenamePolicy = {
      println(destination)
      val overallPrefix = s"$prefix/$destination/part-"
      DefaultFilenamePolicy.fromStandardParameters(
        ValueProvider.StaticValueProvider.of(
          FileSystems.matchNewResource(overallPrefix, false)),
        null, ".jsonl", true)
    }

    override def formatRecord(record: Event): String = {
      implicit val f = DefaultFormats
      write(record.toDataLakeFormat())
    }

    override def getDefaultDestination: String = "default"
  }

Я верю, что это правильная логика, я спрашиваю каждый элемент, каков его целевой раздел, а затем он передается в getFileNamePolicy, и оттуда создается имя файла. Чтобы отформатировать запись, я просто конвертирую ее в JSON.

Проблема интеграции этого с TextIO, я попробовал это

TextIO.
  write()
  .withWindowedWrites()
  .withTempDirectory(tempDir)
  .to(new GCSDestinationString("gcs://bucket"))

но это требует, чтобы тип источника был строковым, технически это могло бы работать, но мне пришлось бы десериализоваться несколько раз. Я нашел в документах для текста IO динамических направлений

Often this is used in conjunction with {@link TextIO#writeCustomType}, which allows your {@link DynamicDestinations} object to examine the input type and takes a format function to convert that type to a string for writing.

Итак, давайте попробуем это

  TextIO
    .writeCustomType[Event]()
    .withWindowedWrites()
    .withTempDirectory(tempDir)
    .to(new GCSDestinationString("gcs://bucket"))

Это все еще не компилируется, поскольку writeCustomType внутренне возвращаетTypedWrite<UserT, Void>&nbsp;и это влияет на требование, чтобы параметр 2-го типа моего динамического целевого объекта был Void. Очевидно, я требую, чтобы это была строка или хотя бы что-то отличное от Void

Я явно что-то упустил