Я пытаюсь распространять сообщения kafka по моей теме, которая состоит из четырех разделов, но у моего потребителя используется только один поток.
Производитель Я проверяю, что его запись выполняется в другие разделы, но на стороне потребителя я получаютолько на одном потоке потока.
Мой код производителя.Здесь у меня есть код производителя и класс случайных разделителей.Producer отправляет в kafka значение ключа, мой ключ - это тип метода http, а значение - это сопоставленный с JSON объект модели pojo.
Producer<String, MyPojo> kafkaProducer = null;
/* My Single instance
final Producer<String, MyPojo> producer = producerInstance.createProducer();
/* here operations are GET/PUT/POST/DELETE
public void sendDataProducer(String operation, MyPojo message) throws Exception {
try {
final ProducerRecord<String, MyPojo> record = new ProducerRecord<String, MyPojo>("usertopic", operation,message);
producer.send(record, new ProducerCallback());
producer.flush();
} finally {
//producer.close();
}
}
public Producer<String, MyPojo> createProducer() {
if (kafkaProducer == null) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "my boot starpservers");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "clientnoveluser");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1500))
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
(new GenericSerializer<Domain>()).getClass().getName());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyRandom.class);
kafkaProducer = new KafkaProducer<String, myPojo>(props);
}
return kafkaProducer;
public class MyRandom implements Partitioner {
private Logger logger = LoggerFactory.getLogger(MyRandomPartitioner.class);
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
logger.info(" Partition of Topic :" + numPartitions);
Random randomGenerator = new Random();
int randomInt = randomGenerator.nextInt(4) + 1;
logger.info(" selected Partition of Topic :" + randomInt);
return randomInt;
}
@Override
public void close() {
}
}
My Stream Consumer: здесь я настраиваюсь как задача с 4 потоками, так что думаю, что каждый мой раздел разделен на один поток, так как задача потока поддерживает поток, и мне не нужно обслуживать разных потребителей.
// stream Consumer Handler
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> userStream = builder.stream("usertopic",Consumed.with(Serdes.String(), Serdes.String()));
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-userstream");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "ALL my bootstrap servers);
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "500");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
//consumer_timeout_ms
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 2000);
props.put("state.dir","/tmp/kakfak/stat));
userStream.peek((key,value)->System.out.println("key :"+key+" value :"+value));
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props);
kafkaStreams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
logger.error("Thread Name :" + t.getName() + " Error while processing:", e);
}
});
kafkaStreams.cleanUp();
kafkaStreams.start();
У меня 4 раздела в теме пользователя.
ниже - журналы моего производителя.
2019-07-08T00:47:02,480 INFO [http-nio-8185-exec-1] c.j.b.s.MyRandom : Partition of Topics :4
2019-07-08T00:47:02,481 INFO [http-nio-8185-exec-1] c.j.b.s.MyRandom : selected Partition of Topic :1
2019-07-08T00:47:02,480 INFO [http-nio-8185-exec-1] c.j.b.s.MyRandom : Partition of Topic :4
2019-07-08T00:47:02,481 INFO [http-nio-8185-exec-1] c.j.b.s.MyRandom : selected Partition of Topic :3
но мой потребитель всегда получает только в единственном потоке
2019-07-08T00:47:02,492 INFO [streams-userstream-ccfe874d-04ee-4f60-8f0e-7c9c380df712-StreamThread-4] c.j.m.s.ConsumerFactory: MyConmerHandler key POST value {"user:testuser"}
2019-07-08T00:47:02,492 INFO [streams-userstream-ccfe874d-04ee-4f60-8f0e-7c9c380df712-StreamThread-4] c.j.m.s.ConsumerFactory: MyConmerHandler key PUT value {"user:testuser"}