Clickhouse не использует сообщения Kafka через сложный материализованный вид - PullRequest
0 голосов
/ 14 января 2019

Сводка TLDR: движок Clickhouse Kafka, материализованное представление не будет работать со сложным оператором выбора.

Более длинная версия:

Я пытаюсь отправить большое количество точек данных JSON в Clickhouse через его движок Kafka, используя JSONEachRow. Но материализованное представление не будет правильно потреблять поток. У меня есть производитель kafka, написанный на go, который берет данные из нескольких потоков tcp и асинхронно записывает в очередь kafka.

Данные передаются таким образом:

Источники TCP -> Производитель -> Kafka -> Clickhouse (Kafka Engine) -> Материализованный вид -> Таблица назначения

Все это работает, пока все хорошо.

Я впервые попал в узкое место, когда увеличил скорость ввода данных (400 000 точек / сек), мой продюсер не смог написать достаточно быстро kafka, и соединения накапливались. Поэтому я надеялся попытаться пакетировать данные, но кажется, что Clickhouse не может принять массив json в качестве входных данных (https://clickhouse.yandex/docs/en/interfaces/formats/)

Итак, я натолкнулся на идею пакетирования точек данных в их источнике и преобразования сообщений в материализованном представлении, поэтому где раньше у меня было много отдельных сообщений:

{"t": 1547457441651445401, "i": "device_2", "c": 20001, "v": 56454654} "}

Теперь у меня есть сообщение, кратное приведенному выше и приведенное к строковому формату, с разделителями новой строки между точками.

{ "realtimes": "{\" т \ ": 1547458266855015791, \" я \ ": \" device_2 \», \ "С \": 20001, \ "v \": 56454654} \ п {\ "т \": 1547458266855015791, \ "я \": \ "device_2 \", \ "с \": 20001, \ "v \": 56454654} "}

Намерение здесь состоит в том, чтобы проанализировать и преобразовать строку в несколько значений, используя visitParamExtract в операторе select материализованного представления.

Материализованный вид:

CREATE MATERIALIZED VIEW ltdb_mat_view TO default.ltdb AS SELECT 
    visitParamExtractInt(x, 't') AS timestamp, 
    visitParamExtractString(x, 'i') AS device_id, 
    visitParamExtractInt(x, 'v') AS value FROM  (
    SELECT arrayJoin(*) AS x
    FROM 
    (
        SELECT splitByChar('\n', realtimes)
        FROM kafka_stream_realtimes 
    )  )

Кажется, он что-то делает, так как при его запуске kafka_stream_realtimes очищается, и я не могу запросить его вручную, получая ошибку «DB :: Exception: Failed to Потребитель::». но данные никогда не попадают в финальную таблицу.

Резюме:

  • Данные достигают Clickhouse, они просто исчезают и, кажется, никогда не прийти к финальному столу.
  • Если я отброшу материализованное представление, я смогу увидеть, как данные накапливаются в kafka_stream_realtimes
  • Если я запускаю материализованный запрос представления как оператор INSERT INTO затем выберите, он будет принимать данные из потока в финальный стол.
  • Я понимаю, что, может быть, просто пробиваю узкое место в кликхаус и это может никогда не сработать, но я хочу пройти через это полнота

Для полноты: kafka_stream_realimes:

CREATE TABLE IF NOT EXISTS kafka_stream_realtimes(realtimes String)
  ENGINE = Kafka('kafka:9092', 'realtimes', 'groupTest', 'JSONEachRow');

ltdb:

CREATE TABLE default.ltdb (timestamp Int64,device_id String,value Int64) ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(toDateTime(round(timestamp/1000000000)))
ORDER BY (device_id, value)
SETTINGS index_granularity=8192;

1 Ответ

0 голосов
/ 14 января 2019

но, похоже, Clickhouse не может принять массив json в качестве ввода

Кажется, мотивация состоит в том, чтобы сделать пакетный коммит на стороне производителя. Почему бы просто не сгруппировать несколько строк JSON и зафиксировать их за один раз? ClickHouse получит эти многострочные сообщения и проанализирует их для вас. Вам также может понадобиться указать kafka_row_delimiter настроек для Kafka Engine, так как большинство производителей kafka не добавляют разделитель строк в конце каждого сообщения.

Так одно сообщение становится

{ "t": 1547457441651445401,"i": "device_2","c": 20001,"v": 56454654}
{ "t": 1547457441651445402,"i": "device_2","c": 20001,"v": 56454654}
{ "t": 1547457441651445403,"i": "device_2","c": 20001,"v": 56454654}
...
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...