KGroupedTable.count () возвращает отрицательные значения?
idAndJobTransaction
.filter((k,v) -> v!=null)
.mapValues(jobTransaction -> {
jobTransaction.setCount(0);
jobTransaction.setId(0L);
jobTransaction.setRunsheet_id(0L);
jobTransaction.setTimestamp(0L);
if(jobTransaction.getDelete_flag() == 1)
return null;
else
return jobTransaction;
} )
.groupBy((id,jobTransaction)->new KeyValue<>(jobTransaction,jobTransaction),Serialized.with(jobTransactionSerde,jobTransactionSerde))
.count()
.toStream()
.mapValues((k,v)-> new JobSummary(k,v))
.peek((k,v)->{
log.info(k.toString());
log.info(v.toString());
}).selectKey((k,v)-> v.getCompany_id()) // So that the count is consumed in order for each company
.to(JOB_SUMMARY,Produced.with(Serdes.Long(),jobSummarySerde));
Метод подсчета иногда возвращает отрицательные значения.Около 1% процентов значений являются отрицательными.Как это возможно?
РЕДАКТИРОВАТЬ 1:
Я помещаю результаты этой агрегации в таблицу Postgres.Отрицательные значения не ограничены -1, но доходят до очень высоких значений.
Я использую 2 потребителя.Имеет ли это какое-то значение?
Может ли это быть проблемой с потоками Кафки?или я должен искать другие возможные причины?
![enter image description here](https://i.stack.imgur.com/MkLYG.png)
РЕДАКТИРОВАТЬ 3: мне удалось захватить некоторые из доступных журналов, и я действительно увидел отрицательные значенияв кратчайшие сроки:
![enter image description here](https://i.stack.imgur.com/QroJc.png)
Что касается класса JobSummary, то это действительно очень простой класс POJO.Вот конструктор, вызываемый в приложении KStream.
public JobSummary(JobTransaction j, Long count){
this.setUser_id(j.getUser_id());
this.setHub_id(j.getHub_id());
this.setCity_id(j.getCity_id());
this.setCompany_id(j.getCompany_id());
this.setJob_master_id(j.getJob_master_id());
this.setJob_status_id(j.getJob_status_id());
this.setCount(count);
this.setDate(j.getDate());
}