функция процесса DoFn не выполняется - PullRequest
1 голос
/ 11 июня 2019

Я пытаюсь написать преобразование луча, например

util.py

class GroupIntoBatches(PTransform):
  def __init__(self, batch_size):
    self.batch_size = batch_size

  @staticmethod
  def of_size(batch_size):
    return GroupIntoBatches(batch_size)

  def expand(self, pcoll):
    input_coder = coders.registry.get_coder(pcoll)
    if not input_coder.is_kv_coder():
          raise ValueError(
            'coder specified in the input PCollection is not a KvCoder')
    key_coder = input_coder.key_coder()
    value_coder = input_coder.value_coder()

    return pcoll | ParDo(_GroupIntoBatchesDoFn(self.batch_size, key_coder, value_coder))


class _GroupIntoBatchesDoFn(DoFn):
    def __init__(self, batch_size, input_key_coder, input_value_coder):
      self.batch_size = batch_size
      self.batch_spec = BagStateSpec("GroupIntoBatches", input_value_coder)

    def process(self, element):
      raise Exception("Not getting to this point") # This is not working
      print element

Попытка выполнить это преобразование с помощью контрольного примера

util_test.py

class GroupIntoBatchesTest(unittest.TestCase):
  NUM_ELEMENTS = 10
  BATCH_SIZE = 5

  @staticmethod
  def _create_test_data():
    scientists = [
      "Einstein",
      "Darwin",
      "Copernicus",
      "Pasteur",
      "Curie",
      "Faraday",
      "Newton",
      "Bohr",
      "Galilei",
      "Maxwell"
    ]

    data = []
    for i in range(GroupIntoBatchesTest.NUM_ELEMENTS):
      index = i % len(scientists)
      data.append(("key", scientists[index]))
    return data

  def test_in_global_window(self):
    pipeline = TestPipeline()
    collection = pipeline | beam.Create(GroupIntoBatchesTest._create_test_data()) | util.GroupIntoBatches.of_size(GroupIntoBatchesTest.BATCH_SIZE)

Мой вопрос: по какой причине функция process не вызывается на моем _GroupIntoBatchesDoFn

Я получаю этот результат при запуске моего теста

test_in_global_window (apache_beam.transforms.util_test.GroupIntoBatchesTest) ... хорошо

1 Ответ

1 голос
/ 13 июня 2019

Ваш тест строит конвейер, но фактически не выполняет его.Вам нужно либо написать

pipeline = TestPipeline()
collection = pipeline | ...
pipeline.run()

, либо, альтернативно,

with TestPipeline() as pipeline:
    collection = pipeline | ...
# run is implicitly called on exit of the with block

(Вас также может заинтересовать преобразование BatchElements .)

...