Как использовать Apache Flink с поисковыми данными? - PullRequest
1 голос
/ 16 апреля 2019

.

Привет

с использованием Apache Flink 1.8. У меня есть поток записей, поступающих из Kafka в формате JSON и фильтрующих их, и все это прекрасно работает.

Теперь я хотел бы обогатить данные из Kafka поисковым значением из таблицы базы данных.

Это просто случай создания 2 потоков, загрузки таблицы во 2-й поток и последующего объединения данных?

Таблица базы данных обновляется, но не часто, и я хотел бы не искать БД по каждой записи, поступающей через поток.

1 Ответ

1 голос
/ 16 апреля 2019

Флинк имеет состояние, которым вы можете воспользоваться здесь.Я сделал что-то похожее, где я взял ежедневный запрос из моей таблицы поиска (в моем случае это был массовый вызов веб-службы) и через результаты превратился в тему кафки.Эта тема кафки использовалась той же задачей, что и данные, необходимые для поиска.Обе темы были обозначены одним и тем же значением, но я использовал тему поиска для хранения данных в состоянии с ключами, и при обработке другой темы я бы вытащил данные из состояния.

У меня было несколькодополнительная логика, чтобы проверить, было ли еще состояние NO для данного ключа.Если бы это было так, я бы сделал асинхронный запрос к веб-сервису.Однако вам может и не понадобиться делать это.

Предостережение в том, что у меня была память для управления состоянием, и моя таблица поиска насчитывала всего около 30 миллионов записей, около 100 гигабайт распределялись по 45 слотам на 15 узлах.

[В ответ на вопрос в комментариях] Извините, но мой ответ был слишком длинным, поэтому пришлось редактировать мой пост:

У меня была работа на python, которая загружала данные с помощью массового вызова REST(ваш может просто сделать поиск данных).Затем он преобразовал данные в правильный формат и сбросил их в Kafka.Тогда у моего потока мерцаний было два источника, один был темой «реальных данных», другой - темой «данных поиска».Данные, поступающие из темы поиска данных, были сохранены в состоянии (я использовал ValueState, потому что каждый ключ сопоставлен с одним возможным значением, но есть других типов состояний . У меня также было время истечения 24 часа для каждой записи, но это был мой вариант использования.

Хитрость в том, что та же операция, которая сохраняет значение в состоянии из темы поиска, должна быть операцией, которая возвращает значение из состояния «реальная» тема обратно.Это потому, что состояние мигания (даже состояния клавиш) связаны с оператором, который их создал.

...