Processando a ordenação total de eventos por chave usando o Apache Beam

Contexto do Problema

Eu estou tentando gerar uma ordem total (linear) de itens de evento por chave de um fluxo em tempo real em que o pedido é o tempo do evento (derivado da carga útil do evento).

Aproximação

Eu tentei implementar isso usando o streaming da seguinte maneira:

1) Configure janelas sequenciais não sobrepostas, por exemplo duração 5 minutos

2) Estabelecer um atraso permitido - é bom descartar eventos tardios

3) Defina o modo de acumulação para reter todos os painéis acionados

4) Use o gatilho "AfterwaterMark"

5) Ao manipular um painel acionado, considere o painel apenas se for o final

6) Use GroupBy.perKey para garantir que todos os eventos nesta janela para essa chave sejam processados como uma unidade em um único recurso

Embora essa abordagem garanta uma ordem linear para cada chave em uma determinada janela, ela não garante essa garantia em várias janelas, por exemplo, pode haver uma janela de eventos para a chave que ocorre depois que ela está sendo processada ao mesmo tempo que a janela anterior, isso pode acontecer facilmente se a primeira janela falhar e precisar ser repetida.

Estou pensando em adaptar essa abordagem em que o fluxo em tempo real pode ser processado primeiro para que ele particione os eventos por chave e os grave em arquivos nomeados pelo intervalo de janelas. Devido à natureza paralela do processamento do feixe, esses arquivos também serão gerados fora de ordem. Um único coordenador de processo pode enviar esses arquivos seqüencialmente para um pipeline em lote - somente enviando o próximo quando receber o arquivo anterior e se o processamento posterior tiver sido concluído com êxito.

O problema é que o Apache Beam acionará um painel apenas se houvesse pelo menos um elemento de tempo nessa janela de tempo. Portanto, se houver falhas nos eventos, poderá haver falhas nos arquivos gerados - ou seja, arquivos ausentes. O problema de ter arquivos ausentes é que o processador de lote de coordenação não pode fazer a distinção entre saber se a janela de tempo passou sem dados ou se houve uma falha; nesse caso, ela não pode continuar até que o arquivo finalmente chegue.

Uma maneira de forçar o acionamento das janelas de eventos pode ser adicionar de alguma forma eventos simulados ao fluxo para cada partição e janela de tempo. No entanto, isso é complicado de fazer ... se houver grandes lacunas na sequência de tempo, se esses eventos fictícios ocorrerem cercados por eventos muito mais tarde, eles serão descartados como atrasados.

Existem outras abordagens para garantir que haja um gatilho para cada janela de evento possível, mesmo que isso resulte na saída de arquivos vazios?

A geração de um pedido total por chave de um fluxo em tempo real é um problema tratável com o Apache Beam? Existe outra abordagem que eu deveria considerar?

questionAnswers(1)

yourAnswerToTheQuestion