Зажечь кеш пуст после сохранения? - PullRequest
0 голосов
/ 13 мая 2019

Мой конвейер данных выглядит следующим образом: Kafka => выполнить некоторые вычисления => загрузить полученные пары в Ignite cache => распечатать его

 SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("MainApplication");
 JavaSparkContext sc = new JavaSparkContext(conf);
 JavaStreamingContext streamingContext = new JavaStreamingContext(sc, Durations.seconds(10));
 JavaIgniteContext<String, Float> igniteContext = new JavaIgniteContext<>(sc, PATH, false);

 JavaDStream<Message> dStream = KafkaUtils.createDirectStream(
         streamingContext,
         LocationStrategies.PreferConsistent(),
         ConsumerStrategies.<String, Message>
                 Subscribe(Collections.singletonList(TOPIC), kafkaParams)
 )
         .map(ConsumerRecord::value);

 JavaPairDStream<String, Message> pairDStream =
         dStream.mapToPair(message -> new Tuple2<>(message.getName(), message));

 JavaPairDStream<String, Float> pairs = pairDStream
         .combineByKey(new CreateCombiner(), new MergeValue(), new MergeCombiners(), new HashPartitioner(10))
         .mapToPair(new ToPairTransformer());

 JavaIgniteRDD<String, Float> myCache = igniteContext.fromCache(new CacheConfiguration<>());

  // I know that we put something here:
  pairDStream.foreachRDD((VoidFunction<JavaPairRDD<String, Float>>) myCache::savePairs);

  // But I can't see anything here:
  myCache.foreach(tuple2 -> System.out.println("In cache: " + tuple2._1() + " = " + tuple2._2()));

  streamingContext.start();
  streamingContext.awaitTermination();
  streamingContext.stop();
  sc.stop();

Но этот код ничего не печатает .. Почему?

Почему Ignite cache пусто даже после savePairs?

Что здесь может быть не так?

Заранее спасибо!

1 Ответ

2 голосов
/ 14 мая 2019

Для меня это выглядит так, что pairDStream.foreachRDD(...) - это ленивая операция, которая никак не влияет, по крайней мере, до того, как вы начнете потоковый контекст streamingContext.start(). С другой стороны, myCache.foreach(...) - жадная операция, и вы выполняете ее на фактически пустом кеше. Итак, попробуйте поставить myCache.foreach(...) после запуска потокового контекста. Или даже после прекращения.

...