Атомный уровень MERGE REPLACE на уровне строк в BigQuery - PullRequest
0 голосов
/ 04 июля 2018

В моем случае я работаю с данными, которые можно идентифицировать по уникальному ключу в источнике, разбитому на n (недетерминированное) количество целевых записей, загруженных в таблицы BigQuery для аналитических целей.

Создание этого ETL для использования функции недавнего изменения потока в Mongo. Я хотел бы удалить все записи в BigQuery, а затем атомарно загрузить новые записи.

Изучение BigQuery DML Я вижу, что MERGE поддерживается, но возможна только WHEN MATCHED THEN DELETE или WHEN MATCHED THEN UPDATE.

Я заинтересован в , КОГДА СОПРОВОЖДАЕТСЯ ТОГО, ЧТО УДАЛИТЬ, И СЛЕДУЕТ ЗА ВСТАВКОЙ .

Как бы я внедрил такой ETL в BigQuery, оставаясь при этом атомарным или, в конечном итоге, непротиворечивым с точки зрения доступности и правильности данных.


РЕДАКТИРОВАТЬ 1: Я хотел бы привести конкретный пример для разработки.

Самая низкая степень детализации уникальности, которую я имею в этом наборе данных, составляет user_id. Строки не являются однозначно идентифицируемыми.

Пример

1

Обновленный пользовательский объект, полученный из потока изменений Монго:

user={_id: "3", name="max", registered="2018-07-05" q=["a", "b", "c"]}

2

Текущий BigQuery.user_q содержит

| user_id | q |
...
|    3    | a |
|    3    | b |
...

3.

Преобразование кода загружает измененный объект пользователя в BigQuery.user_q_incoming

| user_id | q |
|    3    | a |
|    3    | b |
|    3    | c |

4

MERGE между user_q и user_q_incoming:

  1. 2 строки в user_q, принадлежащих user_id 3, УДАЛЕНЫ
  2. 3 строки в user_q_incoming, которые принадлежат user_id 3, вставлены.
  3. Остальные данные (...) в user_q остаются без изменений.

5

BigQuery.user_q содержит

| user_id | q |
...
|    3    | a |
|    3    | b |
|    3    | c |
...

Например, пользователь может удалить вопрос из своего профиля. Оставляя оставшиеся строки равными q=["a", "c"]. Мне нужно это также перевести на мировоззрение BigQuery.

Ответы [ 2 ]

0 голосов
/ 08 июля 2018

Есть аналогичный вопрос и один обход, чтобы заставить MERGE работать (https://issuetracker.google.com/issues/35905927#comment9).

В принципе, что-то вроде следующего должно работать,

MERGE `project.dataset.user_q` T
USING (
  SELECT *, false AS is_insert FROM `project.dataset.user_q_incoming`
UNION ALL
  SELECT *, true AS is_insert FROM `project.dataset.user_q_incoming`
) S
ON T.user_id = S.user_id and NOT is_insert
WHEN MATCHED THEN
  DELETE
WHEN NOT MATCHED AND is_insert THEN
  INSERT(user_id, q) VALUES(user_id, q)

В идеале, вам нужно следующее, но оно пока не поддерживается.

MERGE `project.dataset.user_q`
USING `project.dataset.user_q_incoming`
ON FALSE
WHEN NOT MATCHED BY TARGET THEN
  INSERT(user_id, q) VALUES(user_id, q)
WHEN NOT MATCHED BY SOURCE AND user_id in (SELECT user_id FROM `project.dataset.user_q_incoming`) THEN
  DELETE
0 голосов
/ 04 июля 2018

INSERT поддерживается BigQuery DML

Оператор MERGE - это оператор DML, который может объединять операции INSERT, UPDATE и DELETE в один оператор и выполнять операции атомарно.

например

MERGE dataset.Inventory T
USING dataset.NewArrivals S
ON FALSE
WHEN NOT MATCHED AND product LIKE '%washer%' THEN
  INSERT (product, quantity) VALUES(product, quantity)
WHEN NOT MATCHED BY SOURCE AND product LIKE '%washer%' THEN
  DELETE   

Итак, вы должны хорошо идти со своим ETL

РЕДАКТИРОВАТЬ на основе более подробной информации, добавленной к вопросу

Хорошо, я вижу - я думаю, что в этом случае MERGE не будет применяться, поскольку INSERT может быть сделано ТОЛЬКО для предложения NOT MATCH. Кто-то может придумать, как обмануть MERGE для работы в этом случае, но тем временем приведенное ниже решение делает то, чего вы хотите достичь - я так думаю: o)

CREATE OR REPLACE TABLE `project.dataset.user_q` (user_id INT64, q STRING) AS
SELECT * FROM `project.dataset.user_q`
WHERE NOT user_id IN (SELECT DISTINCT user_id FROM `project.dataset.user_q_incoming`)
UNION ALL
SELECT * FROM `project.dataset.user_q_incoming`
WHERE user_id IN (SELECT DISTINCT user_id FROM `project.dataset.user_q`)
...