У меня есть файл siddhi для чтения из kafka topi c и записи в базу данных.
@store(type='rdbms' , datasource='WSO2_CLUSTER',table.name='sample_table')
define table sample_table(col1 string, col2 string, corr_id string);
from kafkaInputMetricCorr
select col1, col2, corr_id
update or insert into sample_table
on sample_table.corr_id == corr_id;
Когда я использую эту функцию «обновить или вставить», она обновляет все поля (col1, col2 , corr_id), хотя поле не доступно в событии Кафки. Когда сообщение kafka имеет "col1 и corr_id", я хочу, чтобы обновлялись только col1 и corr_id. Что происходит сейчас, это обновление col2, чтобы быть нулевым. Я даже попытался изменить значение на
select *
, надеясь, что будут обновлены только поля, доступные в событии Kafka. Но это не работает. Пожалуйста, помогите в этом. Спасибо.