Разбор записи (PCF) от Kafka с использованием Kafka Kusto Sink - PullRequest
0 голосов
/ 30 мая 2019

Я настроил свою среду с помощью докера на основе этого руководства .

На kafka-console-производители я отправлю эту строку:

Hazriq|27|Undegrad|UNITEN

Я хочу, чтобы эти данные передавались в Kusto следующим образом:

+--------+-----+----------------+------------+
| Name   | Age | EducationLevel | University |
+--------+-----+----------------+------------+
| Hazriq | 27  | Undegrad       | UNITEN     |
+--------+-----+----------------+------------+

Может ли это обрабатываться Kusto с помощью сопоставления (которое я все еще пытаюсь понять) или это должно обрабатываться Kafka?


Попробовал @daniel предложение:

.create table ParsedTable (name: string, age: int, educationLevel: string, univ:string)

.create table ParsedTable ingestion csv mapping 'ParsedTableMapping' '[{ "Name" : "name", "Ordinal" : 0},{ "Name" : "age", "Ordinal" : 1 },{ "Name" : "educationLevel", "Ordinal" : 2},{ "Name" : "univ", "Ordinal" : 3}]'

kusto.tables.topics_mapping=[{'topic': 'kafkatopiclugiaparser','db': 'kusto-test', 'table': 'ParsedTable','format': 'psv', 'mapping':'ParsedTableMapping'}]
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter=org.apache.kafka.connect.storage.StringConverter

, но получил вместо этого:

+----------------------------+-----+----------------+------+
| Name                       | Age | EducationLevel | Univ |
+----------------------------+-----+----------------+------+
| Hazriq|27|Undergrad|UNITEN |     |                |      |
+----------------------------+-----+----------------+------+

Ответы [ 2 ]

0 голосов
/ 30 мая 2019
Поддерживается

прием данных, как вы показали, используя формат psv (см. Ниже) - возможно, это всего лишь вопрос отладки, почему ваш вызов основных команд на стороне клиента не дает ожидаемого результата.если вы могли бы поделиться полным потоком и кодом, включая параметры, это может быть полезно.

.create table ParsedTable (name: string, age: int, educationLevel: string, univ:string)

.ingest inline into table ParsedTable with(format=psv) <| Hazriq|27|Undegrad|UNITEN


ParsedTable:

| name   | age | educationLevel | univ   |
|--------|-----|----------------|--------|
| Hazriq | 27  | Undegrad       | UNITEN |
0 голосов
/ 30 мая 2019

В настоящее время соединитель передает данные по мере поступления (никаких манипуляций с ними на стороне клиента), и любой разбор остается за Кусто.

Таким образом, формат psv поддерживается kusto, и это должно быть возможно, если установить формат на psv и указать ссылку на карту.

При добавлении плагина как , описанного , вы должны иметь возможность настроить его следующим образом:

kusto.tables.topics_mapping=[{'topic': 'testing1','db': 'testDB', 'table': 'KafkaTest','format': 'psv', 'mapping':'KafkaMapping'}]

Отображение может быть определено в Кусто, как описано в документах Кусто , определено так

...