Exceção de fluxo estruturado ao usar o modo de saída acréscimo com marca d'água

Apesar do fato de eu estar usandowithWatermark(), Estou recebendo a seguinte mensagem de erro quando executo meu trabalho de faísca:

Exceção no encadeamento "main" org.apache.spark.sql.AnalysisException: anexar modo de saída não suportado quando houver agregações de streaming no DataFrames / DataSets sem marca d'água;

Pelo que posso ver noguia de programação, isso corresponde exatamente ao uso pretendido (e ao código de exemplo). Alguém sabe o que pode estar errado?

Desde já, obrigado!

Código relevante (Java 8, Spark 2.2.0):

StructType logSchema = new StructType()
        .add("timestamp", TimestampType)
        .add("key", IntegerType)
        .add("val", IntegerType);

Dataset<Row> kafka = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", brokers)
        .option("subscribe", topics)
        .load();

Dataset<Row> parsed = kafka
        .select(from_json(col("value").cast("string"), logSchema).alias("parsed_value"))
        .select("parsed_value.*");

Dataset<Row> tenSecondCounts = parsed
        .withWatermark("timestamp", "10 minutes")
        .groupBy(
            parsed.col("key"),
            window(parsed.col("timestamp"), "1 day"))
        .count();

StreamingQuery query = tenSecondCounts
        .writeStream()
        .trigger(Trigger.ProcessingTime("10 seconds"))
        .outputMode("append")
        .format("console")
        .option("truncate", false)
        .start();

questionAnswers(1)

yourAnswerToTheQuestion