Чтение нескольких файлов из S3 в Spark по периоду даты

Описание

У меня есть приложение, которое отправляет данные в AWS Kinesis Firehose, и оно записывает данные в мое хранилище S3. Firehose использует формат «гггг / мм / дд / чч» для записи файлов.

Как в этом примере S3 путь:

s3://mybucket/2016/07/29/12

Теперь у меня есть приложение Spark, написанное на Scala, где мне нужно читать данные за определенный период времени. У меня есть даты начала и окончания. Данные в формате JSON, и поэтому я используюsqlContext.read.json() неsc.textFile().

Как я могу читать данные быстро и эффективно?

Что я пробовал?

Wildcards - Я могу выбрать данные из всех часов определенной даты или всех дат определенного месяца, например:

val df = sqlContext.read.json("s3://mybucket/2016/07/29/*")
val df = sqlContext.read.json("s3://mybucket/2016/07/*/*")

Но если мне нужно прочитать данные за несколько дней, например, 2016-07-29 - 2016-07-30, я не могу использовать подход с использованием групповых символов таким же образом.

Что подводит меня к следующему пункту ...

Использование нескольких путей или CSV каталогов, представленныхsamthebest вэтот решение. Кажется, что разделение каталогов запятыми работает только сsc.textFile() и неsqlContext.read.json().

союз - Второе решение из предыдущей ссылкиоблако предлагает прочитать каждый каталог отдельно, а затем объединить их вместе. Хотя он предлагает объединить RDD-ы, существует также возможность объединения DataFrames. Если я сгенерирую строки даты из заданного периода дат вручную, то я могу создать путь, который не существует, и вместо того, чтобы игнорировать его, произойдет сбой всего чтения. Вместо этого я мог бы использовать AWS SDK и использовать функциюlistObjects от AmazonS3Client, чтобы получить все ключи, как вiMKanchwalaрешение по предыдущей ссылке.

Единственная проблема в том, что мои данные постоянно меняются. Еслиread.json() Функция получает все данные в виде единого параметра, читает все необходимые данные и достаточно умна, чтобы вывести из этих данных схему json. Если я читаю 2 каталога по отдельности и их схемы не совпадают, то я думаю, что объединение этих двух информационных фреймов становится проблемой.

Глоб (?) Синтаксис - это решение поnhahtdh немного лучше, чем варианты1 а также2 потому что они предоставляют возможность указывать даты и каталоги более подробно и как единый «путь», поэтому он работает также сread.json().

Но опять же, возникает знакомая проблема с отсутствующими каталогами. Допустим, мне нужны все данные с 20.07 по 30.07, я могу объявить это так:

val df = sqlContext.read.json("s3://mybucket/2016/07/[20-30]/*")

Но если мне не хватает данных, скажем, 25 июля, то путь..16/07/25/ не существует, и вся функция не работает.

И, очевидно, становится сложнее, когда запрашиваемый период, например, 25.11.2015-12.02.2016, тогда мне нужно программно (в моем скрипте Scala) создать путь к строке примерно так:

"s3://mybucket/{2015/11/[25-30],2015/12/*,2016/01/*,2016/02/[01-12]}/*"

И, создавая его, я бы хотел как-то убедиться, что все эти 25-30 и 01-12 интервалы имеют соответствующие пути, и если один из них отсутствует, он снова завершается неудачей. (К счастью, Asterisk имеет дело с отсутствующими каталогами, так как читает все, что существует)

Как я могу одновременно прочитать все необходимые данные из одного пути к каталогу без возможности сбоя из-за отсутствия каталога между интервалами дат?

Ответы на вопрос(1)

Ваш ответ на вопрос