Кафка createDirectStream с использованием PySpark - PullRequest
1 голос
/ 12 февраля 2020

Моя главная цель - подключить Kafka, создать DStream, сохранить его в локальной переменной как строку и записать в mon go db, а также создать сквозной поток в PySpark.

Но я сталкиваюсь с проблемой на самом первом шаге при создании DStream, и ошибка «java .util.ArrayList не может быть приведена к java .lang.String». Можете ли вы помочь мне определить исправление? подробности ниже,

Я пытаюсь подключить kafka, используя pyspark, как показано ниже,

kafkaParams = {"metadata.broker.list": ['host1:port','host2:port','host3:port'],
"security.protocol":"ssl",
"ssl.key.password":"***",
"ssl.keystore.location":"/path1/file.jks",
"ssl.keystore.password":"***",
"ssl.truststore.location":"/path1/file2.jks",
"ssl.truststore.password":"***"}

directKafkaStream = KafkaUtils.createDirectStream(ssc,["lac.mx.digitalchannel.raw.s015-txn-qrdc"],kafkaParams)

, но получаю ошибку, Не знаю, как с этим справиться,

py4j.protocol.Py4JJavaError: An error occurred while calling o120.createDirectStreamWithoutMessageHandler.
: java.lang.ClassCastException: java.util.ArrayList cannot be cast to java.lang.String
        at org.apache.spark.streaming.kafka.KafkaCluster$SimpleConsumerConfig$.apply(KafkaCluster.scala:419)
        at org.apache.spark.streaming.kafka.KafkaCluster.config(KafkaCluster.scala:54)
        at org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:131)
        at org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:120)
        at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:212)
        at org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStream(KafkaUtils.scala:721)
        at org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStreamWithoutMessageHandler(KafkaUtils.scala:689)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)

Кроме того, чтобы открыть PySpark CLI, который я использую,

pyspark2 --master local --jars /path/spark-streaming-kafka-0-10_2.11-2.4.0.cloudera2.jar,/path/kafka-clients-2.0.0-cdh6.1.0.jar,/path/spark-sql-kafka-0-10_2.11-2.4.0.cloudera2.jar  --files file.jks,file2.jks

1 Ответ

1 голос
/ 12 февраля 2020

metadata.broker.list должна быть строкой, разделенной запятыми, а не списком

главная цель - подключить Kafka, создать DStream, сохранить его в локальной переменной как строку и записать в мон go

  1. Пн go поддерживает структурированную потоковую запись

  2. Пн go также имеет плагин Kafka Connect, для которого требуется просто файл конфигурации, без кластера Spark или кода

...