Сложности перевода кода Scala Spark-Streaming в Pyspark - PullRequest
0 голосов
/ 26 сентября 2019

Я пытаюсь перевести реализацию Spark в Pyspark, которая обсуждается в этом блоге:

https://dorianbg.wordpress.com/2017/11/11/building-the-speed-layer-of-lambda-architecture-using-structured-spark-streaming/

Однако у меня много проблем из-за некоторых методовв Spark Datafram недоступны или должны пройти через некоторые преобразования, чтобы заставить их работать.У меня есть определенные проблемы с этой частью:

var data_stream_cleaned = data_stream
.selectExpr("CAST(value AS STRING) as string_value")
.as[String]
.map(x => (x.split(";"))) //wrapped array
.map(x => tweet(x(0), x(1), x(2),  x(3), x(4), x(5)))
.selectExpr( "cast(id as long) id", "CAST(created_at as timestamp) created_at",  "cast(followers_count as int) followers_count", "location", "cast(favorite_count as int) favorite_count", "cast(retweet_count as int) retweet_count")
.toDF()
.filter(col("created_at").gt(current_date()))   // kafka will retain data for last 24 hours, this is needed because we are using complete mode as output
.groupBy("location")
.agg(count("id"), sum("followers_count"), sum("favorite_count"),  sum("retweet_count"))

Как бы вы сделали эту работу?Я успешно подключился к потоку Кафки.Я просто пытаюсь объединить данные, чтобы я мог загрузить их в Redshift.

Это то, что у меня есть до сих пор:

ds = data_stream.selectExpr("CAST(value AS STRING) as string_value").rdd.map(lambda x: x.split(";"))

Я получаю сообщение об ошибке

Queries with streaming sources must be executed with writeStream.start()

Что может быть не так?Я не пытаюсь запросить данные, просто преобразовать их.Любая помощь будет принята с благодарностью!

...