Google Cloud BigTable: обновить значение столбца - PullRequest
0 голосов
/ 29 мая 2018

У меня есть следующая структура BigTable для примера:

Table1 : column_family_1 : column_1 : value

value здесь, скажем, число.Это управляется потоком данных, и я хочу обновлять значение каждый раз.

Это значение может быть суммой, и я хочу обновлять его каждый раз, когда пользователь совершает покупку (чтобы сохранить общие расходы до даты), поэтомуЯ делаю следующее в потоке данных прослушивателя событий покупки (при каждом событии покупки):

  • Сделать запрос BigTable, чтобы получить значение по id
  • Добавить сумму, потраченную в новомкупите тот, который присутствует в ответе поиска BigTable
  • Сделайте Put запрос на обновление значения

Хотя этот подход имеет некоторую задержку в сети, похоже, он работает.Сценарий, в котором это не удается, заключается в том, что, когда в потоке данных несколько рабочих, пользователь совершает более одной покупки, и события переходят к нескольким работникам, например:

  • Работник 1 получает событие 1, получает суммуи добавляет в него потраченную сумму
  • Работник 2 получает событие 2, получает старую сумму и добавляет в нее потраченную сумму
  • Оба работника делают Put запросов иони перезаписываются

Чтобы предотвратить это, я пытаюсь сделать запрос, который просто говорит в простом тексте, add 10 to the spent amount value.Это то, что мы можем сделать в потоке данных?

Ответы [ 2 ]

0 голосов
/ 06 июня 2018

Другим решением может быть следующее:

  • Чтобы иметь одну таблицу только для добавления с отдельными событиями и агрегированную таблицу с соответствующим ключом и значениями для агрегирования по.
  • Используйте AbstractCloudBigtableTableDoFn для вставки Put в таблицу только для добавления.
  • На последовательной стадии в одном и том же потоке данных события окна в течение приемлемого короткого периода (5 секунд?) Группируются поключ агрегации.
  • На следующем этапе (-ах) этого потока данных (или в этот момент он даже может быть перенаправлен в другой поток данных), выполните сканирование диапазона для сгруппированного ключа в таблице только для добавления, агрегируйтезначения и выполните вставку Put в эту агрегированную таблицу.

Таким образом:

  • Обеспечение того, чтобы события находились в Bigtable при запуске окна после AbstractCloudBigtableTableDoFn, можно получить согласованную совокупность.
  • В маловероятных обстоятельствах окончания одного окна после следующего запущенного окна (со старым представлением агрегации) метка времени запуска может использоваться для дифференциации последней версии ячеек.
  • Таблицу только для добавления можно также использовать для других типов / уровней агрегации, которые можно было бы решить после этого.
  • Обслуживание относительно простое, поскольку для него не требуется периодическая чистая работа.
0 голосов
/ 30 мая 2018

Bigtable имеет возможность Increment значений.Более подробную информацию вы можете найти в документации protobuf .

Идемпотентность играет важную роль в понимании счетчиков в Bigtable.В Bigtable Put обычно являются идемпотентными, что означает, что вы можете запускать их несколько раз и всегда получать один и тот же результат (a=2 будет давать один и тот же результат независимо от того, сколько раз вы его запускаете).Increment s не являются идемпотентами, поскольку их многократный запуск приведет к разным результатам (a++, a++ имеет результат, отличный от a++, a++, a++).

TransientСбои могут или не может выполнить Increment.Со стороны клиента никогда не ясно, будет ли Increment успешным во время этих временных ошибок.

Эта функция Increment сложна для построения в потоке данных из-за этой идемпотентности.У потока данных есть понятие «связки», которое представляет собой набор действий, которые действуют как единица работы.Эти пакеты повторяются для временных сбоев (вы можете узнать больше о повторных попытках сбоя потока данных здесь ).Поток данных обрабатывает этот «пакет» как единое целое, ошибка Cloud Bigtable должна обрабатывать каждый отдельный элемент «пакета» как отдельную транзакцию, поскольку Cloud Bigtable не поддерживает многострочные транзакции.

Учитывая несоответствие вОжидаемое поведение «комплектов», Cloud Bigtable не позволит вам запускать Increment s через поток данных.

Опции, которые вы имеете, заслуживают большего количества документации, чем я могу предоставить здесь, но я могу предоставить некоторые варианты навысокий уровень:

  1. Всегда используйте Put для любого нового события, которое вы найдете, и суммируйте значения в чтениях.Вы также можете написать другое задание, которое выполняет периодическую очистку строк, создав «транзакцию», которая удаляет все текущие значения и записывает новую ячейку с суммой

  2. Использование ОблакоФункции , которые прослушивают события Pub / Sub и выполняют Increment s.Вот пример Cloud Bigtable с использованием облачных функций .Вы также можете выполнить Get, выполнить сложение и CheckAndMutate с помощью алгоритма, который вы описали в своем посте (я лично выбрал бы CheckAndMutate для согласованности, если бы я выбрал этот вариант).

  3. Используйте AbstractCloudBigtableTableDoFn, чтобы написать свой DoFn, который выполняет Increment с или CheckAndMutate, но с пониманием, что это может вызвать проблемы с целостностью данных.

Если система достаточно велика, вариант № 1 является наиболее надежным вариантом, но за счет сложности системы.Если вам не нужна такая сложность, вариант №2 - это ваша следующая лучшая ставка (хотя я бы выбрал CheckAndMutate).Если вам не нужна целостность данных и вам нужна высокая пропускная способность (например, «подсчет страниц» или другая телеметрия, где допустимо ошибаться небольшую часть времени), тогда вариант № 3 будет вашим лучшим выбором.

...