Поддерживает ли Spark Streaming Kafka 1.1.0 сейчас? - PullRequest
0 голосов
/ 04 мая 2018

Теперь версия spark - 2.3. Я видел maven центральное хранилище: https://search.maven.org/#search%7Cga%7C1%7Cg%3A%22org.apache.spark%22

показанная банка - spark-streaming-kafka-0-10_2.11

так что kafka1.1.0 сейчас не поддерживается?

я все еще должен установить kafka 0.10.x

Ответы [ 2 ]

0 голосов
/ 01 июня 2018

Я прошел тест spark2.3 с kafka1.1.0, используя банки

<dependency>
     <groupId>org.apache.spark</groupId>
     <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
     <version>${spark.version}</version>
</dependency>

хорошо работает.

пример кода:

    SparkConf conf = new SparkConf().setAppName("stream test").setMaster("local[*]");
    JavaStreamingContext streamingContext = new JavaStreamingContext(conf, Durations.seconds(2));

    Map<String, Object> kafkaParams = new HashMap<>();
    kafkaParams.put("bootstrap.servers", "master:9092");
    kafkaParams.put("key.deserializer", StringDeserializer.class);
    kafkaParams.put("value.deserializer", StringDeserializer.class);
    kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
    kafkaParams.put("enable.auto.commit", false);

    List<String> topics = Arrays.asList("A29");

    JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
            streamingContext,
            LocationStrategies.PreferConsistent(),
            ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
    );

    JavaDStream<String> lines = stream.map(ConsumerRecord::value);

    lines.print(30);

    streamingContext.start();
    streamingContext.awaitTermination();
0 голосов
/ 23 мая 2018

На основании по следующей ссылке : Вы должны использовать spark-streaming-kafka-0-10 для Кафки 0.10.0 или выше .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...