Как сделать одиночный и пакетный вызов RP C в Apache Beam - PullRequest
2 голосов
/ 19 июня 2020

В моем конвейере я должен сделать одиночный RP C вызов , а также пакетный вызов RPC для выборки данных для обогащения. Я не смог найти никаких ссылок на то, как сделать эти вызовы в вашем конвейере. Я все еще занимаюсь своим делом в Apache Beam и был бы признателен, если бы кто-нибудь сделал это и мог бы поделиться образцом кода или подробностями о том, как это сделать.

Спасибо.

1 Ответ

0 голосов
/ 24 июня 2020

Настройка ответа

Я помогу, написав примеры DoFns, которые делают то, что вы хотите. Я напишу их в Python, но код Java будет аналогичным.

Предположим, у нас есть две функции, которые внутренне выполняют RP C:

def perform_rpc(client, element):
  .... # Some Code to run one RPC for the element using the client

def perform_batched_rpc(client, elements):
  .... # Some code that runs a single RPC for a batch of elements using the client

Давайте Также предположим, что у вас есть функция create_client(), которая возвращает клиента для вашей внешней системы. Мы предполагаем, что создание этого клиента довольно дорогое, и невозможно поддерживать много клиентов в одном воркере (например, из-за ограничений памяти)

Выполнение одного RP C на элемент

Обычно нормально выполнять блокирующую RP C для каждого элемента, но это может привести к низкой загрузке ЦП

class IndividualBlockingRpc(DoFn):

  def setup(self):
    # Create the client only once per Fn instance
    self.client = create_client()

  def process(self, element):
    perform_rpc(self.client, element)

Если вы хотите быть более сложным, вы также можете попробовать запустить асинхронный RPC путем буферизации. Учтите, что в этом случае ваш клиент должен быть потокобезопасным:

class AsyncRpcs(DoFn):
  def __init__(self):
    self.buffer = None
    self.client = None

  def process(self, element):
    self.buffer.append(element)
    if len(self.buffer) > MAX_BUFFER_SIZE:
      self._flush()

  def finish(self):
    self._flush()

  def _flush(self):
    if not self.client:
      self.client = create_client()
    if not self.executor:
      self.executor = ThreadPoolExecutor() # Use a configured executor 

    futures = [self.executor.submit(perform_rpc, client, elm)
               for elm in self.elements]
    for f in futures:
      f.result()  # Finalize all the futures
    self.buffer = []

Выполнение одного RP C для пакета элементов

Для большинства исполнителей пакетный конвейер имеет большой пучки . Это означает, что имеет смысл просто буферизовать элементы по мере их поступления в process, и грипп sh их время от времени, например:

class BatchAndRpc(DoFn):
  def __init__(self):
    self.buffer = None
    self.client = None

  def process(self, element):
    self.buffer.append(element)
    if len(self.buffer) > MAX_BUFFER_SIZE:
      self._flush()

  def finish(self):
    self._flush()

  def _flush(self):
    if not self.client:
      self.client = create_client()
    perform_batched_rpc(client, self.buffer)
    self.buffer = []

Для потоковых конвейеров или для конвейеров, где вы пакеты недостаточно велики для того, чтобы эта стратегия работала хорошо, вам может потребоваться попробовать другие уловки, но этой стратегии должно быть достаточно для большинства сценариев ios.

Если эти стратегии не работают, позвольте мне знаю, и я подробно расскажу о других.

...