Spark Streaming: Wie kann ich meinem DStream weitere Partitionen hinzufügen?
Ich habe eine Spark-Streaming-App, die so aussieht:
val message = KafkaUtils.createStream(...).map(_._2)
message.foreachRDD( rdd => {
if (!rdd.isEmpty){
val kafkaDF = sqlContext.read.json(rdd)
kafkaDF.foreachPartition(
i =>{
createConnection()
i.foreach(
row =>{
connection.sendToTable()
}
)
closeConnection()
}
)
Und ich starte es auf einem Garncluster mit
spark-submit --master yarn-cluster --num-executors 3 --driver-memory 2g --executor-memory 2g --executor-cores 5....
Wenn ich versuche mich anzumeldenkafkaDF.rdd.partitions.size
, das Ergebnis ist meistens '1' oder '5'. Ich bin verwirrt. Kann ich die Anzahl der Partitionen meines DataFrame steuern?KafkaUtils.createStream
scheint keine Parameter zu akzeptieren, die sich auf die Anzahl der Partitionen beziehen, die ich für die Festplatte haben möchte. Ich habe es versuchtkafkaDF.rdd.repartition( int )
, aber es scheint auch nicht zu funktionieren.
Wie kann ich mehr Parallelität in meinem Code erreichen? Wenn mein Ansatz falsch ist, wie kann ich ihn dann richtig erreichen?