Apache Spark (V. 2.1) потоковое Java - PullRequest
0 голосов
/ 03 июня 2018

В приведенном ниже коде используются исходные кафки, стоковые hdfs, с использованием потоковой передачи искры.Любой способ переместить commitAsync в конец?

SparkSession spark = new SparkSession.Builder().appName(consumer_group).getOrCreate();
JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.milliseconds(batchDurationLong));

stream =  KafkaUtils.createDirectStream(jssc, LocationStrategies.PreferConsistent(),
                ConsumerStrategies.<String, Object>Subscribe(topics, kafkaParams));

stream.transform(rddT  ->{
    OffsetRange[] offsetRanges = ((HasOffsetRanges) rddT.rdd()).offsetRanges();
   ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
    return rddT;
     }).map(
            rdd -> {
              return rdd.value().toString();
         }).foreachRDD(invrdd ->
                {
                   if (!invrdd.isEmpty()) {
                     SparkSession sparkWorker = JavaSparkSessionSingleton.getInstance(invrdd.context().getConf());
                     Dataset<Row> invDataset = sparkWorker.read().option("mode", "PERMISSIVE").json(invrdd);
                     invDataset.write().mode(SaveMode.Append).parquet(storageDir + dateFormat.format(new Date()));
}
});
...