Декартово произведение PCollection с самим собой - PullRequest
0 голосов
/ 19 февраля 2020

Скажем, у меня ограниченная PCollection p типа KV<String, Integer>. Предположим, что p не может поместиться в памяти и, следовательно, не может быть побочным вводом DoFn.

Пример p:

("foo", 0)
("bar", 1)
("baz", 2)

Как я могу взять декартово произведение p и самого себя?

Например, p x p может выглядеть следующим образом:

("foo+foo", [("foo", 0), ("foo", 0)])
("foo+bar", [("foo", 0), ("bar", 1)])
("foo+baz", [("foo", 0), ("baz", 2)])
("bar+foo", [("bar", 1), ("foo", 0)])
("bar+bar", [("bar", 1), ("bar", 1)])
("bar+baz", [("bar", 1), ("baz", 2)])
("baz+foo", [("baz", 2), ("foo", 0)])
("baz+bar", [("baz", 2), ("bar", 1)])
("baz+baz", [("baz", 2), ("baz", 2)])

Ответы [ 2 ]

0 голосов
/ 20 февраля 2020

Я предложу решение, используя Python.

Прежде всего, давайте реализуем алгоритм и после этого решим проблемы с ограничением памяти

import itertools

# Let's build a list with your pairs
collection_items = [("foo", 0), ("bar", 1), ("baz", 2)]

"""
A Python generator is a function that produces a sequence of results. 
It works by maintaining its local state, so that the function can resume again exactly where 
it left off when called subsequent times. Same generator can't be used twice.
I will explain a little later why I use generators
"""

collection_generator1 = (el for el in collection_items)  # Create the first generator
# For example; calling next(collection_generator1) => ("foo", 0); next(collection_generator1) => ("bar", 1),
# next(collection_generator1) => ("bar": 2)
collection_generator2 = (el for el in collection_items) # Create the second generator
cartesian_product = itertools.product(collection_generator1, collection_generator2) # Create the cartesian product

for pair in cartesian_product:
    first_el, second_el = pair
    str_pair1, val_pair1 = first_el
    str_pair2, val_pair2 = first_el

    name = "{str_pair1}+{str_pair2}".format(str_pair1=str_pair1, str_pair2=str_pair2)
    item = (name, [first_el, second_el]) # Compose the item
    print(item)

# OUTPUT

('foo+foo', [('foo', 0), ('foo', 0)])
('foo+foo', [('foo', 0), ('bar', 1)])
('foo+foo', [('foo', 0), ('baz', 2)])
('bar+bar', [('bar', 1), ('foo', 0)])
('bar+bar', [('bar', 1), ('bar', 1)])
('bar+bar', [('bar', 1), ('baz', 2)])
('baz+baz', [('baz', 2), ('foo', 0)])
('baz+baz', [('baz', 2), ('bar', 1)])
('baz+baz', [('baz', 2), ('baz', 2)])

Теперь давайте решим проблемы с памятью

Поскольку у вас много данных, хорошей идеей будет сохранить их в файле, записав пару в каждой строке (как в вашем примере) Теперь давайте прочитаем файл («input.txt») и создадим генератор с его данными.

file_generator_1 = (line.strip() for line in open("input.txt"))
file_generator_2 = (line.strip() for line in open("input.txt").readlines())

Теперь единственная модификация, которую вам нужно сделать, это заменить имя переменной collection_generator1, коллекционер_2 генератор_файл_1, генератор_2

0 голосов
/ 20 февраля 2020

Как вы уже догадались, самый простой способ сделать это - иметь DoFn, который обрабатывает вашу коллекцию PC как основной и побочный ввод.

Если это не сработает, потому что PCollection слишком велик для размещения в памяти, вы можете разбить его на N непересекающихся PCollections, пропустить через каждую из них, а затем сгладить результат. Например, вы могли бы написать что-то вроде

class CrossProduct(beam.PTransform):
  def expand(self, pcoll):
    N = 10
    parts = pcoll | beam.Partition(lambda element, n: hash(element) % n, N)
    cross_parts = [
        pcoll | str(ix) >> beam.FlatMap(
            lambda x, side: [(x, s) for s in side],
            beam.pvalue.AsIter(part))
        for ix, part in enumerate(parts)]
    return cross_parts | beam.Flatten()

output = input | CrossProduct()

. Обратите внимание, однако, что, если элементы вашей PCollection не являются особенно большими, если PCollection не может уместиться в память, его перекрестный продукт может быть слишком большим для производства ( и обрабатывать).

...