Código pyspark de teste de unidade usando python
Eu tenho script empyspark
como abaixo. Eu quero testar uma unidadefunction
neste script.
def rename_chars(column_name):
chars = ((' ', '_&'), ('.', '_Eu escrevi umunittest
como abaixo para testar a função.
Mas não sei como enviar ounittest
. Eu fizspark-submit
o que não faz nada.
import unittest
from my_script import column_names
from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext
conf = SparkConf()
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)
cols = ['ID', 'NAME', 'last.name', 'abc test']
val = [(1, 'Sam', 'SMITH', 'eng'), (2, 'RAM', 'Reddy', 'turbine')]
df = sqlContext.createDataFrame(val, cols)
class RenameColumnNames(unittest.TestCase):
def test_column_names(self):
df1 = column_names(df)
result = df1.schema.names
expected = ['ID', 'NAME', 'last_$name', 'abc_&test']
self.assertEqual(result, expected)
Como posso integrar esse script para funcionar como umunittest
o que posso executar isso em um nó em que tenhopyspark
instalado?
))
new_cols = reduce(lambda a, kv: a.replace(*kv), chars, column_name)
return new_cols
def column_names(df):
changed_col_names = df.schema.names
for cols in changed_col_names:
df = df.withColumnRenamed(cols, rename_chars(cols))
return df
Eu escrevi umunittest
como abaixo para testar a função.
Mas não sei como enviar ounittest
. Eu fizspark-submit
o que não faz nada.
import unittest
from my_script import column_names
from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext
conf = SparkConf()
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)
cols = ['ID', 'NAME', 'last.name', 'abc test']
val = [(1, 'Sam', 'SMITH', 'eng'), (2, 'RAM', 'Reddy', 'turbine')]
df = sqlContext.createDataFrame(val, cols)
class RenameColumnNames(unittest.TestCase):
def test_column_names(self):
df1 = column_names(df)
result = df1.schema.names
expected = ['ID', 'NAME', 'last_$name', 'abc_&test']
self.assertEqual(result, expected)
Como posso integrar esse script para funcionar como umunittest
o que posso executar isso em um nó em que tenhopyspark
instalado?