Prueba de unidad de código pyspark usando python
Tengo script enpyspark
como abajo. Quiero probar unfunction
en este guion
def rename_chars(column_name):
chars = ((' ', '_&'), ('.', '_Yo escribi ununittest
como a continuación para probar la función.
Pero no sé cómo enviar elunittest
. he hechospark-submit
que no hace nada
import unittest
from my_script import column_names
from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext
conf = SparkConf()
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)
cols = ['ID', 'NAME', 'last.name', 'abc test']
val = [(1, 'Sam', 'SMITH', 'eng'), (2, 'RAM', 'Reddy', 'turbine')]
df = sqlContext.createDataFrame(val, cols)
class RenameColumnNames(unittest.TestCase):
def test_column_names(self):
df1 = column_names(df)
result = df1.schema.names
expected = ['ID', 'NAME', 'last_$name', 'abc_&test']
self.assertEqual(result, expected)
¿Cómo puedo integrar este script para que funcione comounittest
¿Qué puedo ejecutar esto en un nodo donde tengopyspark
instalado?
))
new_cols = reduce(lambda a, kv: a.replace(*kv), chars, column_name)
return new_cols
def column_names(df):
changed_col_names = df.schema.names
for cols in changed_col_names:
df = df.withColumnRenamed(cols, rename_chars(cols))
return df
Yo escribi ununittest
como a continuación para probar la función.
Pero no sé cómo enviar elunittest
. he hechospark-submit
que no hace nada
import unittest
from my_script import column_names
from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext
conf = SparkConf()
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)
cols = ['ID', 'NAME', 'last.name', 'abc test']
val = [(1, 'Sam', 'SMITH', 'eng'), (2, 'RAM', 'Reddy', 'turbine')]
df = sqlContext.createDataFrame(val, cols)
class RenameColumnNames(unittest.TestCase):
def test_column_names(self):
df1 = column_names(df)
result = df1.schema.names
expected = ['ID', 'NAME', 'last_$name', 'abc_&test']
self.assertEqual(result, expected)
¿Cómo puedo integrar este script para que funcione comounittest
¿Qué puedo ejecutar esto en un nodo donde tengopyspark
instalado?