Lectura de varios archivos de S3 en Spark por período de fecha

Descripción

Tengo una aplicación que envía datos a AWS Kinesis Firehose y los escribe en mi bucket de S3. Firehose utiliza el formato "aaaa / MM / dd / HH" para escribir los archivos.

Como en este ejemplo de ruta S3:

s3://mybucket/2016/07/29/12

Ahora tengo una aplicación Spark escrita en Scala, donde necesito leer datos de un período de tiempo específico. Tengo fechas de inicio y fin. Los datos están en formato JSON y es por eso que usosqlContext.read.json() nosc.textFile().

¿Cómo puedo leer los datos de manera rápida y eficiente?

Que he probado

Comodines - Puedo seleccionar los datos de todas las horas de una fecha específica o de todas las fechas de un mes específico, por ejemplo:

val df = sqlContext.read.json("s3://mybucket/2016/07/29/*")
val df = sqlContext.read.json("s3://mybucket/2016/07/*/*")

Pero si tengo que leer datos del período de la fecha de algunos días, por ejemplo, 2016-07-29 - 2016-07-30, no puedo usar el enfoque comodín de la misma manera.

Lo que me lleva a mi siguiente punto...

Usando múltiples caminos o un CSV de directorios presentado porsamthebest enesta solución. Parece que separar directorios con comas solo funciona consc.textFile() y nosqlContext.read.json().

Unión - Una segunda solución del enlace anterior pornube sugiere leer cada directorio por separado y luego unirlos. Aunque sugiere unir los RDD-s, también hay una opción para unir DataFrames. Si genero las cadenas de fecha del período de fecha dado manualmente, entonces puedo crear una ruta que no existe y, en lugar de ignorarla, falla toda la lectura. En cambio, podría usar AWS SDK y usar la funciónlistObjects de AmazonS3Client para obtener todas las claves como eniMKanchwalaLa solución del enlace anterior.

El único problema es que mis datos cambian constantemente. Siread.json() La función obtiene todos los datos como un solo parámetro, lee todos los datos necesarios y es lo suficientemente inteligente como para inferir el esquema json de los datos. Si leo 2 directorios por separado y sus esquemas no coinciden, creo que unir estos dos marcos de datos se convierte en un problema.

Sintaxis de Glob (?) - Esta solución pornhahtdh es un poco mejor que las opciones1 y2 porque ofrecen la opción de especificar fechas y directorios con más detalle y como una "ruta" única para que funcione también conread.json().

Pero, de nuevo, se produce un problema familiar sobre los directorios que faltan. Digamos que quiero todos los datos del 20.07 al 30.07, puedo declararlo así:

val df = sqlContext.read.json("s3://mybucket/2016/07/[20-30]/*")

Pero si me faltan datos del 25 de julio, entonces el camino..16/07/25/ no existe y toda la función falla.

Y, obviamente, se vuelve más difícil cuando el período solicitado es, por ejemplo, 25.11.2015-12.02.2016, entonces necesitaría crear mediante programación (en mi script Scala) una ruta de cadena similar a esta:

"s3://mybucket/{2015/11/[25-30],2015/12/*,2016/01/*,2016/02/[01-12]}/*"

Y al crearlo, de alguna manera me aseguraría de que estos intervalos 25-30 y 01-12 tengan todos los caminos correspondientes, si falta uno, falla nuevamente. (Afortunadamente, Asterisk trata con los directorios que faltan, ya que lee todo lo que existe)

¿Cómo puedo leer todos los datos necesarios de una sola ruta de directorio de una vez sin la posibilidad de fallar debido a que falta un directorio entre algún intervalo de fechas?

Respuestas a la pregunta(1)

Su respuesta a la pregunta