Настройка ответа
Я помогу, написав примеры DoFns, которые делают то, что вы хотите. Я напишу их в Python, но код Java будет аналогичным.
Предположим, у нас есть две функции, которые внутренне выполняют RP C:
def perform_rpc(client, element):
.... # Some Code to run one RPC for the element using the client
def perform_batched_rpc(client, elements):
.... # Some code that runs a single RPC for a batch of elements using the client
Давайте Также предположим, что у вас есть функция create_client()
, которая возвращает клиента для вашей внешней системы. Мы предполагаем, что создание этого клиента довольно дорогое, и невозможно поддерживать много клиентов в одном воркере (например, из-за ограничений памяти)
Выполнение одного RP C на элемент
Обычно нормально выполнять блокирующую RP C для каждого элемента, но это может привести к низкой загрузке ЦП
class IndividualBlockingRpc(DoFn):
def setup(self):
# Create the client only once per Fn instance
self.client = create_client()
def process(self, element):
perform_rpc(self.client, element)
Если вы хотите быть более сложным, вы также можете попробовать запустить асинхронный RPC путем буферизации. Учтите, что в этом случае ваш клиент должен быть потокобезопасным:
class AsyncRpcs(DoFn):
def __init__(self):
self.buffer = None
self.client = None
def process(self, element):
self.buffer.append(element)
if len(self.buffer) > MAX_BUFFER_SIZE:
self._flush()
def finish(self):
self._flush()
def _flush(self):
if not self.client:
self.client = create_client()
if not self.executor:
self.executor = ThreadPoolExecutor() # Use a configured executor
futures = [self.executor.submit(perform_rpc, client, elm)
for elm in self.elements]
for f in futures:
f.result() # Finalize all the futures
self.buffer = []
Выполнение одного RP C для пакета элементов
Для большинства исполнителей пакетный конвейер имеет большой пучки . Это означает, что имеет смысл просто буферизовать элементы по мере их поступления в process
, и грипп sh их время от времени, например:
class BatchAndRpc(DoFn):
def __init__(self):
self.buffer = None
self.client = None
def process(self, element):
self.buffer.append(element)
if len(self.buffer) > MAX_BUFFER_SIZE:
self._flush()
def finish(self):
self._flush()
def _flush(self):
if not self.client:
self.client = create_client()
perform_batched_rpc(client, self.buffer)
self.buffer = []
Для потоковых конвейеров или для конвейеров, где вы пакеты недостаточно велики для того, чтобы эта стратегия работала хорошо, вам может потребоваться попробовать другие уловки, но этой стратегии должно быть достаточно для большинства сценариев ios.
Если эти стратегии не работают, позвольте мне знаю, и я подробно расскажу о других.