Я использую kafka connect 1.0.0 с kafka версии 1.0 для моего kafka-cassandra-sink. Создала тестовую тему IndiaDataEngineerTest и таблицу cassandra, имеющую структуру как
IndiaTest (
datetime timeuuid,
city text,
phone_number text,
PRIMARY KEY (datetime,phone_number)
);
В моей теме данные записываются как json от производителя, как показано ниже:
{"datetime":"c8ba2390-4783-11e8-af08-d9271f3c3d48","city":"Santa Monica","phone_number":"+131000000001"}
{"datetime":"42485b20-44f3-11e8-b543-77b9a2d3bc92","city":"Anaheim","phone_number":"+131000000002"}
{"datetime":"53b5c980-41c7-11e8-8689-7983010e6437","city":"Not Found","phone_number":"+131000000004"}
{"datetime":"d43b3860-419a-11e8-a2eb-59f9c7b79f07","city":"Anaheim","phone_number":"+131000000003"}
Я пытаюсь записать вышеуказанные данные в мою таблицу кассандры, используя опору ниже, как показано ниже:
{
"name": "cassandra-sink",
"config": {
"tasks.max":"1",
"connector.class":"com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector",
"topics":"IndiaDataEngineerTest",
"connect.cassandra.contact.points": "www.abc.com",
"connect.cassandra.port": 9042,
"connect.cassandra.username": "jay",
"connect.cassandra.password": "jaytest",
"connect.cassandra.key.space": "message",
"connect.cassandra.kcql":"INSERT INTO IndiaTest SELECT datetime,city,phone_number FROM IndiaDataEngineerTest"
}
}
Теперь, после начала подключения, я отправляю файл конфигурации после ошибки.
[2018-11-13 01:39:55,166] INFO Preparing statements for IndiaDataEngineerTest->IndiaTest (com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter:83)
[2018-11-13 01:39:55,178] ERROR Encountered error line 1:33 missing "(" at (com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter:62)
com.datastax.driver.core.exceptions.SyntaxError: line 1:33 missing "(" at missing at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:58)
at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:24)
at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
at com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:104)
at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter$$anonfun$3.apply(CassandraJsonWriter.scala:108)
at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter$$anonfun$3.apply(CassandraJsonWriter.scala:104)
at scala.util.Try$.apply(Try.scala:192)
Кто-нибудь может мне помочь или подсказать, что пошло не так ... ??
Заранее спасибо.