Preparar meus dados grandes com Spark via Python

Meus 100m de tamanho, dados quantizados:

(1424411938', [3885, 7898])
(3333333333', [3885, 7898])

Resultado desejado:

(3885, [3333333333, 1424411938])
(7898, [3333333333, 1424411938])

Então, o que eu quero é transformar os dados para agrupar 3885 (por exemplo) com todos osdata[0] que tem). Aqui está o que eu fiz emPitão:

def prepare(data):
    result = []
    for point_id, cluster in data:
        for index, c in enumerate(cluster):
            found = 0
            for res in result:
                if c == res[0]:
                    found = 1
            if(found == 0):
                result.append((c, []))
            for res in result:
                if c == res[0]:
                    res[1].append(point_id)
    return result

mas quando eumapPartitions()'eddata RDD comprepare(), parece fazer o que eu quero apenas na partição atual, retornando assim um resultado maior que o desejado.

Por exemplo, se o 1º registro no início estivesse na 1ª partição e o 2º na 2ª, obteria como resultado:

(3885, [3333333333])
(7898, [3333333333])
(3885, [1424411938])
(7898, [1424411938])

Como modificar meuprepare() para obter o efeito desejado? Como alternativa, como processar o resultado queprepare() produz, para que eu possa obter o resultado desejado?

Como você já deve ter notado no código, não me importo com a velocidade.

Aqui está uma maneira de criar os dados:

data = []
from random import randint
for i in xrange(0, 10):
    data.append((randint(0, 100000000), (randint(0, 16000), randint(0, 16000))))
data = sc.parallelize(data)

questionAnswers(1)

yourAnswerToTheQuestion