spark Fehler beim Laden von Dateien von S3 Wildcard

Ich verwende die Pyspark-Shell und versuche, Daten aus S3 mit der Datei-Platzhalter-Funktion von spark zu lesen. Es wird jedoch die folgende Fehlermeldung angezeigt:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.2.0
      /_/

Using Python version 2.7.6 (default, Jul 24 2015 16:07:07)
SparkContext available as sc.
>>> sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", 'AWS_ACCESS_KEY_ID')
>>> sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", 'AWS_SECRET_ACCESS_KEY')
>>> sc.textFile("s3n://myBucket/path/files-*", use_unicode=False).count()
16/01/07 18:03:02 INFO MemoryStore: ensureFreeSpace(37645) called with curMem=83944, maxMem=278019440
16/01/07 18:03:02 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 36.8 KB, free 265.0 MB)
16/01/07 18:03:02 INFO MemoryStore: ensureFreeSpace(5524) called with curMem=121589, maxMem=278019440
16/01/07 18:03:02 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 5.4 KB, free 265.0 MB)
16/01/07 18:03:02 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on salve1:48235 (size: 5.4 KB, free: 265.1 MB)
16/01/07 18:03:02 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0
16/01/07 18:03:02 INFO SparkContext: Created broadcast 2 from textFile at NativeMethodAccessorImpl.java:-2
16/01/07 18:03:03 WARN RestS3Service: Response '/path' - Unexpected response code 404, expected 200
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/spark-1.2.0-bin-1.0.4/python/pyspark/rdd.py", line 819, in count
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/spark-1.2.0-bin-1.0.4/python/pyspark/rdd.py", line 810, in sum
    return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
  File "/spark-1.2.0-bin-1.0.4/python/pyspark/rdd.py", line 715, in reduce
    vals = self.mapPartitions(func).collect()
  File "/spark-1.2.0-bin-1.0.4/python/pyspark/rdd.py", line 676, in collect
    bytesInJava = self._jrdd.collect().iterator()
  File "/spark-1.2.0-bin-1.0.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
  File "/spark-1.2.0-bin-1.0.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o65.collect.
: org.apache.hadoop.fs.s3.S3Exception: org.jets3t.service.S3ServiceException: Failed to sanitize XML document destined for handler class org.jets3t.service.impl.rest.XmlResponsesSaxParser$ListBucketHandler
        at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.list(Jets3tNativeFileSystemStore.java:197)
        at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.list(Jets3tNativeFileSystemStore.java:166)
        at sun.reflect.GeneratedMethodAccessor40.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
        at org.apache.hadoop.fs.s3native.$Proxy7.list(Unknown Source)
        at org.apache.hadoop.fs.s3native.NativeS3FileSystem.listStatus(NativeS3FileSystem.java:375)
        at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:842)
        at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:902)
        at org.apache.hadoop.fs.FileSystem.globStatusInternal(FileSystem.java:1032)
        at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:987)
        at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:177)
        at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
        at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:201)
        at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:205)
        at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:203)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
        at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
        at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:205)
        at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:203)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
        at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:57)
        at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:205)
        at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:203)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1352)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:780)
        at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:309)
        at org.apache.spark.api.java.JavaRDD.collect(JavaRDD.scala:32)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
        at py4j.Gateway.invoke(Gateway.java:259)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:207)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.jets3t.service.S3ServiceException: Failed to sanitize XML document destined for handler class org.jets3t.service.impl.rest.XmlResponsesSaxParser$ListBucketHandler
        at org.jets3t.service.impl.rest.XmlResponsesSaxParser.sanitizeXmlDocument(XmlResponsesSaxParser.java:179)
        at org.jets3t.service.impl.rest.XmlResponsesSaxParser.parseListBucketObjectsResponse(XmlResponsesSaxParser.java:198)
        at org.jets3t.service.impl.rest.httpclient.RestS3Service.listObjectsInternal(RestS3Service.java:1090)
        at org.jets3t.service.impl.rest.httpclient.RestS3Service.listObjectsChunkedImpl(RestS3Service.java:1056)
        at org.jets3t.service.S3Service.listObjectsChunked(S3Service.java:1328)
        at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.list(Jets3tNativeFileSystemStore.java:181)
        ... 44 more
Caused by: java.lang.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOf(Arrays.java:3332)
        at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)
        at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)
        at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:569)
        at java.lang.StringBuffer.append(StringBuffer.java:369)
        at org.jets3t.service.impl.rest.XmlResponsesSaxParser.sanitizeXmlDocument(XmlResponsesSaxParser.java:160)
        at org.jets3t.service.impl.rest.XmlResponsesSaxParser.parseListBucketObjectsResponse(XmlResponsesSaxParser.java:198)
        at org.jets3t.service.impl.rest.httpclient.RestS3Service.listObjectsInternal(RestS3Service.java:1090)
        at org.jets3t.service.impl.rest.httpclient.RestS3Service.listObjectsChunkedImpl(RestS3Service.java:1056)
        at org.jets3t.service.S3Service.listObjectsChunked(S3Service.java:1328)
        at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.list(Jets3tNativeFileSystemStore.java:181)
        at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.list(Jets3tNativeFileSystemStore.java:166)
        at sun.reflect.GeneratedMethodAccessor40.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
        at org.apache.hadoop.fs.s3native.$Proxy7.list(Unknown Source)
        at org.apache.hadoop.fs.s3native.NativeS3FileSystem.listStatus(NativeS3FileSystem.java:375)
        at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:842)
        at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:902)
        at org.apache.hadoop.fs.FileSystem.globStatusInternal(FileSystem.java:1032)
        at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:987)
        at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:177)
        at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
        at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:201)
        at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:205)
        at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:203)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
        at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
        at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:205)

Wenn ich versuche, eine einzelne Datei zu laden (ohne den Platzhalter zu verwenden), funktioniert der Code. Da ich ungefähr 100.000 Dateien lesen muss, frage ich mich, wie alle Dateien am besten in ein RDD geladen werden können.

Aktualisiere

Es scheint mir, dass das Problem darin besteht, dass das von mir verwendete Schlüsselpräfix über 300.000 Dateien im s3-Verzeichnis enthält, das alle meine Dateien enthält. Meine Dateien haben das Datum als Suffix.

s3://myBucket/path/files-2016-01-01-02-00
s3://myBucket/path/files-2016-01-01-02-01
s3://myBucket/path/files-2016-01-01-03-00
s3://myBucket/path/files-2016-01-01-03-01

Ich habe versucht, den Platzhalter zu verwenden, um nur einige Dateien nach Datum mit @ auszuwähles3n://myBucket/path/files-2016-01-01-03-* Als ich die Debug-Protokollierung aktivierte, sah ich, dass der Funke alle Dateien im s3-Verzeichnis auflistete s3://myBucket/path/) anstatt nur der Dateien mit dem von mir angegebenen Schlüsselpräfix s3://myBucket/path/files-2016-01-01-03-). Also, obwohl ich nur 2 Dateien lesen wollte, wurden alle 300k Dateien aufgelistet und dies ist wahrscheinlich der Grund, warum der Speicher voll ist.

Antworten auf die Frage(6)

Ihre Antwort auf die Frage