Cómo escribir en el almacén de PostgreSQL usando Spark Dataset

Estoy tratando de escribir un conjunto de datos de Spark en una tabla postgresql existente (no puedo cambiar los metadatos de la tabla como los tipos de columna). Una de las columnas de esta tabla es de tipoHStore Y está causando problemas.

Veo la siguiente excepción cuando inicio la escritura (aquí el mapa original está vacío, que cuando se escapa da una cadena vacía):

Caused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO part_d3da09549b713bbdcd95eb6095f929c8 (.., "my_hstore_column", ..) VALUES (..,'',..) was aborted.  Call getNextException to see the cause.
    at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:136)
    at org.postgresql.core.v3.QueryExecutorImpl$1.handleError(QueryExecutorImpl.java:419)
    at org.postgresql.core.v3.QueryExecutorImpl$ErrorTrackingResultHandler.handleError(QueryExecutorImpl.java:308)
    at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2004)
    at org.postgresql.core.v3.QueryExecutorImpl.flushIfDeadlockRisk(QueryExecutorImpl.java:1187)
    at org.postgresql.core.v3.QueryExecutorImpl.sendQuery(QueryExecutorImpl.java:1212)
    at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:351)
    at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:1019)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:222)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$anonfun$saveTable$1.apply(JdbcUtils.scala:300)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$anonfun$saveTable$1.apply(JdbcUtils.scala:299)
    at org.apache.spark.rdd.RDD$anonfun$foreachPartitionHStore$anonfun$apply$28.apply(RDD.scala:902)
    at org.apache.spark.rdd.RDD$anonfun$foreachPartitionHStore$anonfun$apply$28.apply(RDD.scala:902)
    at org.apache.spark.SparkContext$anonfun$runJob$5.apply(SparkContext.scala:1899)
    at org.apache.spark.SparkContext$anonfun$runJob$5.apply(SparkContext.scala:1899)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.postgresql.util.PSQLException: ERROR: column "my_hstore_column" is of type hstore but expression is of type character varying

Así es como lo estoy haciendo:

def escapePgHstore[A, B](hmap: Map[A, B]) = {
  hmap.map{case(key, value) => s""" "${key}"=>${value} """}.mkString(",")
}
...
val props = new Properties()
props.put("user", "xxxxxxx")
props.put("password", "xxxxxxx")

ds.withColumn("my_hstore_column", escape_pg_hstore_udf($"original_column"))
  .drop("original_column")
  .coalesce(1).write
  .mode(org.apache.spark.sql.SaveMode.Append)
  .option("driver", "org.postgresql.Driver")
  .jdbc(jdbcUrl, hashedTablePartName, props)

Si no escapo deloriginal_column desde Mapa [String, Long] a String usandoescapePgHstore Veo los siguientes errores:

java.lang.IllegalArgumentException: Can't get JDBC type for map<string,bigint>
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$getJdbcType$2.apply(JdbcUtils.scala:137)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$getJdbcType$2.apply(JdbcUtils.scala:137)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$getJdbcType(JdbcUtils.scala:136)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$anonfun$7.apply(JdbcUtils.scala:293)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$anonfun$7.apply(JdbcUtils.scala:292)
    at scala.collection.TraversableLike$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:292)
    at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:441)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$anonfun$main$1.apply(App.scala:76)
    at scala.App$anonfun$main$1.apply(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
    at scala.App$class.main(App.scala:76)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$runMain(SparkSubmit.scala:736)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

¿Cuál es la forma correcta de hacer que spark escriba un tipo de datos hstore válido?

Respuestas a la pregunta(2)

Su respuesta a la pregunta