Вычисление различий между последовательными записями в Hadoop с Hive Queries

У меня есть таблица Hive, которая содержит данные о клиентских звонках. Для простоты рассмотрим, что он имеет 2 столбца, первый столбец содержит идентификатор клиента, а второй столбец содержит временную метку вызова (временная метка unix).

Я могу запросить эту таблицу, чтобы найти все звонки для каждого клиента:

SELECT * FROM mytable SORT BY customer_id, call_time;

Результат:

Customer1    timestamp11
Customer1    timestamp12
Customer1    timestamp13
Customer2    timestamp21
Customer3    timestamp31
Customer3    timestamp32
...

Можно ли создать запрос Hive, который возвращает для каждого клиента, начиная со второго вызова, интервал времени между двумя последовательными вызовами? Для приведенного выше примера этот запрос должен вернуть:

Customer1    timestamp12-timestamp11
Customer1    timestamp13-timestamp12
Customer3    timestamp32-timestamp31
...

Я пытался адаптировать решения отрешение sql, но я'Я застрял с ограничениями Hive:он принимает подзапросы только в ОТ а такжеобъединения должны содержать только равенства.

Спасибо.

EDIT1:

Я попытался использовать функцию Hive UDF:

public class DeltaComputerUDF extends UDF {
private String previousCustomerId;
private long previousCallTime;

public String evaluate(String customerId, LongWritable callTime) {
    long callTimeValue = callTime.get();
    String timeDifference = null;

    if (customerId.equals(previousCustomerId)) {
        timeDifference = new Long(callTimeValue - previousCallTime).toString();
    }

    previousCustomerId = customerId;
    previousCallTime = callTimeValue;

    return timeDifference;
}}

и использовать его с именемдельта».

Но кажется (из журналов и результатов), что он используется во время MAP. 2 проблемы возникают из-за этого:

Первый: Данные таблицы должны быть отсортированы по идентификатору клиента и отметке времени ДО использования этой функции. Запрос:

 SELECT customer_id, call_time, delta(customer_id, call_time) FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time;

не работает, потому что сортировка выполняется во время REDUCE, долгое время после использования моей функции.

Я могу отсортировать данные таблицы перед использованием функции, но яЯ не доволен этим, потому что это накладные расходы, которые я надеюсь избежать.

Во-вторых: В случае распределенной конфигурации Hadoop данные распределяются между доступными трекерами заданий. Поэтому я полагаю, что будет несколько экземпляров этой функции, по одному для каждого сопоставителя, так что можно иметь одинаковые данные о клиентах, разделенные между двумя сопоставителями. В этом случае я потеряю звонки клиентов, что недопустимо.

Я нене знаю, как решить эту проблему. Я знаю, что DISTRIBUTE BY гарантирует, что все данные с определенным значением отправляются одному и тому же редуктору (таким образом гарантируя, что SORT работает должным образом), кто-нибудь знает, есть ли что-то подобное для маппера?

Далее я планирую следовать за libjack 'Предложение использовать редукционный скрипт. Это "вычисление» Он необходим между некоторыми другими запросами улья, поэтому я хочу попробовать все, что предлагает Hive, прежде чем переходить к другому инструменту, как предлагает Balaswamy vaddeman.

EDIT2:

Я начал исследовать решение пользовательских скриптов. Но на первой странице главы 14 в книге «Программирование кустов» (в этой главе представлены пользовательские сценарии) я обнаружил следующий абзац:

Потоковая передача обычно менее эффективна, чем кодирование сопоставимых UDF или объектов InputFormat. Сериализация и десериализация данных для их передачи в канал и из него относительно неэффективна. Также сложнее отладить всю программу единым образом. Тем не менее, это полезно для быстрого создания прототипов и для использования существующего кода, который не написан на Java. Для пользователей Hive, которые неЯ не хочу писать код Java, это может быть очень эффективным подходом.

Таким образом, было ясно, что пользовательские сценарии не являются лучшим решением с точки зрения эффективности.

Но как сохранить функцию UDF, но убедиться, что она работает должным образом в распределенной конфигурации Hadoop? Я нашел ответ на этот вопрос в разделе «UDF Internals» на странице «Руководство по языку UDF». Если я напишу свой запрос:

 SELECT customer_id, call_time, delta(customer_id, call_time) FROM (SELECT customer_id, call_time FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t;

он выполняется во время REDUCE, а конструкции DISTRIBUTE BY и SORT BY гарантируют, что все записи от одного и того же клиента обрабатываются одним и тем же редуктором в порядке вызовов.

Таким образом, вышеупомянутая UDF и эта конструкция запроса решают мою проблему.

(Извините, что не добавляю ссылки, но яМне не разрешено делать это, потому что я нерепутации не хватает)

Ответы на вопрос(3)

Ваш ответ на вопрос