Я создал функцию облачной платформы Google, которая слушает тему публикации / подраздела и вставляет данные в BigQuery.
Мне удалось получить некоторый код, почти работающий.
Почти: инструкция вставки не сообщает об ошибке, но строка, вставленная в BigQuery, имеет все столбцы с нулем.
Вот код Облачной функции, работающей на NodeJs 6, 128 Мб памяти, запущенной Pub / Sub
Я перепробовал все комбинации следующих двух переменных, с двумя разными сообщениями об ошибках, для параметра игнорирования установлено значение false (см. Внизу поста)
'ignoreUnknownValues':true, 'raw':false
package.json
{
"name": "sample-pubsub",
"version": "0.0.1",
"dependencies": {
"@google-cloud/bigquery": "^1.3.0"
}
}
Функция Body
/**
* Triggered from a message on a Cloud Pub/Sub topic.
*
* @param {!Object} event Event payload and metadata.
* @param {!Function} callback Callback function to signal completion.
*/
exports.helloPubSub = (event, callback) => {
const pubsubMessage = event.data;
console.log(Buffer.from(pubsubMessage.data, 'base64').toString());
const BigQuery = require('@google-cloud/bigquery');
const bigquery = new BigQuery();
bigquery
.dataset("init_data")
.table ("tronc_queteur")
.insert ([pubsubMessage], {'ignoreUnknownValues':true, 'raw':false})
.then ((data) => {
console.log(`Inserted 1 rows`);
console.log(data);
})
.catch(err => {
if (err && err.name === 'PartialFailureError') {
if (err.errors && err.errors.length > 0) {
console.log('Insert errors:');
err.errors.forEach(err => console.error(err));
}
} else {
console.error('ERROR:', err);
}
});
callback();
};
Данные, передаваемые в функцию, следующие (как видно из 1-го console.log () функции)
** ДАННЫЕ **
{
"id":9999,
"queteur_id":552,
"point_quete_id":49,
"tronc_id":281,
"depart_theorique":"2018-06-17 08:09:33",
"depart":"2018-06-17 08:09:33",
"retour":"2018-06-17 10:26:20",
"comptage":"2018-11-08 21:23:02",
"last_update":"2018-11-08 21:23:02",
"last_update_user_id":1,
"euro500":0,
"euro200":0,
"euro100":0,
"euro50":0,
"euro20":1,
"euro10":3,
"euro5":1,
"euro2":0,
"euro1":37,
"cents50":12,
"cents20":0,
"cents10":0,
"cents5":0,
"cents2":0,
"cent1":93,
"don_cheque":0,
"don_creditcard":0,
"foreign_coins":null,
"foreign_banknote":null,
"notes_depart_theorique":null,
"notes_retour":null,
"notes_retour_comptage_pieces":null,
"notes_update":null,
"deleted":false,
"coins_money_bag_id":"2018-PIECE-059",
"bills_money_bag_id":"2018-BILLET-013",
"don_cb_sans_contact_amount":0,
"don_cb_sans_contact_number":0,
"don_cb_total_number":0,
"don_cheque_number":0
}
А вот схема таблицы, которую я использовал для создания таблицы и загрузки данных в BigQuery:
** Определение таблицы BigQuery **
[
{"name": "id","type":"INTEGER"},
{"name": "queteur_id","type":"INTEGER"},
{"name": "point_quete_id","type":"INTEGER"},
{"name": "tronc_id","type":"INTEGER"},
{"name": "depart_theorique","type":"STRING"},
{"name": "depart","type":"STRING"},
{"name": "retour","type":"STRING"},
{"name": "comptage","type":"STRING"},
{"name": "last_update","type":"STRING"},
{"name": "last_update_user_id","type":"INTEGER"},
{"name": "euro500","type":"INTEGER"},
{"name": "euro200","type":"INTEGER"},
{"name": "euro100","type":"INTEGER"},
{"name": "euro50","type":"INTEGER"},
{"name": "euro20","type":"INTEGER"},
{"name": "euro10","type":"INTEGER"},
{"name": "euro5","type":"INTEGER"},
{"name": "euro2","type":"INTEGER"},
{"name": "euro1","type":"INTEGER"},
{"name": "cents50","type":"INTEGER"},
{"name": "cents20","type":"INTEGER"},
{"name": "cents10","type":"INTEGER"},
{"name": "cents5","type":"INTEGER"},
{"name": "cents2","type":"INTEGER"},
{"name": "cent1","type":"INTEGER"},
{"name": "foreign_coins","type":"INTEGER"},
{"name": "foreign_banknote","type":"INTEGER"},
{"name": "notes_depart_theorique","type":"STRING"},
{"name": "notes_retour","type":"STRING"},
{"name": "notes_retour_comptage_pieces","type":"STRING"},
{"name": "notes_update","type":"STRING"},
{"name": "deleted","type":"INTEGER"},
{"name": "don_creditcard","type":"FLOAT"},
{"name": "don_cheque","type":"FLOAT"},
{"name": "coins_money_bag_id","type":"STRING"},
{"name": "bills_money_bag_id","type":"STRING"},
{"name": "don_cb_sans_contact_amount","type":"FLOAT"},
{"name": "don_cb_sans_contact_number","type":"INTEGER"},
{"name": "don_cb_total_number","type":"INTEGER"},
{"name": "don_cheque_number","type":"INTEGER"}
]
со значением ignoreUnknownValues: false, необработанным: false
severity: "ERROR"
textPayload: "{ errors: [ { message: 'no such field.', reason: 'invalid' } ],
row:
{ '@type': 'type.googleapis.com/google.pubsub.v1.PubsubMessage',
attributes: { location: 'Detroit' },
data: 'eyJpZCI6OT...'
данные в кодировке base64:
{"id":9999,"queteur_id":552,"point_quete_id":49,"tronc_id":281,"depart_theorique":"2018-06-17 08:09:33","depart":"2018-06-17 08:09:33","retour":"2018-06-17 10:26:20","comptage":"2018-11-08 22:18:59","last_update":"2018-11-08 22:18:59","last_update_user_id":1,"euro500":0,"euro200":0,"euro100":0,"euro50":0,"euro20":1,"euro10":3,"euro5":1,"euro2":0,"euro1":37,"cents50":12,"cents20":0,"cents10":0,"cents5":0,"cents2":0,"cent1":93,"don_cheque":0,"don_creditcard":0,"foreign_coins":null,"foreign_banknote":null,"notes_depart_theorique":null,"notes_retour":null,"notes_retour_comptage_pieces":null,"notes_update":null,"deleted":false,"coins_money_bag_id":"2018-PIECE-059","bills_money_bag_id":"2018-BILLET-013","don_cb_sans_contact_amount":0,"don_cb_sans_contact_number":0,"don_cb_total_number":0,"don_cheque_number":0}
с «ignoreUnknownValues»: false, «raw»: true
Сообщение: ''
textPayload: "{ errors: [ { message: '', reason: 'invalid' } ],
row:
{ '@type': 'type.googleapis.com/google.pubsub.v1.PubsubMessage',
attributes: { location: 'Detroit' },
data: 'eyJpZCI6O...'
данные точно такой же полезной нагрузки (тот же экран base64, что и выше)
** на стороне Bigquery **
следующий запрос возвращает число строк, непрерывно увеличивающееся, так как я тестирую только нулевые значения в каждом столбце
выбор:
select *
from `init_data.tronc_queteur` as tq
where tq.id is null
результат выглядит следующим образом:
Row id queteur_id point_quete_id tronc_id depart_theorique depart retour comptage last_update last_update_user_id euro500 euro200 euro100 euro50 euro20 euro10 euro5 euro2 euro1 cents50 cents20 cents10 cents5 cents2 cent1 foreign_coins foreign_banknote notes_depart_theorique notes_retour notes_retour_comptage_pieces notes_update deleted don_creditcard don_cheque coins_money_bag_id bills_money_bag_id don_cb_sans_contact_amount don_cb_sans_contact_number don_cb_total_number don_cheque_number
1 null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null
2 null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null
3 null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null