Использование Python многопроцессорной очереди внутри AWS лямбда-функции - PullRequest
2 голосов
/ 08 января 2020

У меня есть python, который создает несколько процессов для выполнения задачи намного быстрее. Когда я создаю эти процессы, я перехожу в очередь. Внутри процессов я использую queue.put (data), поэтому я могу получать данные вне процессов. Он работает fantasti c на моем локальном компьютере, но когда я загружаю zip в функцию AWS Lambda (Python 3.8), он говорит, что функция Queue () не была реализована. Проект отлично работает в AWS Лямбда, когда я просто отключаю функциональность очереди, так что я знаю, что это единственное зависание, которое у меня есть на данный момент.

Я гарантировал установку многопроцессорного пакета непосредственно в мой python проект с помощью "pip install multiprocess - t ./ «а также» pip install boto3 -t ./".

Я новичок в python, а также AWS, но недавно проведенные мною исследования потенциально указывают на SQS .

Чтение этих SQS документов Я не уверен, что это именно то, что я ищу.

Вот код, который я запускаю в Lambda, который работает локально, но не на AWS. См. * Для важных частей:

from multiprocessing import Process, Queue
from craigslist import CraigslistForSale
import time
import math

sitesHold = ["sfbay", "seattle", "newyork", "(many more)..." ]

results = []


def f(sites, category, search_keys, queue):
    local_results = []
    for site in sites:
        cl_fs = CraigslistForSale(site=site, category=category, filters={'query': search_keys})
        for result in cl_fs.get_results(sort_by='newest'):
            local_results.append(result)
    if len(local_results) > 0:
        print(local_results)
    queue.put(local_results) # Putting data *********************************


def scan_handler(event, context):
    started_at = time.monotonic()
    queue = Queue()
    print("Running...")
    amount_of_lists = int(event['amountOfLists'])
    list_length = int(len(sitesHold) / amount_of_lists)
    extra_lists = math.ceil((len(sitesHold) - (amount_of_lists * list_length)) / list_length)
    site_list = []
    list_creator_counter = 0
    site_counter = 0
    for i in range(amount_of_lists + extra_lists):
        site_list.append(sitesHold[list_creator_counter:list_creator_counter + list_length])
        list_creator_counter += list_length
    processes = []
    for i in range(len(site_list)):
        site_counter = site_counter + len(site_list[i])
        processes.append(Process(target=f, args=(site_list[i], event['category'], event['searchQuery'], queue,))) # Creating processes and creating queues ***************************

    for process in processes:
        process.start() # Starting processes ***********************

    for process in processes:
        listings = queue.get() # Getting from queue ****************************
        if len(listings) > 0:
            for listing in listings:
                results.append(listing)

    print(f"Results: {results}")

    for process in processes:
        process.join()

    total_time_took = time.monotonic() - started_at
    print(f"Sites processed: {site_counter}")
    print(f'Took {total_time_took} seconds long')

Это ошибка, которую дает мне лямбда-функция:

{
  "errorMessage": "[Errno 38] Function not implemented",
  "errorType": "OSError",
  "stackTrace": [
    "  File \"/var/task/main.py\", line 90, in scan_handler\n    queue = Queue()\n",
    "  File \"/var/lang/lib/python3.8/multiprocessing/context.py\", line 103, in Queue\n    return Queue(maxsize, ctx=self.get_context())\n",
    "  File \"/var/lang/lib/python3.8/multiprocessing/queues.py\", line 42, in __init__\n    self._rlock = ctx.Lock()\n",
    "  File \"/var/lang/lib/python3.8/multiprocessing/context.py\", line 68, in Lock\n    return Lock(ctx=self.get_context())\n",
    "  File \"/var/lang/lib/python3.8/multiprocessing/synchronize.py\", line 162, in __init__\n    SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)\n",
    "  File \"/var/lang/lib/python3.8/multiprocessing/synchronize.py\", line 57, in __init__\n    sl = self._semlock = _multiprocessing.SemLock(\n"
  ]
}

Работает ли Queue () в AWS Lambda? Как лучше всего достичь 1028 * моей цели?

Ответы [ 2 ]

3 голосов
/ 08 января 2020
0 голосов
/ 17 апреля 2020

Из AWS документов

Если вы разрабатываете лямбда-функцию с Python, параллелизм не приходит по умолчанию. Lambda поддерживает Python 2.7 и Python 3.6, оба из которых имеют многопроцессорные и многопоточные модули.

Многопроцессорный модуль, поставляемый с Python, позволяет запускать несколько процессов параллельно. Поскольку среда выполнения Lambda не поддерживает / dev / shm (разделяемая память для процессов), вы не можете использовать multiprocessing.Queue или multiprocessing.Pool.

С другой стороны, вы можете использовать multiprocessing.Pipe вместо многопроцессорной обработки. Выполните sh то, что вам нужно, без каких-либо ошибок при выполнении функции Lambda.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...