como construir um gráfico a partir de tuplas no graphx e rotular os nós depois?
Algum contexto pode ser encontradoaqui, a ideia é que eu criei um gráfico a partir de tuplas coletadas de uma solicitação em uma tabela do Hive. Elas correspondem às relações comerciais entre os países. Tendo construído o gráfico dessa maneira, os vértices não são rotulados. Quero estudar a distribuição de diplomas e obter os nomes dos países mais conectados. Eu tentei 2 opções:
Primeiro : Tentei mapear o índice dos vértices com os nomes de string dos vértices com a função idMapbis dentro da função que está coletando e imprimindo os dez principais graus conectados.Segundo : Tentei adicionar rótulo aos vértices do próprio gráfico.Nos dois casos, recebo o seguinte erro: a tarefa não é serializável
Código global:import org.apache.spark.SparkContext
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
val sqlContext= new org.apache.spark.sql.hive.HiveContext(sc)
val data = sqlContext.sql("select year, trade_flow, reporter_iso, partner_iso, sum(trade_value_us) from comtrade.annual_hs where length(commodity_code)='2' and not partner_iso='WLD' group by year, trade_flow, reporter_iso, partner_iso").collect()
val data_2010 = data.filter(line => line(0)==2010)
val couples = data_2010.map(line=>(line(2),line(3))) //pays->pays
os casais têm esta aparência: Matriz [(Qualquer, Qualquer)] = Matriz ((MWI, MOZ), (WSM, AUS), (MDA, CRI), (KNA, HTI), (PER, ERI), (SWE, CUB ), ...
val idMap = sc.broadcast(couples
.flatMap{case (x: String, y: String) => Seq(x, y)}
.distinct
.zipWithIndex
.map{case (k, v) => (k, v.toLong)}
.toMap)
val edges: RDD[(VertexId, VertexId)] = sc.parallelize(couples
.map{case (x: String, y: String) => (idMap.value(x), idMap.value(y))})
val graph = Graph.fromEdgeTuples(edges, 1)
construídos dessa maneira, os vértices se parecem com (68,1), por exemplo
val degrees: VertexRDD[Int] = graph.degrees.cache()
//Most connected vertices
def topNamesAndDegrees(degrees: VertexRDD[Int], graph: Graph[Int, Int]): Array[(Int, Int)] = {
val namesAndDegrees = degrees.innerJoin(graph.vertices) {
(id, degree, k) => (id.toInt, degree)}
val ord = Ordering.by[(Int, Int), Int](_._2)
namesAndDegrees.map(_._2).top(10)(ord)}
topNamesAndDegrees(degrees, graph).foreach(println)
Temos: (79.1016), (64.912), (55.889) ...
Primeira opção para recuperar os nomes:val idMapbis = sc.parallelize(couples
.flatMap{case (x: String, y: String) => Seq(x, y)}
.distinct
.zipWithIndex
.map{case (k, v) => (v,k)}
.toMap)
def topNamesAndDegrees(degrees: VertexRDD[Int], graph: Graph[Int, Int]): Array[(String, Int)] = {
val namesAndDegrees = degrees.innerJoin(graph.vertices) {
(id, degree, name) => (idMapbis.value(id.toInt), degree)}
val ord = Ordering.by[(String, Int), Int](_._2)
namesAndDegrees.map(_._2).top(10)(ord)}
topNamesAndDegrees(degrees, graph).foreach(println)
A tarefa não é serializável, mas a função idMapbis está funcionando, pois não há erro com idMapbis.value (graph.vertices.take (1) (0) ._ 1.toInt)
Opção 2:graph.vertices.map{case (k, v) => (k,idMapbis.value(k.toInt))}
A tarefa não pode ser serializada novamente (para o contexto, veja como topNamesAndDegrees é modificado para obter os nomes dos vértices mais conectados nessa opção)
def topNamesAndDegrees(degrees: VertexRDD[Int], graph: Graph[Int, Int]): Array[(String, Int)] = {
val namesAndDegrees = degrees.innerJoin(graph.vertices) {
(id, degree, name) => (name, degree)}
val ord = Ordering.by[(String, Int), Int](_._2)
namesAndDegrees.map(_._2).top(10)(ord)}
topNamesAndDegrees(degrees, graph).foreach(println)
Estou interessado em entender como melhorar uma dessas opções, talvez as duas se alguém ver como.