в качестве выходного пункта назначения. Дайте мне знать, если вы хотите больше информации об этом бите.
ствуйте, я очень смущен динамическим назначением файлов API, и нет документов, так что я здесь.
Ситуация у меня есть PCollection, и он содержит события, принадлежащие к различным разделам. Я хочу разделить их и записать в разные папки в gcs.
Вот что у меня есть.
Динамический целевой объект:
class GCSDestinationString(prefix: String) extends DynamicDestinations[Event, String, String] {
override def getDestination(element: Event): String = {
element.partition //this returns a string which is a gcs folder path
}
override def getFilenamePolicy(destination: String): FileBasedSink.FilenamePolicy = {
println(destination)
val overallPrefix = s"$prefix/$destination/part-"
DefaultFilenamePolicy.fromStandardParameters(
ValueProvider.StaticValueProvider.of(
FileSystems.matchNewResource(overallPrefix, false)),
null, ".jsonl", true)
}
override def formatRecord(record: Event): String = {
implicit val f = DefaultFormats
write(record.toDataLakeFormat())
}
override def getDefaultDestination: String = "default"
}
Я верю, что это правильная логика, я спрашиваю каждый элемент, каков его целевой раздел, а затем он передается в getFileNamePolicy, и оттуда создается имя файла. Чтобы отформатировать запись, я просто конвертирую ее в JSON.
Проблема интеграции этого с TextIO, я попробовал это
TextIO.
write()
.withWindowedWrites()
.withTempDirectory(tempDir)
.to(new GCSDestinationString("gcs://bucket"))
но это требует, чтобы тип источника был строковым, технически это могло бы работать, но мне пришлось бы десериализоваться несколько раз. Я нашел в документах для текста IO динамических направлений
Often this is used in conjunction with {@link TextIO#writeCustomType}, which allows your {@link DynamicDestinations} object to examine the input type and takes a format function to convert that type to a string for writing.
Итак, давайте попробуем это
TextIO
.writeCustomType[Event]()
.withWindowedWrites()
.withTempDirectory(tempDir)
.to(new GCSDestinationString("gcs://bucket"))
Это все еще не компилируется, поскольку writeCustomType внутренне возвращаетTypedWrite<UserT, Void>
и это влияет на требование, чтобы параметр 2-го типа моего динамического целевого объекта был Void. Очевидно, я требую, чтобы это была строка или хотя бы что-то отличное от Void
Я явно что-то упустил