В вашем случае вы должны передать динамодаб в красное смещение
DynamoDB --> DynamoDBStreams --> Lambda Function --> Kinesis Firehose --> Redshift.
Во-первых, вам нужна лямбда-функция для обработки DynamoDBStream.Для каждого события DynamoDBStream используйте firehose PutRecord
API для отправки данных в firehose.Из примера
var firehose = new AWS.Firehose();
firehose.putRecord({
DeliveryStreamName: 'STRING_VALUE', /* required */
Record: { /* required */
Data: new Buffer('...') || 'STRING_VALUE' /* Strings will be Base-64 encoded on your behalf */ /* required */
}
}, function(err, data) {
if (err) console.log(err, err.stack); // an error occurred
else console.log(data); // successful response
});
Далее мы должны знать, как данные вставляются в RedShift.Из документа пожарный шланг ,
Для доставки данных в Amazon Redshift Kinesis Firehose сначала доставляет входящие данные в корзину S3 в формате, описанном ранее.Затем Kinesis Firehose выдает команду Amazon Redshift COPY для загрузки данных из вашей корзины S3 в кластер Amazon Redshift.
Итак, мы должны знать, какой формат данных позволяет команде COPY
отображать данныев схему RedShift.Мы должны следовать требованию формата данных для команды COPY redshift .
По умолчанию команда COPY ожидает, что исходные данные будут разделены символом UTF-8.По умолчанию в качестве разделителя используется символ канала (|).
Таким образом, вы можете запрограммировать лямбда-выражение, которое вводит событие потока DynamodB, преобразовать его в запись строки, разделенную по конвейеру (|), и записать ее в firehose.
var firehose = new AWS.Firehose();
firehose.putRecord({
DeliveryStreamName: 'YOUR_FIREHOSE_NAME',
Record: { /* required */
Data: "RED_SHIFT_COLUMN_1_DATA|RED_SHIFT_COLUMN_2_DATA\n"
}
}, function(err, data) {
if (err) console.log(err, err.stack); // an error occurred
else console.log(data); // successful response
});
не забудьте добавить \n
, так как пожарный шланг не добавит вам новую строку.