Код производителя, который будет читать видеофайл .mp4
с диска и отправлять его на kafka, который, очевидно, работает, так как печатает "Message sent to the Kafka Topic java_in_use_topic Successfully"
, но consumer.poll
пусто:
@RestController
@RequestMapping(value = "/javainuse-kafka/")
public class ApacheKafkaWebController {
@GetMapping(value = "/producer")
public String producer(@RequestParam("message") String message) {
Map<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections to the Kakfa cluster
props.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
props.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
Producer<String, byte[]> producer = new KafkaProducer<>(props);
Path path = Paths.get("C:/kafka-picture-consumer/SampleVideo_1280x720_1mb.mp4");
ProducerRecord<String, byte[]> record = null;
try {
record = new ProducerRecord<>("topiccc", "keyyyyy", Files.readAllBytes(path));
} catch (IOException e) {
e.printStackTrace();
}
producer.send(record);
producer.close();
return "Message sent to the Kafka Topic java_in_use_topic Successfully";
}
Код потребителя, который будет использоваться в servlet
:
public class ConsumerService {
public byte[] consumer(){
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("topiccc"));
ConsumerRecords<String, byte[]> records = consumer.poll(100);
System.out.println("ISSSSSSSSSSSSSSSSSSSSSSSSSSSSS EMPTYYYYYYYYYY:"+String.valueOf(records.isEmpty()));
return records.iterator().next().value();
}
}