pyspark. sql .utils.AnalysisException: 'Время события должно быть определено в окне или отметке времени, но отметка времени имеет тип string - PullRequest
0 голосов
/ 09 апреля 2020

Я пишу демонстрацию структурированной потоковой передачи с использованием pyspark, но она пошла не так. Я использую Кафку в качестве потокового источника данных, код как показано ниже:

def produce():
    p = KafkaProducer(bootstrap_servers="localhost:9092", value_serializer=lambda x: json.dumps(x, ensure_ascii=False).encode("utf-8"))

    for i in range(100):
        p.send(topic="test", value={"timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "word": "spark" + str(random.randint(1, 3))})
        time.sleep(0.5)
    p.flush()

Код потокового зажигания:

spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "test") \
    .option("deserializer", lambda x: json.loads(x.decode("utf-8"))) \
    .load()
df = df.select(F.get_json_object(df.value.cast("string"), "$.timestamp").alias("timestamp"),
               F.get_json_object(df.value.cast("string"), "$.word").alias("word"))

df = df \
    .withWatermark("timestamp", "5 seconds") \
    .groupBy(
    F.window("timestamp", "10 seconds")
).count()
df = df.select(df.window.start.cast("string").alias("start"), df.window.end.cast("string").alias("end"),
          "count")
q = df.writeStream \
    .outputMode("update") \
    .format("console") \
    .trigger(processingTime='20 seconds') \
    .option("checkpointLocation", "hdfs://127.0.0.1:9000/offsets_d") \
    .start()
q.awaitTermination()

, и он вызывает pyspark.sql.utils.AnalysisException: 'Event time must be defined on a window or a timestamp, but timestamp is of type string;;\nEventTimeWatermark timestamp#21: string, interval 5 seconds\n+- Project [get_json_object(cast(value#8 as string), $.timestamp) AS timestamp#21, get_json_object(cast(value#8 as string), $.word) AS word#22]\n +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@435b0386, kafka, Map(deserializer -> <function <lambda> at 0x7fca0be971e0>, subscribe -> test, kafka.bootstrap.servers -> localhost:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@3546fa34,kafka,List(),None,List(),None,Map(deserializer -> <function <lambda> at 0x7fca0be971e0>, subscribe -> test, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]\n

Я следовал официальный документ, но я не знаю, где это не так.

1 Ответ

0 голосов
/ 09 апреля 2020

Эта часть выглядит так, как будто она вызывает исключение:

df = df \
    .withWatermark("timestamp", "5 seconds") \
    .groupBy(
    F.window("timestamp", "10 seconds")
).count()

Попробуйте следовать примеру, приведенному в официальной документации

windowedCounts = words \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        window(words.timestamp, "10 minutes", "5 minutes"),
        words.word) \
    .count()

и убедитесь, что Ваш столбец timestamp имеет тип отметки времени, а не строку .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...