ошибка kafka-cassandra-sink для вставки данных json в cassandra - PullRequest
0 голосов
/ 13 ноября 2018

Я использую kafka connect 1.0.0 с kafka версии 1.0 для моего kafka-cassandra-sink. Создала тестовую тему IndiaDataEngineerTest и таблицу cassandra, имеющую структуру как

IndiaTest (
    datetime timeuuid, 
    city text, 
    phone_number text,
    PRIMARY KEY (datetime,phone_number)
);

В моей теме данные записываются как json от производителя, как показано ниже:

{"datetime":"c8ba2390-4783-11e8-af08-d9271f3c3d48","city":"Santa Monica","phone_number":"+131000000001"}
{"datetime":"42485b20-44f3-11e8-b543-77b9a2d3bc92","city":"Anaheim","phone_number":"+131000000002"}
{"datetime":"53b5c980-41c7-11e8-8689-7983010e6437","city":"Not Found","phone_number":"+131000000004"}
{"datetime":"d43b3860-419a-11e8-a2eb-59f9c7b79f07","city":"Anaheim","phone_number":"+131000000003"}

Я пытаюсь записать вышеуказанные данные в мою таблицу кассандры, используя опору ниже, как показано ниже:

{

"name": "cassandra-sink",
"config": {
"tasks.max":"1",
"connector.class":"com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector",
    "topics":"IndiaDataEngineerTest",
    "connect.cassandra.contact.points": "www.abc.com",
    "connect.cassandra.port": 9042,
    "connect.cassandra.username": "jay",
    "connect.cassandra.password": "jaytest",
    "connect.cassandra.key.space": "message",
    "connect.cassandra.kcql":"INSERT INTO IndiaTest SELECT datetime,city,phone_number FROM IndiaDataEngineerTest"
    }
}

Теперь, после начала подключения, я отправляю файл конфигурации после ошибки.

[2018-11-13 01:39:55,166] INFO Preparing statements for IndiaDataEngineerTest->IndiaTest (com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter:83)
[2018-11-13 01:39:55,178] ERROR Encountered error line 1:33 missing "(" at (com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter:62)
com.datastax.driver.core.exceptions.SyntaxError: line 1:33 missing "(" at missing at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:58)
    at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:24)
    at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
    at com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:104)
    at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter$$anonfun$3.apply(CassandraJsonWriter.scala:108)
    at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter$$anonfun$3.apply(CassandraJsonWriter.scala:104)
    at scala.util.Try$.apply(Try.scala:192)

Кто-нибудь может мне помочь или подсказать, что пошло не так ... ?? Заранее спасибо.

1 Ответ

0 голосов
/ 19 декабря 2018

Проблема была решена, так как я не использую правильную версию Cassandra. Для kafka connect 1.0.0 с kafka версии 1.0 требуется Cassandra версии 3.0.9 или выше.

...