Читайте, обновляйте и сохраняйте кэшированное значение атомарно - PullRequest
0 голосов
/ 02 апреля 2019

У меня есть несколько потоков (N), которые должны обновить один и тот же кеш.Итак, предположим, что существует как минимум N потоков.Каждый поток может обрабатывать значения с одинаковыми ключами.Проблема заключается в том, что если я обновлюсь следующим образом:

1. Read old value from cache (multiple threads get the same old value)
2. Merge new value with old value (each thread update old value)
3. Save updated value back to the cache (only the last update was saved, another one is lost)

я могу потерять некоторые обновления, если несколько потоков будут пытаться обновить одну и ту же запись одновременно.На первый взгляд, существует решение сделать все обновления атомарными: например, использовать мутацию Increment в hbase или add в аэроспайке (в настоящее время я рассматриваю эти кеши для своего случая).Если значение состоит только из числовых типов примитивов, то это нормально, потому что обе реализации кэша поддерживают атомарный inc / dec.

1. Inc/dec each value (cache will resolve sequence of this ops by it's self)

Но что если значение состоит не только из примитивов?Затем я должен прочитать значение и обновить его в моем коде.В этом случае я все еще могу потерять некоторые обновления.

Как я уже писал, в настоящее время я рассматриваю hbase и aerospike, но оба они не совсем подходят для моего случая.В hbase, как я знаю, нет способа заблокировать строку со стороны клиента (> ~ 0,98), поэтому я должен использовать операцию checkAndPut для каждого сложного типа.В аэроспайке я могу добиться чего-то вроде блокировки на основе строк, используя lua udfs, но я хочу их избежать.Redis позволяет watch записывать, и если произошло обновление из другого потока, транзакция завершится неудачно, и я могу перехватить эту ошибку и повторить попытку.

Итак, мой вопрос, как добиться чего-то вроде блокировки на основе строкдля таких обновлений и будет ли блокировка на основе строки будет правильным способом?Может быть, есть другой подход?


  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("sample")
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Duration(500))

    val source = Source()
    val stream = source.stream(ssc)

    stream.foreachRDD(rdd => {
      if (!rdd.isEmpty()) {
        rdd.foreachPartition(partition => {
          if (partition.nonEmpty) {
            val cache = Cache()

            partition.foreach(entity=> {
// in this block if 2 distributed workers (in case of apache spark, for example) 
//will process entities with the same keys i can lose one of this update
// worker1 and worker2 will get the same value
               val value = cache.get(entity.key)
// both workers will update this value but may get different results
               val updatedValue = ??? // some non-trivial update depends on entity
// for example, worker1 put new value, then worker2 put new value. In this case only updates from worker2 are visible and updates from worker1 are lost
               cache.put(entity.key, updatedValue)
            })
          }
        })
      }
    })

    ssc.start()
    ssc.awaitTermination()
  }

Итак, если я использую kafka в качестве источника, я могу обойти это, если сообщения разбиты по ключам.В этом случае я могу положиться на тот факт, что только один работник будет обрабатывать конкретную запись в любой момент времени.Но как справиться с той же ситуацией, когда сообщения разбиты случайным образом (ключ находится внутри тела сообщения)?

...