Я считаю, что pvalue.AsDict - это то, что вам нужно, что даст вам интерфейс в стиле словаря для бокового ввода. Вы можете найти некоторые примеры в поиске Apache Beam Github .
Вот упрощенный пример, который я только что написал, но, пожалуйста, посмотрите проверенный пример ниже (хотя и немного более сложный) В случае, если я допустил ошибку.
class ComputeHashes(beam.DoFn):
def process(self, element):
# use the element as a key to produce a KV, value is not used
yield (HashFunction(element), true)
initial_elements = beam.Create("foo")
computed_hashes = initial_elements | beam.ParDo(ComputeHashes())
class FilterIfAlreadyComputedHash(beam.DoFn):
def process(self, element, hashes):
# Filter if it already exists in hashes
if not hashes.get(element):
yield element
more_elements = beam.Create("foo", "bar") # Read from your pipeline's source
small_words = more_elements | beam.ParDo(FilterIfAlreadyComputedHash(), beam.pvalue.AsDict(computed_hashes))
В проверенном примере из репозитория луча github в visionml_test.py коллекция PC преобразуется в представление словаря с использованием луча. PValue.AsDict ().
class VisionMlTestIT(unittest.TestCase):
def test_text_detection_with_language_hint(self):
IMAGES_TO_ANNOTATE = [
'gs://apache-beam-samples/advanced_analytics/vision/sign.jpg'
]
IMAGE_CONTEXT = [vision.types.ImageContext(language_hints=['en'])]
with TestPipeline(is_integration_test=True) as p:
contexts = p | 'Create context' >> beam.Create(
dict(zip(IMAGES_TO_ANNOTATE, IMAGE_CONTEXT)))
output = (
p
| beam.Create(IMAGES_TO_ANNOTATE)
| AnnotateImage(
features=[vision.types.Feature(type='TEXT_DETECTION')],
context_side_input=beam.pvalue.AsDict(contexts))
| beam.ParDo(extract))
Боковой ввод передается в FlatMap (в visionml.py ), а в функции FlatMap запись извлекается из словаря с .get (). Это также может быть передано в карту или ParDo. См. beam python боковая входная документация (здесь вместо них используется .AsSingleton .AsDict). Здесь вы можете найти пример использования его в вызове процесса.
class AnnotateImage(PTransform):
"""A ``PTransform`` for annotating images using the GCP Vision API.
ref: https://cloud.google.com/vision/docs/
Batches elements together using ``util.BatchElements`` PTransform and sends
each batch of elements to the GCP Vision API.
Element is a Union[text_type, binary_type] of either an URI (e.g. a GCS URI)
or binary_type base64-encoded image data.
Accepts an `AsDict` side input that maps each image to an image context.
"""
MAX_BATCH_SIZE = 5
MIN_BATCH_SIZE = 1
def __init__(
self,
features,
retry=None,
timeout=120,
max_batch_size=None,
min_batch_size=None,
client_options=None,
context_side_input=None,
metadata=None):
"""
Args:
features: (List[``vision.types.Feature.enums.Feature``]) Required.
The Vision API features to detect
retry: (google.api_core.retry.Retry) Optional.
A retry object used to retry requests.
If None is specified (default), requests will not be retried.
timeout: (float) Optional.
The time in seconds to wait for the response from the Vision API.
Default is 120.
max_batch_size: (int) Optional.
Maximum number of images to batch in the same request to the Vision API.
Default is 5 (which is also the Vision API max).
This parameter is primarily intended for testing.
min_batch_size: (int) Optional.
Minimum number of images to batch in the same request to the Vision API.
Default is None. This parameter is primarily intended for testing.
client_options:
(Union[dict, google.api_core.client_options.ClientOptions]) Optional.
Client options used to set user options on the client.
API Endpoint should be set through client_options.
context_side_input: (beam.pvalue.AsDict) Optional.
An ``AsDict`` of a PCollection to be passed to the
_ImageAnnotateFn as the image context mapping containing additional
image context and/or feature-specific parameters.
Example usage::
image_contexts =
[(''gs://cloud-samples-data/vision/ocr/sign.jpg'', Union[dict,
``vision.types.ImageContext()``]),
(''gs://cloud-samples-data/vision/ocr/sign.jpg'', Union[dict,
``vision.types.ImageContext()``]),]
context_side_input =
(
p
| "Image contexts" >> beam.Create(image_contexts)
)
visionml.AnnotateImage(features,
context_side_input=beam.pvalue.AsDict(context_side_input)))
metadata: (Optional[Sequence[Tuple[str, str]]]): Optional.
Additional metadata that is provided to the method.
"""
super(AnnotateImage, self).__init__()
self.features = features
self.retry = retry
self.timeout = timeout
self.max_batch_size = max_batch_size or AnnotateImage.MAX_BATCH_SIZE
if self.max_batch_size > AnnotateImage.MAX_BATCH_SIZE:
raise ValueError(
'Max batch_size exceeded. '
'Batch size needs to be smaller than {}'.format(
AnnotateImage.MAX_BATCH_SIZE))
self.min_batch_size = min_batch_size or AnnotateImage.MIN_BATCH_SIZE
self.client_options = client_options
self.context_side_input = context_side_input
self.metadata = metadata
def expand(self, pvalue):
return (
pvalue
| FlatMap(self._create_image_annotation_pairs, self.context_side_input)
| util.BatchElements(
min_batch_size=self.min_batch_size,
max_batch_size=self.max_batch_size)
| ParDo(
_ImageAnnotateFn(
features=self.features,
retry=self.retry,
timeout=self.timeout,
client_options=self.client_options,
metadata=self.metadata)))
@typehints.with_input_types(
Union[text_type, binary_type], Optional[vision.types.ImageContext])
@typehints.with_output_types(List[vision.types.AnnotateImageRequest])
def _create_image_annotation_pairs(self, element, context_side_input):
if context_side_input: # If we have a side input image context, use that
image_context = context_side_input.get(element)
else:
image_context = None
if isinstance(element, text_type):
image = vision.types.Image(
source=vision.types.ImageSource(image_uri=element))
else: # Typehint checks only allows text_type or binary_type
image = vision.types.Image(content=element)
request = vision.types.AnnotateImageRequest(
image=image, features=self.features, image_context=image_context)
yield request
Обратите внимание, что в Java вы используете его как .asMap () .