Pyspark - ValueError: não foi possível converter a seqüência de caracteres em float / literal inválido para float ()
Estou tentando usar dados de um quadro de dados spark como entrada para o meu modelo k-means. No entanto, continuo recebendo erros. (Verifique a seção após o código)
Meu spark dataframe e se parece com isso (e tem cerca de 1 milhão de linhas):
ID col1 col2 Latitude Longitude
13 ... ... 22.2 13.5
62 ... ... 21.4 13.8
24 ... ... 21.8 14.1
71 ... ... 28.9 18.0
... ... ... .... ....
Aqui está o meu código:
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors
df = spark.read.csv("file.csv")
spark_rdd = df.rdd.map(lambda row: (row["ID"], Vectors.dense(row["Latitude"],row["Longitude"])))
feature_df = spark_rdd.toDF(["ID", "features"])
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(feature_df)
sum_of_square_error = model.computeCost(feature_df)
print str(sum_of_square_error)
centers = model.clusterCenters()
for center in centers:
print(center)
No entanto, recebo o erro:
Py4JJavaError Traceback (most recent call last)
<ipython-input-145-f50a6cbe7243> in <module>()
7
8 kmeans = KMeans().setK(2).setSeed(1)
----> 9 model = kmeans.fit(feature_df)
10
11
~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/ml/base.py in fit(self, dataset, params)
62 return self.copy(params)._fit(dataset)
63 else:
---> 64 return self._fit(dataset)
65 else:
66 raise ValueError("Params must be either a param map or a list/tuple of param maps, "
~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/ml/wrapper.py in _fit(self, dataset)
234
235 def _fit(self, dataset):
--> 236 java_model = self._fit_java(dataset)
237 return self._create_model(java_model)
238
~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/ml/wrapper.py in _fit_java(self, dataset)
231 """
232 self._transfer_params_to_java()
--> 233 return self._java_obj.fit(dataset._jdf)
234
235 def _fit(self, dataset):
/usr/local/lib/python2.7/dist-packages/py4j/java_gateway.pyc in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:
~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/usr/local/lib/python2.7/dist-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling {0}{1}{2}.\n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(
Py4JJavaError: An error occurred while calling o3552.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 457.0 failed 4 times, most recent failure: Lost task 5.3 in stage 457.0 (TID 2308, 10.3.1.31, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/worker.py", line 174, in main
process()
File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/worker.py", line 169, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/serializers.py", line 268, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "<ipython-input-145-f50a6cbe7243>", line 4, in <lambda>
File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/ml/linalg/__init__.py", line 790, in dense
return DenseVector(elements)
File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/ml/linalg/__init__.py", line 275, in __init__
ar = np.array(ar, dtype=np.float64)
ValueError: could not convert string to float: GOLF
at org.apache.spark.api.python.PythonRunner$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$anon$1.next(PythonRDD.scala:156)
at org.apache.spark.api.python.PythonRunner$anon$1.next(PythonRDD.scala:152)
at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
at scala.collection.Iterator$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$anonfunATUALIZAÇÃO: Eu até tentei$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:215)
at org.apache.spark.storage.BlockManager$anonfun$doPutIterator$1.apply(BlockManager.scala:957)
at org.apache.spark.storage.BlockManager$anonfun$doPutIterator$1.apply(BlockManager.scala:948)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$failJobAndIndependentStages(DAGScheduler.scala:1435)
at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
at org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
at org.apache.spark.util.EventLoop$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1958)
at org.apache.spark.rdd.RDD.count(RDD.scala:1157)
at org.apache.spark.rdd.RDD$anonfun$takeSample$1.apply(RDD.scala:567)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.takeSample(RDD.scala:556)
at org.apache.spark.mllib.clustering.KMeans.initKMeansParallel(KMeans.scala:353)
at org.apache.spark.mllib.clustering.KMeans.runAlgorithm(KMeans.scala:256)
at org.apache.spark.mllib.clustering.KMeans.run(KMeans.scala:227)
at org.apache.spark.ml.clustering.KMeans.fit(KMeans.scala:319)
at sun.reflect.GeneratedMethodAccessor89.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/worker.py", line 174, in main
process()
File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/worker.py", line 169, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/serializers.py", line 268, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "<ipython-input-145-f50a6cbe7243>", line 4, in <lambda>
File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/ml/linalg/__init__.py", line 790, in dense
return DenseVector(elements)
File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/ml/linalg/__init__.py", line 275, in __init__
ar = np.array(ar, dtype=np.float64)
ValueError: could not convert string to float: GOLFE
at org.apache.spark.api.python.PythonRunner$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$anon$1.next(PythonRDD.scala:156)
at org.apache.spark.api.python.PythonRunner$anon$1.next(PythonRDD.scala:152)
at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
at scala.collection.Iterator$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$anonfunATUALIZAÇÃO: Eu até tentei$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:215)
at org.apache.spark.storage.BlockManager$anonfun$doPutIterator$1.apply(BlockManager.scala:957)
at org.apache.spark.storage.BlockManager$anonfun$doPutIterator$1.apply(BlockManager.scala:948)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
O estranho é que o erro é diferente toda vez que eu o executo. Os 3 tipos de erros que recebo são:
UnicodeEncodeError: 'decimal' codec can't encode characters in position 3-5: invalid decimal Unicode string
invalid literal for float(): 2017-04
ValueError: could not convert string to float: GOLF
Corrija-me se estiver errado, mas acho que algum valor dos dados nas colunas pode estar incorreto (por exemplo, cadeias ocasionais na coluna de latitude e longitude)
Existe uma maneira de verificar se o valor em cada linha do 'Latitude' é de fato um valor flutuante? Existe uma maneira de verificar se o valor em cada linha de 'ID' é um número inteiro? Gostaria de descartar as linhas que contêm valores do tipo de dados incorreto. Talvez haja uma maneira de fazer isso usandodf.filter()
?
Eu apreciaria muito qualquer ajuda. Obrigado.
ATUALIZAÇÃO: Eu até tenteidf.describe('ID', 'Latitude', 'Longitude').show()
e retorna valores numéricos para valores count, mean, stddev, min, max para cada coluna, indicando para mim que todos eles devem ser números ..?