Resultados de la búsqueda a petición "apache-spark"
Iniciando Ipython con Spark 2
Tengo mi script de inicio de ipthon de la siguiente manera IPYTHON_OPTS="notebook --port 8889 \ --notebook-dir='/usr/hdp/2.3.2.0-2950/spark/' \ --ip='*' --no-browser" pyspark Funciona bien para Spark mayores. Pero cuando cambio a Spark2, aparece ...
Cómo extraer un elemento de una matriz en pyspark
Tengo un marco de datos con el siguiente tipo col1|col2|col3|col4 xxxx|yyyy|zzzz|[1111],[2222]Quiero que mi salida sea del siguiente tipo col1|col2|col3|col4|col5 xxxx|yyyy|zzzz|1111|2222Mi col4 es una matriz y quiero convertirlo en una columna ...
¿Por qué el nodo trabajador no ve las actualizaciones del acumulador en otros nodos trabajadores?
Estoy usando unLongAccumulator como contador compartido en operaciones de mapas. Pero parece que no lo estoy usando correctamente porque el estado del contador en los nodos de trabajo no está actualizado. Así es como se ve mi clase ...
Spark - ¿Ventana con recursividad? - Propagación condicional de valores entre filas
Tengo el siguiente marco de datos que muestra los ingresos de las compras. +-------+--------+-------+ |user_id|visit_id|revenue| +-------+--------+-------+ | 1| 1| 0| | 1| 2| 0| | 1| 3| 0| | 1| 4| 100| | 1| 5| 0| | 1| 6| 0| | 1| 7| 200| | 1| 8| ...
Cómo usar COGROUP para grandes conjuntos de datos
tengo dosrdd's a saberval tab_a: RDD[(String, String)] yval tab_b: RDD[(String, String)] Estoy usandocogroup para esos conjuntos de datos como: val tab_c = tab_a.cogroup(tab_b).collect.toArray val updated = tab_c.map { x => { //somecode } }Estoy ...
pyspark aprox Función cuántica
Tengo un marco de datos con estas columnasid, price, timestamp. Me gustaría encontrar el valor medio agrupado porid. Estoy usando este código para encontrarlo, pero me está dando este error. from pyspark.sql import DataFrameStatFunctions as ...
El trabajo asíncrono de chispa falla con un error
Estoy escribiendo código para spark en java. Cuando usoforeachAsync chispa falla y me dajava.lang.IllegalStateException: Cannot call methods on a stopped SparkContext. En este código: JavaSparkContext sparkContext = new ...
Motivo _ razón de creación temporal
¿Por qué spark, al guardar el resultado en un sistema de archivos, carga los archivos de resultados en un directorio _temporary y luego los mueve a la carpeta de salida en lugar de cargarlos directamente a la carpeta de salida?
restar dos columnas con nulo en el marco de datos de chispa
Soy nuevo en spark, tengo dataframe df: +----------+------------+-----------+ | Column1 | Column2 | Sub | +----------+------------+-----------+ | 1 | 2 | 1 | +----------+------------+-----------+ | 4 | null | null ...
PySpark 2.1: Importar módulo con UDF rompe la conectividad de Hive
Actualmente estoy trabajando con Spark 2.1 y tengo un script principal que llama a un módulo auxiliar que contiene todos mis métodos de transformación. En otras palabras: main.py helper.pyEn la parte superior de mihelper.py archivo Tengo varias ...