Organización de código chispa y mejores prácticas [cerrado]

Entonces, después de haber pasado muchos años en un mundo orientado a objetos con la reutilización del código, los patrones de diseño y las mejores prácticas siempre tomados en cuenta, me encuentro luchando un poco con la organización del código y la reutilización del código en el mundo de Spark.

Si trato de escribir código de manera reutilizable, casi siempre viene con un costo de rendimiento y termino reescribiéndolo a lo que sea óptimo para mi caso de uso particular. Esta constante "escribir lo que es óptimo para este caso de uso en particular" también afecta la organización del código, porque dividir el código en diferentes objetos o módulos es difícil cuando "todo realmente pertenece" y, por lo tanto, termino con muy pocos objetos "Dios" que contienen mucho cadenas de transformaciones complejas. De hecho, con frecuencia pienso que si hubiera echado un vistazo a la mayoría del código Spark que estoy escribiendo ahora cuando estaba trabajando en el mundo orientado a objetos, habría hecho una mueca y lo descarté como "código spaghetti".

He navegado por Internet tratando de encontrar algún tipo de equivalente a las mejores prácticas del mundo orientado a objetos, pero sin mucha suerte. Puedo encontrar algunas "mejores prácticas" para la programación funcional, pero Spark simplemente agrega una capa adicional, porque el rendimiento es un factor muy importante aquí.

Entonces, mi pregunta es, ¿alguno de ustedes, gurús de Spark, encontró algunas mejores prácticas para escribir código de Spark que puedan recomendar?

EDITAR

Como está escrito en un comentario, en realidad no esperaba que nadie publicara una respuesta sobre cómoresolver este problema, sino que esperaba que alguien en esta comunidad se hubiera encontrado con algún tipo de Martin Fowler, que había escrito algunos artículos o publicaciones de blog en algún lugar sobre cómo abordar los problemas con la organización del código en el mundo de Spark.

@DanielDarabos sugirió que podría poner un ejemplo de una situación en la que la organización del código y el rendimiento son conflictivos. Si bien encuentro que frecuentemente tengo problemas con esto en mi trabajo diario, me resulta un poco difícil resumirlo en un buen ejemplo mínimo;) pero lo intentaré.

En el mundo orientado a objetos, soy un gran admirador del Principio de responsabilidad única, por lo que me aseguraría de que mis métodos solo fueran responsables de una cosa. Los hace reutilizables y fácilmente comprobables. Entonces, si tuviera que, por ejemplo, calcular la suma de algunos números en una lista (coincidiendo con algunos criterios) y tuviera que calcular el promedio del mismo número, definitivamente crearía dos métodos: uno que calcula la suma y otro que calculó el promedio. Me gusta esto:

def main(implicit args: Array[String]): Unit = {
  val list = List(("DK", 1.2), ("DK", 1.4), ("SE", 1.5))

  println("Summed weights for DK = " + summedWeights(list, "DK")
  println("Averaged weights for DK = " + averagedWeights(list, "DK")
}

def summedWeights(list: List, country: String): Double = {
  list.filter(_._1 == country).map(_._2).sum
}

def averagedWeights(list: List, country: String): Double = {
  val filteredByCountry = list.filter(_._1 == country) 
  filteredByCountry.map(_._2).sum/ filteredByCountry.length
}

Por supuesto, puedo seguir honrando a SRP en Spark:

def main(implicit args: Array[String]): Unit = {
  val df = List(("DK", 1.2), ("DK", 1.4), ("SE", 1.5)).toDF("country", "weight")

  println("Summed weights for DK = " + summedWeights(df, "DK")
  println("Averaged weights for DK = " + averagedWeights(df, "DK")
}


def avgWeights(df: DataFrame, country: String, sqlContext: SQLContext): Double = {
  import org.apache.spark.sql.functions._
  import sqlContext.implicits._

  val countrySpecific = df.filter('country === country)
  val summedWeight = countrySpecific.agg(avg('weight))

  summedWeight.first().getDouble(0)
}

def summedWeights(df: DataFrame, country: String, sqlContext: SQLContext): Double = {
  import org.apache.spark.sql.functions._
  import sqlContext.implicits._

  val countrySpecific = df.filter('country === country)
  val summedWeight = countrySpecific.agg(sum('weight))

  summedWeight.first().getDouble(0)
}

Pero porque midf puede contener miles de millones de filas que preferiría no tener que realizar elfilter dos veces. De hecho, el rendimiento está directamente relacionado con el costo de EMR, por lo que REALMENTE no quiero eso. Para superarlo, decido violar SRP y simplemente pongo las dos funciones en una y me aseguro de llamar a persistir en el país filtradoDataFrame, Me gusta esto:

def summedAndAveragedWeights(df: DataFrame, country: String, sqlContext: SQLContext): (Double, Double) = {
  import org.apache.spark.sql.functions._
  import sqlContext.implicits._

  val countrySpecific = df.filter('country === country).persist(StorageLevel.MEMORY_AND_DISK_SER)
  val summedWeights = countrySpecific.agg(sum('weight)).first().getDouble(0)
  val averagedWeights = summedWeights / countrySpecific.count()

  (summedWeights, averagedWeights)
}

Ahora, este ejemplo es, por supuesto, una gran simplificación de lo que se encuentra en la vida real. Aquí podría resolverlo simplemente filtrando y persistiendodf antes de entregándolo a las funciones sum y avg (que también serían más SRP), pero en la vida real puede haber una serie de cálculos intermedios que se necesitan una y otra vez. En otras palabras, elfilter funcionar aquí es simplemente un intento de hacer unsencillo ejemplo de algo que se beneficiará de ser persistente. De hecho, creo que llama apersist Es una palabra clave aquí. Vocaciónpersist acelerará enormemente mi trabajo, pero el costo es que tengo que acoplar firmemente todo el código que depende de la persistenciaDataFrame - Incluso si están lógicamente separados.

Respuestas a la pregunta(1)

Su respuesta a la pregunta