Как использовать Redis в Spark Streaming? - PullRequest
0 голосов
/ 20 сентября 2018

Я использую потоковую трансляцию в java. Я настроил sparkconfig obj как SparkConf sparkConf = new SparkConf().setAppName("MyApp").setMaster("local[2]") .set("spark.streaming.stopGracefullyOnShutdown","true") .set("redis.host", "localhost") .set("redis.port", "6379"); и передал config obj в JavastreamingContext.

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(1000));

как я могу получить доступ к redis с помощью объекта jssc.заранее спасибо.

1 Ответ

0 голосов
/ 25 сентября 2018

Это создаст поток из списка Redis

  SparkConf sparkConf = new SparkConf().setAppName("MyApp").setMaster("local[2]")
            .set("spark.streaming.stopGracefullyOnShutdown", "true")
            .set("redis.host", "localhost")
            .set("redis.port", "6379");


    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(1000));

    RedisConfig redisConfig = new RedisConfig(new RedisEndpoint(sparkConf));

    RedisStreamingContext redisStreamingContext = new RedisStreamingContext(jssc.ssc());
    String[] keys = new String[]{"myList"};
    RedisInputDStream<Tuple2<String, String>> redisStream =
            redisStreamingContext.createRedisStream(keys, StorageLevel.MEMORY_ONLY(), redisConfig);

    redisStream.print();

    jssc.start();
    jssc.awaitTermination();

Нажмите некоторые данные в списке:

LPUSH "myList" "aaaa"
LPUSH "myList" "bbbb"
...