Параметризованный тип вывода потока данных в файл avro
У меня есть конвейер, который успешно выводит файл Avro следующим образом:
@DefaultCoder(AvroCoder.class)
class MyOutput_T_S {
T foo;
S bar;
Boolean baz;
public MyOutput_T_S() {}
}
@DefaultCoder(AvroCoder.class)
class T {
String id;
public T() {}
}
@DefaultCoder(AvroCoder.class)
class S {
String id;
public S() {}
}
...
PCollection<MyOutput_T_S> output = input.apply(myTransform);
output.apply(AvroIO.Write.to("/out").withSchema(MyOutput_T_S.class));
Как я могу воспроизвести это точное поведение, кроме как с параметризованным выводомMyOutput<T, S>
(гдеT
а такжеS
оба Avro-кодируются с использованием отражения).
Основная проблема заключается в том, что Avro Reflection не работает для параметризованных типов. Итак, на основании этих ответов:
Установка пользовательских кодеров и обработка параметризованных типовИспользование Avrocoder для пользовательских типов с обобщенными1) думаю, мне нужно написать кастомCoderFactory
но мне трудно понять, как именно это работает (у меня проблемы с поиском примеров). Как ни странно, совершенно наивная фабрика кодировщиков позволяет мне запустить конвейер и проверить правильность вывода с помощью DataflowAssert:
cr.RegisterCoder(MyOutput.class, new CoderFactory() {
@Override
public Coder<?> create(List<? excents Coder<?>> componentCoders) {
Schema schema = new Schema.Parser().parse("{\"type\":\"record\,"
+ "\"name\":\"MyOutput\","
+ "\"namespace\":\"mypackage"\","
+ "\"fields\":[]}"
return AvroCoder.of(MyOutput.class, schema);
}
@Override
public List<Object> getInstanceComponents(Object value) {
MyOutput<Object, Object> myOutput = (MyOutput<Object, Object>) value;
List components = new ArrayList();
return components;
}
Хотя теперь я могу успешно утверждать вывод, я ожидаю, что это не обрезает его для записи в файл. Я не понял, как я должен использовать предоставленнуюcomponentCoders
генерировать правильную схему, и если я пытаюсь просто засунуть схемуT
или жеS
вfields
Я получил:
java.lang.IllegalArgumentException: Unable to get field id from class null
2) Предполагая, что я выясняю, как кодироватьMyOutput
, Что я передаюAvroIO.Write.withSchema
? Если я прохожу либоMyOutput.class
или схема, я получаю ошибки несоответствия типов.