Exceção de fluxo estruturado ao usar o modo de saída acréscimo com marca d'água
Apesar do fato de eu estar usandowithWatermark()
, Estou recebendo a seguinte mensagem de erro quando executo meu trabalho de faísca:
Exceção no encadeamento "main" org.apache.spark.sql.AnalysisException: anexar modo de saída não suportado quando houver agregações de streaming no DataFrames / DataSets sem marca d'água;
Pelo que posso ver noguia de programação, isso corresponde exatamente ao uso pretendido (e ao código de exemplo). Alguém sabe o que pode estar errado?
Desde já, obrigado!
Código relevante (Java 8, Spark 2.2.0):
StructType logSchema = new StructType()
.add("timestamp", TimestampType)
.add("key", IntegerType)
.add("val", IntegerType);
Dataset<Row> kafka = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", topics)
.load();
Dataset<Row> parsed = kafka
.select(from_json(col("value").cast("string"), logSchema).alias("parsed_value"))
.select("parsed_value.*");
Dataset<Row> tenSecondCounts = parsed
.withWatermark("timestamp", "10 minutes")
.groupBy(
parsed.col("key"),
window(parsed.col("timestamp"), "1 day"))
.count();
StreamingQuery query = tenSecondCounts
.writeStream()
.trigger(Trigger.ProcessingTime("10 seconds"))
.outputMode("append")
.format("console")
.option("truncate", false)
.start();