Что эквивалентно использованию Python Asyncio для генерации HTTP-запросов в Scala? - PullRequest
1 голос
/ 03 августа 2020

В настоящее время я пытаюсь создать асинхронный сборщик данных на стороне клиента (через HTTP), используя Scala. Полученные данные будут проанализированы и вставлены в базу данных, но это выходит за рамки этого вопроса.

Основные цели:

  1. Создание функции для извлечения данных из api асинхронно.
  2. Иметь контроль над частотой запросов (другими словами: иметь гибкость для обработки ограничений скорости).

Чтобы лучше понять, что я пытаюсь сделать, Я написал код в Python, чтобы лучше выразить свой вопрос.

Сначала я определяю функцию, которая создает asyn c HTTP-запросы (я использовал aiohttp , поскольку я нашел его выразительным для моего намерения):

from aiohttp import ClientResponse, ClientSession, ClientError, http_exceptions 


async def fetch_data(url: str, method: str, session: ClientSession, **kwargs) -> dict:
    """
    Request wrapper that fetches data. Passes kwargs to session.
    Args:
        :param url: api endpoint used (eg. https://foo.bar.com/api/v3/baz)
        :param method: HTTP method used (eg. GET, POST...etc.)
        :param session: aiohttp.ClientSession object. The main entry point for all client API operations.
    Kwargs:
        ___ : Includes other KWARGS that may be passed to a session (eg. Headers, SSL...etc.)
    Returns:
        Dict
     """
    response: ClientResponse = await session.request(url=url, method=method, **kwargs)
    response.raise_for_status()  # Raises an aiohttp.ClientResponseError if the response status is 400 or higher
    logger.info("Got response [%s] for request: %s", response.status, url)
    data: dict = await response.json()
    return data

Я позже использовал бы эту функцию для извлечения данных и реальной работы с ними (обратите внимание на ограничение на TCPConnector, которое ограничивает общее количество для одновременных подключений):

H = {"Authorization": "orgId={org_id}".format(org_id=config.account_id())}
C = config.cert_dir()

async def main():
    sslcontext = ssl.create_default_context(cafile=None)
    sslcontext.load_cert_chain(certfile=C + "Admin_API_access.pem", keyfile=C + "Admin_API_access.key")
    connector = aiohttp.TCPConnector(limit=0, ssl=sslcontext)  # limit=0 can be used to reduce the number of async requests

    async with aiohttp.ClientSession(connector=connector) as context:
        # Call API and do something with the data (eg. write to a file)

Насколько я понимаю (а также к моему замешательству), есть несколько способов сделать это в Scala. Что эквивалентно вышеупомянутому коду Python в Scala для генерации HTTP-запросов без блокировки. Поскольку Asyncio является однопоточным, я не особенно ищу однопоточное решение, а скорее асинхронное.

И, наконец, я не ищу кого-нибудь, кто бы написал код Что касается меня, я скорее ищу концептуальное понимание потенциальных решений, которые я могу go, которые предлагают ту же функциональность.

1 Ответ

1 голос
/ 03 августа 2020

По сути, в Scala асинхронность представлена ​​Future, который можно рассматривать как оболочку одного объекта, который:

  • может в какой-то момент времени (возможно, в прошлое) будет завершено
  • если оно завершено, оно либо завершено с помощью объекта, либо исключением

Можно зарегистрировать обратные вызовы, которые будут работать с Future в какой-то момент после него завершает; можно также обращаться с Future так же, как с коллекцией с map, flatMap, et c. чтобы преобразовать его (за кулисами эти операции регистрируют обратный вызов и возвращают новый Future, который завершается обратным вызовом: использование map, flatMap, recover и друзей обычно предпочтительнее ручной регистрации обратного вызова).

Это очень общий API, и есть много-много способов его реализовать. В общем, если библиотека Scala возвращает Future с, она поддерживает асинхронные операции из коробки. Нет недостатка в библиотеках, которые дадут Future HTTP-запроса, включая, но не ограничиваясь этим:

  • Akka HTTP
  • Dispatch
  • Gigahorse
  • Play WS

Выбор такой библиотеки в основном дело вкуса: другие используемые библиотеки могут направить вас к конкретной библиотеке (например, если вы используете другие библиотеки Akka, может иметь смысл использовать Akka HTTP), и вы можете обнаружить, что та или иная библиотека лучше поддерживает конкретный вариант использования.

Кроме того, если конкретная библиотека не дает вам Future, вы можете довольно легко обернуть его:

import some.blocking.http.request.library

import scala.concurrent.ExecutionContext

def asyncRequestUrl(url: String)(implicit ectx: ExecutionContext): Future[Response] =
  Future {
    library.request(url)
  }

scala.concurrent.ExecutionContext представляет пул потоков с невыполненными задачами для выполнения в пуле. Future { code... } (технически Future.apply) добавляет задачу для выполнения code... и завершения будущего (которое немедленно возвращается) с результатом выполнения. Обратите внимание: поскольку пул потоков почти всегда имеет верхний предел количества потоков в нем, задача будет оставаться в состоянии «запланировано, но не выполняется» до тех пор, пока все потоки в пуле заняты другими задачами.

Если вызывающий не предоставляет ExecutionContext, компилятор пометит это как ошибку и может предложить использовать scala.concurrent.ExecutionContext.Implicits.global. Использование этого ExecutionContext, вероятно, нормально в средах разработки / тестирования, но почти наверняка захочется создать собственный ExecutionContext с большим количеством потоков, чем глобальный контекст по умолчанию, по крайней мере, для выполнения HTTP-запросов. Использование библиотеки asyn c по индивидуальному проекту, подобной тем, которые я перечислил выше, вероятно, «просто сработает» с точки зрения выбора подходящего пула потоков (и, вероятно, будет настраиваемым, если обнаружится, что он «не просто работает»).

Если вы используете кодовую базу, ориентированную на FP, вы, вероятно, захотите использовать что-то вроде http4s , которое использует немного другую модель, чем «vanilla» Scala для асинхронности (например, ConcurrentEffect от Cats Effect). Есть способы заставить его работать с ванилью Scala, если так хочется.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...