Как отправить данные из Lambda (Python) в Redshift через Kinesis - PullRequest
0 голосов
/ 02 июля 2019

У меня есть лямбда-функция в Python, которая делает некоторые вещи и возвращает некоторые значения, которые нужно вставить в Redshift. В лямбде я помещаю значения в Kinesis, которые копируют их в S3, а затем в Redshift.

Значения в лямбде получаются в виде строки:

final_string = 'a;b;d;c'

Каждая буква - это значение для другого столбца в таблице в Redshift, поэтому в качестве разделителя используется ';'. Затем я отправляю данные в Kinesis Stream с помощью:

put_response = kinesis_client.put_record(StreamName = 'PixelTrack',
                                            Data=json.dumps(final_string),
                                           PartitionKey='first')

Затем поток кинезиса подает поток пожарного шланга Kinesis. Файл, который генерируется в S3 с Kinesis Firehose, похож (включая кавычки в файле):

"a;b;c;d;c" 

И, наконец, я копирую данные в красное смещение с помощью следующего оператора (настроенного в пожарном шланге Kinesis):

copy table
from blabla
BLANKSASNULL 
DELIMITER ';' 
EMPTYASNULL 
NULL AS 'null' 
ESCAPE 
FILLRECORD;

Мне удалось заставить его работать и получать значения в Redshift, когда в Kinesis буферизуется только один результат (хотя создается новый столбец в Redshift). Таким образом, когда во время буфера была выполнена только одна лямбда, таблица Redshift выглядит так:

  A        B         C         D     no_info_column
  "a       b         c         d"        <null>

Проблема возникает, когда я выполняю лямбду несколько раз, потому что я получаю файл на S3 со следующим текстом:

"a,b,c,d" "a1,b1,c1,d1"

И я получаю в Redshift ошибку Extra column(s) found, потому что оператор копирования не может найти разделение строк.

Я безуспешно пробовал следующие вещи:

  • Возвращение строки в лямбде
  • Поиск, как установить разделитель строк в копии ( SO вопрос )
  • Преобразование списка в json вместо строки. Тогда у меня возникли проблемы с открытием списка скобок
  • Использование REMOVEQUOTES в операторе копирования

Мой первоначальный вопрос собирался быть следующим: «Как скопировать из s3 в красное смещение с разными строками, разделенными двойными кавычками», но, возможно, проблема в моем первом подходе или что-то в этом роде, поэтому я решил сделать вопрос немного немного шире.

Так, как я мог решить это?

1 Ответ

1 голос
/ 03 июля 2019

Если вы хотите отправить потоковые данные в Amazon Redshift, вы можете использовать Amazon Kinesis Data Firehose.Он имеет встроенную буферизацию данных, основанную на размере (МБ) или времени (секундах) для пакетной записи в Amazon Redshift.

Вы правы, что не идеально выполнять небольшие операции INSERT в Redshift.Гораздо лучше загружать данные оптом.Таким образом, если вам нужно загружать данные непрерывно, Kinesis Data Firehose обеспечивает наилучшее сочетание производительности.

Вы упомянули, что «поток kinesis передает поток Kinesis Firehose».Не стесняйтесь писать напрямую из функции AWS Lambda в Kinesis Data Firehose.

...