Проблемы с инициализацией Кафки в Сиддхи - PullRequest
0 голосов
/ 26 февраля 2020

Невозможно создать поток из Kafka topi c, используя сиддхи. Даже если я создаю строку с представлением дизайна. Я скопировал все необходимые банки в папки lib и bundle. Даже запустил Kafka с Zookeeper локально (не знаю, зачем он мне нужен локально, но nwm).

На оснастке. sh start У меня следующая ошибка:

[2020-02-26 22:15:43,041]  WARNING {org.wso2.carbon.launcher.extensions.OSGiLibBundleDeployerUtils lambda$getBundlesInfo$1} - Error when loading the OSGi bundle information from /home/Hed/StreamProcessor/siddhi-tooling-5.1.2/lib/kafka-clients-2.3.0.jar 
java.io.IOException: Required bundle manifest headers do not exist
    at org.wso2.carbon.launcher.extensions.OSGiLibBundleDeployerUtils.getBundleInfo(OSGiLibBundleDeployerUtils.java:183)
    at org.wso2.carbon.launcher.extensions.OSGiLibBundleDeployerUtils.lambda$getBundlesInfo$1(OSGiLibBundleDeployerUtils.java:135)
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
    at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
    at java.util.stream.StreamSpliterators$WrappingSpliterator.forEachRemaining(StreamSpliterators.java:313)
    at java.util.stream.StreamSpliterators$DistinctSpliterator.forEachRemaining(StreamSpliterators.java:1291)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
    at java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:747)
    at java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:721)
    at java.util.stream.AbstractTask.compute(AbstractTask.java:327)
    at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

Для этого сценария:

@App:name("HelloKafka")
@App:description('Consume events from a Kafka Topic and publish to a different Kafka Topic')

@source(type='kafka',
        topic.list='kafka_topic',
        partition.no.list='0',
        threading.option='single.thread',
        group.id="group",
        bootstrap.servers='localhost:9092',
        @map(type='json'))
define stream SweetProductionStream (name string, amount double);

Я вижу ошибку при выполнении команды:

io.siddhi.core.exception.SiddhiAppCreationException: Error on 'HelloKafka' @ Line: 10. Position: 26, near '@source(type='kafka',
        topic.list='kafka_topic',
        partition.no.list='0',
        threading.option='single.thread',
        group.id="group",
        bootstrap.servers='localhost:9092',
        @map(type='json'))'. org/apache/kafka/clients/producer/Producer
    at io.siddhi.core.util.ExceptionUtil.populateQueryContext(ExceptionUtil.java:43)
    at io.siddhi.core.util.parser.helper.DefinitionParserHelper.addEventSource(DefinitionParserHelper.java:388)
    at io.siddhi.core.util.SiddhiAppRuntimeBuilder.defineStream(SiddhiAppRuntimeBuilder.java:117)
    at io.siddhi.core.util.parser.SiddhiAppParser.defineStreamDefinitions(SiddhiAppParser.java:374)
    at io.siddhi.core.util.parser.SiddhiAppParser.parse(SiddhiAppParser.java:230)
    at io.siddhi.core.SiddhiManager.createSiddhiAppRuntime(SiddhiManager.java:85)
    at io.siddhi.core.SiddhiManager.createSiddhiAppRuntime(SiddhiManager.java:95)
    at io.siddhi.distribution.editor.core.internal.DebugRuntime.createRuntime(DebugRuntime.java:201)
    at io.siddhi.distribution.editor.core.internal.DebugRuntime.(DebugRuntime.java:56)
    at io.siddhi.distribution.editor.core.internal.DebugProcessorService.start(DebugProcessorService.java:38)
    at io.siddhi.distribution.editor.core.internal.EditorMicroservice.start(EditorMicroservice.java:761)
    at io.siddhi.distribution.editor.core.internal.EditorMicroservice.startWithVariables(EditorMicroservice.java:781)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.wso2.msf4j.internal.router.HttpMethodInfo.invokeResource(HttpMethodInfo.java:187)
    at org.wso2.msf4j.internal.router.HttpMethodInfo.invoke(HttpMethodInfo.java:143)
    at org.wso2.msf4j.internal.MSF4JHttpConnectorListener.dispatchMethod(MSF4JHttpConnectorListener.java:218)
    at org.wso2.msf4j.internal.MSF4JHttpConnectorListener.lambda$onMessage$58(MSF4JHttpConnectorListener.java:129)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoClassDefFoundError: org/apache/kafka/clients/producer/Producer
    at java.lang.Class.getDeclaredConstructors0(Native Method)
    at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
    at java.lang.Class.getConstructor0(Class.java:3075)
    at java.lang.Class.newInstance(Class.java:412)
    at io.siddhi.core.util.SiddhiClassLoader.loadClass(SiddhiClassLoader.java:32)
    at io.siddhi.core.util.SiddhiClassLoader.loadExtensionImplementation(SiddhiClassLoader.java:48)
    at io.siddhi.core.util.parser.helper.DefinitionParserHelper.addEventSource(DefinitionParserHelper.java:346)
    ... 21 more
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.clients.producer.Producer cannot be found by siddhi-io-kafka_5.0.7
    at org.eclipse.osgi.internal.loader.BundleLoader.findClassInternal(BundleLoader.java:448)
    at org.eclipse.osgi.internal.loader.BundleLoader.findClass(BundleLoader.java:361)
    at org.eclipse.osgi.internal.loader.BundleLoader.findClass(BundleLoader.java:353)
    at org.eclipse.osgi.internal.loader.ModuleClassLoader.loadClass(ModuleClassLoader.java:161)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
    ... 28 more

Может кто-нибудь сказать мне, что я делаю не так? (

1 Ответ

0 голосов
/ 23 марта 2020

Убедитесь, что у вас есть jar-файлы, преобразованные в OSGi, в "C: \ Program Files \ WSO2 \ Enterprise Integrator \ 7.0.2 \ streaming-integrator \ lib" .

Список jar-файлов, преобразованных в OSGi:

kafka_2.12_2.3.0_1.0.0
kafka_clients_2.3.0_1.0.0
metrics_core_2.2.0_1.0.0
scala_library_2.12.8_1.0.0
zkclient_0.11_1.0.0
zookeeper_3.4.14_1.0.0

, скопируйте оригинальные файлы jar в "C: \ Program Files \ WSO2 \ Enterprise Integrator \ 7.0. 2 \ streaming-integrator \ samples \ sample-clients \ lib "

Список оригинальных jar-файлов:

kafka_2.12-2.3.0
kafka-clients-2.3.0
metrics-core-2.2.0
scala-library-2.12.8
zkclient-0.11
zookeeper-3.4.14

Чтобы создать файлы, преобразованные в OSGi, скопируйте все Оригинальные баночки в папку с именем «источник» и создать пустую папку с именем «место назначения». Затем выполните в терминале следующую команду:

MINGW32 /c/Program Files/WSO2/Enterprise Integrator/7.0.2/streaming-integrator/bin
$ ./jartobundle.sh C:/DevTools/source C:/DevTools/destination  

Наконец, раздайте OSGis и оригинал в соответствии с указанными выше каталогами.

PS1: в моем случае я использую kafka_2.12- 2.4.1, но базовое имя банок не меняется. PS2: адаптируйте каталоги в соответствии с вашим установочным путем

Для получения более подробной информации обратитесь к документации WSO2: Кафка транспорт

...