Kafka consumer.poll возвращается пустым - PullRequest
0 голосов
/ 06 мая 2018

Код производителя, который будет читать видеофайл .mp4 с диска и отправлять его на kafka, который, очевидно, работает, так как печатает "Message sent to the Kafka Topic java_in_use_topic Successfully", но consumer.poll пусто:

@RestController
@RequestMapping(value = "/javainuse-kafka/")
public class ApacheKafkaWebController {
@GetMapping(value = "/producer")
public String producer(@RequestParam("message") String message) {
    Map<String, Object> props = new HashMap<>();
    // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
    props.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
            "localhost:9092");
    props.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
    Producer<String, byte[]> producer = new KafkaProducer<>(props);
    Path path = Paths.get("C:/kafka-picture-consumer/SampleVideo_1280x720_1mb.mp4");
    ProducerRecord<String, byte[]> record = null;
    try {
        record = new ProducerRecord<>("topiccc", "keyyyyy", Files.readAllBytes(path));
    } catch (IOException e) {
        e.printStackTrace();
    }
    producer.send(record);
    producer.close();
    return "Message sent to the Kafka Topic java_in_use_topic Successfully";
}

Код потребителя, который будет использоваться в servlet:

public class ConsumerService {
    public byte[] consumer(){
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("topiccc"));
        ConsumerRecords<String, byte[]> records = consumer.poll(100);
       System.out.println("ISSSSSSSSSSSSSSSSSSSSSSSSSSSSS EMPTYYYYYYYYYY:"+String.valueOf(records.isEmpty()));
        return records.iterator().next().value();
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...