Calcule las diferencias entre registros sucesivos en Hadoop con Hive Queries

Tengo una tabla Hive que contiene datos de llamadas de clientes. Para simplificar, tenga en cuenta que tiene 2 columnas, la primera columna contiene la identificación del cliente y la segunda columna contiene la marca de tiempo de la llamada (marca de hora de Unix).

Puedo consultar esta tabla para encontrar todas las llamadas para cada cliente:

SELECT * FROM mytable SORT BY customer_id, call_time;

El resultado es:

Customer1    timestamp11
Customer1    timestamp12
Customer1    timestamp13
Customer2    timestamp21
Customer3    timestamp31
Customer3    timestamp32
...

¿Es posible crear una consulta de Hive que devuelva, para cada cliente, a partir de la segunda llamada, el intervalo de tiempo entre dos llamadas sucesivas? Para el ejemplo anterior, esa consulta debe devolver:

Customer1    timestamp12-timestamp11
Customer1    timestamp13-timestamp12
Customer3    timestamp32-timestamp31
...

He intentado adaptar las soluciones delsolución sql, pero estoy atascado con las limitaciones de Hive:acepta subconsultas solo en FROM yLas uniones deben contener solo igualdades.

Gracias.

EDIT1:

He intentado usar una función UDF de Hive:

public class DeltaComputerUDF extends UDF {
private String previousCustomerId;
private long previousCallTime;

public String evaluate(String customerId, LongWritable callTime) {
    long callTimeValue = callTime.get();
    String timeDifference = null;

    if (customerId.equals(previousCustomerId)) {
        timeDifference = new Long(callTimeValue - previousCallTime).toString();
    }

    previousCustomerId = customerId;
    previousCallTime = callTimeValue;

    return timeDifference;
}}

y usarlo con el nombre "delta".

Pero parece (a partir de los registros y el resultado) que se está utilizando en el momento de MAP. 2 problemas surgen de esto:

Primero: Los datos de la tabla se deben ordenar por ID de cliente y marca de tiempo ANTES de usar esta función. La consulta:

 SELECT customer_id, call_time, delta(customer_id, call_time) FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time;

no funciona porque la parte de clasificación se realiza en el momento de REDUCIR, mucho después de que se esté utilizando mi función.

Puedo ordenar los datos de la tabla antes de usar la función, pero no estoy contento con esto porque es una sobrecarga que espero evitar.

Segundo: En el caso de una configuración distribuida de Hadoop, los datos se dividen entre los rastreadores de trabajos disponibles. Así que creo que habrá varias instancias de esta función, una para cada asignador, por lo que es posible tener la misma división de datos de clientes entre 2 asignadores. En este caso perderé las llamadas de los clientes, lo cual no es aceptable.

No sé cómo resolver este problema. Sé que DISTRIBUTE BY garantiza que todos los datos con un valor específico se envíen al mismo reductor (asegurando así que SORT funcione como se esperaba), ¿alguien sabe si hay algo similar para el asignador?

A continuación, planeo seguir la sugerencia de libjack para usar un script de reducción. Este "cálculo" es necesario entre otras consultas de Hive, por lo que quiero probar todo lo que Hive ofrece, antes de pasar a otra herramienta, como lo sugiere Balaswamy vaddeman.

EDIT2:

Comencé a investigar la solución de scripts personalizados. Pero, en la primera página del capítulo 14 del libro Programming Hive (este capítulo presenta los scripts personalizados), encontré el siguiente párrafo:

La transmisión por lo general es menos eficiente que codificar las UDF comparables o los objetos InputFormat. Serializar y deserializar los datos para pasarlos dentro y fuera de la tubería es relativamente ineficiente. También es más difícil depurar todo el programa de una manera unificada. Sin embargo, es útil para la creación rápida de prototipos y para aprovechar el código existente que no está escrito en Java. Para los usuarios de Hive que no desean escribir código Java, puede ser un enfoque muy efectivo.

Entonces quedó claro que los scripts personalizados no son la mejor solución en términos de eficiencia.

Pero, ¿cómo debo mantener mi función UDF, pero asegurarme de que funcione como se espera en una configuración distribuida de Hadoop? Encontré la respuesta a esta pregunta en la sección Internos de UDF de la página wiki de UDF del Manual de idiomas. Si escribo mi consulta:

 SELECT customer_id, call_time, delta(customer_id, call_time) FROM (SELECT customer_id, call_time FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t;

se ejecuta en el momento de REDUCE y las construcciones DISTRIBUTE BY y SORT BY garantizan que todos los registros del mismo cliente estén siendo procesados ​​por el mismo reductor, en orden de llamadas.

Así que el UDF anterior y esta construcción de consulta resuelven mi problema.

(Lo siento por no agregar los enlaces, pero no tengo permiso para hacerlo porque no tengo suficientes puntos de reputación)

Respuestas a la pregunta(3)

Su respuesta a la pregunta