Как отфильтровать поток данных с помощью операции преобразования и внешнего RDD?

я использовалtransform метод в аналогичном случае использования, как описано вОперация преобразования разделПреобразования на DStreams:

spamInfoRDD = sc.pickleFile(...) # RDD containing spam information
# join data stream with spam information to do data cleaning
cleanedDStream = wordCounts.transform(lambda rdd: rdd.join(spamInfoRDD).filter(...))

Мой код выглядит следующим образом:

sc = SparkContext("local[4]", "myapp")
ssc = StreamingContext(sc, 5)
ssc.checkpoint('hdfs://localhost:9000/user/spark/checkpoint/')
lines = ssc.socketTextStream("localhost", 9999)
counts = lines.flatMap(lambda line: line.split(" "))\
              .map(lambda word: (word, 1))\
              .reduceByKey(lambda a, b: a+b)
filter_rdd = sc.parallelize([(u'A', 1), (u'B', 1)], 2)
filtered_count = counts.transform(
    lambda rdd: rdd.join(filter_rdd).filter(lambda k, (v1, v2): v1 and not v2)
)
filtered_count.pprint()
ssc.start()
ssc.awaitTermination()

Но я получаю следующую ошибку

Похоже, вы пытаетесь передать СДР или сослаться на СДР из действия или преобразования. Преобразования и действия СДР могут вызываться только драйвером, а не внутри других преобразований; например, rdd1.map (lambda x: rdd2.values.count () * x) недопустим, поскольку преобразование значений и действие count не могут быть выполнены внутри преобразования rdd1.map. Для получения дополнительной информации см. SPARK-5063.

Как мне использовать мой внешний RDD для фильтрации элементов из dstream?

Ответы на вопрос(1)

Ваш ответ на вопрос