Broadcast wird beim Verbinden von Datenrahmen in Spark 1.6 nicht gesendet

Below ist der Beispielcode, den ich ausführe. Wenn dieser Spark-Job ausgeführt wird, werden Dataframe-Joins mithilfe von sortmergejoin anstelle von broadcastjoin ausgeführt.

def joinedDf (sqlContext: SQLContext,
              txnTable:   DataFrame,
              countriesDfBroadcast: Broadcast[DataFrame]): 
              DataFrame = {
                    txnTable.as("df1").join((countriesDfBroadcast.value).withColumnRenamed("CNTRY_ID", "DW_CNTRY_ID").as("countries"),
                    $"df1.USER_CNTRY_ID" === $"countries.DW_CNTRY_ID", "inner")
              }
joinedDf(sqlContext, txnTable, countriesDfBroadcast).write.parquet("temp")  

Der broadcastjoin-Befehl wird nicht ausgeführt, auch wenn ich in der join-Anweisung einen broadcast () -Hinweis angegeben habe.

Der Optimierer partitioniert den Datenrahmen mit einem Hash und verursacht einen Datenversatz.

Hat jemand dieses Verhalten gesehen?

Ich verwende Spark 1.6 und HiveContext als SQLContext für Garn. Der Spark-Job läuft auf 200 Executoren. und die Datengröße des txnTable ist 240GB und die Datengröße von CountriesDf ist 5mb.

Antworten auf die Frage(2)

Ihre Antwort auf die Frage