Как использовать конвейер данных для копирования данных из таблицы DynamoDB в другую таблицу DynamoDB, если оба имеют емкость по требованию - PullRequest
0 голосов
/ 21 февраля 2020

Я использовал для копирования данных из одной DynamoDB в другую DynamoDB с использованием конвейера. json. Он работает, когда исходная таблица имеет выделенную емкость и не имеет значения, если для пункта назначения задано предоставление / по требованию. Я хочу, чтобы для обеих моих таблиц была установлена ​​емкость по требованию . Но когда я использую тот же шаблон, он не работает. Есть ли способ, которым мы можем это сделать, или он все еще находится в стадии разработки?

Вот мой оригинальный работающий скрипт:

{
    "objects": [
        {
            "startAt": "FIRST_ACTIVATION_DATE_TIME",
            "name": "DailySchedule",
            "id": "DailySchedule",
            "period": "1 day",
            "type": "Schedule",
            "occurrences": "1"
        },
        {
            "id": "Default",
            "name": "Default",
            "scheduleType": "ONDEMAND",
            "pipelineLogUri": "#{myS3LogsPath}",
            "schedule": {
                "ref": "DailySchedule"
            },
            "failureAndRerunMode": "CASCADE",
            "role": "DataPipelineDefaultRole",
            "resourceRole": "DataPipelineDefaultResourceRole"
        },
        {
            "id": "DDBSourceTable",
            "tableName": "#{myDDBSourceTableName}",
            "name": "DDBSourceTable",
            "type": "DynamoDBDataNode",
            "readThroughputPercent": "#{myDDBReadThroughputRatio}"
        },
        {
            "name": "S3TempLocation",
            "id": "S3TempLocation",
            "type": "S3DataNode",
            "directoryPath": "#{myTempS3Folder}/#{format(@scheduledStartTime, 'YYYY-MM-dd-HH-mm-ss')}"
        },
        {
            "id": "DDBDestinationTable",
            "tableName": "#{myDDBDestinationTableName}",
            "name": "DDBDestinationTable",
            "type": "DynamoDBDataNode",
            "writeThroughputPercent": "#{myDDBWriteThroughputRatio}"
        },
        {
            "id": "EmrClusterForBackup",
            "name": "EmrClusterForBackup",
            "amiVersion": "3.8.0",
            "masterInstanceType": "m3.xlarge",
            "coreInstanceType": "m3.xlarge",
            "coreInstanceCount": "1",
            "region": "#{myDDBSourceRegion}",
            "terminateAfter": "10 Days",
            "type": "EmrCluster"
        },
        {
            "id": "EmrClusterForLoad",
            "name": "EmrClusterForLoad",
            "amiVersion": "3.8.0",
            "masterInstanceType": "m3.xlarge",
            "coreInstanceType": "m3.xlarge",
            "coreInstanceCount": "1",
            "region": "#{myDDBDestinationRegion}",
            "terminateAfter": "10 Days",
            "type": "EmrCluster"
        },
        {
            "id": "TableLoadActivity",
            "name": "TableLoadActivity",
            "runsOn": {
                "ref": "EmrClusterForLoad"
            },
            "input": {
                "ref": "S3TempLocation"
            },
            "output": {
                "ref": "DDBDestinationTable"
            },
            "type": "EmrActivity",
            "maximumRetries": "2",
            "dependsOn": {
               "ref": "TableBackupActivity"
            },
            "resizeClusterBeforeRunning": "true",
            "step": [
                "s3://dynamodb-emr-#{myDDBDestinationRegion}/emr-ddb-storage-handler/2.1.0/emr-ddb-2.1.0.jar,org.apache.hadoop.dynamodb.tools.DynamoDbImport,#{input.directoryPath},#{output.tableName},#{output.writeThroughputPercent}"
            ]
        },
        {
            "id": "TableBackupActivity",
            "name": "TableBackupActivity",
            "input": {
                "ref": "DDBSourceTable"
            },
            "output": {
                "ref": "S3TempLocation"
            },
            "runsOn": {
                "ref": "EmrClusterForBackup"
            },
            "resizeClusterBeforeRunning": "true",
            "type": "EmrActivity",
            "maximumRetries": "2",
            "step": [
                "s3://dynamodb-emr-#{myDDBSourceRegion}/emr-ddb-storage-handler/2.1.0/emr-ddb-2.1.0.jar,org.apache.hadoop.dynamodb.tools.DynamoDbExport,#{output.directoryPath},#{input.tableName},#{input.readThroughputPercent}"
            ]
        },
        {
            "dependsOn": {
                "ref": "TableLoadActivity"
            },
            "name": "S3CleanupActivity",
            "id": "S3CleanupActivity",
            "input": {
                "ref": "S3TempLocation"
            },
            "runsOn": {
               "ref": "EmrClusterForBackup"
            },
            "type": "ShellCommandActivity",
            "command": "(sudo yum -y update aws-cli) && (aws s3 rm #{input.directoryPath} --recursive)"
        }
    ],
    "parameters": [
        {
            "myComment": "This Parameter specifies the S3 logging path for the pipeline.  It is used by the 'Default' object to set the 'pipelineLogUri' value.",
            "id" : "myS3LogsPath",
            "type" : "AWS::S3::ObjectKey",
            "description" : "S3 path for pipeline logs."
        },
        {
            "id": "myDDBSourceTableName",
            "type": "String",
            "description": "Source DynamoDB table name"
        },
        {
            "id": "myDDBDestinationTableName",
            "type": "String",
            "description": "Target DynamoDB table name"
        },
        {
            "id": "myDDBWriteThroughputRatio",
            "type": "Double",
            "description": "DynamoDB write throughput ratio",
            "default": "1",
            "watermark": "Enter value between 0.1-1.0"
        },
        {
            "id": "myDDBSourceRegion",
            "type": "String",
            "description": "Region of the DynamoDB table",
            "default": "us-west-2"
        },
        {
            "id": "myDDBDestinationRegion",
            "type": "String",
            "description": "Region of the DynamoDB table",
            "default": "us-west-2"
        },
        {
            "id": "myDDBReadThroughputRatio",
            "type": "Double",
            "description": "DynamoDB read throughput ratio",
            "default": "1",
            "watermark": "Enter value between 0.1-1.0"
        },
        {
            "myComment": "Temporary S3 path to store the dynamodb backup csv files, backup files will be deleted after the copy completes",
            "id": "myTempS3Folder",
            "type": "AWS::S3::ObjectKey",
            "description": "Temporary S3 folder"
        }
    ]
}

А вот сообщение об ошибке при выполнении конвейера данных, когда исходная таблица DynamoDB установлена ​​на по требованию емкость:

at org.apache.hadoop.mapreduce.JobSubmitter.writeOldSplits(JobSubmitter.java:520)
    at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:512)
    at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:394)
    at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1285)
    at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1282)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
    at org.apache.hadoop.mapreduce.Job.submit(Job.java:1282)
    at org.apache.hadoop.mapred.JobClient$1.run(JobClient.java:562)
    at org.apache.hadoop.mapred.JobClient$1.run(JobClient.java:557)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
    at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:557)
    at org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java:548)
    at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:833)
    at org.apache.hadoop.dynamodb.tools.DynamoDbExport.run(DynamoDbExport.java:79)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
    at org.apache.hadoop.dynamodb.tools.DynamoDbExport.main(DynamoDbExport.java:30)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:212)

...