Arrays de mesclagem / combinação de faíscas no grupo

O código Spark a seguir demonstra corretamente o que eu quero fazer e gera a saída correta com um pequeno conjunto de dados de demonstração.

Quando executo esse mesmo tipo geral de código em um grande volume de dados de produção, estou tendo problemas de tempo de execução. O trabalho do Spark é executado no meu cluster por ~ 12 horas e falha.

Apenas olhando para o código abaixo, parece ineficiente explodir cada linha, apenas para mesclá-la novamente. No conjunto de dados de teste fornecido, a quarta linha com três valores em array_value_1 e três valores em array_value_2, que explodirão para 3 * 3 ou nove linhas explodidas.

Portanto, em um conjunto de dados maior, uma linha com cinco dessas colunas da matriz e dez valores em cada coluna explodiria para 10 ^ 5 linhas explodidas?

Observando as funções Spark fornecidas, não há funções prontas para uso que fariam o que eu quero. Eu poderia fornecer uma função definida pelo usuário. Existem desvantagens de velocidade nisso?

val sparkSession = SparkSession.builder.
  master("local")
  .appName("merge list test")
  .getOrCreate()

val schema = StructType(
  StructField("category", IntegerType) ::
    StructField("array_value_1", ArrayType(StringType)) ::
    StructField("array_value_2", ArrayType(StringType)) ::
    Nil)

val rows = List(
  Row(1, List("a", "b"), List("u", "v")),
  Row(1, List("b", "c"), List("v", "w")),
  Row(2, List("c", "d"), List("w")),
  Row(2, List("c", "d", "e"), List("x", "y", "z"))
)

val df = sparkSession.createDataFrame(rows.asJava, schema)

val dfExploded = df.
  withColumn("scalar_1", explode(col("array_value_1"))).
  withColumn("scalar_2", explode(col("array_value_2")))

// This will output 19. 2*2 + 2*2 + 2*1 + 3*3 = 19
logger.info(s"dfExploded.count()=${dfExploded.count()}")

val dfOutput = dfExploded.groupBy("category").agg(
  collect_set("scalar_1").alias("combined_values_2"),
  collect_set("scalar_2").alias("combined_values_2"))

dfOutput.show()

questionAnswers(1)

yourAnswerToTheQuestion