jmeter kafka: ошибка выброса потребителя как [ClassCastException: [Ljava.lang.String; не может быть преобразован в java.util.List] - PullRequest
0 голосов
/ 03 мая 2018

Я пытаюсь читать сообщения Кафки, используя потребителя Кафки в jmeter, используя сэмплер jsr223. я не могу понять ошибку

[Ответное сообщение: javax.script.ScriptException: javax.script.ScriptException: java.lang.ClassCastException: [Ljava.lang.String; не может быть преобразовано в java.util.List]

Пожалуйста, помогите мне решить эту проблему, чтобы я мог подписываться и получать сообщения с помощью потребителя kafka.

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;


Properties props = new Properties();
String groupID = "REQUEST_RESPONSE_JOB_GROUP";
String clientID =  "REQUEST_RESPONSE_JOB_CLIENT";
String BSID = "kafka:9092";
String topic = "PROC_REST_EVENTS";
props.put("bootstrap.servers", BSID);
props.put("group.id", groupID);
props.put("client.id", clientID);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("partition.assignment.strategy","org.apache.kafka.clients.consumer.RangeAssignor");

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
//Kafka Consumer subscribes list of topics here.
consumer.subscribe(Arrays.asList(topic));
//print the topic name
System.out.println("Subscribed to topic " + topic);

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
        // print the offset,key and value for the consumer records.
        System.out.printf("offset = %d, key = %s, value = %s\n", 
    record.offset(), record.key(), record.value());
    return records;
}

1 Ответ

0 голосов
/ 04 мая 2018

Скорее всего, вы получаете Список из темы Kafka, в то время как ваш потребитель ожидает String , вам нужно изменить конфигурацию потребителя, чтобы она соответствовала типам, полученным из этой темы.

Попробуйте следующий код Groovy, который отправляет 3 сообщения в тему test (если он не существует, вам нужно создать его ) и читает их после этого.

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.LongDeserializer
import org.apache.kafka.common.serialization.LongSerializer
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer

def BOOTSTRAP_SERVERS = 'localhost:9092'
def TOPIC = 'test'
Properties kafkaProps = new Properties()
kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS)
kafkaProps.put(ProducerConfig.CLIENT_ID_CONFIG, 'KafkaExampleProducer')
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName())
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())
kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS)
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, 'KafkaExampleConsumer')
kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName())
kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName())
def producer = new KafkaProducer<>(kafkaProps)
def consumer = new KafkaConsumer<>(kafkaProps)
1.upto(3) {
    def record = new ProducerRecord<>(TOPIC, it as long, 'Hello from JMeter ' + it)
    producer.send(record)
    log.info('Sent record(key=' + record.key() + 'value=' + record.value() + ')')
}

consumer.subscribe(Collections.singletonList(TOPIC))
final int giveUp = 100
int noRecordsCount = 0
while (true) {
    def consumerRecords = consumer.poll(1000)
    if (consumerRecords.count() == 0) {
        noRecordsCount++
        if (noRecordsCount > giveUp) break
        else continue
    }
    consumerRecords.each { record ->
        log.info('Received Record:(' + record.key() + ', ' + record.value() + ')')
    }
    consumer.commitAsync()

}
consumer.close()

Вы должны увидеть вывод как:

JMeter Kafka Produce And Consume Message

После этого вы сможете использовать приведенный выше код в качестве основы для собственного тестирования потребления сообщений Kafka. См. Статью Apache Kafka - Как загрузить тест с помощью JMeter для получения дополнительной информации о нагрузочном тестировании Kafka с помощью JMeter.

...