Bereite meine BigData mit Spark via Python vor

Meine 100 m große, quantisierte Daten:

(1424411938', [3885, 7898])
(3333333333', [3885, 7898])

Erwünschtes Ergebnis

(3885, [3333333333, 1424411938])
(7898, [3333333333, 1424411938])

Also, was ich will, ist, die Daten so zu transformieren, dass ich 3885 (zum Beispiel) mit allen @ gruppierdata[0] das habe ich). Hier ist was ich in @ gemacht haPytho:

def prepare(data):
    result = []
    for point_id, cluster in data:
        for index, c in enumerate(cluster):
            found = 0
            for res in result:
                if c == res[0]:
                    found = 1
            if(found == 0):
                result.append((c, []))
            for res in result:
                if c == res[0]:
                    res[1].append(point_id)
    return result

aber wenn ichmapPartitions() 'eddata RDD mitprepare(), es scheint nur in der aktuellen Partition das zu tun, was ich will, also ein größeres Ergebnis als das gewünschte zurückgeben.

Zum Beispiel, wenn der 1. Datensatz am Anfang in der 1. Partition und der 2. in der 2. Partition war, dann würde ich als Ergebnis erhalten:

(3885, [3333333333])
(7898, [3333333333])
(3885, [1424411938])
(7898, [1424411938])

Wie ändere ich meinprepare(), um den gewünschten Effekt zu erzielen? Alternativ, wie Sie das Ergebnis verarbeiten, dassprepare() produziert, damit ich das gewünschte Ergebnis bekomme?

Wie Sie vielleicht bereits aus dem Code bemerkt haben, ist mir die Geschwindigkeit überhaupt nicht wichtig.

Hier ist ein Weg, um die Daten zu erstellen:

data = []
from random import randint
for i in xrange(0, 10):
    data.append((randint(0, 100000000), (randint(0, 16000), randint(0, 16000))))
data = sc.parallelize(data)

Antworten auf die Frage(2)

Ihre Antwort auf die Frage