Просто для записи - Zeppelin может использоваться с предоставленной пользователем установкой Spark, независимо от встроенной, и поддерживает Spark 2.2 (как указано в вопросе). Таким образом, вы можете использовать формат «скорость», если хотите.

всех сил, чтобы получитьconsole раковина работает сPySpark Структурированная потоковая передача когда бегут от Zeppelin. По сути, я не вижу никаких результатов, напечатанных на экране, или в любых лог-файлах, которые я нашел.

Мой вопрос: У кого-нибудь есть рабочий пример использования PySpark Structured Streaming с приемником, который производит вывод, видимый в Apache Zeppelin? В идеале он также использовал бы источник сокетов, поскольку это легко проверить.

Я использую:

Ubuntu 16.04искровым 2.2.0-бен-hadoop2.7Дирижабль-0.7.3-бен-всеpython3

Я основал свой код наПример structd_network_wordcount.py, Работает при запуске из оболочки PySpark (./bin/pyspark --master local[2]); Я вижу таблицы за партию.

%pyspark
# structured streaming
from pyspark.sql.functions import *
lines = spark\
    .readStream\
    .format('socket')\
    .option('host', 'localhost')\
    .option('port', 9999)\
    .option('includeTimestamp', 'true')\
    .load()

# Split the lines into words, retaining timestamps
# split() splits each line into an array, and explode() turns the array into multiple rows
words = lines.select(
    explode(split(lines.value, ' ')).alias('word'),
    lines.timestamp
)

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, '10 seconds', '1 seconds'),
    words.word
).count().orderBy('window')

# Start running the query that prints the windowed word counts to the console
query = windowedCounts\
    .writeStream\
    .outputMode('complete')\
    .format('console')\
    .option('truncate', 'false')\
    .start()

print("Starting...")
query.awaitTermination(20)

Я ожидаю увидеть распечатки результатов для каждой партии, но вместо этого я просто вижуStarting..., а потомFalse, возвращаемое значениеquery.awaitTermination(20).

В отдельном терминале я ввожу некоторые данные вnc -lk 9999 Сеанс Netcat, пока работает выше.

Ответы на вопрос(2)

Ваш ответ на вопрос