не могу скопировать JSON - PullRequest
       48

не могу скопировать JSON

0 голосов
/ 29 августа 2018

Ниже приведен вариант использования, над которым я работаю: Я настроил enable Streams при создании DynamoDB с new and old Image. Я создал Kinesis Firehose delivery stream с пунктом назначения как Redshift (Intermediate s3).

Из Dynamodb мой поток достигает Firhose, а оттуда в Bucket, как JSON (S3 Bucket -Gzip), приведенный ниже. Моя проблема в том, что я cannot COPY this JSON to redshift.

Вещи, которые я не могу получить:

    1. Не уверен, каким должен быть оператор Создать таблицу в Redshift
    1. Каким должен быть синтаксис COPY в Kinesis firhose.
    1. Как мне использовать JsonPaths здесь. Пожарная служба Kinesis Data настроена на возврат только json в мое ведро s3.
    1. Как упомянуть Maniphest в команде COPY

Загрузка JSON в S3 показана ниже:

{
    "Keys": {
        "vehicle_id": {
            "S": "x011"
        }
    },
    "NewImage": {
        "heart_beat": {
            "N": "0"
        },
        "cdc_id": {
            "N": "456"
        },
        "latitude": {
            "N": "1.30951"
        },
        "not_deployed_counter": {
            "N": "1"
        },
        "reg_ind": {
            "N": "0"
        },
        "operator": {
            "S": "x"
        },
        "d_dttm": {
            "S": "11/08/2018 2:43:46 PM"
        },
        "z_id": {
            "N": "1267"
        },
        "last_end_trip_dttm": {
            "S": "11/08/2018 1:43:46 PM"
        },
        "land_ind": {
            "N": "1"
        },
        "s_ind": {
            "N": "1"
        },
        "status_change_dttm": {
            "S": "11/08/2018 2:43:46 PM"
        },
        "case_ind": {
            "N": "1"
        },
        "last_po_change_dttm": {
            "S": "11/08/2018 2:43:46 PM"
        },
        "violated_duration": {
            "N": "20"
        },
        "vehicle_id": {
            "S": "x011"
        },
        "longitude": {
            "N": "103.7818"
        },
        "file_status": {
            "S": "Trip_Start"
        },
        "unhired_duration": {
            "N": "10"
        },
        "eo_lat": {
            "N": "1.2345"
        },
        "reply_eo_ind": {
            "N": "1"
        },
        "license_ind": {
            "N": "0"
        },
        "indiscriminately_parked_ind": {
            "N": "0"
        },
        "eo_lng": {
            "N": "102.8978"
        },
        "officer_id": {
            "S": "xxxx@gmail.com"
        },
        "case_status": {
            "N": "0"
        },
        "color_status_cd": {
            "N": "0"
        },
        "parking_id": {
            "N": "2345"
        },
        "ttr_dttm": {
            "S": "11/08/2018 2:43:46 PM"
        },
        "deployed_ind": {
            "N": "1"
        },
        "status": {
            "S": "PI"
        }
    },
    "SequenceNumber": "1200000000000956615967",
    "SizeBytes": 570,
    "ApproximateCreationDateTime": 1535513040,
    "eventName": "INSERT"
}

Моя таблица создания Заявление:

create table vehicle_status(
    heart_beat integer,
    cdc_id integer,
    latitude integer,   
    not_deployed_counter integer,
    reg_ind integer,
    operator varchar(10),
    d_dttm varchar(30),
    z_id integer,
    last_end_trip_dttm varchar(30),
    land_ind integer,
    s_ind integer,
    status_change_dttm varchar(30), 
    case_ind integer,
    last_po_change_dttm varchar(30),    
    violated_duration integer,
    vehicle_id varchar(8),
    longitude integer,  
    file_status varchar(30),
    unhired_duration integer,
    eo_lat integer,                     
    reply_eo_ind integer,
    license_ind integer,    
    indiscriminately_parked_ind integer,
    eo_lng integer,
    officer_id varchar(50),
    case_status integer,
    color_status_cd integer,
    parking_id integer,
    ttr_dttm varchar(30),
    deployed_ind varchar(3),
  status varchar(8));

и My Copy Statement (вручную пытаюсь восстановить это из Redshift):

COPY vehicle_status (heart_beat, cdc_id, latitude, not_deployed_counter, reg_ind, operator, d_dttm, z_id, last_end_trip_dttm, land_ind, s_ind, status_change_dttm, case_ind, last_po_change_dttm, violated_duration, vehicle_id, longitude, file_status, unhired_duration, eo_lat, reply_eo_ind, license_ind, indiscriminately_parked_ind, eo_lng, officer_id, case_status, color_status_cd, parking_id, ttr_dttm, deployed_ind, status) 
FROM 's3://<my-bucket>/2018/08/29/05/vehicle_status_change-2-2018-08-29-05-24-42-092c330b-e14a-4133-bf4a-5982f2e1f49e.gz' CREDENTIALS 'aws_iam_role=arn:aws:iam::<accountnum>:role/<RedshiftRole>' GZIP json 'auto';

Когда я пытаюсь выполнить описанную выше процедуру - я получаю Вставить записи - но все столбцы и строки равны нулю.

Как мне скопировать этот формат json в redhsift. Застряли здесь последние 3 дня. Поможет любая помощь.

S3 Bucket:

Amazon S3/<My-bucket>/2018/08/29/05
Amazon S3/<My-bucket>/manifests/2018/08/29/05

1 Ответ

0 голосов
/ 31 августа 2018

Я не очень хорошо знаком с Amazon, но позвольте мне попытаться ответить на большинство ваших вопросов, чтобы вы могли двигаться дальше. Другие люди могут редактировать этот ответ или дополнительную информацию. Спасибо!

Не уверен, каким должен быть оператор Создать таблицу в Redshift

Ваш оператор создания create table vehicle_status(...) не имеет проблем, хотя вы можете добавить distribution key, sort key и encoding в зависимости от ваших требований, см. Дополнительные здесь и здесь

Согласно документам AWS Kenesis ваша таблица должна присутствовать в Redshift, следовательно, вы можете подключиться к Redshift с помощью команды psql и запустить create statement вручную.

Каким должен быть синтаксис COPY в Kinesis firhose.

Синтаксис Copy останется неизменным, если вы запустите его через psql или firhose, к счастью, созданный вами скрипт копирования работает без ошибок, я попробовал его в моем случае с небольшой модификацией direct AWS/SECRET ключ снабжен, а не работает нормально, здесь sql, который я запускаю, работал нормально и скопировал 1 запись данных в таблицу vehicle_status.

На самом деле ваша структура пути json сложна, поэтому json 'auto' не будет работать. Вот рабочая команда, я создал для вас пример файла jsonpath с 4 примерами полей, и вы можете следовать той же структуре, чтобы создать файл jsonpath со всеми точками данных.

 COPY vehicle_status (heart_beat, cdc_id, operator, status) FROM 's3://XXX/development/test_file.json' CREDENTIALS 'aws_access_key_id=XXXXXXXXXXXXXXXXX;aws_secret_access_key=MYXXXXXXXXXXXXXXXXXXXXXX' json 's3://XXX/development/yourjsonpathfile';

И ваш json path file должен иметь содержимое, аналогичное приведенному ниже.

{
  "jsonpaths": [
    "$['NewImage']['heart_beat']['N']",
    "$['NewImage']['cdc_id']['N']",
    "$['NewImage']['operator']['S']",
    "$['NewImage']['status']['S']"
  ]
}

Я проверил это, и оно работает.

Как мне использовать JsonPaths здесь. Пожарная служба Kinesis Data настроена на возврат только json в мое ведро s3.

Я использовал только данные вашего примера json, и это работает, поэтому здесь я не вижу проблем.

Как упомянуть Maniphest в команде COPY

Это хороший вопрос, я мог бы попытаться объяснить его, я надеюсь, здесь вы имеете в виду menifest.

Если вы видите выше команду копирования, она отлично работает для одного файла или пары файлов, но подумайте, что у вас много файлов, вот вам и концепция menifest. Прямо из документов Amazon, "Вместо указания пути к объекту для команды COPY вы указываете имя текстового файла в формате JSON, в котором явно перечисляются загружаемые файлы."

Короче говоря, если вы хотите загрузить несколько файлов за один снимок, что также является предпочтительным способом Redshift, вы можете создать простой menifest с json и предоставить его в команде копирования.

{ "entries": [ {"url":"s3://mybucket-alpha/2013-10-04-custdata", "mandatory":true}, {"url":"s3://mybucket-alpha/2013-10-05-custdata", "mandatory":true},.... ] }

загрузите манифест в S3 и используйте его в вашей команде копирования, как показано ниже.

 COPY vehicle_status (heart_beat, cdc_id, latitude, not_deployed_counter, reg_ind, operator, d_dttm, z_id, last_end_trip_dttm, land_ind, s_ind, status_change_dttm, case_ind, last_po_change_dttm, violated_duration, vehicle_id, longitude, file_status, unhired_duration, eo_lat, reply_eo_ind, license_ind, indiscriminately_parked_ind, eo_lng, officer_id, case_status, color_status_cd, parking_id, ttr_dttm, deployed_ind, status) FROM 's3://XXX/development/test.menifest' CREDENTIALS 'aws_access_key_id=XXXXXXXXXXXXXXXXX;aws_secret_access_key=MYXXXXXXXXXXXXXXXXXXXXXX' json 's3://yourbucket/jsonpath' menifest;

Вот подробный справочник для menifest .

Я надеюсь, что это даст вам некоторые идеи, как двигаться дальше, и если вы заметите конкретную ошибку, я был бы рад перефокусироваться на ответ.

...