чтобы получить контекст для каждого теста. Это обычно добавляет много времени на выполнение тестов, так как создание нового контекста искры в настоящее время стоит дорого.
я есть сценарий вpyspark
как ниже. Я хочу провести модульное тестированиеfunction
в этом сценарии.
def rename_chars(column_name):
chars = ((' ', '_&'), ('.', '_Я написалunittest
как ниже, чтобы проверить функцию.
Но я не знаю, как отправитьunittest
, я сделалspark-submit
который ничего не делает.
import unittest
from my_script import column_names
from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext
conf = SparkConf()
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)
cols = ['ID', 'NAME', 'last.name', 'abc test']
val = [(1, 'Sam', 'SMITH', 'eng'), (2, 'RAM', 'Reddy', 'turbine')]
df = sqlContext.createDataFrame(val, cols)
class RenameColumnNames(unittest.TestCase):
def test_column_names(self):
df1 = column_names(df)
result = df1.schema.names
expected = ['ID', 'NAME', 'last_$name', 'abc_&test']
self.assertEqual(result, expected)
Как я могу интегрировать этот скрипт для работы в качествеunittest
что я могу запустить это на узле, где у меня естьpyspark
установлен?
))
new_cols = reduce(lambda a, kv: a.replace(*kv), chars, column_name)
return new_cols
def column_names(df):
changed_col_names = df.schema.names
for cols in changed_col_names:
df = df.withColumnRenamed(cols, rename_chars(cols))
return df
Я написалunittest
как ниже, чтобы проверить функцию.
Но я не знаю, как отправитьunittest
, я сделалspark-submit
который ничего не делает.
import unittest
from my_script import column_names
from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext
conf = SparkConf()
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)
cols = ['ID', 'NAME', 'last.name', 'abc test']
val = [(1, 'Sam', 'SMITH', 'eng'), (2, 'RAM', 'Reddy', 'turbine')]
df = sqlContext.createDataFrame(val, cols)
class RenameColumnNames(unittest.TestCase):
def test_column_names(self):
df1 = column_names(df)
result = df1.schema.names
expected = ['ID', 'NAME', 'last_$name', 'abc_&test']
self.assertEqual(result, expected)
Как я могу интегрировать этот скрипт для работы в качествеunittest
что я могу запустить это на узле, где у меня естьpyspark
установлен?