Spark Streaming: Wie kann ich meinem DStream weitere Partitionen hinzufügen?

Ich habe eine Spark-Streaming-App, die so aussieht:

val message = KafkaUtils.createStream(...).map(_._2)

message.foreachRDD( rdd => {

  if (!rdd.isEmpty){
    val kafkaDF = sqlContext.read.json(rdd)

    kafkaDF.foreachPartition(
      i =>{
        createConnection()
        i.foreach(
          row =>{
            connection.sendToTable()
          }
        )
        closeConnection()
      }
    )

Und ich starte es auf einem Garncluster mit

spark-submit --master yarn-cluster --num-executors 3 --driver-memory 2g --executor-memory 2g --executor-cores 5....

Wenn ich versuche mich anzumeldenkafkaDF.rdd.partitions.size, das Ergebnis ist meistens '1' oder '5'. Ich bin verwirrt. Kann ich die Anzahl der Partitionen meines DataFrame steuern?KafkaUtils.createStream scheint keine Parameter zu akzeptieren, die sich auf die Anzahl der Partitionen beziehen, die ich für die Festplatte haben möchte. Ich habe es versuchtkafkaDF.rdd.repartition( int ), aber es scheint auch nicht zu funktionieren.

Wie kann ich mehr Parallelität in meinem Code erreichen? Wenn mein Ansatz falsch ist, wie kann ich ihn dann richtig erreichen?

Antworten auf die Frage(2)

Ihre Antwort auf die Frage