если хочешь. Очевидно, что он не может пережить родительский переводчик, над которым вы не имеете никакого контроля. В противном случае вы можете легко добавить ведение журнала и использовать отладчик, чтобы увидеть, что инициализация применяется только при первом вызове.

апуске следующего фрагмента кода PySpark:

nlp = NLPFunctions()

def parse_ingredients(ingredient_lines):
    parsed_ingredients = nlp.getingredients_bulk(ingredient_lines)[0]
    return list(chain.from_iterable(parsed_ingredients))


udf_parse_ingredients = UserDefinedFunction(parse_ingredients, ArrayType(StringType()))

Я получаю следующую ошибку:_pickle.PicklingError: Could not serialize object: TypeError: can't pickle _thread.lock objects

Я предполагаю, что это потому, что PySpark не может сериализовать этот пользовательский класс. Но как я могу избежать накладных расходов на создание этого дорогостоящего объекта при каждом запускеparse_ingredients_line функционировать?

Ответы на вопрос(3)

Допустим, вы хотите использоватьIdentity класс определяется так (identity.py):

class Identity(object):                   
    def __getstate__(self):
        raise NotImplementedError("Not serializable")

    def identity(self, x):
        return x

Вы можете, например, использовать вызываемый объект (f.py) и хранитьIdentity экземпляр как член класса:

from identity import Identity

class F(object):                          
    identity = None

    def __call__(self, x):
        if not F.identity:
            F.identity = Identity()
        return F.identity.identity(x)

и используйте их, как показано ниже:

from pyspark.sql.functions import udf
import f

sc.addPyFile("identity.py")
sc.addPyFile("f.py")

f_ = udf(f.F())

spark.range(3).select(f_("id")).show()
+-----+
|F(id)|
+-----+
|    0|
|    1|
|    2|
+-----+

или автономная функция и закрытие:

from pyspark.sql.functions import udf
import identity

sc.addPyFile("identity.py")

def f(): 
    dict_ = {}                 
    @udf()              
    def f_(x):                 
        if "identity" not in dict_:
            dict_["identity"] = identity.Identity()
        return dict_["identity"].identity(x)
    return f_


spark.range(3).select(f()("id")).show()
+------+
|f_(id)|
+------+
|     0|
|     1|
|     2|
+------+
 Vitaliy01 апр. 2018 г., 07:49
Я не совсем понимаю пример. Где вы показываете, что можете сохранять состояние между выполнением udf?
 user691041101 апр. 2018 г., 12:37
@Vitaliy Это стандартный код Python - в обоих случаях мы сохраняем интересующий объект во внешней области видимости, поэтому его время жизни не ограничивается самой областью действия. Вы могли бы использоватьnonlocal вместо изменчивогоdict если хочешь. Очевидно, что он не может пережить родительский переводчик, над которым вы не имеете никакого контроля. В противном случае вы можете легко добавить ведение журнала и использовать отладчик, чтобы увидеть, что инициализация применяется только при первом вызове.

Редактировать: этот ответ неверный. Объект все еще сериализуется, а затем десериализуется, когда он транслируется, поэтому сериализации не избежать. (Советы по правильному использованию больших переменных вещания?)

Попробуйте использоватьшироковещательная переменная.

sc = SparkContext()
nlp_broadcast = sc.broadcast(nlp) # Stores nlp in de-serialized format.

def parse_ingredients(ingredient_lines):
    parsed_ingredients = nlp_broadcast.value.getingredients_bulk(ingredient_lines)[0]
    return list(chain.from_iterable(parsed_ingredients))
 Thomas Nys12 окт. 2017 г., 09:17
Это предлагаемое решение дает мне ту же ошибку.
Решение Вопроса

https://github.com/scikit-learn/scikit-learn/issues/6975), сделав все зависимости класса NLPFunctions сериализуемыми.

Ваш ответ на вопрос