¿Por qué las llamadas de caché tardan tanto en un conjunto de datos de Spark?

Estoy cargando grandes conjuntos de datos y luego los almacena en caché como referencia en todo mi código. El código se parece a esto:

val conversations = sqlContext.read
  .format("com.databricks.spark.redshift")
  .option("url", jdbcUrl)
  .option("tempdir", tempDir)
  .option("forward_spark_s3_credentials","true")
  .option("query", "SELECT * FROM my_table "+
                   "WHERE date <= '2017-06-03' "+
                   "AND date >= '2017-03-06' ")
  .load()
  .cache()

Si dejo el caché, el código se ejecuta rápidamente porque los conjuntos de datos se evalúan perezosamente. Pero si pongo el caché (), el bloque tarda mucho en ejecutarse.

Desde la línea de tiempo de eventos de la UI de Spark en línea, parece que la tabla SQL se transmite a los nodos de trabajo y luego se almacena en caché en los nodos de trabajo.

¿Por qué la caché se ejecuta de inmediato? El código fuente parece marcarlo solo para el almacenamiento en caché cuando se calculan los datos:

loscódigo fuente para el conjunto de datos llama a este códigoen CacheManager.scala cuando se llama cache o persistir:

  /**
   * Caches the data produced by the logical representation of the given [[Dataset]].
   * Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because
   * recomputing the in-memory columnar representation of the underlying table is expensive.
   */
  def cacheQuery(
      query: Dataset[_],
      tableName: Option[String] = None,
      storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock {
    val planToCache = query.logicalPlan
    if (lookupCachedData(planToCache).nonEmpty) {
      logWarning("Asked to cache already cached data.")
    } else {
      val sparkSession = query.sparkSession
      cachedData.add(CachedData(
        planToCache,
        InMemoryRelation(
          sparkSession.sessionState.conf.useCompression,
          sparkSession.sessionState.conf.columnBatchSize,
          storageLevel,
          sparkSession.sessionState.executePlan(planToCache).executedPlan,
          tableName)))
    }
  }

Lo que solo parece marcar para el almacenamiento en caché en lugar de realmente almacenar en caché los datos. Y esperaría que el almacenamiento en caché regrese inmediatamente en función de otras respuestas en Stack Overflow también.

¿Alguien más ha visto el almacenamiento en caché inmediatamente antes de unacción se realiza en el conjunto de datos? ¿Por qué pasó esto?

Respuestas a la pregunta(0)

Su respuesta a la pregunta