Pyspark Kafka структурированная трансляция: ошибка при записи - PullRequest
0 голосов
/ 07 октября 2018

Я могу прочитать поток из темы Кафки и записать (преобразованные) данные обратно в другую тему Кафки в два разных этапа в PySpark.Код для этого выглядит следующим образом:

# Define Stream:
df = spark \
     .readStream \
     .format("kafka") \
     .option("kafka.bootstrap.servers", "localhost:9092") \
     .option("subscribe", "instream") \
     .load()

# Transform
matchdata = df.select(from_json(F.col("value").cast("string"),schema).alias("value"))\
          .select(F.col('value').cast("string"))

# Stream the data, from a Kafka topic to a Spark in-memory table
query = matchdata \
       .writeStream \
       .format("memory") \
       .queryName("PositionTable") \
       .outputMode("append") \
       .start()

query.awaitTermination(5)

# Create a new dataframe after stream completes:
tmp_df=spark.sql("select * from PositionTable")

# Write data to a different Kafka topic
tmp_df \
     .write \
     .format("kafka") \
     .option("kafka.bootstrap.servers", "localhost:9092") \
     .option("topic", "outstream") \
     .save()

Приведенный выше код работает должным образом: данные в теме Kafka «instream» читаются в PySpark, а затем PySpark может записывать данные в тему Kafka «outtream»Msgstr ".

Тем не менее, я хотел бы прочитать поток и немедленно записать преобразованные данные (поток будет неограниченным, и мы хотели бы получить информацию сразу, как только данные появятся).Следуя документации , я заменил приведенный выше запрос следующим:

query = matchdata \
       .writeStream \
       .format("kafka") \
       .option("kafka.bootstrap.servers", "localhost:9092") \
       .option("topic", "outstream") \
       .option("checkpointLocation", "/path/to/HDFS/dir") \
       .start()

Это не похоже на работу.Там нет сообщения об ошибке, поэтому я не знаю, что не так.Я также пробовал работать с окнами и агрегировать в Windows, но это также не работает.Любой совет будет оценен!

1 Ответ

0 голосов
/ 08 октября 2018

Хорошо, я нашел проблему.Основная причина заключалась в том, что подкаталог «путь / к / HDFS / dir» должен существовать.После создания этого каталога код запустился, как и ожидалось.Было бы хорошо, если бы сообщение об ошибке указывало что-то подобное.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...