Параметризованный тип вывода потока данных в файл avro

У меня есть конвейер, который успешно выводит файл Avro следующим образом:

@DefaultCoder(AvroCoder.class)
class MyOutput_T_S {
  T foo;
  S bar;
  Boolean baz;
  public MyOutput_T_S() {}
}

@DefaultCoder(AvroCoder.class)
class T {
  String id;
  public T() {}
}

@DefaultCoder(AvroCoder.class)
class S {
  String id;
  public S() {}
}
...
PCollection<MyOutput_T_S> output = input.apply(myTransform);
output.apply(AvroIO.Write.to("/out").withSchema(MyOutput_T_S.class));

Как я могу воспроизвести это точное поведение, кроме как с параметризованным выводомMyOutput<T, S> (гдеT а такжеS оба Avro-кодируются с использованием отражения).

Основная проблема заключается в том, что Avro Reflection не работает для параметризованных типов. Итак, на основании этих ответов:

Установка пользовательских кодеров и обработка параметризованных типовИспользование Avrocoder для пользовательских типов с обобщенными

1) думаю, мне нужно написать кастомCoderFactory но мне трудно понять, как именно это работает (у меня проблемы с поиском примеров). Как ни странно, совершенно наивная фабрика кодировщиков позволяет мне запустить конвейер и проверить правильность вывода с помощью DataflowAssert:

cr.RegisterCoder(MyOutput.class, new CoderFactory() {
  @Override
  public Coder<?> create(List<? excents Coder<?>> componentCoders) {
    Schema schema = new Schema.Parser().parse("{\"type\":\"record\,"
      + "\"name\":\"MyOutput\","
      + "\"namespace\":\"mypackage"\","
      + "\"fields\":[]}"
    return AvroCoder.of(MyOutput.class, schema);
  }
  @Override
  public List<Object> getInstanceComponents(Object value) {
    MyOutput<Object, Object> myOutput = (MyOutput<Object, Object>) value;
    List components = new ArrayList();
    return components;
  }

Хотя теперь я могу успешно утверждать вывод, я ожидаю, что это не обрезает его для записи в файл. Я не понял, как я должен использовать предоставленнуюcomponentCoders генерировать правильную схему, и если я пытаюсь просто засунуть схемуT или жеS вfields Я получил:

java.lang.IllegalArgumentException: Unable to get field id from class null

2) Предполагая, что я выясняю, как кодироватьMyOutput, Что я передаюAvroIO.Write.withSchema? Если я прохожу либоMyOutput.class или схема, я получаю ошибки несоответствия типов.

Ответы на вопрос(1)

Ваш ответ на вопрос