Spark kartesisches Produkt

Ich muss die Koordinaten vergleichen, um die Entfernung zu ermitteln. Dazu lade ich die Daten mit sc.textFile () und erstelle ein kartesisches Produkt. Es gibt ungefähr 2.000.000 Zeilen in der Textdatei, also 2.000.000 x 2.000.000 zu vergleichende Koordinaten.

Ich habe den Code mit ca. 2.000 Koordinaten getestet und es hat innerhalb von Sekunden funktioniert. Aber mit der großen Datei scheint es an einem bestimmten Punkt anzuhalten und ich weiß nicht warum. Der Code sieht wie folgt aus:

def concat(x,y):
    if(isinstance(y, list)&(isinstance(x,list))):
        return x + y
    if(isinstance(x,list)&isinstance(y,tuple)):
        return x + [y]
    if(isinstance(x,tuple)&isinstance(y,list)):
        return [x] + y
    else: return [x,y]

def haversian_dist(tuple):
    lat1 = float(tuple[0][0])
    lat2 = float(tuple[1][0])
    lon1 = float(tuple[0][2])
    lon2 = float(tuple[1][2])
    p = 0.017453292519943295
    a = 0.5 - cos((lat2 - lat1) * p)/2 + cos(lat1 * p) * cos(lat2 * p) * (1 - cos((lon2 - lon1) * p)) / 2
    print(tuple[0][1])
    return (int(float(tuple[0][1])), (int(float(tuple[1][1])),12742 * asin(sqrt(a))))

def sort_val(tuple):
    dtype = [("globalid", int),("distance",float)]
    a = np.array(tuple[1], dtype=dtype)
    sorted_mins = np.sort(a, order="distance",kind="mergesort")
    return (tuple[0], sorted_mins)


def calc_matrix(sc, path, rangeval, savepath, name):
    data = sc.textFile(path)
    data = data.map(lambda x: x.split(";"))
    data = data.repartition(100).cache()
    data.collect()
    matrix = data.cartesian(data)
    values = matrix.map(haversian_dist)
    values = values.reduceByKey(concat)
    values = values.map(sort_val)
    values = values.map(lambda x: (x[0], x[1][1:int(rangeval)].tolist()))
    values = values.map(lambda x: (x[0], [y[0] for y in x[1]]))
    dicti = values.collectAsMap()
    hp.save_pickle(dicti, savepath, name)

Auch eine Datei mit ca. 15.000 Einträgen funktioniert nicht. Ich weiß, dass die kartesische Ursache die Laufzeit von O (n ^ 2) ist. Aber sollte Funke damit nicht umgehen? Oder stimmt etwas nicht? Der einzige Ausgangspunkt ist eine Fehlermeldung, aber ich weiß nicht, ob es sich um das eigentliche Problem handelt:

16/08/06 22:21:12 WARN TaskSetManager: Lost task 15.0 in stage 1.0 (TID 16, hlb0004): java.net.SocketException: Daten?bergabe unterbrochen (broken pipe)
    at java.net.SocketOutputStream.socketWrite0(Native Method)
    at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
    at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
    at java.io.DataOutputStream.write(DataOutputStream.java:107)
    at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
    at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$write$1(PythonRDD.scala:440)
    at org.apache.spark.api.python.PythonRDD$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
    at org.apache.spark.api.python.PythonRDD$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
    at org.apache.spark.api.python.PythonRunner$WriterThread$anonfun$run$3.apply(PythonRDD.scala:280)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
    at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)

16/08/06 22:21:12 INFO TaskSetManager: Starting task 15.1 in stage 1.0 (TID 17, hlb0004, partition 15,PROCESS_LOCAL, 2408 bytes)
16/08/06 22:21:12 WARN TaskSetManager: Lost task 7.0 in stage 1.0 (TID 8, hlb0004): java.net.SocketException: Connection reset
    at java.net.SocketInputStream.read(SocketInputStream.java:209)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
    at java.io.DataInputStream.readInt(DataInputStream.java:387)
    at org.apache.spark.api.python.PythonRunner$anon$1.read(PythonRDD.scala:139)
    at org.apache.spark.api.python.PythonRunner$anon$1.<init>(PythonRDD.scala:207)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:342)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Antworten auf die Frage(2)

Ihre Antwort auf die Frage