Spark структурированный поток - постоянная метки времени - PullRequest
0 голосов
/ 15 октября 2018

Я работаю над небольшим проектом, где я делаю анализ настроений в твиттере.Я довольно новичок в потоковой передаче Kafka и Spark, однако мне было трудно найти причину моей проблемы, учитывая информацию в Интернете.Моя программа spark слушает тему kafka (v. 0.10) под названием «twitter», которая содержит твиты, упакованные как json.Я использую следующий код:

from pyspark.sql import SparkSession, udf
from pyspark.sql.functions import *
from pyspark.sql.types import *
from afinn import Afinn


def afinn_score(row):
    afinn = Afinn(language='en', emoticons=True)
    return afinn.score(row)


def main():
    spark = SparkSession.builder.appName("TwitterSentiment").getOrCreate()
    spark.sparkContext.setLogLevel("WARN")

    afinn_score_udf = udf(afinn_score, DoubleType())

    schema = StructType([
        StructField("text", StringType(), True)
    ])

    kafka_df = spark  \
        .readStream.format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "twitter") \
        .load() \
        .select(from_json(col("value").cast("string"), schema).alias("tweets"), "timestamp") \
        .select("tweets.*", "timestamp")

    sentiment_df = kafka_df \
        .withWatermark("timestamp", "15 seconds") \
        .withColumn("sentiment", afinn_score_udf(kafka_df.text))

    print_df = sentiment_df \
        .withWatermark("timestamp", "15 seconds") \
        .groupBy(sentiment_df.timestamp, window(sentiment_df.timestamp, "10 seconds")) \
        .agg(count(sentiment_df.sentiment).alias("tweet_count"), avg(sentiment_df.sentiment).alias("avg_sentiment"))

    query = print_df.writeStream \
        .outputMode('append') \
        .format("console") \
        .start()


if __name__ == "__main__":
    main()

Моя проблема в том, что метки времени, которые я получаю от брокера kafka, всегда одинаковы.Временная метка для всех партий: «1970-01-01 00: 59: 59.999», однако смещение корректно увеличивается.Это означает, что мой groupBy возвращает только одно поле.Обратите внимание, что я использую kafka 0.10 с пакетом spark sql maven: 2.11-2.3.2 Заранее спасибо.

Обновление 1: В итоге я использовал поле "timestamp_ms", которое включено вданные фида твиттера.Используя эту временную метку, я смог заставить свою структурированную потоковую передачу работать.Тем не менее, я пробовал код также на другой машине, и информация о метках времени, встроенная в Kafka, остается неизменной.

...