Вложенный конвейер в Apache Beam - PullRequest
0 голосов
/ 27 мая 2018

Мой конвейер Apache Beam принимает бесконечный поток сообщений.Каждое сообщение разветвляет N элементов (N равно ~ 1000 и отличается для каждого входа).Затем для каждого элемента, созданного на предыдущем этапе, существует операция map , которая создает новые N элементов, которые должны быть уменьшены с помощью операции top 1 (элементы группируются по исходному сообщению, котороебыл прочитан из очереди).Результаты top 1 сохраняются во внешнем хранилище.В Spark я могу легко сделать это, читая сообщения из потока и создавая RDD для каждого сообщения, которое отображает + уменьшает.Поскольку у Apache Beam нет вложенных конвейеров, я не вижу способа реализовать его в Beam с бесконечным потоком ввода.Пример:

Infinite stream elements: A, B

Step 1 (fan out, N = 3): A -> A1, A2, A3
                (N = 2): B -> B1, B2

Step 2 (map): A1, A2, A3 -> A1', A2', A3'
              B1, B2, B3 -> B1', B2'

Step 3 (top1): A1', A2', A3' -> A2'
               B1', B2' -> B3'

Output: A2', B2'

Нет зависимости между элементами A и B.A2 'и B2' являются главными элементами в своей группе.Поток бесконечен.Операция карты может занять от нескольких секунд до нескольких минут.Создание водяного знака окна в течение максимального времени, необходимого для выполнения операции с картой, значительно замедлит общее время конвейера для быстрых операций с картой.Вложенный конвейер помог бы, потому что таким образом я мог бы создать конвейер для сообщения.

Ответы [ 2 ]

0 голосов
/ 13 июня 2018

Итак, вот такое рабочее решение.Скорее всего, я буду редактировать его для любых ошибок, которые я могу сделать в понимании вопроса.(PS код шаблона находится в Java).Предполагая, что input является вашим источником потока

PCollection<Messages> msgs = input.apply(Window.<Messages>into(        
                                    FixedWindows.of(Duration.standardSeconds(1)) 
                                                .triggering(AfterWatermark.pastEndOfWindow()
                                         // fire the moment you see an element 
                                                   .withEarlyFirings(AfterPane.elementCountAtLeast(1))
                                         //optional since you have small window 
                                                   .withLateFirings(AfterProcessingTime.pastFirstElementInPane()))
                                                .withAllowedLateness(Duration.standardMinutes(60))
                                                .discardingFiredPanes()); 

Это позволит вам прочитать поток Messages, который может быть либо строкой, либо HashMap, либо даже списком.Заметьте, что вы говорите лучу открывать окно для каждого элемента, который он получает, и вы установили максимальное значение окна в 1 секунду.Вы можете изменить это, если хотите запускать каждые 10 сообщений и минутное окно и т. Д.

После этого вам нужно написать 2 класса, которые расширяют DoFn в основном

PCollection<Element> top = msgs.apply(ParDo.of(new ExtractElements()))
                               .apply(ParDo.of(new TopElement()));

Где Element может быть String, int, double и т. Д.

Наконец, вы можете каждый Element на хранение с:

top.apply(ParDo.of(new ParsetoString()))
   .apply(TextIO.write().withWindowedWrites()
                        .withNumShards(1)
                        .to(filename));

Следовательно, у вас будет примерно 1 файл для каждого сообщения, которое может быть много.Но, к сожалению, вы не можете добавить в файл.Если вы не сделаете управление окнами, где вы группируете все элементы в один список и записываете в него.

Конечно, есть хакерский способ сделать это без окон, и я объясню, если этот вариант использования не работает с вами (или если вам любопытно)

Позвольтея знаю, если я что-то пропустил!:)

0 голосов
/ 27 мая 2018

Не похоже, что для этого вам понадобится «вложенный конвейер».Позвольте мне показать вам, как это выглядит в Beam Python SDK (это похоже на Java):

Например, попробуйте фиктивную операцию добавления числа и апострофа к строке (например, "A" => "A1'"), вы бы сделали что-то вроде этого:

def my_fn(value):
  def _inner(elm):
    return (elm, elm + str(value) + "'")  # A KV-pair
  return _inner

# my_stream has [A, B]
pcoll_1 = (my_stream
           | beam.Map(my_fn(1)))
pcoll_2 = (my_stream
           | beam.Map(my_fn(2)))
pcoll_3 = (my_stream
           | beam.Map(my_fn(3)))

def top_1(elms):
  ... # Some operation

result = ((pcoll_1, pcoll_2, pcoll_3)
          | beam.CoGroupByKey()
          | beam.Map(top_1))
...