У меня есть класс ParDo, который проверяет, содержит ли сообщение pubsub определенные атрибуты, и возвращает действительный и недействительный TaggedOutput (который отлично работает в обычном потоке, используя yield для возврата значений), и я не могу выполнить модульное тестирование для этого класса я пытаюсь предоставить фиктивное сообщение (словарь, реплицирующий информацию о сообщении pubsub) и хочу проверить, содержат ли выходные данные класса другие атрибуты.
это то, что у меня так далеко:
class TestValidateMessage(unittest.TestCase):
def test_not_valid(self):
with TestPipeline() as p:
pcoll = (
p
| beam.Create([{'attributes':{"imageUrl":000}}])
| beam.ParDo(ValidateMessage()).with_outputs(
'invalid', main='valid'))
valid, _ = pcoll
invalid = pcoll['invalid']
print(invalid)
assert_that(invalid, {'failure_step':'Message validation'})
При этом я получаю сообщение об ошибке:
TypeError: Map can be used only with callable objects. Received {'failure_step'} instead.
Когда я пытаюсь распечатать (недействительно), я получаю PCollection[ParDo(ValidateMessage)/ParDo(ValidateMessage).invalid]
Как получить доступ (для подтверждения целей) к содержимому PCollection?