Как запустить независимые преобразования параллельно с помощью PySpark?

Я пытаюсь запустить 2 функции, делая полностью независимые преобразования на одном RDD параллельно, используя PySpark. Какими методами можно сделать то же самое?

def doXTransforms(sampleRDD):
    (X transforms)

def doYTransforms(sampleRDD):
    (Y Transforms)

if __name__ == "__main__":
    sc = SparkContext(appName="parallelTransforms")
    sqlContext = SQLContext(sc)
    hive_context = HiveContext(sc)

    rows_rdd = hive_context.sql("select * from tables.X_table")

    p1 = Process(target=doXTransforms , args=(rows_rdd,))
    p1.start()
    p2 = Process(target=doYTransforms, args=(rows_rdd,))  
    p2.start()
    p1.join()
    p2.join()
    sc.stop()

Это не работает, и теперь я понимаю, что это не будет работать. Но есть ли альтернативный способ сделать эту работу? В частности, есть ли какие-либо конкретные решения для Python-Spark?

 ShuaiYuan27 июн. 2016 г., 11:02
Если каждое из ваших преобразований может использовать (почти) 100% ресурса кластера, как это обычно бывает, их параллельная работа фактически замедляет работу.

Ответы на вопрос(1)

Решение Вопроса

Просто используйте потоки и убедитесь, что у кластера достаточно ресурсов для одновременной обработки обеих задач.

from threading import Thread
import time

def process(rdd, f):
    def delay(x):
        time.sleep(1)
        return f(x)
    return rdd.map(delay).sum()


rdd = sc.parallelize(range(100), int(sc.defaultParallelism / 2))

t1 = Thread(target=process, args=(rdd, lambda x: x * 2))
t2  = Thread(target=process, args=(rdd, lambda x: x + 1))
t1.start(); t2.start()

Возможно, это не так часто полезно на практике, но в остальном должно работать просто отлично.

Вы можете в дальнейшем использоватьпланирование в приложении сFAIR планировщик и пулы планировщика для лучшего контроля над стратегией исполнения.

Вы также можете попробоватьpyspark-asyncactions (отказ от ответственности - автор этого ответа также является автором пакета), который предоставляет набор оберток вокруг Spark API иconcurrent.futures:

import asyncactions
import concurrent.futures

f1 = rdd.filter(lambda x: x % 3 == 0).countAsync()
f2 = rdd.filter(lambda x: x % 11 == 0).countAsync()

[x.result() for x in concurrent.futures.as_completed([f1, f2])]
 zero32327 июн. 2016 г., 19:14
Это не важно Единственное, что происходит в этом коде, это вызовы RPC. Это не касается реальных вычислений. Вы также можете обрабатывать это в одном потоке с помощью асинхронных вызовов. Смотрите такжеэтот ответ и мои комментарии ниже.
 preitam ojha27 июн. 2016 г., 18:54
В Python потоки не являются действительно параллельными из-за GIL. Так что, если я воспользуюсь описанным выше методом, я не смогу использовать несколько ядер, верно?
 preitam ojha27 июн. 2016 г., 22:34
Это сработало, спасибо! Единственное изменение, которое я должен был сделать, чтобы это работало, было изменение yarn.scheduler.capacity.maximum-am-resource -cent с 0,1 до 0,5 в /etc/hadoop/conf/capacity-scheduler.xml.

Ваш ответ на вопрос