в качестве выходного пункта назначения. Дайте мне знать, если вы хотите больше информации об этом бите.

ствуйте, я очень смущен динамическим назначением файлов API, и нет документов, так что я здесь.

Ситуация у меня есть PCollection, и он содержит события, принадлежащие к различным разделам. Я хочу разделить их и записать в разные папки в gcs.

Вот что у меня есть.

Динамический целевой объект:

  class GCSDestinationString(prefix: String) extends DynamicDestinations[Event, String, String] {

    override def getDestination(element: Event): String = {
      element.partition //this returns a string which is a gcs folder path
    }

    override def getFilenamePolicy(destination: String): FileBasedSink.FilenamePolicy = {
      println(destination)
      val overallPrefix = s"$prefix/$destination/part-"
      DefaultFilenamePolicy.fromStandardParameters(
        ValueProvider.StaticValueProvider.of(
          FileSystems.matchNewResource(overallPrefix, false)),
        null, ".jsonl", true)
    }

    override def formatRecord(record: Event): String = {
      implicit val f = DefaultFormats
      write(record.toDataLakeFormat())
    }

    override def getDefaultDestination: String = "default"
  }

Я верю, что это правильная логика, я спрашиваю каждый элемент, каков его целевой раздел, а затем он передается в getFileNamePolicy, и оттуда создается имя файла. Чтобы отформатировать запись, я просто конвертирую ее в JSON.

Проблема интеграции этого с TextIO, я попробовал это

TextIO.
  write()
  .withWindowedWrites()
  .withTempDirectory(tempDir)
  .to(new GCSDestinationString("gcs://bucket"))

но это требует, чтобы тип источника был строковым, технически это могло бы работать, но мне пришлось бы десериализоваться несколько раз. Я нашел в документах для текста IO динамических направлений

Often this is used in conjunction with {@link TextIO#writeCustomType}, which allows your {@link DynamicDestinations} object to examine the input type and takes a format function to convert that type to a string for writing.

Итак, давайте попробуем это

  TextIO
    .writeCustomType[Event]()
    .withWindowedWrites()
    .withTempDirectory(tempDir)
    .to(new GCSDestinationString("gcs://bucket"))

Это все еще не компилируется, поскольку writeCustomType внутренне возвращаетTypedWrite<UserT, Void> и это влияет на требование, чтобы параметр 2-го типа моего динамического целевого объекта был Void. Очевидно, я требую, чтобы это была строка или хотя бы что-то отличное от Void

Я явно что-то упустил

 jkff21 дек. 2017 г., 21:57
Ваш второй фрагмент кода должен скомпилироваться. TypedWrite.to (DynamicDestination) меняет тип назначения: public <NewDestinationT> TypedWrite <UserT, NewDestinationT> на (DynamicDestination <UserT, DestinationT, String> dynamicDestination). Можете ли вы включить полное сообщение об ошибке компиляции? Кроме того, вы также можете попробовать FileIO.write () (добавленный вчера на master), который имеет более удобный API - см.github.com/apache/beam/blob/master/sdks/java/core/src/main/java/... и используйте TextIO.sink (). Для этого вам понадобится версия 2.3.0-SNAPSHOT.

Ответы на вопрос(1)

writeCustomType().to(DynamicDestinations) не был проверен, и мы этого не заметили, но в опечатке типа была опечатка. PRhttps://github.com/apache/beam/pull/4319 находится в обзоре. Вам все равно понадобится 2.3.0-SNAPSHOT, чтобы поднять его, и в этом случае я все равно рекомендую просто использоватьFileIO.write().

 jkff23 дек. 2017 г., 21:22
Динамические назначения являются более общими, например, Вы можете использовать боковые входы в вашем отображении.
 Luke De Feo23 дек. 2017 г., 20:53
Хорошо, имеет смысл, мой ответ влияет так же, какова разница между двумя API

Ваш ответ на вопрос