Как AWS Lambda выполняет функцию? - PullRequest
1 голос
/ 07 марта 2019

У меня есть две функции в AWS Lambda, показанные ниже.Первый объединяет данные за каждый час и возвращает средние значения.Второй выполняет умножение матриц.

Когда я выполняю эти функции локально, умножение матриц выполняется в пятьдесят раз медленнее, поскольку он должен выполнять значительно больше операций, чем вычисление среднечасовых значений.Однако когда я запускаю их на AWS Lambda, функция, выполняющая умножение матриц, работает быстрее, иногда даже вдвое быстрее.Как это может быть?

Это импортер, вычисляющий среднечасовые значения:

import json
import time


def get_averages(event, context):
    """
    file is a JSON object containing sensor data for a whole day. 
    Retrieves
        data from input, transforms it and stores it in another JSON 
        object.
    Input file:
        json = [{
            "sensor_id": row[0],
            "sensor_type": row[1],
            "lat": row[3], lon: row[4],
            "timestamp": row[5],
            "P1": row[6], "durP1": row[7], "ratioP1": row[8],
            "P2": row[9], "durP2": row[10], "ratioP2": row[11]
        }]

    Output object (data aggregated by hour):
        data = {
            "sensor_id": 1,
            "sensor_type": "ABC",
            "location": [lat, lon]
            "date": "yyyy-mm-dd",
            "total_data-points": int,
            "data": [
                        {
                            "time": "hh:mm:ss",
                            "data-points": int,
                            "P1": [1, 2, 3],  # ([avg_val, avg_dur, avg_ratio])
                            "P2": [1, 2, 3],  # ([avg_val, avg_dur, avg_ratio])
                        },
                    ]
        }
    """
    start = time.perf_counter()
    first_row = True
    for row in json.loads(event['json_input'].replace('\'', '"')):
        # row[5][11:-6] to remove day and '+00'
        if (first_row):
            out = {
                'sensor_id': row['sensor_id'],
                'sensor_type': row['sensor_type'],
                'location': [row['lat'], row['lon']],
                'date': row['timestamp'][:10],
                "total_data-points": 0,
                'data': [{'hour': row['timestamp'][11:13]}]
            }
            p1 = ([0, 0, 0], 0)  # second element of the tuple counts
            p2 = ([0, 0, 0], 0)  # data points per hour
            first_row = False

        elif (row['timestamp'][11:13] != out['data'][-1]['hour']):
            # calculate averages
            avg_p1 = [x / p1[1] for x in p1[0]]
            avg_p2 = [y / p1[1] for y in p2[0]]

            # append averages and number of data points
            out['total_data-points'] += p1[1]
            out['data'][-1]['data-points'] = p1[1]
            out['data'][-1]['P1'] = avg_p1
            out['data'][-1]['P2'] = avg_p2

            # add object for new hour, reset p1, p2
            out['data'].append({'hour': row['timestamp'][11:13]})
            p1 = ([0, 0, 0], 0)
            p2 = ([0, 0, 0], 0)

        # add values to p1, p2, increment counters
        data_1 = [float(x) for x in [row['P1'], row['durP1'],
                  row['ratioP1']]]
        data_2 = [float(y) for y in [row['P2'], row['durP2'],
                  row['ratioP2']]]
        p1 = ([sum(x) for x in zip(p1[0], data_1)], p1[1] + 1)
        p2 = ([sum(x) for x in zip(p1[0], data_2)], p2[1] + 1)

    # add last elements to the json object when EOF is reached
    avg_p1 = [x / p1[1] for x in p1[0]]
    avg_p2 = [y / p1[1] for y in p2[0]]

    out['total_data-points'] += p1[1]
    out['data'][-1]['data-points'] = p1[1]
    out['data'][-1]['P1'] = avg_p1
    out['data'][-1]['P2'] = avg_p2

    perf = time.perf_counter() - start

    return {
        'statusCode': 200,
        'body': json.dumps({"output": out, "time": perf})
    }

Это функция, выполняющая умножение матриц:

import json
import time


def get_matrix(event, context):
    """
    file is a JSON object containing sensor data for a whole day. 
    Retrieves
        data from input, transforms it and stores it in another JSON 
    object.
    Input file:
        json = [{
            "sensor_id": row[0],
            "sensor_type": row[1],
            "lat": row[3], lon: row[4],
            "timestamp": row[5],
            "P1": row[6], "durP1": row[7], "ratioP1": row[8],
            "P2": row[9], "durP2": row[10], "ratioP2": row[11]
        }]

    Output object:
        data = {
            "sensor_id": 1,
            "sensor_type": "ABC",
            "location": [lat, lon]
            "date": "yyyy-mm-dd",
            "data": [
                        {
                            "start_time": "hh:mm:ss",
                            "end_time": "hh:mm:ss",
                            "result": 2D-array
                        },
                    ]
        }
    """
    start = time.perf_counter()
    first_row = True
    cnt_rows = 0
    cnt_cols = 0
    matrix = [[]]
    for row in json.loads(event['json_input'].replace('\'', '"')):
        # row[5][11:-6] to remove day and '+00'
        if (first_row):
            out = {
                'sensor_id': row['sensor_id'],
                'sensor_type': row['sensor_type'],
                'location': [row['lat'], row['lon']],
                'date': row['timestamp'][:10],
                'data': [{}]
            }
            first_row = False

        if cnt_rows == 1:
            out['data'][-1]['start_time']: row['timestamp'][11:13]

        if (cnt_cols == event["matrix_size"]):
            cnt_cols = 0
            cnt_rows += 1
            if (cnt_rows == event["matrix_size"]):
                out['data'][-1]['end_time'] = row['timestamp'][11:13]
                out['data'][-1]['result'] = matrix_square(matrix)
                out['data'].append({})
                matrix = [[]]
                cnt_rows = 0
            else:
                matrix.append([])

        matrix[-1].extend([float(row['P1']), float(row['durP1']),
                           float(row['ratioP1']), float(row['P2']),
                           float(row['durP2']), float(row['ratioP2'])])
        cnt_cols += 6

    perf = time.perf_counter() - start

    return {
        'statusCode': 200,
        'body': json.dumps({"output": out, "time": perf})
    }

def matrix_square(matrix):
    res = []
    for i in range(len(matrix)):
        res.append([])
        for j in range(len(matrix)):
            row = matrix[i]
            col = [r[j] for r in matrix]
            res[-1].append(0)
            for k in range(len(row)):
                res[i][j] += row[k] * col[k]
    return res

1 Ответ

0 голосов
/ 08 марта 2019

В лямбде всякий раз, когда вы пытаетесь запустить функцию. aws автоматически назначает ресурсы для вашей функции в первый раз в зависимости от ваших требований к функции, и это является причиной этого. И эти ресурсы остаются там в течение некоторого времени, поэтому может быть случай, когда вы запустите fuction во второй раз, это займет меньше времени, чем прежде.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...