Как прочитать входные данные из S3 в кластерном приложении Spark Streaming EC2

Я пытаюсь, чтобы приложение Spark Streaming считывало его входные данные из каталога S3, но я получаю это исключение после запуска его с помощью сценария spark-submit:

Exception in thread "main" java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively).
    at org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:66)
    at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:49)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
    at org.apache.hadoop.fs.s3native.$Proxy6.initialize(Unknown Source)
    at org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:216)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187)
    at org.apache.spark.streaming.StreamingContext.checkpoint(StreamingContext.scala:195)
    at MainClass$.main(MainClass.scala:1190)
    at MainClass.main(MainClass.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:292)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Я устанавливаю эти переменные через этот блок кода, как предложено здесьhttp://spark.apache.org/docs/latest/ec2-scripts.html (внизу страницы):

val ssc = new org.apache.spark.streaming.StreamingContext(
  conf,
  Seconds(60))
ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId",args(2))
ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey",args(3))

args (2) и args (3) - это, конечно же, мой идентификатор ключа доступа AWS и секретный ключ доступа.

Почему он продолжает говорить, что они не установлены?

РЕДАКТИРОВАТЬ: Я пытался так же, но я получаю то же исключение:

val lines = ssc.textFileStream("s3n://"+ args(2) +":"+ args(3) + "@<mybucket>/path/")

Ответы на вопрос(9)

Ваш ответ на вопрос