Сводка TLDR: движок Clickhouse Kafka, материализованное представление не будет работать со сложным оператором выбора.
Более длинная версия:
Я пытаюсь отправить большое количество точек данных JSON в Clickhouse через его движок Kafka, используя JSONEachRow. Но материализованное представление не будет правильно потреблять поток.
У меня есть производитель kafka, написанный на go, который берет данные из нескольких потоков tcp и асинхронно записывает в очередь kafka.
Данные передаются таким образом:
Источники TCP -> Производитель -> Kafka -> Clickhouse (Kafka Engine) -> Материализованный вид ->
Таблица назначения
Все это работает, пока все хорошо.
Я впервые попал в узкое место, когда увеличил скорость ввода данных (400 000 точек / сек), мой продюсер не смог написать достаточно быстро kafka, и соединения накапливались. Поэтому я надеялся попытаться пакетировать данные, но кажется, что Clickhouse не может принять массив json в качестве входных данных (https://clickhouse.yandex/docs/en/interfaces/formats/)
Итак, я натолкнулся на идею пакетирования точек данных в их источнике и преобразования сообщений в материализованном представлении, поэтому где раньше у меня было много отдельных сообщений:
{"t": 1547457441651445401, "i": "device_2", "c": 20001, "v": 56454654} "}
Теперь у меня есть сообщение, кратное приведенному выше и приведенное к строковому формату, с разделителями новой строки между точками.
{ "realtimes": "{\" т \ ": 1547458266855015791, \" я \ ": \" device_2 \», \ "С \": 20001, \ "v \": 56454654} \ п {\ "т \": 1547458266855015791, \ "я \": \ "device_2 \", \ "с \": 20001, \ "v \": 56454654} "}
Намерение здесь состоит в том, чтобы проанализировать и преобразовать строку в несколько значений, используя visitParamExtract в операторе select материализованного представления.
Материализованный вид:
CREATE MATERIALIZED VIEW ltdb_mat_view TO default.ltdb AS SELECT
visitParamExtractInt(x, 't') AS timestamp,
visitParamExtractString(x, 'i') AS device_id,
visitParamExtractInt(x, 'v') AS value FROM (
SELECT arrayJoin(*) AS x
FROM
(
SELECT splitByChar('\n', realtimes)
FROM kafka_stream_realtimes
) )
Кажется, он что-то делает, так как при его запуске kafka_stream_realtimes очищается, и я не могу запросить его вручную, получая ошибку «DB :: Exception: Failed to Потребитель::». но данные никогда не попадают в финальную таблицу.
Резюме:
- Данные достигают Clickhouse, они просто исчезают и, кажется, никогда не
прийти к финальному столу.
- Если я отброшу материализованное представление, я смогу увидеть, как данные накапливаются в
kafka_stream_realtimes
- Если я запускаю материализованный запрос представления как оператор INSERT INTO
затем выберите, он будет принимать данные из потока в
финальный стол.
- Я понимаю, что, может быть, просто пробиваю узкое место в кликхаус
и это может никогда не сработать, но я хочу пройти через это
полнота
Для полноты:
kafka_stream_realimes:
CREATE TABLE IF NOT EXISTS kafka_stream_realtimes(realtimes String)
ENGINE = Kafka('kafka:9092', 'realtimes', 'groupTest', 'JSONEachRow');
ltdb:
CREATE TABLE default.ltdb (timestamp Int64,device_id String,value Int64) ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(toDateTime(round(timestamp/1000000000)))
ORDER BY (device_id, value)
SETTINGS index_granularity=8192;