Как я могу выполнить денормализацию данных на GCP? - PullRequest
0 голосов
/ 24 июня 2019

Я пытаюсь денормализовать и объединить три таблицы с именами order, order_item и user в Bigquery. У меня уже есть конвейер, который используется для обновления этих таблиц с использованием pubsub и dataflow. Теперь я хочу создать конвейер, который обновляет новую денормализованную таблицу всякий раз, когда происходит обновление или вставка в любую из трех упомянутых таблиц. Эти обновления для order и order_item должны быть синхронизированы, и пользователь должен быть присоединен только тогда, когда вставка происходит по заказу (чтобы показать статус пользователя при создании нового заказа).

Пока я придумал два решения.

  • Один из них - фиксировать изменения в каждой таблице, когда сообщение из pubsub читается через поток данных, а затем обогащается последними соответствующими записями из денормализованной таблицы. Наконец, старая запись заменяется новой.
  • Другой запрос запросов к таблицам order и order_item для получения обновленных или вновь вставленных строк, а затем присоединение к ним с помощью bigquery sql, следовательно, добавление результатов в денормализованную таблицу. Эта работа выполняется регулярно с помощью Airflow.

заказ

id (первичный ключ) last_updated_at создан в user_id (внешний ключ)

=====================

Order_Item

id (первичный ключ) last_updated_at создан в order_id (внешний ключ)

=====================

Пользователь

id (первичный ключ) last_updated_at created_at

Я не очень хорошо знаком с потоком данных, и я не смог найти ни одного учебного пособия или примера, показывающего, как я могу выполнить работу с его использованием (хотя есть примеры кодов, показывающих, как можно выполнить операцию ETL, решения не существует к проблеме синхронизации). Есть ли примеры, которые я могу рассмотреть, и какая альтернатива кажется более эффективной?

1 Ответ

0 голосов
/ 11 июля 2019

Я хочу предвосхитить это тем, что BigQuery не является транзакционной базой данных, и, следовательно, пытаться работать согласованно после того, как факт будет очень трудным. В этом случае я бы предложил использовать Cloud Spanner или Cloud SQL (см. Эту статью Quora * , чтобы узнать разницу между ними). Например, в Cloud Spanner это очень просто. Существует понятие транзакций , в котором вы можете идеально синхронизировать денормализованную таблицу с другими таблицами в любой момент времени.

С другой стороны, если вы согласны с тем, что ваша денормализованная таблица потенциально не синхронизирована с другими таблицами, тогда есть более простые решения.

В этом случае я предполагаю, что переключение на другое предложение хранилища запрещено с точки зрения затрат и что таблицы могут быть потенциально не синхронизированы. Облачный поток данных является отличным продуктом, если вам требуется анализировать данные в пакетном или потоковом режиме, но API становится неудобным для использования в случаях, подобных вашему (обработка на основе событий). Ваше первое решение кажется лучшим, если вы хотите продолжать использовать Dataflow, но я бы действительно предложил использовать что-то вроде Cloud Functions .

Настройка будет выглядеть так:

  1. Pub / Sub записывает в поток данных
  2. Поток данных записывает обновленные строки в BigQuery
  3. Поток данных записывает сообщение Pub / Sub, содержащее дельту (например, вставьте строку X в порядок, обновите строку Y в элементе order_).
  4. Облачная функция, которая запускает подписку Pub / Sub с заданной логикой для чтения правильных строк из нормализованных таблиц и последующей записи в денормализованные таблицы.

Ваша облачная функция может выглядеть примерно так (написано на Javascript), вдохновленной здесь и здесь . :

// Import the Google Cloud client library
const {BigQuery} = require('@google-cloud/bigquery');
const bigquery = new BigQuery();
const dataset = bigquery.dataset('my-dataset');
const orders = dataset.table('orders');
const order_items = dataset.table('order_items');
const users = dataset.table('users');
const denorm = dataset.table('denormalized');

function GetOrder(order_id) {
  let [order] = await orders.row(order_id);
  return order;
}

function GetOrderItem(...) { ... }
function GetUser(...) { ... }

/**
 * HTTP Cloud Function.
 *
 * @param {Object} req Cloud Function request context.
 * @param {Object} res Cloud Function response context.
 */
exports.get = (req, res) => {
  const method = req.params.method;
  const table = req.params.table;

  let query = '';
  if (method === 'insert' && table === 'order') {
    let order = GetOrder(req.params.order_id);
    let order_item = GetOrderItem(order.id);
    let user = GetUser(order.user_id);
    denorm.insert({
      ORDER: order.my_data,
      ORDER_ITEM: order_item.my_data,
      USER: user.my_data
    });
  } else if ( ... ) { ... }
}

Как вставить Как запросить

...