Производитель Kafka с использованием HiveStorageHandler - PullRequest
0 голосов
/ 25 ноября 2018

Я относительно новичок в hive / hadoop

Я читал эти Обработчики хранилища Hive .

Теперь я пытаюсь написать собственную реализацию HiveStorageHandler длязапрашивать и отправлять сообщения в Kafka, используя Hive Table.

Я видел, что есть и другие реализации HiveStorageHandler, которые позволяют нам запрашивать и записывать в базы данных NoSQL, используя таблицы кустов.

Я пытаюсь повторить это для Кафки.Я нашел проект для него

HiveKa - запрос Kafka с использованием Hive

Здесь они пытаются прочитать данные из Kafka, используя запросы к таблице кустов.Я хочу написать тему кафки, используя вставку на столе.

Может кто-нибудь, пожалуйста, подскажите мне об этом?

Ответы [ 2 ]

0 голосов
/ 04 апреля 2019

Я хочу написать тему кафки, используя вставку в таблицу.

Это возможно при использовании Kafka HiveStorageHandler.Ниже приведены общие варианты использования этой функции

  1. Запрос тем Kafka
  2. Запрос данных из тем Kafka и вставка в управляемую / внешнюю таблицу куста
  3. Запрос данных из Kafkaразделы и вставьте их в другие разделы Kafka
  4. Запросите данные из внешней / управляемой таблицы куста и вставьте разделы в темы Kafka

Вы пытаетесь выполнить третий вариант использования.

Сначала создайте две внешние таблицы для исходной и целевой тем Kafka.

create external table if not exists source_topic_table
(
<fields>
)
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES (
'kafka.topic'='source_topic_name',
'kafka.bootstrap.servers'=''
);


create external table if not exists target_topic_table
(
<fields>
)
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES (
'kafka.topic'='target_topic_name',
'kafka.bootstrap.servers'=''
);

Затем используйте запрос объединения для вставки данных в целевую тему Kafka

merge into target_topic_table
using (
select
<columns>,
cast(null as binary) as `__key`,
cast(null as int) as `__partition`,
cast(-1 as bigint) as `__offset`,
cast(to_epoch_milli(current_timestamp) as bigint) as `__timestamp`
from source_topic_table
) sub
on
sub.column_name = target_topic_table.coulmn_name <Some condition>
when not matched then insert values
(
<sub.columns>,
sub.`__key`,sub.`__partition`,sub.`__offset`,sub.`__timestamp`
);

Примечание:

  1. Используется внешняя ненативная таблица Hive

  2. Помимо пользовательской схемы полезной нагрузки, обработчик хранилища Kafka добавляет 4 дополнительных столбца (__key, __partition, __offset, __timestmap), которые пользователи могут использовать для запроса полей метаданных Kafka

  3. Пользователи должны установить свойство таблицы 'kafka.serde.class', если данные не находятся в csvformat

  4. Пользователи также могут установить свойство таблицы 'kafka.write.semantic', которое допускает значение NONE, AT_LEAST_ONCE или EXACTLY_ONCE.

0 голосов
/ 25 ноября 2018

Если я правильно понимаю, вы хотите прочитать события из Hive и нажать на Кафку.У меня нет опыта работы с обработчиками хранилищ, но я бы предпочел написать соответствующий код для вывода в Kafka, а затем передать эти события в Hadoop / Hive.

В Kafka есть инфраструктура, называемая Kafka Connect, которая выполняет запись во внешние системы. Confluent написал такой Connector для HDFS, который предлагает поддержку Hive, обновляя метасторское хранилище Hive всякий раз, когда файл записывается в HDFS.

Не записывая обработчик хранилища, вы можете попробовать использовать соединитель JDBC Source или иным образом Spark / Flink, чтобы прочитать эти данные из Hive и протолкнуть их в Kafka.

Обычно Hadoop является местом назначения событий CDC, а не его источником.Главным образом, потому что это просто медленный запрос ... Если вы хотите создавать события на вставках, обычно требуется некоторое сканирование таблицы, так что генерация событий из Cassandra / Hbase может быть лучшим вариантом

...