Прозрачная потоковая передача и пакетная обработка - PullRequest
0 голосов
/ 13 марта 2019

Я все еще довольно новичок в мире потоковой и пакетной обработки и пытаюсь понять концепции и речи.Весьма возможно, что ответ на мой вопрос хорошо известен, его легко найти или даже сто раз ответили здесь, в SO, но я не смог его найти.

Справочная информация:

Я работаю в большом научном проекте (исследование ядерного синтеза), и мы производим тонны данных измерений во время экспериментов.Эти данные в основном представляют собой потоки сэмплов, помеченных наносекундной временной меткой, где сэмплы могут быть чем угодно, от одного значения АЦП, через массив таких данных, до глубоко структурированных данных (до сотен записей от 1-битного логического значения до 64-битной двойной точностиплавает) на необработанные HD видеокадры или даже текстовые сообщения.Если я правильно понимаю общие термины, я буду считать наши данные «табличными данными», по большей части.

Мы работаем в основном с самодельными программными решениями от сбора данных до простого онлайн (потокового) анализа (например,масштабирование, субдискретизация и т. д.) к нашим собственным средствам хранения, управления и доступа к данным.

Учитывая масштаб операции и усилия по поддержке всех этих реализаций, мы изучаем возможности использования стандартных структур иинструменты для более наших задач.

Мой вопрос:

В частности, на этом этапе мы сталкиваемся с необходимостью все более и более изощренной (автоматизированной и ручной) аналитики данных для данных в реальном времени / онлайн / в реальном времени, а также «после»факт "автономной / пакетной аналитики" исторических "данных.С этой целью я пытаюсь понять, могут ли и как существующие аналитические структуры, такие как Spark, Flink, Storm и т. Д. (Возможно, поддерживаемые очередями сообщений, такими как Kafka, Pulsar, ...), поддерживать сценарий, в котором

  • данные передаются / передаются в платформу / платформу, прикрепляются идентификаторы, такие как URL или идентификатор или тому подобное
  • платформа взаимодействует со встроенным или внешним хранилищем для сохранения потоковых данных (в течение многих лет), связанныхс идентификатором
  • аналитические процессы теперь могут прозрачно запрашивать / анализировать данные, адресованные идентификатором и произвольным (открытым или закрытым) временным окном, а инфраструктура предоставляет пакеты / выборки данных для анализа либо из внутреннего хранилища, либо из поступающегов режиме реального времени из сбора данных

Простая потоковая передача онлайн-данных в хранилище и выполнение запросов оттуда не представляется возможным, поскольку нам необходимы как необработанные, так и проанализированные данные для мониторинга в реальном времени и контроля обратной связи в реальном времени эксперимента.Кроме того, предоставление пользователю возможности запрашивать либо живой входной сигнал, либо историческую партию из хранилища по-другому не было бы идеальным решением, поскольку наши физики в большинстве своем не являются специалистами по данным, и мы хотели бы держать в стороне такие «технические детали» и в идеале использовать те же алгоритмы.следует использовать для анализа новых данных в реальном времени и старых хранимых данных из предыдущих экспериментов.

Sitenotes:

  • речь идет о загрузке данных в режиме просмотра в диапазоне 10-й гигабит в секундуприходя пакетами продолжительностью от секунд до минут - могут ли это быть обработаны кандидатами?
  • мы используем временные метки в наносекундном разрешении, даже думая о пико - это накладывает некоторые ограничения на список возможных кандидатов, еслиЯ правильно понимаю?

Я был бы очень рад, если бы кто-нибудь смог понять мой вопрос и пролить некоторый свет на эту тему для меня: -)

Большое спасибо и добрыйС уважением, Беппо

Ответы [ 2 ]

2 голосов
/ 14 марта 2019

Не думаю, что кто-то может сказать: «Да, Framework X определенно может справиться с вашей рабочей нагрузкой», потому что это во многом зависит от того, что вам нужно от обработки сообщений, например относительно надежности обмена сообщениями и того, как ваши потоки данных могут быть разделены.

Вы можете быть заинтересованы в BenchmarkingDistributedStreamProcessingEngines . В статье используются версии Storm / Flink / Spark, которым несколько лет (похоже, они были выпущены в 2016 году), но, может быть, авторы захотят позволить вам использовать их тест для оценки более новых версий трех платформ?

Очень распространенная установка для потоковой аналитики - использовать источник данных -> Kafka / Pulsar -> аналитическая среда -> долговременное хранилище данных. Это позволяет отделить обработку от загрузки данных и позволяет выполнять такие вещи, как повторная обработка исторических данных, как если бы они были новыми.

Я думаю, что первым шагом для вас должно быть выяснить, сможете ли вы получить необходимый объем данных через Kafka / Pulsar. Либо создайте набор тестов вручную, либо извлеките некоторые данные, которые, по вашему мнению, могут быть репрезентативными для вашей производственной среды, и посмотрите, сможете ли вы передать их через Kafka / Pulsar с необходимой вам пропускной способностью / задержкой.

Не забудьте рассмотреть возможность разделения ваших данных. Если некоторые из ваших потоков данных могут быть обработаны независимо (т.е. порядок не имеет значения), вам не следует помещать их в одни и те же разделы. Например, вероятно, нет причин смешивать измерения датчиков и потоки видеопотока. Если вы можете разделить ваши данные на независимые потоки, вы с меньшей вероятностью столкнетесь с узкими местами как в Kafka / Pulsar, так и в аналитической среде. Отдельные потоки данных также позволят вам намного лучше распараллелить обработку в аналитической среде, например, вы можете запустить, например, подача видео и обработка сенсора на разных машинах.

Как только вы узнаете, можете ли вы получить достаточную пропускную способность через Kafka / Pulsar, вы должны написать небольшой пример для каждой из 3 платформ. Для начала я бы просто получил и отбросил данные из Kafka / Pulsar, которые должны сообщить вам заранее, есть ли узкое место на пути аналитики Kafka / Pulsar ->. После этого вы можете расширить пример, чтобы сделать что-то интересное с данными примера, например, сделайте немного обработки, как то, что вы, возможно, захотите сделать в производстве.

Вам также необходимо учитывать, какие виды обработки вам необходимы для ваших потоков данных. Как правило, вы платите штраф за производительность за гарантированную обработку по крайней мере один раз или ровно один раз. Для некоторых типов данных (например, видеопотока) иногда может быть уместно терять сообщения. Выбрав необходимую гарантию, вы можете соответствующим образом настроить аналитические структуры (например, отключить взлом в Storm) и попробовать сравнительный анализ своих тестовых данных.

Просто, чтобы ответить на некоторые ваши вопросы более четко:

Случай использования анализа / мониторинга данных в реальном времени звучит так, как будто он достаточно хорошо подходит для систем Storm / Flink. Подключите его непосредственно к Kafka / Pulsar, а затем сделайте любую аналитику, которая вам нужна, звучит так, как будто она может работать для вас.

Повторная обработка исторических данных будет зависеть от того, какие запросы вам нужно делать. Если вам просто нужен временной интервал + идентификатор, вы, вероятно, можете сделать это с помощью Kafka плюс фильтр или соответствующее разбиение. Kafka позволяет начать обработку с определенной временной отметки, и если ваши данные разделены по идентификатору или вы фильтруете их в качестве первого шага в своей аналитике, вы можете начать с предоставленной временной отметки и прекратить обработку, когда нажмете сообщение вне временного окна. Это применимо только в том случае, если временная метка, которая вас интересует, - это когда сообщение было добавлено в Kafka. Я также не верю, что Kafka поддерживает разрешение менее миллисекунды для генерируемых временных отметок.

Если вам нужно выполнить более сложные запросы (например, вам нужно посмотреть на временные метки, сгенерированные вашими датчиками), вы можете посмотреть, используя Cassandra или Elasticsearch или Solr как ваше постоянное хранилище данных.Вы также захотите выяснить, как вернуть данные из этих систем обратно в вашу аналитическую систему.Например, я считаю, что Spark поставляется с соединителем для чтения из Elasticsearch, в то время как Elasticsearch предоставляет соединитель для Storm.Вы должны проверить, существует ли такой соединитель для вашей комбинации хранилища данных и аналитической системы, или быть готовым написать свой собственный.

Редактировать: Обрабатывать, чтобы ответить на ваш комментарий.

Я не знал, чтоKafka или Pulsar поддерживают временные метки, указанные пользователем, но, несомненно, они оба do .Я не вижу, что Pulsar поддерживает временные метки менее миллисекунды, хотя?

Идея, которую вы описываете, определенно может быть поддержана Кафкой.

Вам нужна возможность запустить клиента Kafka / Pulsar в определенное время и читать вперед.Похоже, Pulsar пока не поддерживает это, но Кафка поддерживает.

Вы должны гарантировать, что когда вы записываете данные в раздел, они поступают в порядке отметки времени.Это означает, что вам не разрешено, например, написать первое сообщение 1 с отметкой времени 10, а затем сообщение 2 с отметкой времени 5.

Если вы можете быть уверены, что пишете сообщения для Кафки, описанный вами пример будет работать,Затем вы можете сказать: «Начните с метки времени« прошлой ночью в полночь », и Кафка начнет там.Когда поступают живые данные, они получат их и добавят в конец своего журнала.Когда потребительская / аналитическая инфраструктура прочитает все данные с последней полуночи до текущего времени, она начнет ждать поступления новых (живых) данных и обрабатывает их по мере поступления. Затем вы можете написать собственный код в своей аналитической среде дляубедитесь, что он останавливает обработку, когда достигает первого сообщения с отметкой времени «завтра вечером».

Что касается поддержки меток времени менее миллисекунды, я не думаю, что Кафка или Пульсар будут поддерживать их из коробки, но вы можете обойти это достаточно легко.Просто добавьте метку времени менее миллисекунды в сообщение в качестве настраиваемого поля.Если вы хотите начать, например, с отметки времени 9 мс 10 нс, вы просите Kafka начать с 9 мс и использовать фильтр в среде аналитики, чтобы отбрасывать все сообщения между 9 мс и 9 мс 10 нс.

0 голосов
/ 26 апреля 2019

Позвольте мне добавить следующие предложения о том, как Apache Pulsar может помочь удовлетворить некоторые ваши требования.Пища для размышления.

"данные перетекают / передаются в платформу / фреймворк, прикрепляется идентификатор, такой как URL или идентификатор или что-то подобное"

Возможно, вы захотитепосмотрите на Функции Pulsar , которые позволяют вам писать простые функции (в Java или Python), которые выполняются для каждого отдельного сообщения, публикуемого в теме.Они идеально подходят для этого варианта использования дополнения данных.

платформа взаимодействует со встроенным или внешним хранилищем для сохранения потоковых данных (в течение многих лет), связанных с идентификатором

Pulsar недавно добавил многоуровневое хранилище , которое позволяет вам сохранять потоки событий в S3, хранилище BLOB-объектов Azure или облачном хранилище Google.Это позволило бы вам годами хранить данные в дешевом и надежном хранилище данных.

аналитические процессы теперь могут прозрачно запрашивать / анализировать данные, адресованные идентификатором и произвольным (открытым или закрытым) временным окном,и инфраструктура предоставляет пакеты данных / выборки для анализа либо из внутреннего хранилища, либо в режиме реального времени после получения данных

Apache Pulsar также добавил интеграцию с механизмом запросов Presto, которыйпозволит вам запросить данные за определенный период времени (включая данные из многоуровневого хранилища) и поместить их в тему для обработки.

...