como construir um gráfico a partir de tuplas no graphx e rotular os nós depois?

Algum contexto pode ser encontradoaqui, a ideia é que eu criei um gráfico a partir de tuplas coletadas de uma solicitação em uma tabela do Hive. Elas correspondem às relações comerciais entre os países. Tendo construído o gráfico dessa maneira, os vértices não são rotulados. Quero estudar a distribuição de diplomas e obter os nomes dos países mais conectados. Eu tentei 2 opções:

Primeiro : Tentei mapear o índice dos vértices com os nomes de string dos vértices com a função idMapbis dentro da função que está coletando e imprimindo os dez principais graus conectados.Segundo : Tentei adicionar rótulo aos vértices do próprio gráfico.

Nos dois casos, recebo o seguinte erro: a tarefa não é serializável

Código global:
import org.apache.spark.SparkContext
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

val sqlContext= new org.apache.spark.sql.hive.HiveContext(sc)
val data = sqlContext.sql("select year, trade_flow, reporter_iso, partner_iso, sum(trade_value_us) from comtrade.annual_hs where length(commodity_code)='2' and not partner_iso='WLD' group by year, trade_flow, reporter_iso, partner_iso").collect()
val data_2010 = data.filter(line => line(0)==2010)
val couples = data_2010.map(line=>(line(2),line(3))) //pays->pays 

os casais têm esta aparência: Matriz [(Qualquer, Qualquer)] = Matriz ((MWI, MOZ), (WSM, AUS), (MDA, CRI), (KNA, HTI), (PER, ERI), (SWE, CUB ), ...

val idMap = sc.broadcast(couples 
.flatMap{case (x: String, y: String) => Seq(x, y)}
.distinct 
.zipWithIndex  
.map{case (k, v) => (k, v.toLong)}  
.toMap) 

val edges: RDD[(VertexId, VertexId)] = sc.parallelize(couples
.map{case (x: String, y: String) => (idMap.value(x), idMap.value(y))})

val graph = Graph.fromEdgeTuples(edges, 1)

construídos dessa maneira, os vértices se parecem com (68,1), por exemplo

val degrees: VertexRDD[Int] = graph.degrees.cache()

//Most connected vertices 
def topNamesAndDegrees(degrees: VertexRDD[Int], graph: Graph[Int, Int]): Array[(Int, Int)] = {
val namesAndDegrees = degrees.innerJoin(graph.vertices) {
 (id, degree, k) => (id.toInt, degree)}
val ord = Ordering.by[(Int, Int), Int](_._2)
namesAndDegrees.map(_._2).top(10)(ord)}
topNamesAndDegrees(degrees, graph).foreach(println)

Temos: (79.1016), (64.912), (55.889) ...

Primeira opção para recuperar os nomes:
val idMapbis = sc.parallelize(couples
.flatMap{case (x: String, y: String) => Seq(x, y)} 
.distinct 
.zipWithIndex  
.map{case (k, v) => (v,k)}  
.toMap)

def topNamesAndDegrees(degrees: VertexRDD[Int], graph: Graph[Int, Int]):  Array[(String, Int)] = {
val namesAndDegrees = degrees.innerJoin(graph.vertices) {
 (id, degree, name) => (idMapbis.value(id.toInt), degree)}
val ord = Ordering.by[(String, Int), Int](_._2)
namesAndDegrees.map(_._2).top(10)(ord)}
topNamesAndDegrees(degrees, graph).foreach(println)

A tarefa não é serializável, mas a função idMapbis está funcionando, pois não há erro com idMapbis.value (graph.vertices.take (1) (0) ._ 1.toInt)

Opção 2:
graph.vertices.map{case (k, v) => (k,idMapbis.value(k.toInt))}

A tarefa não pode ser serializada novamente (para o contexto, veja como topNamesAndDegrees é modificado para obter os nomes dos vértices mais conectados nessa opção)

def topNamesAndDegrees(degrees: VertexRDD[Int], graph: Graph[Int, Int]): Array[(String, Int)] = {
val namesAndDegrees = degrees.innerJoin(graph.vertices) {
 (id, degree, name) => (name, degree)}
val ord = Ordering.by[(String, Int), Int](_._2)
namesAndDegrees.map(_._2).top(10)(ord)}
topNamesAndDegrees(degrees, graph).foreach(println)

Estou interessado em entender como melhorar uma dessas opções, talvez as duas se alguém ver como.

questionAnswers(1)

yourAnswerToTheQuestion