в коде, если не df.head (1) .isEmpty: я получил исключение,
Exception has occurred: pyspark.sql.utils.AnalysisException 'Queries with streaming sources must be executed with writeStream.start();;\nkafka'
Я не знаю, как использовать при потоковой передаче данных.когда я использую jupyter, чтобы выполнить каждую строку, код в порядке, и я могу получить свой результат.но использовать .py это не хорошо.
моя цель заключается в следующем: я хочу использовать потоковую передачу, чтобы получать данные от kafka каждую секунду, затем я преобразую каждую партию данных обработки паром (одна партия означает данные за одну секунду, которую я получаю) в pandas dataframe, а затем я используюФункция панды, чтобы сделать что-то с данными, наконец, я отправляю результат в другую тему Кафки.
Пожалуйста, помогите мне и простите мой пул на английском, Большое спасибо.
sc = SparkContext("local[2]", "OdometryConsumer")
spark = SparkSession(sparkContext=sc) \
.builder \
.appName("StructuredNetworkWordCount") \
.getOrCreate()
# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "data") \
.load()
ds = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
print(type(ds))
if not df.head(1).isEmpty:
alertQuery = ds \
.writeStream \
.queryName("qalerts")\
.format("memory")\
.start()
alerts = spark.sql("select * from qalerts")
pdAlerts = alerts.toPandas()
a = pdAlerts['value'].tolist()
d = []
for i in a:
x = json.loads(i)
d.append(x)
df = pd.DataFrame(d)
print(df)
ds = df['jobID'].unique().tolist()
dics = {}
for source in ds:
ids = df.loc[df['jobID'] == source, 'id'].tolist()
dics[source]=ids
print(dics)
query = ds \
.writeStream \
.queryName("tableName") \
.format("console") \
.start()
query.awaitTermination()