Чтение потоковых данных Kafka с помощью sparklyr - PullRequest
0 голосов
/ 17 июня 2020

Я использую пакет sparklyr R для получения потоковых данных из Kafka. Одно из основных ограничений заключается в том, что пользователь должен создавать конвейеры чтения / записи. Хотя я могу частично записать потоковые данные в csv, я не могу найти способ записать их в памяти в табличной форме, чтобы обработать и немного поиграть. Какие-либо предложения? Вот часть моего кода, с которым я работаю.

library(sparklyr)
library(dplyr)
# spark_disconnect(sc)
setwd(paste0(Sys.getenv("SPARK_HOME"),"/"))
sc <- spark_connect(master = "local",
                    spark_home = paste0(Sys.getenv("SPARK_HOME"),"/"),
                    config = list(sparklyr.shell.packages = c("org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.4","org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4")))
spark_log(sc, n = NULL)
read_options <- list(kafka.bootstrap.servers = 'localhost:9093',subscribe = 'mytopic')
stream_read_kafka(sc,options = read_options) %>% mutate_all(.funs = as.character) %>% spark_dataframe() %>% stream_write_csv(path = "file3")
...