Как использовать PySpark Structure Streaming + Kafka - PullRequest
2 голосов
/ 13 июля 2020

Я пытаюсь использовать потоковую передачу искровой структуры с kafka, и у меня возникла проблема при использовании искровой отправки. Потребитель по-прежнему получает данные из продукта, но структура Spark является ошибкой. Пожалуйста, помогите мне найти проблему в моем коде. Вот мой код в test.py:

from kafka import KafkaProducer
from kafka import KafkaConsumer
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('stream_test').getOrCreate()
import random

producer = KafkaProducer(bootstrap_servers=["localhost:9092"])
for i in range(0,100):
    lg_value = str(random.uniform(5000, 10000))
    producer.send(topic = 'test', value = bytes(lg_value, encoding='utf-8'))
    producer.flush()

df = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092") \
    .option("subscribe","test").load()
df_to_string = df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")
print("done")

при запуске: spark-submit --packages org. apache .spark: spark- sql -kafka -0-10_2.12: 3.0.0 вывод терминала test.py:

> 20/07/12 19:39:09 INFO Executor: Starting executor ID driver on host
> 192.168.31.129 20/07/12 19:39:09 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on
> port 38885. 20/07/12 19:39:09 INFO NettyBlockTransferService: Server
> created on 192.168.31.129:38885 20/07/12 19:39:09 INFO BlockManager:
> Using org.apache.spark.storage.RandomBlockReplicationPolicy for block
> replication policy 20/07/12 19:39:09 INFO BlockManagerMaster:
> Registering BlockManager BlockManagerId(driver, 192.168.31.129, 38885,
> None) 20/07/12 19:39:09 INFO BlockManagerMasterEndpoint: Registering
> block manager 192.168.31.129:38885 with 413.9 MiB RAM,
> BlockManagerId(driver, 192.168.31.129, 38885, None) 20/07/12 19:39:09
> INFO BlockManagerMaster: Registered BlockManager
> BlockManagerId(driver, 192.168.31.129, 38885, None) 20/07/12 19:39:09
> INFO BlockManager: Initialized BlockManager: BlockManagerId(driver,
> 192.168.31.129, 38885, None) 20/07/12 19:39:11 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of
> spark.sql.warehouse.dir ('file:/home/thoaint2/spark-warehouse').
> 20/07/12 19:39:11 INFO SharedState: Warehouse path is
> 'file:/home/thoaint2/spark-warehouse'. Traceback (most recent call
> last):   File "/home/thoaint2/test.py", line 15, in <module>
>     df = spark.readStream.format("kafka").option('kafka.bootstrap.servers','localhost:9092')
> \   File
> "/home/thoaint2/spark-3.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 420, in load   File
> "/home/thoaint2/spark-3.0.0-bin-hadoop2.7/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
> line 1304, in __call__   File
> "/home/thoaint2/spark-3.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/utils.py",
> line 131, in deco   File
> "/home/thoaint2/spark-3.0.0-bin-hadoop2.7/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 326, in get_return_value py4j.protocol.Py4JJavaError: An error
> occurred while calling o31.load. : java.lang.NoClassDefFoundError:
> org/apache/kafka/common/serialization/ByteArraySerializer     at
> org.apache.spark.sql.kafka010.KafkaSourceProvider$.<init>(KafkaSourceProvider.scala:557)
>   at
> org.apache.spark.sql.kafka010.KafkaSourceProvider$.<clinit>(KafkaSourceProvider.scala)
>   at
> org.apache.spark.sql.kafka010.KafkaSourceProvider.org$apache$spark$sql$kafka010$KafkaSourceProvider$$validateStreamOptions(KafkaSourceProvider.scala:325)
>   at
> org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema(KafkaSourceProvider.scala:70)
>   at
> org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:220)
>   at
> org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:112)
>   at
> org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:112)
>   at
> org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:35)
>   at
> org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:205)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)     at
> py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)  at
> py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)    at
> py4j.Gateway.invoke(Gateway.java:282)     at
> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>   at py4j.commands.CallCommand.execute(CallCommand.java:79)   at
> py4j.GatewayConnection.run(GatewayConnection.java:238)    at
> java.lang.Thread.run(Thread.java:748) Caused by:
> java.lang.ClassNotFoundException:
> org.apache.kafka.common.serialization.ByteArraySerializer     at
> java.net.URLClassLoader.findClass(URLClassLoader.java:382)

1 Ответ

0 голосов
/ 13 июля 2020

Вам необходимо добавить JAR-файл kafka-clients в ваш --packages

Также обратите внимание, что Spark также работает как производитель, поэтому вам не нужна другая Python библиотека Kafka.

Если вы просто хотите обрабатывать потоки Kafka без использования JVM, загляните в Faust

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...