La transmisión no se produce al unir marcos de datos en Spark 1.6

A continuación se muestra el código de muestra que estoy ejecutando. cuando se ejecuta este trabajo de chispa, las uniones de trama de datos se realizan utilizando sortmergejoin en lugar de broadcastjoin.

def joinedDf (sqlContext: SQLContext,
              txnTable:   DataFrame,
              countriesDfBroadcast: Broadcast[DataFrame]): 
              DataFrame = {
                    txnTable.as("df1").join((countriesDfBroadcast.value).withColumnRenamed("CNTRY_ID", "DW_CNTRY_ID").as("countries"),
                    $"df1.USER_CNTRY_ID" === $"countries.DW_CNTRY_ID", "inner")
              }
joinedDf(sqlContext, txnTable, countriesDfBroadcast).write.parquet("temp")  

El broadcastjoin no ocurre incluso cuando especifico una sugerencia broadcast () en la declaración de unión.

El optimizador está dividiendo hash el marco de datos y está causando sesgo de datos.

¿Alguien ha visto este comportamiento?

Estoy ejecutando esto en hilo usando Spark 1.6 y HiveContext como SQLContext. El trabajo de chispa se ejecuta en 200 ejecutores. y el tamaño de los datos de la tabla txn es de 240 GB y el tamaño de datos de los países es de 5 MB.

Respuestas a la pregunta(1)

Su respuesta a la pregunta