передать значение столбца в качестве параметра функции

отаю с PySpark над огромным набором данных, где я хочу отфильтровать фрейм данных на основе строк в другом фрейме данных. Например,

dd = spark.createDataFrame(["something.google.com","something.google.com.somethingelse.ac.uk","something.good.com.cy", "something.good.com.cy.mal.org"], StringType()).,toDF('domains')
+----------------------------------------+
|domains                                 |
+----------------------------------------+
|something.google.com                    |
|something.google.com.somethingelse.ac.uk|
|something.good.com.cy                   |
|something.good.com.cy.mal.org           |
+----------------------------------------+  

dd1 =  spark.createDataFrame(["google.com", "good.com.cy"], StringType()).toDF('gooddomains')
+-----------+
|gooddomains|
+-----------+
|google.com |
|good.com.cy|
+-----------+

Я предполагаю чтоdomains а такжеgooddomains действительные доменные имена.

Что я хочу сделать, это отфильтровать соответствующие строки вdd которые не заканчиваютсяdd1, Итак, в приведенном выше примере я хочу отфильтровать строки 1 и 3, чтобы в конечном итоге

+----------------------------------------+
|domains                                 |
+----------------------------------------+
|something.google.com.somethingelse.ac.uk|
|something.good.com.cy.mal.org           |
+----------------------------------------+  

Мое текущее решение (как показано ниже) может учитывать только домены до 3 «слов». Если бы я добавил, скажем,verygood.co.ac.uk вdd1 (то есть белый список), то это не удастся.

def split_filter(x, whitelist):
    splitted1 = x.select(F.split(x['domains'], '\.').alias('splitted_domains'))
    last_two = splitted1.select(F.concat(splitted1.splitted_domains[F.size(splitted1.splitted_domains)-2], \
       F.lit('.'), \
       splitted1.splitted_domains[F.size(splitted1.splitted_domains)-1]).alias('last_two'))
    last_three = splitted1.select(F.concat(splitted1.splitted_domains[F.size(splitted1.splitted_domains)-3], \
       F.lit('.'), \
       splitted1.splitted_domains[F.size(splitted1.splitted_domains)-2], \
       F.lit('.'), \
       splitted1.splitted_domains[F.size(splitted1.splitted_domains)-1]).alias('last_three'))
    x = x.withColumn('id', F.monotonically_increasing_id())
    last_two = last_two.withColumn('id', F.monotonically_increasing_id())
    last_three = last_three.withColumn('id', F.monotonically_increasing_id())
    final_d = x.join(last_two, ['id']).join(last_three, ['id'])
    df1 = final_d.join(whitelist, final_d['last_two'] == whitelist['domains'], how = 'left_anti')
    df2 = df1.join(whitelist, df1['last_three'] == whitelist['domains'], how = 'left_anti')
    return df2.drop('id')

Я использую Spark 2.3.0 с Python 2.7.5.

Ответы на вопрос(2)

Ваш ответ на вопрос