Я хочу предвосхитить это тем, что BigQuery не является транзакционной базой данных, и, следовательно, пытаться работать согласованно после того, как факт будет очень трудным. В этом случае я бы предложил использовать Cloud Spanner или Cloud SQL (см. Эту статью Quora * , чтобы узнать разницу между ними). Например, в Cloud Spanner это очень просто. Существует понятие транзакций , в котором вы можете идеально синхронизировать денормализованную таблицу с другими таблицами в любой момент времени.
С другой стороны, если вы согласны с тем, что ваша денормализованная таблица потенциально не синхронизирована с другими таблицами, тогда есть более простые решения.
В этом случае я предполагаю, что переключение на другое предложение хранилища запрещено с точки зрения затрат и что таблицы могут быть потенциально не синхронизированы. Облачный поток данных является отличным продуктом, если вам требуется анализировать данные в пакетном или потоковом режиме, но API становится неудобным для использования в случаях, подобных вашему (обработка на основе событий). Ваше первое решение кажется лучшим, если вы хотите продолжать использовать Dataflow, но я бы действительно предложил использовать что-то вроде Cloud Functions .
Настройка будет выглядеть так:
- Pub / Sub записывает в поток данных
- Поток данных записывает обновленные строки в BigQuery
- Поток данных записывает сообщение Pub / Sub, содержащее дельту (например, вставьте строку X в порядок, обновите строку Y в элементе order_).
- Облачная функция, которая запускает подписку Pub / Sub с заданной логикой для чтения правильных строк из нормализованных таблиц и последующей записи в денормализованные таблицы.
Ваша облачная функция может выглядеть примерно так (написано на Javascript), вдохновленной здесь и здесь . :
// Import the Google Cloud client library
const {BigQuery} = require('@google-cloud/bigquery');
const bigquery = new BigQuery();
const dataset = bigquery.dataset('my-dataset');
const orders = dataset.table('orders');
const order_items = dataset.table('order_items');
const users = dataset.table('users');
const denorm = dataset.table('denormalized');
function GetOrder(order_id) {
let [order] = await orders.row(order_id);
return order;
}
function GetOrderItem(...) { ... }
function GetUser(...) { ... }
/**
* HTTP Cloud Function.
*
* @param {Object} req Cloud Function request context.
* @param {Object} res Cloud Function response context.
*/
exports.get = (req, res) => {
const method = req.params.method;
const table = req.params.table;
let query = '';
if (method === 'insert' && table === 'order') {
let order = GetOrder(req.params.order_id);
let order_item = GetOrderItem(order.id);
let user = GetUser(order.user_id);
denorm.insert({
ORDER: order.my_data,
ORDER_ITEM: order_item.my_data,
USER: user.my_data
});
} else if ( ... ) { ... }
}
Как вставить
Как запросить