La entrada de logstash jdbc está duplicando resultados
Estoy usando el complemento jdbc de entrada logstash para leer dos (o más) bases de datos y enviar los datos a elasticsearch, y estoy usando kibana 4 para visualizar estos datos.
Esta es mi configuración de logstash:
input {
jdbc {
type => "A"
jdbc_driver_library => "C:\DEV\elasticsearch-1.7.1\plugins\elasticsearch-jdbc-1.7.1.0\lib\jtds-1.3.1.jar"
jdbc_driver_class => "Java::net.sourceforge.jtds.jdbc.Driver"
jdbc_connection_string => "jdbc:jtds:sqlserver://dev_data_base_server:1433/dbApp1;domain=CORPDOMAIN;useNTLMv2=true"
jdbc_user => "user"
jdbc_password => "pass"
schedule => "5 * * * *"
statement => "SELECT id, date, content, status from test_table"
}
jdbc {
type => "B"
jdbc_driver_library => "C:\DEV\elasticsearch-1.7.1\plugins\elasticsearch-jdbc-1.7.1.0\lib\jtds-1.3.1.jar"
jdbc_driver_class => "Java::net.sourceforge.jtds.jdbc.Driver"
jdbc_connection_string => "jdbc:jtds:sqlserver://dev_data_base_server:1433/dbApp2;domain=CORPDOMAIN;useNTLMv2=true"
jdbc_user => "user"
jdbc_password => "pass"
schedule => "5 * * * *"
statement => "SELECT id, date, content, status from test_table"
}
}
filter {
}
output {
if [type] == "A" {
elasticsearch {
host => "localhost"
protocol => http
index => "logstash-servera-%{+YYYY.MM.dd}"
}
}
if [type] == "B" {
elasticsearch {
host => "localhost"
protocol => http
index => "logstash-serverb-%{+YYYY.MM.dd}"
}
}
stdout { codec => rubydebug }
}
El problema es que cada vez que ejecuta logstash, comienza a guardar todos los datos que ya están en la búsqueda elástica.
Después de ejecutar con la cláusula where = date> '2015-09-10' detuve el logstash y ejecuté nuevamente (con --debug) con el 'parámetro especial': sql_last_date. Después del inicio de logstash Comienza a mostrar esto en el registro:
←[36mExecuting JDBC query {:statement=>"SELECT \n\tSUBSTRING(R.RECEBEDOR, 1, 2)
AS 'DDD',\nCASE WHEN R.STATUS <> 'RCON' AND R.COD_RESPOSTA in (428,429,230,425,
430,427,418,422,415,424,214,433,435,207,426) THEN 'REGRA DE NEGÓCIO' \n W
HEN R.STATUS = 'RCON' THEN 'SUCESSO'\n\t ELSE 'ERRO'\n END AS 'TIPO_MENSAGEM
',\nAP.ALIAS as 'CANAL', R.ID_RECARGA, R.VALOR, R.STATUS, R.COD_RESPOSTA, R.DESC
_RESPOSTA, R.DT_RECARGA as '@timestamp', R.ID_CLIENTE, R.ID_DEPENDENTE, R.ID_APL
ICACAO, RECEBEDOR, R.ID_OPERADORA, R.TIPO_PRODUTO \n\nFROM RECARGA R (NOLOCK)\nJ
OIN APLICACAO AP ON R.ID_APLICACAO = AP.ID_APLICACAO \nwhere R.DT_RECARGA > :sql
_last_start\nORDER BY R.DT_RECARGA ASC", :parameters=>{:sql_last_start=>2015-09-
10 18:48:00 UTC}, :level=>:debug, :file=>"/DEV/logstash-1.5.4/vendor/bundle/jr,ub
y/1.9/gems/logstash-input-jdbc-1.0.0/lib/logstash/plugin_mixins/jdbc.rb", :line=
>"107", :method=>"execute_statement"}←[0m
Esta vez corrí con la declaración 'real' que es:
SELECT
SUBSTRING(R.RECEBEDOR, 1, 2) AS 'DDD',
CASE WHEN R.STATUS <> 'RCON' AND R.COD_RESPOSTA in (428,429,230,425,430,427,418,422,415,424,214,433,435,207,426) THEN 'REGRA DE NEGÓCIO'
WHEN R.STATUS = 'RCON' THEN 'SUCESSO'
ELSE 'ERRO'
END AS 'TIPO_MENSAGEM',
AP.ALIAS as 'CANAL', R.ID_RECARGA, R.VALOR, R.STATUS, R.COD_RESPOSTA, R.DESC_RESPOSTA, R.DT_RECARGA as '@timestamp', R.ID_CLIENTE, R.ID_DEPENDENTE, R.ID_APLICACAO, RECEBEDOR, R.ID_OPERADORA
FROM RECARGA R (NOLOCK)
JOIN APLICACAO AP ON R.ID_APLICACAO = AP.ID_APLICACAO
where R.DT_RECARGA > :sql_last_start
ORDER BY R.DT_RECARGA ASC
Alguien sabe como resolverlo?
¡Gracias!