Эффективная настройка ParDo или start_bundle для бокового ввода - PullRequest
1 голос
/ 17 марта 2020

список A: 25M хэшей
список B: 175K хэшей

Я хочу проверить каждый ha sh в списке B на наличие в списке A. Для этого у меня есть функция ParDo, и я получаю когда это не соответствует. Это процесс дедупликации.

Как эффективно настроить этот ParDo, теперь я выполняю боковой ввод списка A при обработке списка B. Но не следует ли вводить сбоку go в setup () или start_bundle ( ) ParDo, поэтому я храню список поиска (A) на рабочем месте всего один раз?

class Checknewrecords(beam.DoFn):
    def process(self, element, hashlist):
        if element['TA_HASH'] not in hashlist:
            yield element
        else:
            pass

Если у вас есть ответ, пожалуйста, включите ссылку на документацию, потому что я не нашел хорошей документации для Python версия.

  • transformed_records является коллекцией PC из предыдущего преобразования
  • current_data является коллекцией PC из BigQuery.read

    new_records = transformed_records | 'Checknewrecords' >> beam.ParDo (Checknewrecords (), pvalue.AsList (current_data))

Ответы [ 2 ]

2 голосов
/ 18 марта 2020

Извините, я изначально неправильно понял вопрос. На самом деле я не думаю, что в start_bundle возможно иметь боковой ввод. Это доступно только в process_bundle. Но вместо этого вы могли бы выполнить работу при первом вызове пакета процессов и получить аналогичный результат.

class DoFnMethods(beam.DoFn):
  def __init__(self):
    self.first_element_processed = False
    self.once_retrieved_side_input_data = None

  def called_once(self, side_input):
    if self.first_element_processed:
      return
    self.once_retrieved_side_input_data = side_input.get(...)
    self.first_element_processed = True

  def process(self, element, side_input):
    self.called_once(side_input)
    ...

Примечание: вам нужно знать, что начальный пакет и конечный sh пакет будут вызывается один раз для пакета по всем windows, и для каждого вычисляемого окна предоставляется боковой ввод для обработки. Поэтому, если вы работаете с windows, вам может понадобиться использовать dict (с ключом в окне) для переменных self.first_element_processed и self.once_retrieved_side_input_data, чтобы вы могли вызывать _on c один раз для каждого окна.

2 голосов
/ 17 марта 2020

Я считаю, что pvalue.AsDict - это то, что вам нужно, что даст вам интерфейс в стиле словаря для бокового ввода. Вы можете найти некоторые примеры в поиске Apache Beam Github .

Вот упрощенный пример, который я только что написал, но, пожалуйста, посмотрите проверенный пример ниже (хотя и немного более сложный) В случае, если я допустил ошибку.

class ComputeHashes(beam.DoFn):
  def process(self, element):
      # use the element as a key to produce a KV, value is not used
      yield (HashFunction(element), true) 

initial_elements = beam.Create("foo")
computed_hashes = initial_elements | beam.ParDo(ComputeHashes())

class FilterIfAlreadyComputedHash(beam.DoFn):
  def process(self, element, hashes):
    # Filter if it already exists in hashes
    if not hashes.get(element):
      yield element

more_elements = beam.Create("foo", "bar") # Read from your pipeline's source
small_words = more_elements | beam.ParDo(FilterIfAlreadyComputedHash(), beam.pvalue.AsDict(computed_hashes))

В проверенном примере из репозитория луча github в visionml_test.py коллекция PC преобразуется в представление словаря с использованием луча. PValue.AsDict ().

class VisionMlTestIT(unittest.TestCase):
  def test_text_detection_with_language_hint(self):
    IMAGES_TO_ANNOTATE = [
        'gs://apache-beam-samples/advanced_analytics/vision/sign.jpg'
    ]
    IMAGE_CONTEXT = [vision.types.ImageContext(language_hints=['en'])]

    with TestPipeline(is_integration_test=True) as p:
      contexts = p | 'Create context' >> beam.Create(
          dict(zip(IMAGES_TO_ANNOTATE, IMAGE_CONTEXT)))

      output = (
          p
          | beam.Create(IMAGES_TO_ANNOTATE)
          | AnnotateImage(
              features=[vision.types.Feature(type='TEXT_DETECTION')],
              context_side_input=beam.pvalue.AsDict(contexts))
          | beam.ParDo(extract))

Боковой ввод передается в FlatMap (в visionml.py ), а в функции FlatMap запись извлекается из словаря с .get (). Это также может быть передано в карту или ParDo. См. beam python боковая входная документация (здесь вместо них используется .AsSingleton .AsDict). Здесь вы можете найти пример использования его в вызове процесса.

class AnnotateImage(PTransform):
  """A ``PTransform`` for annotating images using the GCP Vision API.
  ref: https://cloud.google.com/vision/docs/
  Batches elements together using ``util.BatchElements`` PTransform and sends
  each batch of elements to the GCP Vision API.
  Element is a Union[text_type, binary_type] of either an URI (e.g. a GCS URI)
  or binary_type base64-encoded image data.
  Accepts an `AsDict` side input that maps each image to an image context.
  """

  MAX_BATCH_SIZE = 5
  MIN_BATCH_SIZE = 1

  def __init__(
      self,
      features,
      retry=None,
      timeout=120,
      max_batch_size=None,
      min_batch_size=None,
      client_options=None,
      context_side_input=None,
      metadata=None):
    """
    Args:
      features: (List[``vision.types.Feature.enums.Feature``]) Required.
        The Vision API features to detect
      retry: (google.api_core.retry.Retry) Optional.
        A retry object used to retry requests.
        If None is specified (default), requests will not be retried.
      timeout: (float) Optional.
        The time in seconds to wait for the response from the Vision API.
        Default is 120.
      max_batch_size: (int) Optional.
        Maximum number of images to batch in the same request to the Vision API.
        Default is 5 (which is also the Vision API max).
        This parameter is primarily intended for testing.
      min_batch_size: (int) Optional.
        Minimum number of images to batch in the same request to the Vision API.
        Default is None. This parameter is primarily intended for testing.
      client_options:
        (Union[dict, google.api_core.client_options.ClientOptions]) Optional.
        Client options used to set user options on the client.
        API Endpoint should be set through client_options.
      context_side_input: (beam.pvalue.AsDict) Optional.
        An ``AsDict`` of a PCollection to be passed to the
        _ImageAnnotateFn as the image context mapping containing additional
        image context and/or feature-specific parameters.
        Example usage::
          image_contexts =
            [(''gs://cloud-samples-data/vision/ocr/sign.jpg'', Union[dict,
            ``vision.types.ImageContext()``]),
            (''gs://cloud-samples-data/vision/ocr/sign.jpg'', Union[dict,
            ``vision.types.ImageContext()``]),]
          context_side_input =
            (
              p
              | "Image contexts" >> beam.Create(image_contexts)
            )
          visionml.AnnotateImage(features,
            context_side_input=beam.pvalue.AsDict(context_side_input)))
      metadata: (Optional[Sequence[Tuple[str, str]]]): Optional.
        Additional metadata that is provided to the method.
    """
    super(AnnotateImage, self).__init__()
    self.features = features
    self.retry = retry
    self.timeout = timeout
    self.max_batch_size = max_batch_size or AnnotateImage.MAX_BATCH_SIZE
    if self.max_batch_size > AnnotateImage.MAX_BATCH_SIZE:
      raise ValueError(
          'Max batch_size exceeded. '
          'Batch size needs to be smaller than {}'.format(
              AnnotateImage.MAX_BATCH_SIZE))
    self.min_batch_size = min_batch_size or AnnotateImage.MIN_BATCH_SIZE
    self.client_options = client_options
    self.context_side_input = context_side_input
    self.metadata = metadata

  def expand(self, pvalue):
    return (
        pvalue
        | FlatMap(self._create_image_annotation_pairs, self.context_side_input)
        | util.BatchElements(
            min_batch_size=self.min_batch_size,
            max_batch_size=self.max_batch_size)
        | ParDo(
            _ImageAnnotateFn(
                features=self.features,
                retry=self.retry,
                timeout=self.timeout,
                client_options=self.client_options,
                metadata=self.metadata)))

  @typehints.with_input_types(
      Union[text_type, binary_type], Optional[vision.types.ImageContext])
  @typehints.with_output_types(List[vision.types.AnnotateImageRequest])
  def _create_image_annotation_pairs(self, element, context_side_input):
    if context_side_input:  # If we have a side input image context, use that
      image_context = context_side_input.get(element)
    else:
      image_context = None

    if isinstance(element, text_type):
      image = vision.types.Image(
          source=vision.types.ImageSource(image_uri=element))
    else:  # Typehint checks only allows text_type or binary_type
      image = vision.types.Image(content=element)

    request = vision.types.AnnotateImageRequest(
        image=image, features=self.features, image_context=image_context)
    yield request

Обратите внимание, что в Java вы используете его как .asMap () .

...