Logstash -> Elasticsearch - actualizar datos desnormalizados

Explicación del caso de uso

Tenemos una base de datos relacional con datos sobre nuestras operaciones diarias. El objetivo es permitir a los usuarios buscar datos importantes con un motor de búsqueda de texto completo. Los datos están normalizados y, por lo tanto, no están en la mejor forma para realizar consultas de texto completo, por lo que la idea era desnormalizar un subconjunto de datos y copiarlo en tiempo real a Elasticsearch, lo que nos permite crear una aplicación de búsqueda rápida y precisa. .

Ya tenemos un sistema que permiteAbastecimiento de eventos de nuestras operaciones de base de datos (inserciones, actualizaciones, eliminaciones). Los eventos solo contienen las columnas modificadas y las claves principales (en una actualización no obtenemos toda la fila). Logstash ya recibe una notificación para cada evento, por lo que esta parte ya se maneja.

Problema real

Ahora estamos llegando a nuestro problema. Dado que el plan es desnormalizar nuestros datos, tendremos que asegurarnos de que las actualizaciones de los objetos primarios se propaguen a los objetos secundarios desnormalizados en Elasticsearch. ¿Cómo podemos configurar logstash para hacer esto?

Ejemplo

Digamos que mantenemos una lista deEmployees en Elasticsearch. CadaEmployee se asigna a unCompany. Dado que los datos están desnormalizados (con el fin de una búsqueda más rápida), cadaEmployee también lleva el nombre y la dirección delCompany. Una actualización cambia el nombre de unCompany - ¿Cómo podemos configurar logstash para actualizar el nombre de la empresa en todosEmployees, asignado a laCompany?

Explicación adicional

@Darth_Vader: El problema al que nos enfrentamos es que tenemos un evento queCompany ha cambiado, pero queremos modificar documentos de tipoEmployee en Elasticsearch, porque llevan los datos sobre la empresa en sí. Su respuesta espera que obtengamos un evento para cadaEmployee, Que no es el caso.

Quizás esto lo aclare. Tenemos 3 empleados en Elasticsearch:

{type:'employee',id:'1',name:'Person 1',company.cmp_id:'1',company.name:'Company A'}
{type:'employee',id:'2',name:'Person 2',company.cmp_id:'1',company.name:'Company A'}
{type:'employee',id:'3',name:'Person 3',company.cmp_id:'2',company.name:'Company B'}

Luego ocurre una actualización en la base de datos de origen.

UPDATE company SET name = 'Company NEW' WHERE cmp_id = 1;

Tenemos un evento en logstash, donde dice algo como esto:

{type:'company',cmp_id:'1',old.name:'Company A',new.name:'Company NEW'}

Esto debería propagarse a Elasticsearch, de modo que los empleados resultantes sean:

{type:'employee',id:'1',name:'Person 1',company.cmp_id:'1',company.name:'Company NEW'}
{type:'employee',id:'2',name:'Person 2',company.cmp_id:'1',company.name:'Company NEW'}
{type:'employee',id:'3',name:'Person 3',company.cmp_id:'2',company.name:'Company B'}

Note que el campocompany.name cambiado

Respuestas a la pregunta(1)

Su respuesta a la pregunta