Я пытаюсь выяснить, возможно ли отправить элементы PCollection в родительский процесс при использовании DirectRunner
из Python Apache Beam SDK.
Однако я столкнулся со странной ошибкой, когда всеКажется, работает нормально, когда создается очередь и конвейер вызывается внутри секции __main__
скрипта, но не тогда, когда тот же код вызывается внутри подфункции.Я предполагаю, что это происходит из-за некоторых травлений / укропов, которые происходят под сценой, но было бы желательно получить более конкретное объяснение.
Файл /tmp/inputs/winterstale.txt
, используемый ниже, можно загрузить с: https://storage.googleapis.com/apache-beam-samples/shakespeare/winterstale.txt
from __future__ import print_function
import atexit
import queue
import tempfile
import time
import unittest
import apache_beam as beam
from apache_beam.io.filesystems import FileSystems
from apache_beam.runners.direct.direct_runner import BundleBasedDirectRunner
from apache_beam.runners.interactive.cache_manager import FileBasedCacheManager
from apache_beam.runners.interactive.cache_manager import ReadCache
from apache_beam.runners.interactive.cache_manager import WriteCache
def add_to_queue(element, queue):
queue.put(element)
def write_to_queue():
q = queue.Queue()
with beam.Pipeline(runner=BundleBasedDirectRunner()) as p:
_ = (
p
| "Read" >> beam.io.ReadFromText("/tmp/inputs/winterstale.txt")
| "Remove whitespace" >> beam.Map(lambda element: element.strip("\n\t|"))
| "Remove empty lines" >> beam.FlatMap(lambda element: [element] if element else [])
| "Write" >> beam.Map(lambda element: add_to_queue(element, queue=q))
)
return list(q.queue)
if __name__ == "__main__":
cache_location = tempfile.mkdtemp()
atexit.register(FileSystems.delete, [cache_location])
# Using a function call
cache_manager = FileBasedCacheManager(cache_dir=cache_location)
result1 = write_to_queue()
print(len(result1)) # >>> prints "0" <<<
# Copy-pasing the code from "write_to_queue()"
q = queue.Queue()
with beam.Pipeline(runner=BundleBasedDirectRunner()) as p:
_ = (
p
| "Read" >> beam.io.ReadFromText("/tmp/inputs/winterstale.txt")
| "Remove whitespace" >> beam.Map(lambda element: element.strip("\n\t|"))
| "Remove empty lines" >> beam.FlatMap(lambda element: [element] if element else [])
| "Write" >> beam.Map(lambda element: add_to_queue(element, queue=q))
)
result2 = list(q.queue) # >>> prints "3561" <<<
print(len(result2))