Confluent 4.1.0 -> KSQL: junção STREAM-TABLE -> dados da tabela null

PASSO 1: Execute o produtor para criar dados de amostra

./bin/kafka-avro-console-producer \
         --broker-list localhost:9092 --topic stream-test-topic \
         --property schema.registry.url=http://localhost:8081 \
         --property value.schema='{"type":"record","name":"dealRecord","fields":[{"name":"DEAL_ID","type":"string"},{"name":"DEAL_EXPENSE_CODE","type":"string"},{"name":"DEAL_BRANCH","type":"string"}]}'

Dados de amostra :

{"DEAL_ID":"deal002", "DEAL_EXPENSE_CODE":"EXP002", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal003", "DEAL_EXPENSE_CODE":"EXP003", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal004", "DEAL_EXPENSE_CODE":"EXP004", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal005", "DEAL_EXPENSE_CODE":"EXP005", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal006", "DEAL_EXPENSE_CODE":"EXP006", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal007", "DEAL_EXPENSE_CODE":"EXP001", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal008", "DEAL_EXPENSE_CODE":"EXP002", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal009", "DEAL_EXPENSE_CODE":"EXP003", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal010", "DEAL_EXPENSE_CODE":"EXP004", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal011", "DEAL_EXPENSE_CODE":"EXP005", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal012", "DEAL_EXPENSE_CODE":"EXP006", "DEAL_BRANCH":"AMSTERDAM"}

PASSO 2: Abra outro terminal e execute o consumidor para testar os dados.

./bin/kafka-avro-console-consumer --topic stream-test-topic \
         --bootstrap-server localhost:9092 \
         --property schema.registry.url=http://localhost:8081 \
         --from-beginning

PASSO 3: Abra outro terminal e execute o produtor.

./bin/kafka-avro-console-producer \
         --broker-list localhost:9092 --topic expense-test-topic \
--property "parse.key=true" \
--property "key.separator=:" \
--property schema.registry.url=http://localhost:8081 \
--property key.schema='"string"' \
         --property value.schema='{"type":"record","name":"dealRecord","fields":[{"name":"EXPENSE_CODE","type":"string"},{"name":"EXPENSE_DESC","type":"string"}]}'

Dados:

"pk1":{"EXPENSE_CODE":"EXP001", "EXPENSE_DESC":"Regulatory Deposit"}
"pk2":{"EXPENSE_CODE":"EXP002", "EXPENSE_DESC":"ABC - Sofia"}
"pk3":{"EXPENSE_CODE":"EXP003", "EXPENSE_DESC":"Apple Corporation"}
"pk4":{"EXPENSE_CODE":"EXP004", "EXPENSE_DESC":"Confluent Europe"}
"pk5":{"EXPENSE_CODE":"EXP005", "EXPENSE_DESC":"Air India"}
"pk6":{"EXPENSE_CODE":"EXP006", "EXPENSE_DESC":"KLM International"}

PASSO 4: Abra outro terminal e execute o consumidor

./bin/kafka-avro-console-consumer --topic expense-test-topic \
         --bootstrap-server localhost:9092 \
--property "parse.key=true" \
--property "key.separator=:" \
--property schema.registry.url=http://localhost:8081 \
         --from-beginning

PASSO 5: Faça login no cliente KSQL.

./bin/ksql http://localhost:8088

crie o seguinte fluxo e tabela e execute a consulta de junção.

KSQL:

CORRENTE:

    CREATE STREAM SAMPLE_STREAM 
       (DEAL_ID VARCHAR, DEAL_EXPENSE_CODE varchar, DEAL_BRANCH VARCHAR) 
       WITH (kafka_topic='stream-test-topic',value_format='AVRO', key = 'DEAL_ID');

MESA:

CREATE TABLE SAMPLE_TABLE 
   (EXPENSE_CODE varchar, EXPENSE_DESC VARCHAR)
   WITH (kafka_topic='expense-test-topic',value_format='AVRO', key = 'EXPENSE_CODE');

A seguir está a SAÍDA:

ksql> SELECT STREAM1.DEAL_EXPENSE_CODE, TABLE1.EXPENSE_DESC 
       from SAMPLE_STREAM STREAM1 LEFT JOIN SAMPLE_TABLE TABLE1 
       ON STREAM1.DEAL_EXPENSE_CODE = TABLE1.EXPENSE_CODE  
       WINDOW TUMBLING (SIZE 3 MINUTE) 
       GROUP BY STREAM1.DEAL_EXPENSE_CODE, TABLE1.EXPENSE_DESC;

EXP001 | null
EXP001 | null
EXP002 | null
EXP003 | null
EXP004 | null
EXP005 | null
EXP006 | null
EXP002 | null
EXP002 | null

questionAnswers(1)

yourAnswerToTheQuestion