поток kafka не получает данные о потоке разделов - PullRequest
0 голосов
/ 07 июля 2019

Я пытаюсь распространять сообщения kafka по моей теме, которая состоит из четырех разделов, но у моего потребителя используется только один поток.

Производитель Я проверяю, что его запись выполняется в другие разделы, но на стороне потребителя я получаютолько на одном потоке потока.

Мой код производителя.Здесь у меня есть код производителя и класс случайных разделителей.Producer отправляет в kafka значение ключа, мой ключ - это тип метода http, а значение - это сопоставленный с JSON объект модели pojo.

Producer<String, MyPojo> kafkaProducer = null;

/* My Single instance 
final Producer<String, MyPojo> producer =  producerInstance.createProducer();

/* here operations are GET/PUT/POST/DELETE
        public void sendDataProducer(String operation, MyPojo message) throws Exception {
        try {
            final ProducerRecord<String, MyPojo> record = new ProducerRecord<String, MyPojo>("usertopic", operation,message);
            producer.send(record, new ProducerCallback());
            producer.flush();
        } finally {

            //producer.close();

        }
        }

public Producer<String, MyPojo> createProducer() {
        if (kafkaProducer == null) {
                    Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "my boot starpservers");
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "clientnoveluser");

            props.put(ProducerConfig.ACKS_CONFIG, "all");
           props.put(ProducerConfig.RETRIES_CONFIG, 3);
            props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1500))
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                    (new GenericSerializer<Domain>()).getClass().getName());
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyRandom.class); 
            kafkaProducer = new KafkaProducer<String, myPojo>(props);
        }
        return kafkaProducer;



public class MyRandom implements Partitioner {
    private Logger logger = LoggerFactory.getLogger(MyRandomPartitioner.class);

    @Override
    public void configure(Map<String, ?> configs) {
    }

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        logger.info(" Partition of Topic :" + numPartitions);
            Random randomGenerator = new Random();
            int randomInt = randomGenerator.nextInt(4) + 1;
            logger.info(" selected Partition of Topic :" + randomInt);
            return  randomInt;

    }

    @Override
    public void close() {
    }

}

My Stream Consumer: здесь я настраиваюсь как задача с 4 потоками, так что думаю, что каждый мой раздел разделен на один поток, так как задача потока поддерживает поток, и мне не нужно обслуживать разных потребителей.

// stream Consumer Handler 

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> userStream = builder.stream("usertopic",Consumed.with(Serdes.String(), Serdes.String()));
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-userstream");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "ALL my bootstrap servers);
        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "500");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        //consumer_timeout_ms
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 2000);

        props.put("state.dir","/tmp/kakfak/stat));

   userStream.peek((key,value)->System.out.println("key :"+key+" value :"+value));

KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props);


kafkaStreams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
            @Override
            public void uncaughtException(Thread t, Throwable e) {

                logger.error("Thread Name :" + t.getName() + " Error while processing:", e);
            }
        });


        kafkaStreams.cleanUp();
        kafkaStreams.start();

У меня 4 раздела в теме пользователя.

ниже - журналы моего производителя.

2019-07-08T00:47:02,480 INFO  [http-nio-8185-exec-1] c.j.b.s.MyRandom :  Partition of Topics :4
2019-07-08T00:47:02,481 INFO  [http-nio-8185-exec-1] c.j.b.s.MyRandom :  selected Partition of Topic :1


2019-07-08T00:47:02,480 INFO  [http-nio-8185-exec-1] c.j.b.s.MyRandom :  Partition of Topic :4
2019-07-08T00:47:02,481 INFO  [http-nio-8185-exec-1] c.j.b.s.MyRandom :  selected Partition of Topic :3

но мой потребитель всегда получает только в единственном потоке

2019-07-08T00:47:02,492 INFO  [streams-userstream-ccfe874d-04ee-4f60-8f0e-7c9c380df712-StreamThread-4] c.j.m.s.ConsumerFactory: MyConmerHandler  key POST value {"user:testuser"}

2019-07-08T00:47:02,492 INFO  [streams-userstream-ccfe874d-04ee-4f60-8f0e-7c9c380df712-StreamThread-4] c.j.m.s.ConsumerFactory: MyConmerHandler  key PUT value {"user:testuser"}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...