передать значение столбца в качестве параметра функции
отаю с PySpark над огромным набором данных, где я хочу отфильтровать фрейм данных на основе строк в другом фрейме данных. Например,
dd = spark.createDataFrame(["something.google.com","something.google.com.somethingelse.ac.uk","something.good.com.cy", "something.good.com.cy.mal.org"], StringType()).,toDF('domains')
+----------------------------------------+
|domains |
+----------------------------------------+
|something.google.com |
|something.google.com.somethingelse.ac.uk|
|something.good.com.cy |
|something.good.com.cy.mal.org |
+----------------------------------------+
dd1 = spark.createDataFrame(["google.com", "good.com.cy"], StringType()).toDF('gooddomains')
+-----------+
|gooddomains|
+-----------+
|google.com |
|good.com.cy|
+-----------+
Я предполагаю чтоdomains
а такжеgooddomains
действительные доменные имена.
Что я хочу сделать, это отфильтровать соответствующие строки вdd
которые не заканчиваютсяdd1
, Итак, в приведенном выше примере я хочу отфильтровать строки 1 и 3, чтобы в конечном итоге
+----------------------------------------+
|domains |
+----------------------------------------+
|something.google.com.somethingelse.ac.uk|
|something.good.com.cy.mal.org |
+----------------------------------------+
Мое текущее решение (как показано ниже) может учитывать только домены до 3 «слов». Если бы я добавил, скажем,verygood.co.ac.uk
вdd1
(то есть белый список), то это не удастся.
def split_filter(x, whitelist):
splitted1 = x.select(F.split(x['domains'], '\.').alias('splitted_domains'))
last_two = splitted1.select(F.concat(splitted1.splitted_domains[F.size(splitted1.splitted_domains)-2], \
F.lit('.'), \
splitted1.splitted_domains[F.size(splitted1.splitted_domains)-1]).alias('last_two'))
last_three = splitted1.select(F.concat(splitted1.splitted_domains[F.size(splitted1.splitted_domains)-3], \
F.lit('.'), \
splitted1.splitted_domains[F.size(splitted1.splitted_domains)-2], \
F.lit('.'), \
splitted1.splitted_domains[F.size(splitted1.splitted_domains)-1]).alias('last_three'))
x = x.withColumn('id', F.monotonically_increasing_id())
last_two = last_two.withColumn('id', F.monotonically_increasing_id())
last_three = last_three.withColumn('id', F.monotonically_increasing_id())
final_d = x.join(last_two, ['id']).join(last_three, ['id'])
df1 = final_d.join(whitelist, final_d['last_two'] == whitelist['domains'], how = 'left_anti')
df2 = df1.join(whitelist, df1['last_three'] == whitelist['domains'], how = 'left_anti')
return df2.drop('id')
Я использую Spark 2.3.0 с Python 2.7.5.