Улучшение производительности Apache Spark для Redis - PullRequest
0 голосов
/ 06 декабря 2018

У меня есть приложение, которое записывает данные ключа, значения в Redis с помощью Apache Spark.Приложение работает без каких-либо проблем.Но приложение работает намного медленнее.Я ищу некоторые предложения по улучшению пропускной способности записи и увеличению параллелизма при записи данных в Redis.

Здесь приведен код

Dataset<Row> rowkeyMapping = services.select(regexp_replace(col("rowkey"), "_", "").as("rowkey"),struct(regexp_replace(col("name"), "\\[", ","), regexp_replace(col("oname"), "\\[", ","), col("cid")).as("detailsinfo"));

rowkeyMapping.foreach(obj -> {
    JedisPoolConfig poolConfig = new JedisPoolConfig();
    poolConfig.setMaxTotal(5000);
    JedisPool pool = new JedisPool(poolConfig, "redis-host", Integer.parseInt("6379"));
    Jedis jedis = pool.getResource();
    ObjectMapper om = new ObjectMapper();
    String[] rowArray = obj.mkString()
        .replaceAll("[\\[]", ",")
        .split(",");
    String key = rowArray[0];
    DetailInfo detail = new DetailInfo();
    detail.setName(rowArray[1]);
    detail.setOName(rowArray[2]);
    detail.setCid(rowArray[3]);

    String value = om.writeValueAsString(detail);
    logger.info("writing key value pairs to Redis cache (Key) :: " + key);
    jedis.set(key, value);
    jedis.quit();
});

Я плохо понимаю RedisPipelining.Но я думаю, что Pipelining - это скорее пакетирование команд.Здесь, в моем случае, я имею дело с миллионами данных.Я не уверен, подходит ли конвейерная обработка.

Любая помощь приветствуется. Спасибо заранее.

Ответы [ 2 ]

0 голосов
/ 07 декабря 2018

Как отметил @Amir Kost в своем ответе, ваша проблема в том, что вы создаете новое соединение, когда устанавливаете одну пару ключ-значение.Чтобы повысить производительность, вы должны повторно использовать соединение для пакета пар ключ-значение.

Как вы упомянули в своем комментарии, вы должны создать соединение в исполнителе.Таким образом, чтобы повторно использовать соединение, вам нужно использовать foreachPartition метод Dataset<Row> вместо foreach.foreachPartition запускает данную функцию ForeachPartitionFunction<T> для всего раздела.Таким образом, вы можете создать соединение и повторно использовать его для всех элементов раздела.Проверьте подробности doc .

Кроме того, с помощью foreachPartition вы можете получить пакет элементов в разделе, а затем использовать Redis Pipline для повышения производительности.Проверьте конвейер документ для деталей

0 голосов
/ 06 декабря 2018

Я не эксперт ни по Spark, ни по Redis, но я думаю, что следующие строки должны быть вне цикла foreach:

JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(5000);
JedisPool pool = new JedisPool(poolConfig, "redis-host", Integer.parseInt("6379"));
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...