Я пишу демонстрацию структурированной потоковой передачи с использованием pyspark, но она пошла не так. Я использую Кафку в качестве потокового источника данных, код как показано ниже:
def produce():
p = KafkaProducer(bootstrap_servers="localhost:9092", value_serializer=lambda x: json.dumps(x, ensure_ascii=False).encode("utf-8"))
for i in range(100):
p.send(topic="test", value={"timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "word": "spark" + str(random.randint(1, 3))})
time.sleep(0.5)
p.flush()
Код потокового зажигания:
spark = SparkSession \
.builder \
.appName("StructuredNetworkWordCount") \
.getOrCreate()
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test") \
.option("deserializer", lambda x: json.loads(x.decode("utf-8"))) \
.load()
df = df.select(F.get_json_object(df.value.cast("string"), "$.timestamp").alias("timestamp"),
F.get_json_object(df.value.cast("string"), "$.word").alias("word"))
df = df \
.withWatermark("timestamp", "5 seconds") \
.groupBy(
F.window("timestamp", "10 seconds")
).count()
df = df.select(df.window.start.cast("string").alias("start"), df.window.end.cast("string").alias("end"),
"count")
q = df.writeStream \
.outputMode("update") \
.format("console") \
.trigger(processingTime='20 seconds') \
.option("checkpointLocation", "hdfs://127.0.0.1:9000/offsets_d") \
.start()
q.awaitTermination()
, и он вызывает pyspark.sql.utils.AnalysisException: 'Event time must be defined on a window or a timestamp, but timestamp is of type string;;\nEventTimeWatermark timestamp#21: string, interval 5 seconds\n+- Project [get_json_object(cast(value#8 as string), $.timestamp) AS timestamp#21, get_json_object(cast(value#8 as string), $.word) AS word#22]\n +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@435b0386, kafka, Map(deserializer -> <function <lambda> at 0x7fca0be971e0>, subscribe -> test, kafka.bootstrap.servers -> localhost:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@3546fa34,kafka,List(),None,List(),None,Map(deserializer -> <function <lambda> at 0x7fca0be971e0>, subscribe -> test, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]\n
Я следовал официальный документ, но я не знаю, где это не так.