У меня два потока исходят из двух тем orders
и fsource
. В основном заказы имеют статус c и обновляются редко, в то время как fsource
обновляется со скоростью 1000 в секунду. Здесь я использовал KTable-KTabke
join, потому что у них одинаковый ключ.
PObject:
private String orderId;{1,2,3,4,5,6}
private Double price;
private Long oTNum;//{1,2,3,4,5,6}
FSource:
private String orderId;{1,2,3,4,5,6}
private Double adPrice;
private Long fTNum;//{1,2,3,4,5.........} Sequence number for for each event
EnOrder:
private String orderId;
private Double price;
private Double adPrice;
private Long oTNum;
private Long fTNum;
private Long eTNum;//{1,2,3,4,5.........}
public class EnStreamApp implements PTMS{
private static final Logger logger = Logger.getLogger(EnStreamApp.class);
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app1-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, URL_KAFKA_BROKERS);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, new JSONSerdeComp<>().getClass());
props.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\temp");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10); //commit as fast as possible
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), 30000);
StreamsBuilder builder = new StreamsBuilder();
KTable<String, POrder> pOrderTable = builder.<String, POrder>table("orders"); // Static KTable
KTable<String, FSource> fTable = builder.<String, FSource>table("fsource"); // Events 1000 per seconds
KTable<String, EnOrder> enrichedTable = pOrderTable.join(fTable, new ValueJoiner<POrder, FSource, EnOrder>() {
@Override
public EnOrder apply(POrder order, FSource fSource) {
EnOrder enOrder = EnOrder.builder()
.orderId(order.getOrderId())
.price(order.getPrice())
.oTNum(order.getOTNum())
.adPrice(fSource!=null ? fSource.getAdPrice():null)
.fTNum(fSource!=null ? fSource.getFTNum():0)
.eTNum(AtomicSequenceGenerator.INSTANCE.getNext()) // This should be in-sync with events from fSource fTNum
.build();
logger.info(String.format("Enriched:{OrderId=%s, oTNum=%s, fTNum=%s, eTNum=%s}", enOrder.getOrderId(), enOrder.getOTNum(), enOrder.getFTNum(), enOrder.getETNum()));
return enOrder;
}
});
enrichedTable.toStream().to("enriched", Produced.with(Serdes.String(), new JSONSerdeComp<>()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> streams.close()));
}
}
2020-06-16 14:00:56,577 INFO com.amicngh.mr.streams.AaggregatorStream [my-app1-aggregation-application-251f1c80-dfcf-433a-a361-5f0fc3cf887e-StreamThread-1]: Enriched:{OrderId=3, oTNum=3, fTNum=232, eTNum=232}
2020-06-16 14:00:56,578 INFO com.amicngh.mr.streams.AaggregatorStream [my-app1-aggregation-application-251f1c80-dfcf-433a-a361-5f0fc3cf887e-StreamThread-1]: Enriched:{OrderId=4, oTNum=4, fTNum=233, eTNum=233}
2020-06-16 14:00:56,578 INFO com.amicngh.mr.streams.AaggregatorStream [my-app1-aggregation-application-251f1c80-dfcf-433a-a361-5f0fc3cf887e-StreamThread-1]: Enriched:{OrderId=5, oTNum=5, fTNum=234, eTNum=234}
2020-06-16 14:18:54,979 WARN org.apache.kafka.streams.kstream.internals.KTableSource [my-app1-aggregation-application-82e14b19-9ae3-4ce7-9c1b-a041ccbe70ed-StreamThread-1]: Detected out-of-order KTable update for fsource-STATE-STORE-0000000003 at offset 9560, partition 0.
2020-06-16 14:26:50,799 INFO com.amicngh.mr.streams.AaggregatorStream [my-app1-aggregation-application-82e14b19-9ae3-4ce7-9c1b-a041ccbe70ed-StreamThread-1]: Enriched:{OrderId=5, oTNum=5, fTNum=9564, eTNum=15742}
2020-06-16 14:26:50,799 WARN org.apache.kafka.streams.kstream.internals.KTableSource [my-app1-aggregation-application-82e14b19-9ae3-4ce7-9c1b-a041ccbe70ed-StreamThread-1]: Detected out-of-order KTable update for fsource-STATE-STORE-0000000003 at offset 9562, partition 0.
2020-06-16 14:26:50,799 INFO com.amicngh.mr.streams.AaggregatorStream [my-app1-aggregation-application-82e14b19-9ae3-4ce7-9c1b-a041ccbe70ed-StreamThread-1]: Enriched:{OrderId=6, oTNum=6, fTNum=9565, eTNum=15743}
2020-06-16 14:26:50,799 INFO com.amicngh.mr.streams.AaggregatorStream [my-app1-aggregation-application-82e14b19-9ae3-4ce7-9c1b-a041ccbe70ed-StreamThread-1]: Enriched:{OrderId=1, oTNum=1, fTNum=9566, eTNum=15744}
2020-06-16 14:26:50,799 INFO com.amicngh.mr.streams.AaggregatorStream [my-app1-aggregation-application-82e14b19-9ae3-4ce7-9c1b-a041ccbe70ed-StreamThread-1]: Enriched:{OrderId=2, oTNum=2, fTNum=9567, eTNum=15745}
2020-06-16 14:26:50,799 INFO com.amicngh.mr.streams.AaggregatorStream [my-app1-aggregation-application-82e14b19-9ae3-4ce7-9c1b-a041ccbe70ed-StreamThread-1]: Enriched:{OrderId=3, oTNum=3, fTNum=9568, eTNum=15746}
2020-06-16 14:26:50,799 INFO com.amicngh.mr.streams.AaggregatorStream [my-app1-aggregation-application-82e14b19-9ae3-4ce7-9c1b-a041ccbe70ed-StreamThread-1]: Enriched:{OrderId=4, oTNum=4, fTNum=9569, eTNum=15747}
Слияние потока иногда выглядит нормально, но когда в течение этого периода я вижу повторяющиеся события, обрабатываемые функцией join
. В идеале одно событие fsource
должно приводить к одному событию в «объединенном потоке», тогда почему объединение обрабатывает больше событий, чем получено.
Это выглядит правильно
2020-06-16 14:00:56,578 INFO com.amicngh.mr.streams.AaggregatorStream [my-app1-aggregation-application-251f1c80-dfcf-433a-a361-5f0fc3cf887e-StreamThread-1]: Enriched:{OrderId=5, oTNum=5, fTNum=234, eTNum=234}
Это выглядит НЕПРАВИЛЬНО
2020-06-16 14:18:54,979 WARN org.apache.kafka.streams.kstream.internals.KTableSource [my-app1-aggregation-application-82e14b19-9ae3-4ce7-9c1b-a041ccbe70ed-StreamThread-1]: Detected out-of-order KTable update for fsource-STATE-STORE-0000000003 at offset 9560, partition 0.
2020-06-16 14:26:50,799 INFO com.amicngh.mr.streams.AaggregatorStream [my-app1-aggregation-application-82e14b19-9ae3-4ce7-9c1b-a041ccbe70ed-StreamThread-1]: Enriched:{OrderId=5, oTNum=5, fTNum=9564, eTNum=15742}
2020-06-16 14:26:50,799 WARN org.apache.kafka.streams.kstream.internals.KTableSource [my-app1-aggregation-application-82e14b19-9ae3-4ce7-9c1b-a041ccbe70ed-StreamThread-1]: Detected out-of-order KTable update for fsource-STATE-STORE-0000000003 at offset 9562, partition 0.
2020-06-16 14:26:50,799 INFO com.amicngh.mr.streams.AaggregatorStream [my-app1-aggregation-application-82e14b19-9ae3-4ce7-9c1b-a041ccbe70ed-StreamThread-1]: Enriched:{OrderId=6, oTNum=6, fTNum=9565, eTNum=15743}
Есть идеи, почему объединение не работает должным образом? эти предупреждения вызывают эту проблему? Как я могу это исправить?
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.4.1</version>
</dependency>
Обновление:
Есть идеи, почему join
обрабатывает повторяющиеся события (см. Ниже, где я вижу несколько значений eTNum
для того же fTNum
)
2020-06-17 17:59:05,033 INFO com.cs.pt.mr.streams.AaggregatorStream [my-app1-aggregation-application-47c109a1-d4ad-4d11-a833-9e24d8b99f6b-StreamThread-1]: Enriched:{OrderId=5, oTNum=5, fTNum=6989, eTNum=120749}
2020-06-17 17:59:19,194 INFO com.cs.pt.mr.streams.AaggregatorStream [my-app1-aggregation-application-47c109a1-d4ad-4d11-a833-9e24d8b99f6b-StreamThread-1]: Enriched:{OrderId=5, oTNum=5, fTNum=6989, eTNum=139709}
2020-06-17 17:59:33,438 INFO com.cs.pt.mr.streams.AaggregatorStream [my-app1-aggregation-application-47c109a1-d4ad-4d11-a833-9e24d8b99f6b-StreamThread-1]: Enriched:{OrderId=5, oTNum=5, fTNum=6989, eTNum=158669}