Когда вы используете неразрешенные столбцы, Spark определит правильные столбцы для вас.
тря на то, что я используюwithWatermark()
Я получаю следующее сообщение об ошибке при запуске задания на искру:
Исключение в потоке "main" org.apache.spark.sql.AnalysisException: добавление режима вывода не поддерживается при потоковой агрегации потоковых DataFrames / DataSets без водяного знака ;;
Из того, что я вижу вруководство по программированиюэто точно соответствует предполагаемому использованию (и коду примера). Кто-нибудь знает, что может быть не так?
Заранее спасибо!
Соответствующий код (Java 8, Spark 2.2.0):
StructType logSchema = new StructType()
.add("timestamp", TimestampType)
.add("key", IntegerType)
.add("val", IntegerType);
Dataset<Row> kafka = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", topics)
.load();
Dataset<Row> parsed = kafka
.select(from_json(col("value").cast("string"), logSchema).alias("parsed_value"))
.select("parsed_value.*");
Dataset<Row> tenSecondCounts = parsed
.withWatermark("timestamp", "10 minutes")
.groupBy(
parsed.col("key"),
window(parsed.col("timestamp"), "1 day"))
.count();
StreamingQuery query = tenSecondCounts
.writeStream()
.trigger(Trigger.ProcessingTime("10 seconds"))
.outputMode("append")
.format("console")
.option("truncate", false)
.start();