Resultados da pesquisa a pedido "apache-spark"

1 a resposta

Função definida pelo usuário a ser aplicada à janela no PySpark?

Estou tentando aplicar uma função definida pelo usuário para Window no PySpark. Eu li que o UDAF pode ser o caminho a seguir, mas não consegui encontrar nada concreto. Para dar um exemplo (extraído daqui:Blog de tecnologia de ...

1 a resposta

udf Nenhum TypeTag disponível para o tipo string

Eu não entendo um comportamento de faísca. Crio um udf que retorna um número inteiro como abaixo import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf, SparkContext} object Show { def main(args: Array[String]): Unit = { ...

2 a resposta

Como compor o nome da coluna usando o valor de outra coluna para withColumn no Scala Spark

Estou tentando adicionar uma nova coluna a umDataFrame. O valor desta coluna é o valor de outra coluna cujo nome depende de outras colunas da mesmaDataFrame. Por exemplo, dado o seguinte: +---+---+----+----+ | A| B| A_1| B_2| ...

3 a resposta

Spark Structured Streaming com integração Hbase

Estamos fazendo streaming de dados kafka que estão sendo coletados no MySQL. Agora que todas as análises estiverem concluídas, quero salvar meus dados diretamente no Hbase. Passei pelo documento de streaming estruturado do spark, mas não consegui ...

2 a resposta

Consistir consistência de streaming estruturado entre coletores

Gostaria de entender melhor o modelo de consistência do streaming estruturado do Spark 2.2 no seguinte caso: uma fonte (Kinesis)2 consultas desta fonte para 2 coletores diferentes: um coletor de arquivos para fins de arquivamento (S3) e outro ...

1 a resposta

Como usar um predicado durante a leitura da conexão JDBC?

Por padrão,spark_read_jdbc() lê uma tabela inteira do banco de dados no Spark. Eu usei a seguinte sintaxe para criar essas conexões. library(sparklyr) library(dplyr) config <- spark_config() config$`sparklyr.shell.driver-class-path` ...

0 a resposta

Por que o cache de chamada demora muito tempo em um conjunto de dados Spark?

Estou carregando grandes conjuntos de dados e, em seguida, armazenando-os em cache para referência em todo o meu código. O código se parece com isso: val conversations = sqlContext.read .format("com.databricks.spark.redshift") .option("url", ...

2 a resposta

Como usar o COGROUP para grandes conjuntos de dados

Eu tenho doisrdd's nomeadamenteval tab_a: RDD[(String, String)] eval tab_b: RDD[(String, String)] estou a usarcogroup para esses conjuntos de dados como: val tab_c = tab_a.cogroup(tab_b).collect.toArray val updated = tab_c.map { x => { ...

2 a resposta

Como desserializar registros do Kafka usando o Structured Streaming em Java?

Eu uso o Spark2.1. Estou tentando ler registros do Kafka usando o Spark Structured Streaming, desserializá-los e aplicar agregações posteriormente. Eu tenho o seguinte código: SparkSession spark = SparkSession .builder() ...

2 a resposta

Altere o carimbo de data e hora para o formato UTC no Pyspark

Eu tenho um quadro de dados de entrada (ip_df), os dados desse quadro de dados são os seguintes: id timestamp_value 1 2017-08-01T14:30:00+05:30 2 2017-08-01T14:30:00+06:30 3 2017-08-01T14:30:00+07:30Preciso criar um novo quadro de dados (op_df), ...