Я работаю над небольшим проектом, где я делаю анализ настроений в твиттере.Я довольно новичок в потоковой передаче Kafka и Spark, однако мне было трудно найти причину моей проблемы, учитывая информацию в Интернете.Моя программа spark слушает тему kafka (v. 0.10) под названием «twitter», которая содержит твиты, упакованные как json.Я использую следующий код:
from pyspark.sql import SparkSession, udf
from pyspark.sql.functions import *
from pyspark.sql.types import *
from afinn import Afinn
def afinn_score(row):
afinn = Afinn(language='en', emoticons=True)
return afinn.score(row)
def main():
spark = SparkSession.builder.appName("TwitterSentiment").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
afinn_score_udf = udf(afinn_score, DoubleType())
schema = StructType([
StructField("text", StringType(), True)
])
kafka_df = spark \
.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "twitter") \
.load() \
.select(from_json(col("value").cast("string"), schema).alias("tweets"), "timestamp") \
.select("tweets.*", "timestamp")
sentiment_df = kafka_df \
.withWatermark("timestamp", "15 seconds") \
.withColumn("sentiment", afinn_score_udf(kafka_df.text))
print_df = sentiment_df \
.withWatermark("timestamp", "15 seconds") \
.groupBy(sentiment_df.timestamp, window(sentiment_df.timestamp, "10 seconds")) \
.agg(count(sentiment_df.sentiment).alias("tweet_count"), avg(sentiment_df.sentiment).alias("avg_sentiment"))
query = print_df.writeStream \
.outputMode('append') \
.format("console") \
.start()
if __name__ == "__main__":
main()
Моя проблема в том, что метки времени, которые я получаю от брокера kafka, всегда одинаковы.Временная метка для всех партий: «1970-01-01 00: 59: 59.999», однако смещение корректно увеличивается.Это означает, что мой groupBy возвращает только одно поле.Обратите внимание, что я использую kafka 0.10 с пакетом spark sql maven: 2.11-2.3.2 Заранее спасибо.
Обновление 1: В итоге я использовал поле "timestamp_ms", которое включено вданные фида твиттера.Используя эту временную метку, я смог заставить свою структурированную потоковую передачу работать.Тем не менее, я пробовал код также на другой машине, и информация о метках времени, встроенная в Kafka, остается неизменной.