Spark Structured Streaming редис-приемник выполнять не желательно - PullRequest
1 голос
/ 13 января 2020

Я использовал сообщения со структурой потоковой передачи с использованием искры и сохранял данные в Redis. Расширяя ForeachWriter [org. apache .spark. sql .Row], я использовал приемник redis для сохранения данных. Код работает хорошо, но в redis в секунду можно сохранить чуть более 100 данных. Есть ли лучший способ ускорить процедуру? В то время как код, подобный приведенному ниже, будет подключаться и отключаться к серверу redis в каждом пакете mico, есть ли способ просто подключиться один раз и сохранить подключения, чтобы минимизировать стоимость подключения, которая, как я полагаю, является основной причиной затрат времени? Я пытался транслировать джедаев, но ни джедай, ни джедиспул не могут быть сериализуемы, поэтому он не работал.

Мой код ниже:

class StreamDataSink extends ForeachWriter[org.apache.spark.sql.Row]{

  var jedis:Jedis = _

  override def open(partitionId:Long,version:Long):Boolean={
    if(null == jedis){
      jedis = FPCRedisUtils.getPool.getResource
    }
    true
  }

  override def process(record: Row): Unit = {

    if(0 == record(3)){
      jedis.select(Constants.REDIS_DATABASE_INDEX)
      if(jedis.exists("counter")){
        jedis.incr("counter")
      }else{
        jedis.set("counter",1.toString)
      }
    }
  }

  override def close(errorOrNull: Throwable): Unit = {
    if(null != jedis){
      jedis.close()
      jedis.disconnect()
    }
  }

Любые предложения будут оценены.

1 Ответ

0 голосов
/ 13 января 2020

Не делай jedis.disconnect(). Это фактически закроет сокет, вызывая новое соединение в следующий раз. Используйте только jedis.close(), это вернет соединение с пулом.

Когда вы вызываете INCR для несуществующего ключа, он создается автоматически, по умолчанию устанавливается на ноль, а затем увеличивается, в результате чего новый ключ с значение 1.

Это упрощает ваше if-else до простого jedis.incr("counter").

. При этом вы получаете:

jedis.select(Constants.REDIS_DATABASE_INDEX)
jedis.incr("counter")

Просмотрите, действительно ли вам нужен SELECT . Это для каждого соединения, и все соединения по умолчанию имеют значение DB 0. Если все рабочие нагрузки, совместно использующие один и тот же пул jedis, используют DB 0. Не нужно вызывать select.

Если вам нужны и select, и incr, тогда конвейер их:

Pipeline pipelined = jedis.pipelined()
pipelined.select(Constants.REDIS_DATABASE_INDEX)
pipelined.incr("counter")
pipelined.sync()

Это отправит две команды в одном сетевом сообщении, что повысит вашу производительность.

...