Я использую Kafka Docker версию, которая работает нормально (я полагаю?) Из этого репозитория https://github.com/wurstmeister/kafka-docker
Файл docker-compose.yml
выглядит так:
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
build: .
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "json:1:1"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
И с использованием PySpark 2.4.4
Мой файл python выглядит так:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql.types import *
from pyspark.sql.functions import *
import json, time, os.path, re, pytz
import pandas as pd
import numpy as np
from itertools import chain
os.environ["PYSPARK_SUBMIT_ARGS"] = "--packages org.apache.spark:spark-streaming-kafka-0-
8_2.11:2.4.4 pyspark-shell"
conf = SparkConf().set("spark.executor.memory", "5g")\
.set("spark.driver.memory", "5g")\
.set("spark.driver.bindAddress", "0.0.0.0")
sc = SparkContext(master = "local[2]", appName = "Pipeline Spark", conf = conf)
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 10)
kvs = KafkaUtils.createDirectStream(ssc, "json", {"metadata.broker.list": "127.0.0.1:9092"})
# Parse the JSON
parsed = kvs.map(lambda x: json.loads(x[1]))
def getSparkSessionInstance(sparkConf):
if ("sparkSessionSingletonInstance" not in globals()):
globals()["sparkSessionSingletonInstance"] = SparkSession \
.builder \
.config(conf = sparkConf)\
.getOrCreate()
return globals()["sparkSessionSingletonInstance"]
def process(time, rdd):
print("========= %s =========" % str(time))
spark = getSparkSessionInstance(rdd.context.getConf())
data = spark.read.json(rdd)#, schema = schema)
if len(data.head(1)) == 0:
print("Empty data", end = "\n\n")
return False
# Write raw data
if os.path.isdir("rawData.parquet"):
data.write.mode("append").parquet("rawData.parquet")
else:
data.write.parquet("rawData.parquet")
print("Batch pipeline starting on", data.count(), "data")
# Filtering first
data = filter_data(data)
data = data.withColumn("test_col", test_udf(data["test_col"]))\
# Write modified data
if os.path.isdir("treatedData.parquet"):
data.write.mode("append").parquet("treatedData.parquet")
else:
data.write.parquet("treatedData.parquet")
print("Batch pipeline done :", data.count(), "remaining data", end = "\n\n")
# LAUNCH THE SPARK PROCESS
parsed.foreachRDD(process)
ssc.start()
ssc.awaitTermination()
И я получил ошибку при запуске скрипта (python script.py
):
Вся трассировка стека такова:
Ivy Default Cache set to: /home/alexandre/.ivy2/cache
The jars for the packages stored in: /home/alexandre/.ivy2/jars
:: loading settings :: url = jar:file:/opt/spark-2.4.4/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-streaming-kafka-0-8_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-a02b9c34-c09d-4f85-9ea4-cbd9def59f84;1.0
confs: [default]
found org.apache.spark#spark-streaming-kafka-0-8_2.11;2.4.4 in central
found org.apache.kafka#kafka_2.11;0.8.2.1 in central
found org.scala-lang.modules#scala-xml_2.11;1.0.2 in central
found com.yammer.metrics#metrics-core;2.2.0 in central
found org.slf4j#slf4j-api;1.7.16 in central
found org.scala-lang.modules#scala-parser-combinators_2.11;1.1.0 in central
found com.101tec#zkclient;0.3 in central
found log4j#log4j;1.2.17 in central
found org.apache.kafka#kafka-clients;0.8.2.1 in central
found net.jpountz.lz4#lz4;1.2.0 in central
found org.xerial.snappy#snappy-java;1.1.7.3 in central
found org.spark-project.spark#unused;1.0.0 in central
:: resolution report :: resolve 437ms :: artifacts dl 8ms
:: modules in use:
com.101tec#zkclient;0.3 from central in [default]
com.yammer.metrics#metrics-core;2.2.0 from central in [default]
log4j#log4j;1.2.17 from central in [default]
net.jpountz.lz4#lz4;1.2.0 from central in [default]
org.apache.kafka#kafka-clients;0.8.2.1 from central in [default]
org.apache.kafka#kafka_2.11;0.8.2.1 from central in [default]
org.apache.spark#spark-streaming-kafka-0-8_2.11;2.4.4 from central in [default]
org.scala-lang.modules#scala-parser-combinators_2.11;1.1.0 from central in [default]
org.scala-lang.modules#scala-xml_2.11;1.0.2 from central in [default]
org.slf4j#slf4j-api;1.7.16 from central in [default]
org.spark-project.spark#unused;1.0.0 from central in [default]
org.xerial.snappy#snappy-java;1.1.7.3 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 12 | 0 | 0 | 0 || 12 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-a02b9c34-c09d-4f85-9ea4-cbd9def59f84
confs: [default]
0 artifacts copied, 12 already retrieved (0kB/8ms)
19/09/27 12:08:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/09/27 12:08:59 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
before
------
KVS
<pyspark.streaming.kafka.KafkaDStream object at 0x7fcd59381908>
------
========= 2019-09-27 12:09:10 =========
<pyspark.conf.SparkConf object at 0x7fcd593905c0>
<pyspark.sql.session.SparkSession object at 0x7fcd59381208>
[Stage 0:> (0 + 1) / 1]
[Stage 0:> (0 + 1) / 1]19/09/27 12:11:12 ERROR Utils: Aborting task
java.io.IOException: Connecting to 130.91.90.34.bc.googleusercontent.com/34.90.91.130:35713 timed out (120000 ms)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:243)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187)
at org.apache.spark.rpc.netty.NettyRpcEnv.org$apache$spark$rpc$netty$NettyRpcEnv$$downloadClient(NettyRpcEnv.scala:368)
at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$openChannel$1.apply$mcV$sp(NettyRpcEnv.scala:336)
at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$openChannel$1.apply(NettyRpcEnv.scala:335)
at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$openChannel$1.apply(NettyRpcEnv.scala:335)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
at org.apache.spark.rpc.netty.NettyRpcEnv.openChannel(NettyRpcEnv.scala:339)
at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:693)
at org.apache.spark.util.Utils$.fetchFile(Utils.scala:509)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:811)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:803)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:803)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:375)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
19/09/27 12:11:12 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.io.IOException: Connecting to 130.91.90.34.bc.googleusercontent.com/34.90.91.130:35713 timed out (120000 ms)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:243)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187)
at org.apache.spark.rpc.netty.NettyRpcEnv.org$apache$spark$rpc$netty$NettyRpcEnv$$downloadClient(NettyRpcEnv.scala:368)
at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$openChannel$1.apply$mcV$sp(NettyRpcEnv.scala:336)
at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$openChannel$1.apply(NettyRpcEnv.scala:335)
at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$openChannel$1.apply(NettyRpcEnv.scala:335)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
at org.apache.spark.rpc.netty.NettyRpcEnv.openChannel(NettyRpcEnv.scala:339)
at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:693)
at org.apache.spark.util.Utils$.fetchFile(Utils.scala:509)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:811)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:803)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:803)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:375)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Suppressed: java.lang.NullPointerException
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1402)
... 17 more
19/09/27 12:11:12 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
19/09/27 12:11:12 ERROR JobScheduler: Error running job streaming job 1569586150000 ms.0
org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
File "/home/alexandre/.local/lib/python3.7/site-packages/pyspark/streaming/util.py", line 68, in call
r = self.func(t, *rdds)
File "pipeline_spark.py", line 58, in process
data.write.parquet(DATA_RAW)
File "/home/alexandre/.local/lib/python3.7/site-packages/pyspark/sql/readwriter.py", line 286, in json
return self._df(self._jreader.json(jrdd))
File "/home/alexandre/.local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/home/alexandre/.local/lib/python3.7/site-packages/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/home/alexandre/.local/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o253.json.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.io.IOException: Connecting to 130.91.90.34.bc.googleusercontent.com/34.90.91.130:35713 timed out (120000 ms)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:243)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187)
at org.apache.spark.rpc.netty.NettyRpcEnv.org$apache$spark$rpc$netty$NettyRpcEnv$$downloadClient(NettyRpcEnv.scala:368)
at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$openChannel$1.apply$mcV$sp(NettyRpcEnv.scala:336)
at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$openChannel$1.apply(NettyRpcEnv.scala:335)
at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$openChannel$1.apply(NettyRpcEnv.scala:335)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
========= 2019-09-27 12:09:20 =========
<pyspark.conf.SparkConf object at 0x7fcd593a4a58>
<pyspark.sql.session.SparkSession object at 0x7fcd59381208>
Traceback (most recent call last):
File "pipeline_spark.py", line 122, in <module>
File "/home/alexandre/.local/lib/python3.7/site-packages/pyspark/streaming/context.py", line 192, in awaitTermination
self._jssc.awaitTermination()
File "/home/alexandre/.local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/home/alexandre/.local/lib/python3.7/site-packages/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/home/alexandre/.local/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o33.awaitTermination.
: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
File "/home/alexandre/.local/lib/python3.7/site-packages/pyspark/streaming/util.py", line 68, in call
r = self.func(t, *rdds)
File "pipeline_spark.py", line 58, in process
data.write.parquet(DATA_RAW)
File "/home/alexandre/.local/lib/python3.7/site-packages/pyspark/sql/readwriter.py", line 286, in json
return self._df(self._jreader.json(jrdd))
File "/home/alexandre/.local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/home/alexandre/.local/lib/python3.7/site-packages/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/home/alexandre/.local/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o253.json.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.io.IOException: Connecting to 130.91.90.34.bc.googleusercontent.com/34.90.91.130:35713 timed out (120000 ms)
at apply$mcV$sp(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: py4j.Py4JNetworkException: Error while sending a command: null response: c
p3
call
L1569586160000
lo958
e
at py4j.CallbackConnection.sendCommand(CallbackConnection.java:158)
at py4j.CallbackClient.sendCommand(CallbackClient.java:384)
... 24 more
19/09/27 12:11:13 ERROR JobScheduler: Error running job streaming job 1569586170000 ms.0
py4j.Py4JException: Error while sending a command.
at py4j.CallbackClient.sendCommand(CallbackClient.java:397)
at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
at com.sun.proxy.$Proxy17.call(Unknown Source)
at o
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: py4j.Py4JNetworkException: Error while sending a command: null response: c
p3
call
L1569586170000
lo1083
e
at py4j.CallbackConnection.sendCommand(CallbackConnection.java:158)
at py4j.CallbackClient.sendCommand(CallbackClient.java:384)
... 24 more
19/09/27 12:11:13 ERROR JobScheduler: Error running job streaming job 1569586180000 ms.0
py4j.Py4JException: Error while obtaining a new communication channel
at py4j.CallbackClient.getConnectionLock(CallbackClient.java:257)
at py4j.CallbackClient.sendCommand(CallbackClient.java:377)
at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
at com.sun.proxy.$Proxy17.call(Unknown Source)
at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92)
at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apaor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.ConnectException: Connection refused (Connection refused)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
... 25 more
19/09/27 12:11:13 ERROR PythonDStream$$anon$1: Cannot connect to Python process. It's probably dead. Stopping StreamingContext.
py4j.Py4JException: Error while obtaining a new communication channel
at py4j.CallbackClient.getConnectionLock(CallbackClient.java:257)
at py4j.CallbackClient.sendCommand(CallbackClient.java:377)
at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
ava:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.ConnectException: Connection refused (Connection refused)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.ConnectException: Connection refused (Connection refused)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
... 25 more
19/09/27 12:11:13 ERROR PythonDStream$$anon$1: Cannot connect to Python process. It's probably dead. Stopping StreamingContext.
py4j.Py4JException: Error while obtaining a new communication channel
at py4j.CallbackClient.getConnectionLock(CallbackClient.java:257)
... 25 more
19/09/27 12:11:13 ERROR JobScheduler: Error running job streaming job 1569586200000 ms.0
py4j.Py4JException: Error while obtaining a new communication channel
... 25 more
19/09/27 12:11:13 ERROR PythonDStream$$anon$1: Cannot connect to Python process. It's probably dead. Stopping StreamingContext.
py4j.Py4JException: Error while obtaining a new communication channel
at py4j.CallbackClient.getConnectionLock(CallbackClient.java:250)
... 25 more
19/09/27 12:11:13 ERROR Utils: Aborting task
java.io.IOException: Failed to connect to 130.91.90.34.bc.googleusercontent.com/34.90.91.130:35713
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1402)
... 17 more
Caused by: java.nio.channels.ClosedChannelException
at io.netty.channel.nio.AbstractNioChannel.doClose()(Unknown Source)
Я не знаю, в чем проблема, но мне кажется, это может быть что-то, связанное с Failed to connect to 130.91.90.34.bc.googleusercontent.com/34.90.91.130:41177
А может это связано с конфигурацией pyspark / spark?или конфигурация кафки-докера?
Редактировать: я запускаю докер-контейнер на GCP и скрипт PySpark на точно такой же виртуальной машине