Excepción de transmisión estructurada cuando se utiliza el modo de salida anexa con marca de agua

A pesar de que estoy usandowithWatermark(), Recibo el siguiente mensaje de error cuando ejecuto mi trabajo de chispa:

Excepción en el subproceso "main" org.apache.spark.sql.AnalysisException: el modo de salida de anexos no es compatible cuando hay agregaciones de transmisión en DataFrames / DataSets sin marca de agua;

Por lo que puedo ver en elguía de programación, esto coincide exactamente con el uso previsto (y el código de ejemplo). ¿Alguien sabe qué podría estar mal?

¡Gracias por adelantado!

Código relevante (Java 8, Spark 2.2.0):

StructType logSchema = new StructType()
        .add("timestamp", TimestampType)
        .add("key", IntegerType)
        .add("val", IntegerType);

Dataset<Row> kafka = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", brokers)
        .option("subscribe", topics)
        .load();

Dataset<Row> parsed = kafka
        .select(from_json(col("value").cast("string"), logSchema).alias("parsed_value"))
        .select("parsed_value.*");

Dataset<Row> tenSecondCounts = parsed
        .withWatermark("timestamp", "10 minutes")
        .groupBy(
            parsed.col("key"),
            window(parsed.col("timestamp"), "1 day"))
        .count();

StreamingQuery query = tenSecondCounts
        .writeStream()
        .trigger(Trigger.ProcessingTime("10 seconds"))
        .outputMode("append")
        .format("console")
        .option("truncate", false)
        .start();

Respuestas a la pregunta(1)

Su respuesta a la pregunta