Pyspark - ValueError: não foi possível converter a seqüência de caracteres em float / literal inválido para float ()

Estou tentando usar dados de um quadro de dados spark como entrada para o meu modelo k-means. No entanto, continuo recebendo erros. (Verifique a seção após o código)

Meu spark dataframe e se parece com isso (e tem cerca de 1 milhão de linhas):

ID            col1           col2        Latitude         Longitude
13            ...            ...           22.2             13.5
62            ...            ...           21.4             13.8
24            ...            ...           21.8             14.1
71            ...            ...           28.9             18.0
...           ...            ...           ....             ....

Aqui está o meu código:

from pyspark.ml.clustering import KMeans 
from pyspark.ml.linalg import Vectors


df = spark.read.csv("file.csv")

spark_rdd = df.rdd.map(lambda row: (row["ID"], Vectors.dense(row["Latitude"],row["Longitude"])))
feature_df = spark_rdd.toDF(["ID", "features"])    

kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(feature_df)

sum_of_square_error = model.computeCost(feature_df)
    print str(sum_of_square_error)

centers = model.clusterCenters()

for center in centers:
    print(center)

No entanto, recebo o erro:

Py4JJavaError                             Traceback (most recent call last)
<ipython-input-145-f50a6cbe7243> in <module>()
      7 
      8 kmeans = KMeans().setK(2).setSeed(1)
----> 9 model = kmeans.fit(feature_df)
     10 
     11 

~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/ml/base.py in fit(self, dataset, params)
     62                 return self.copy(params)._fit(dataset)
     63             else:
---> 64                 return self._fit(dataset)
     65         else:
     66             raise ValueError("Params must be either a param map or a list/tuple of param maps, "

~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/ml/wrapper.py in _fit(self, dataset)
    234 
    235     def _fit(self, dataset):
--> 236         java_model = self._fit_java(dataset)
    237         return self._create_model(java_model)
    238 

~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/ml/wrapper.py in _fit_java(self, dataset)
    231         """
    232         self._transfer_params_to_java()
--> 233         return self._java_obj.fit(dataset._jdf)
    234 
    235     def _fit(self, dataset):

/usr/local/lib/python2.7/dist-packages/py4j/java_gateway.pyc in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:

~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/usr/local/lib/python2.7/dist-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name)
    317                 raise Py4JJavaError(
    318                     "An error occurred while calling {0}{1}{2}.\n".
--> 319                     format(target_id, ".", name), value)
    320             else:
    321                 raise Py4JError(

Py4JJavaError: An error occurred while calling o3552.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 457.0 failed 4 times, most recent failure: Lost task 5.3 in stage 457.0 (TID 2308, 10.3.1.31, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/worker.py", line 174, in main
    process()
  File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/worker.py", line 169, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/serializers.py", line 268, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "<ipython-input-145-f50a6cbe7243>", line 4, in <lambda>
  File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/ml/linalg/__init__.py", line 790, in dense
    return DenseVector(elements)
  File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/ml/linalg/__init__.py", line 275, in __init__
    ar = np.array(ar, dtype=np.float64)
ValueError: could not convert string to float: GOLF


    at org.apache.spark.api.python.PythonRunner$anon$1.read(PythonRDD.scala:193)
    at org.apache.spark.api.python.PythonRunner$anon$1.next(PythonRDD.scala:156)
    at org.apache.spark.api.python.PythonRunner$anon$1.next(PythonRDD.scala:152)
    at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
    at scala.collection.Iterator$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$anonfunATUALIZAÇÃO: Eu até tentei$anon$1.hasNext(WholeStageCodegenExec.scala:377)
    at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:215)
    at org.apache.spark.storage.BlockManager$anonfun$doPutIterator$1.apply(BlockManager.scala:957)
    at org.apache.spark.storage.BlockManager$anonfun$doPutIterator$1.apply(BlockManager.scala:948)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$failJobAndIndependentStages(DAGScheduler.scala:1435)
    at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
    at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
    at org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
    at org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
    at org.apache.spark.util.EventLoop$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1958)
    at org.apache.spark.rdd.RDD.count(RDD.scala:1157)
    at org.apache.spark.rdd.RDD$anonfun$takeSample$1.apply(RDD.scala:567)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.takeSample(RDD.scala:556)
    at org.apache.spark.mllib.clustering.KMeans.initKMeansParallel(KMeans.scala:353)
    at org.apache.spark.mllib.clustering.KMeans.runAlgorithm(KMeans.scala:256)
    at org.apache.spark.mllib.clustering.KMeans.run(KMeans.scala:227)
    at org.apache.spark.ml.clustering.KMeans.fit(KMeans.scala:319)
    at sun.reflect.GeneratedMethodAccessor89.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/worker.py", line 174, in main
    process()
  File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/worker.py", line 169, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/serializers.py", line 268, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "<ipython-input-145-f50a6cbe7243>", line 4, in <lambda>
  File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/ml/linalg/__init__.py", line 790, in dense
    return DenseVector(elements)
  File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/ml/linalg/__init__.py", line 275, in __init__
    ar = np.array(ar, dtype=np.float64)
ValueError: could not convert string to float: GOLFE 

    at org.apache.spark.api.python.PythonRunner$anon$1.read(PythonRDD.scala:193)
    at org.apache.spark.api.python.PythonRunner$anon$1.next(PythonRDD.scala:156)
    at org.apache.spark.api.python.PythonRunner$anon$1.next(PythonRDD.scala:152)
    at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
    at scala.collection.Iterator$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$anonfunATUALIZAÇÃO: Eu até tentei$anon$1.hasNext(WholeStageCodegenExec.scala:377)
    at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:215)
    at org.apache.spark.storage.BlockManager$anonfun$doPutIterator$1.apply(BlockManager.scala:957)
    at org.apache.spark.storage.BlockManager$anonfun$doPutIterator$1.apply(BlockManager.scala:948)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    ... 1 more

O estranho é que o erro é diferente toda vez que eu o executo. Os 3 tipos de erros que recebo são:

UnicodeEncodeError: 'decimal' codec can't encode characters in position 3-5: invalid decimal Unicode string

invalid literal for float(): 2017-04

ValueError: could not convert string to float: GOLF

Corrija-me se estiver errado, mas acho que algum valor dos dados nas colunas pode estar incorreto (por exemplo, cadeias ocasionais na coluna de latitude e longitude)

Existe uma maneira de verificar se o valor em cada linha do 'Latitude' é de fato um valor flutuante? Existe uma maneira de verificar se o valor em cada linha de 'ID' é um número inteiro? Gostaria de descartar as linhas que contêm valores do tipo de dados incorreto. Talvez haja uma maneira de fazer isso usandodf.filter()?

Eu apreciaria muito qualquer ajuda. Obrigado.

ATUALIZAÇÃO: Eu até tenteidf.describe('ID', 'Latitude', 'Longitude').show() e retorna valores numéricos para valores count, mean, stddev, min, max para cada coluna, indicando para mim que todos eles devem ser números ..?

questionAnswers(1)

yourAnswerToTheQuestion