Organização do código Spark e práticas recomendadas [fechado]

Assim, depois de passar muitos anos em um mundo orientado a objetos com reutilização de código, padrões de design e práticas recomendadas sempre levadas em consideração, me deparo um pouco com a organização e a reutilização de código no mundo do Spark.

Se eu tentar escrever código de uma maneira reutilizável, ele quase sempre vem com um custo de desempenho e acabo reescrevendo-o para o que for ideal para o meu caso de uso específico. Essa constante "escrever o que é ideal para esse caso de uso específico" também afeta a organização do código, porque é difícil dividir o código em diferentes objetos ou módulos quando "tudo realmente se encaixa" e, portanto, acabo com muito poucos objetos "Deus" que contêm cadeias de transformações complexas. De fato, freqüentemente penso que, se eu tivesse examinado a maior parte do código Spark que estou escrevendo agora quando estava trabalhando no mundo orientado a objetos, eu teria estremecido e descartado como "código espaguete".

Naveguei na internet tentando encontrar algum tipo de equivalente às melhores práticas do mundo orientado a objetos, mas sem muita sorte. Posso encontrar algumas "práticas recomendadas" para programação funcional, mas o Spark apenas adiciona uma camada extra, porque o desempenho é um fator tão importante aqui.

Portanto, minha pergunta para você é: algum de vocês, gurus do Spark, encontrou algumas práticas recomendadas para escrever código Spark que você pode recomendar?

EDITAR

Conforme escrito em um comentário, eu realmente não esperava que alguém publicasse uma resposta sobre comoresolver esse problema, mas eu esperava que alguém dessa comunidade tivesse encontrado algum tipo de Martin Fowler, que tivesse escrito alguns artigos ou postagens de blog em algum lugar sobre como resolver problemas com a organização de códigos no mundo do Spark.

@DanielDarabos sugeriu que eu pudesse dar um exemplo de uma situação em que a organização e o desempenho do código são conflitantes. Embora descubra que frequentemente tenho problemas com isso no meu trabalho diário, acho um pouco difícil resumir um bom exemplo mínimo;), mas tentarei.

No mundo orientado a objetos, sou um grande fã do Princípio da Responsabilidade Única, para garantir que meus métodos sejam responsáveis apenas por uma coisa. Torna-os reutilizáveis e facilmente testáveis. Portanto, se eu tivesse que, digamos, calcular a soma de alguns números em uma lista (correspondendo a alguns critérios) e tivesse que calcular a média do mesmo número, definitivamente criaria dois métodos - um que calculasse a soma e outro que calculou a média. Como isso:

def main(implicit args: Array[String]): Unit = {
  val list = List(("DK", 1.2), ("DK", 1.4), ("SE", 1.5))

  println("Summed weights for DK = " + summedWeights(list, "DK")
  println("Averaged weights for DK = " + averagedWeights(list, "DK")
}

def summedWeights(list: List, country: String): Double = {
  list.filter(_._1 == country).map(_._2).sum
}

def averagedWeights(list: List, country: String): Double = {
  val filteredByCountry = list.filter(_._1 == country) 
  filteredByCountry.map(_._2).sum/ filteredByCountry.length
}

É claro que posso continuar honrando o SRP no Spark:

def main(implicit args: Array[String]): Unit = {
  val df = List(("DK", 1.2), ("DK", 1.4), ("SE", 1.5)).toDF("country", "weight")

  println("Summed weights for DK = " + summedWeights(df, "DK")
  println("Averaged weights for DK = " + averagedWeights(df, "DK")
}


def avgWeights(df: DataFrame, country: String, sqlContext: SQLContext): Double = {
  import org.apache.spark.sql.functions._
  import sqlContext.implicits._

  val countrySpecific = df.filter('country === country)
  val summedWeight = countrySpecific.agg(avg('weight))

  summedWeight.first().getDouble(0)
}

def summedWeights(df: DataFrame, country: String, sqlContext: SQLContext): Double = {
  import org.apache.spark.sql.functions._
  import sqlContext.implicits._

  val countrySpecific = df.filter('country === country)
  val summedWeight = countrySpecific.agg(sum('weight))

  summedWeight.first().getDouble(0)
}

Mas porque meudf pode conter bilhões de linhas, eu preferiria não ter que executar ofilter duas vezes. De fato, o desempenho está diretamente acoplado ao custo do EMR, então eu REALMENTE não quero isso. Para superá-lo, eu decido violar o SRP e simplesmente coloco as duas funções em uma e asseguro que eu chamo de persistir no filtro filtrado pelo país.DataFrame, como isso:

def summedAndAveragedWeights(df: DataFrame, country: String, sqlContext: SQLContext): (Double, Double) = {
  import org.apache.spark.sql.functions._
  import sqlContext.implicits._

  val countrySpecific = df.filter('country === country).persist(StorageLevel.MEMORY_AND_DISK_SER)
  val summedWeights = countrySpecific.agg(sum('weight)).first().getDouble(0)
  val averagedWeights = summedWeights / countrySpecific.count()

  (summedWeights, averagedWeights)
}

Agora, este exemplo é, obviamente, uma enorme simplificação do que é encontrado na vida real. Aqui eu poderia simplesmente resolvê-lo filtrando e persistindodf antes entregando-o às funções sum e avg (que também seriam mais SRP), mas na vida real pode haver vários cálculos intermediários em andamento que são necessários repetidamente. Em outras palavras, ofilter função aqui é apenas uma tentativa de fazer umasimples exemplo de algo que se beneficiará da persistência. Na verdade, acho que as chamadas parapersist é uma palavra-chave aqui. Chamandopersist irá acelerar muito o meu trabalho, mas o custo é que eu tenho que juntar firmemente todo o código que depende da persistênciaDataFrame - mesmo se eles estiverem logicamente separados.

questionAnswers(1)

yourAnswerToTheQuestion