Reemplazar bigrams según su frecuencia en Scala y Spark

Quiero reemplazar todos los bigrams cuyo conteo de frecuencia es mayor que un umbral con este patrón(word1.concat("-").concat(word2)), y he intentado:

import org.apache.spark.{SparkConf, SparkContext}

object replace {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
      .setMaster("local")
      .setAppName("replace")

    val sc = new SparkContext(conf)
    val rdd = sc.textFile("data/ddd.txt")

    val threshold = 2

    val searchBigram=rdd.map {
      _.split('.').map { substrings =>
        // Trim substrings and then tokenize on spaces
        substrings.trim.split(' ').

          // Remove non-alphanumeric characters and convert to lowercase
          map {
          _.replaceAll( """\W""", "").toLowerCase()
        }.
          sliding(2)

      }.flatMap {
        identity
      }
        .map {
        _.mkString(" ")
      }
        .groupBy {
        identity
      }
        .mapValues {
        _.size
      }
    }.flatMap {
      identity
    }.reduceByKey(_ + _).collect
      .sortBy(-_._2)
      .takeWhile(_._2 >= threshold)
      .map(x=>x._1.split(' '))
      .map(x=>(x(0), x(1))).toVector


    val sample1 = sc.textFile("data/ddd.txt")
    val sample2 = sample1.map(s=> s.split(" ") // split on space
      .sliding(2)                       // take continuous pairs
      .map{ case Array(a, b) => (a, b) }
      .map(elem => if (searchBigram.contains(elem)) (elem._1.concat("-").concat(elem._2)," ") else elem)
      .map{case (e1,e2) => e1}.mkString(" "))
    sample2.foreach(println)
  }
}

pero este código elimina la última palabra de cada documento y muestra algunos errores cuando lo ejecuto en un archivo que contiene muchos documentos.

supongamos que mi archivo de entrada contiene estos documentos:

surprise heard thump opened door small seedy man clasping package wrapped.

upgrading system found review spring two thousand issue moody audio mortgage backed.

omg left gotta wrap review order asap . understand issue moody hand delivered dali lama

speak hands wear earplugs lives . listen maintain link long .

buffered lightning two thousand volts cables burned revivification place .

cables volts cables finally able hear auditory issue moody gem long rumored music .

y mi salida favorita es:

surprise heard thump opened door small-man clasping package wrapped.

upgrading system found review spring two-thousand issue-moody audio mortgage backed.

omg left gotta wrap review order asap . understand issue-moody hand delivered dali lama

speak hands wear earplugs lives . listen maintain link long small-man .

buffered lightning two-thousand volts-cables burned revivification place .

cables volts-cables finally able hear auditory issue-moody gem long rumored music .

Alguien puede ayudarme?

Respuestas a la pregunta(2)

Su respuesta a la pregunta