Я пытаюсь перенести данные CSV из S3 в DynamoDB, используя Data Pipeline. Данные не в формате экспорта DynamoDB, а вместо этого в обычном CSV.
Я понимаю, что конвейер данных чаще используется для импорта или экспорта формата DynamoDB, а не стандартного CSV. Я думаю Я прочитал в Google, что можно использовать обычный файл, но я не смог собрать что-то, что работает. Документация AWS также не очень полезна. Я не смог найти справочные посты, которые сравнительно недавно (<2 лет) </p>
Если это возможно, кто-нибудь может подсказать, почему мой конвейер может не работать? Я вставил конвейер и сообщение об ошибке ниже. Кажется, ошибка указывает на проблему с подключением данных к «Динамо», я полагаю, потому что они не в формате экспорта.
Я бы сделал это в Lambda, но загрузка данных занимает больше 15 минут.
Спасибо
{
"objects": [
{
"myComment": "Activity used to run the hive script to import CSV data",
"output": {
"ref": "dynamoDataTable"
},
"input": {
"ref": "s3csv"
},
"name": "S3toDynamoLoader",
"hiveScript": "DROP TABLE IF EXISTS tempHiveTable;\n\nDROP TABLE IF EXISTS s3TempTable;\n\nCREATE EXTERNAL TABLE tempHiveTable (#{myDDBColDef}) \nSTORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' \nTBLPROPERTIES (\"dynamodb.table.name\" = \"#{myDDBTableName}\", \"dynamodb.column.mapping\" = \"#{myDDBTableColMapping}\");\n \nCREATE EXTERNAL TABLE s3TempTable (#{myS3ColDef}) \nROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\\\n' LOCATION '#{myInputS3Loc}';\n \nINSERT OVERWRITE TABLE tempHiveTable SELECT * FROM s3TempTable;",
"id": "S3toDynamoLoader",
"runsOn": { "ref": "EmrCluster" },
"stage": "false",
"type": "HiveActivity"
},
{
"myComment": "The DynamoDB table that we are uploading to",
"name": "DynamoDB",
"id": "dynamoDataTable",
"type": "DynamoDBDataNode",
"tableName": "#{myDDBTableName}",
"writeThroughputPercent": "1.0",
"dataFormat": {
"ref": "DDBTableFormat"
}
},
{
"failureAndRerunMode": "CASCADE",
"resourceRole": "DataPipelineDefaultResourceRole",
"role": "DataPipelineDefaultRole",
"pipelineLogUri": "#{myLogUri}",
"scheduleType": "ONDEMAND",
"name": "Default",
"id": "Default"
},
{
"name": "EmrCluster",
"coreInstanceType": "m1.medium",
"coreInstanceCount": "1",
"masterInstanceType": "m1.medium",
"releaseLabel": "emr-5.29.0",
"id": "EmrCluster",
"type": "EmrCluster",
"terminateAfter": "2 Hours"
},
{
"myComment": "The S3 file that contains the data we're importing",
"directoryPath": "#{myInputS3Loc}",
"dataFormat": {
"ref": "csvFormat"
},
"name": "S3DataNode",
"id": "s3csv",
"type": "S3DataNode"
},
{
"myComment": "Format for the S3 Path",
"name": "S3ExportFormat",
"column": "not_used STRING",
"id": "csvFormat",
"type": "CSV"
},
{
"myComment": "Format for the DynamoDB table",
"name": "DDBTableFormat",
"id": "DDBTableFormat",
"column": "not_used STRING",
"type": "DynamoDBExportDataFormat"
}
],
"parameters": [
{
"description": "S3 Column Mappings",
"id": "myS3ColDef",
"default": "phoneNumber string,firstName string,lastName string, spend double",
"type": "String"
},
{
"description": "DynamoDB Column Mappings",
"id": "myDDBColDef",
"default": "phoneNumber String,firstName String,lastName String, spend double",
"type": "String"
},
{
"description": "Input S3 foder",
"id": "myInputS3Loc",
"default": "s3://POCproject-dev1-data/upload/",
"type": "AWS::S3::ObjectKey"
},
{
"description": "DynamoDB table name",
"id": "myDDBTableName",
"default": "POCproject-pipeline-data",
"type": "String"
},
{
"description": "S3 to DynamoDB Column Mapping",
"id": "myDDBTableColMapping",
"default": "phoneNumber:phoneNumber,firstName:firstName,lastName:lastName,spend:spend",
"type": "String"
},
{
"description": "DataPipeline Log Uri",
"id": "myLogUri",
"default": "s3://POCproject-dev1-data/",
"type": "AWS::S3::ObjectKey"
}
]
}
Ошибка
[INFO] (TaskRunnerService-df-09432511OLZUA8VN0NLE_@EmrCluster_2020-03-06T02:52:47-0) df-09432511OLZUA8VN0NLE amazonaws.datapipeline.taskrunner.LogMessageUtil: Returning tail errorMsg :Caused by: java.lang.RuntimeException: com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException: One or more parameter values were invalid: An AttributeValue may not contain an empty string (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ValidationException; Request ID: UM56KGVOU511P6LS7LP1N0Q4HRVV4KQNSO5AEMVJF66Q9ASUAAJG)
at org.apache.hadoop.dynamodb.DynamoDBFibonacciRetryer.handleException(DynamoDBFibonacciRetryer.java:108)
at org.apache.hadoop.dynamodb.DynamoDBFibonacciRetryer.runWithRetry(DynamoDBFibonacciRetryer.java:83)
at org.apache.hadoop.dynamodb.DynamoDBClient.writeBatch(DynamoDBClient.java:258)
at org.apache.hadoop.dynamodb.DynamoDBClient.putBatch(DynamoDBClient.java:215)
at org.apache.hadoop.dynamodb.write.AbstractDynamoDBRecordWriter.write(AbstractDynamoDBRecordWriter.java:112)
at org.apache.hadoop.hive.dynamodb.write.HiveDynamoDBRecordWriter.write(HiveDynamoDBRecordWriter.java:42)
at org.apache.hadoop.hive.ql.exec.FileSinkOperator.process(FileSinkOperator.java:762)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:897)
at org.apache.hadoop.hive.ql.exec.SelectOperator.process(SelectOperator.java:95)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:897)
at org.apache.hadoop.hive.ql.exec.TableScanOperator.process(TableScanOperator.java:130)
at org.apache.hadoop.hive.ql.exec.MapOperator$MapOpCtx.forward(MapOperator.java:148)
at org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:550)
... 18 more
Caused by: com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException: One or more parameter values were invalid: An AttributeValue may not contain an empty string (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ValidationException; Request ID: UM56KGVOU511P6LS7LP1N0Q4HRVV4KQNSO5AEMVJF66Q9ASUAAJG)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)