Dataflow gibt den parametrisierten Typ in die AVRO-Datei @ a

Ich habe eine Pipeline, die eine Avro-Datei wie folgt erfolgreich ausgibt:

@DefaultCoder(AvroCoder.class)
class MyOutput_T_S {
  T foo;
  S bar;
  Boolean baz;
  public MyOutput_T_S() {}
}

@DefaultCoder(AvroCoder.class)
class T {
  String id;
  public T() {}
}

@DefaultCoder(AvroCoder.class)
class S {
  String id;
  public S() {}
}
...
PCollection<MyOutput_T_S> output = input.apply(myTransform);
output.apply(AvroIO.Write.to("/out").withSchema(MyOutput_T_S.class));

Wie kann ich genau dieses Verhalten reproduzieren, außer mit einem parametrierten AusgangMyOutput<T, S> (woT undS sind beide Avro-fähig (mittels Reflektion).

Das Hauptproblem ist, dass die Avro-Reflexion bei parametrisierten Typen nicht funktioniert. Also basierend auf diesen Antworten:

Benutzerdefinierte Codierer einstellen und parametrisierte Typen handhabenVerwenden von Avrocoder für benutzerdefinierte Typen mit Generika

1) Ich denke, ich muss ein benutzerdefiniertes @ schreibCoderFactory aber ich habe Schwierigkeiten, genau herauszufinden, wie das funktioniert (ich habe Probleme, Beispiele zu finden). Seltsamerweise lässt mich eine völlig naive Coder-Factory die Pipeline ausführen und die ordnungsgemäße Ausgabe mit DataflowAssert überprüfen:

cr.RegisterCoder(MyOutput.class, new CoderFactory() {
  @Override
  public Coder<?> create(List<? excents Coder<?>> componentCoders) {
    Schema schema = new Schema.Parser().parse("{\"type\":\"record\,"
      + "\"name\":\"MyOutput\","
      + "\"namespace\":\"mypackage"\","
      + "\"fields\":[]}"
    return AvroCoder.of(MyOutput.class, schema);
  }
  @Override
  public List<Object> getInstanceComponents(Object value) {
    MyOutput<Object, Object> myOutput = (MyOutput<Object, Object>) value;
    List components = new ArrayList();
    return components;
  }

Während ich es jetzt erfolgreich gegen die Ausgabe behaupten kann, gehe ich davon aus, dass diese nicht zum Schreiben in eine Datei abgeschnitten wird. Ich habe nicht herausgefunden, wie ich das bereitgestellte @ verwenden socomponentCoders, um das richtige Schema zu generieren und wenn ich versuche, einfach das Schema von @ zu verschiebT oderSinfields Ich bekomme

java.lang.IllegalArgumentException: Unable to get field id from class null

2) Angenommen, ich finde heraus, wie man @ codieMyOutput. Was übergebe ich anAvroIO.Write.withSchema? Wenn ich entwederMyOutput.class oder das Schema, für das ich Typenkonfliktfehler erhalte.

Antworten auf die Frage(2)

Ihre Antwort auf die Frage