Как читать двоичные данные по темам Кафки в Spark - PullRequest
0 голосов
/ 26 апреля 2018

Мне нужно прочитать зашифрованное сообщение из темы Кафки.Мой текущий код, который читает строки из темы, выглядит следующим образом:

JavaPairReceiverInputDStream<String, String> pairrdd = 
            KafkaUtils.createStream(jssc, zkQuorum, group, topicmap);

Что я должен сделать, чтобы изменить этот код из очереди kafka, чтобы убедиться, что массив байтов прочитан, зашифрованные данные не повреждены

1 Ответ

0 голосов
/ 26 апреля 2018

Для чтения данных из Kafka в форме <byte[], byte[]> вы можете использовать KafkaUtils.Вот так -

Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");
kafkaParams.put("key.deserializer", ByteArrayDeserializer.class);
kafkaParams.put("value.deserializer", ByteArrayDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);

Collection<String> topics = Arrays.asList("topicA", "topicB");

JavaInputDStream<ConsumerRecord<byte[], byte[]>> pairrdd =
  KafkaUtils.createDirectStream(
    jssc,
    LocationStrategies.PreferConsistent(),
    ConsumerStrategies.<byte[], byte[]>Subscribe(topics, kafkaParams)
  );

Надеюсь, это поможет!

...