Параллельное выполнение запросов DynamoDB (BatchGetItems для глобального вторичного индекса) - PullRequest
1 голос
/ 05 марта 2019

Идея здесь заключается в параллельном выполнении нескольких запросов DynamoDB, поскольку запрос выполняется через GSI. На данный момент BatchGetItems не поддерживает запросы по индексам , и рекомендуемый подход заключается в параллельном запросе данных. Я использую подпрограммы go с wg, чтобы позаботиться о выполнении подпрограмм параллельно.

Входные данные для функции - это массив строк с идентификатором, выходные данные - это атрибуты идентификаторов.

Когда функция запускается локально, проблема не возникает, однако, когда функция запускается в AWS-Lambda, возвращаемые данные продолжают расти;

т; Ввод 2 элементов должен выводить 2 элемента. Если функция протестирована на AWS-Lambda,

  • 1 раз функция возвращает 2 элемента
  • 2-й раз возвращает 4 элемента (одни и те же элементы повторяются 2 раза)
  • 3-й раз возвращает 6 элементов (одни и те же элементы повторяются 4 раза)

и так далее. Вот фрагмент кода. Есть ли что-то неправильно обработанное, когда лямбда выводит дополнительный набор данных при каждом запуске лямбды?

package main

import (
    "context"
    "fmt"
    "os"
    "sync"
    "github.com/aws/aws-lambda-go/lambda"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/dynamodb"
    "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
)

//Final Output Interface
var bulkOutput []interface{}

func exitWithError(err error) {
    fmt.Fprintln(os.Stderr, err)
    os.Exit(1)
}

//LambdaInputJSON input for the lambda handler
type LambdaInputJSON struct {
    Ids      []string `json:"ids,omitempty"`
}

//HandleRequest : Lambda entry point
func HandleRequest(ctx context.Context, data LambdaInputJSON) ([]interface{}, error) {
    return DynamoDBBatchGetRecords(data), nil
}

func main() {
    lambda.Start(HandleRequest)
}

func DynamoDBBatchGetRecords(a LambdaInputJSON) []interface{} {

    var wg sync.WaitGroup
    var mutex = &sync.Mutex{}

    iterations := len(a.Ids)
    wg.Add(iterations)
    for i := 0; i < iterations; i++ {
        go QueryOutput(a.Ids[i], &wg, mutex)
    }

    wg.Wait()
    return bulkOutput

}

//QueryOutput GoRoutine
func QueryOutput(data string, wg *sync.WaitGroup, mtx *sync.Mutex) {
    var outputData []interface{}
    defer wg.Done()
    sess, err := session.NewSession(&aws.Config{
        Region: aws.String("aws-region"),
    })
    if err != nil {
        exitWithError(fmt.Errorf("failed to make Query API call, %v", err))
    }
    ddb := dynamodb.New(sess)
    queryInput := &dynamodb.QueryInput{
        Limit:                aws.Int64(1),
        TableName:            aws.String("table-name"),
        IndexName:            aws.String("gsi-index"),
        ScanIndexForward:     aws.Bool(false),
        ConsistentRead:       aws.Bool(false),
        KeyConditions: map[string]*dynamodb.Condition{
            "column_name": {
                ComparisonOperator: aws.String("EQ"),
                AttributeValueList: []*dynamodb.AttributeValue{
                    {
                        S: aws.String(data),
                    },
                },
            },
        },
    }
    output, err := ddb.Query(queryInput)
    if err != nil {
        exitWithError(fmt.Errorf("Failed to make Query API call, %v", err))
    }
    err = dynamodbattribute.UnmarshalListOfMaps(output.Items, &outputData)
    if err != nil {
        exitWithError(fmt.Errorf("Failed to unmarshal Query result items, %v", err))
    }
    mtx.Lock()
    bulkOutput = append(bulkOutput, outputData)
    mtx.Unlock()
}

1 Ответ

0 голосов
/ 07 марта 2019

Согласно документации , глобальные переменные не зависят от кода обработчика вашей лямбда-функции.Это приводило к тому, что буфер со временем рос.

Исправленная ссылка вставлена ​​ниже.

package main

import (
    "context"
    "fmt"
    "os"
    "sync"

    "github.com/aws/aws-lambda-go/lambda"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/dynamodb"
    "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
)

func exitWithError(err error) {
    fmt.Fprintln(os.Stderr, err)
    os.Exit(1)
}

//HandleRequest : Lambda entry point
func HandleRequest(ctx context.Context, data LambdaInputJSON) ([]interface{}, error) {
    output := DynamoDBBatchGetRecords(data)
    return output, nil
}

func main() {
    lambda.Start(HandleRequest)
}

func DynamoDBBatchGetRecords(a LambdaInputJSON) []interface{} {
    var dataOut []interface{}
    var wg = &sync.WaitGroup{}
    var mtx = &sync.Mutex{}

    iterations := len(a.Ids)
    wg.Add(iterations)
    for i := 0; i < i; i++ {
        go func(i int) {
            defer wg.Done()
            var outputData []interface{}
            sess, err := session.NewSession(&aws.Config{
                Region: aws.String("aws-region"),
            })
            if err != nil {
                exitWithError(fmt.Errorf("failed to make Query API call, %v", err))
            }
            ddb := dynamodb.New(sess)
            queryInput := &dynamodb.QueryInput{
                Limit:            aws.Int64(1),
                TableName:        aws.String("table"),
                IndexName:        aws.String("index"),
                ScanIndexForward: aws.Bool(false),
                ConsistentRead: aws.Bool(false),
                KeyConditions: map[string]*dynamodb.Condition{
                    "index-column": {
                        ComparisonOperator: aws.String("EQ"),
                        AttributeValueList: []*dynamodb.AttributeValue{
                            {
                                S: aws.String(a.Ids[i]),
                            },
                        },
                    },
                },
            }
            output, err := ddb.Query(queryInput)

            if err != nil {
                exitWithError(fmt.Errorf("E1 failed to make Query API call, %v", err))
            }
            err = dynamodbattribute.UnmarshalListOfMaps(output.Items, &outputData)
            if err != nil {
                exitWithError(fmt.Errorf("E2 failed to unmarshal Query result items, %v", err))
            }

            mtx.Lock()
            dataOut = append(dataOut, outputData[0])
            mtx.Unlock()

        }(i)
    }
    wg.Wait()
    return dataOut
}
...