R Spark читает по одному файлу за раз, интегрирует с Shiny - PullRequest
0 голосов
/ 20 декабря 2018

У меня есть папка на HDFS, которая содержит 10 файлов CSV.Каждый файл CSV содержит 10000 строк и 17 столбцов.

Цель

Реактивное чтение папки в HDFS.

Если в папке содержатся файлы, считывайте по одному файлу за раз (от самого старого до самого нового) из папки.

Изобразите некоторые параметры в Shiny.

Обновите график как новые файлыдобавляются в папку или считываются из папки.

Статус В настоящее время с помощью SparklyR я могу реактивно читать все файлы одновременно и генерировать график, содержащий 100000 точек (ggplot ).Если я добавлю 11-й файл (содержащий 10000 строк) после запуска приложения, график обновится с 110000 точками.

library(sparklyr)

conf = spark_config()
conf$spark.driver.memory="50g"
sc <- spark_connect(master = "local[*]", config = conf)
read_folder <- stream_read_csv(sc, "hdfs://localhost:9000/nik_ml/")

ui <- function(){
  plotOutput("plot")
}

server <- function(input, output, session){

  ps <- reactiveSpark(read_folder, intervalMillis = 10)
  output$plot <- renderPlot({
    df2 = ps()
    # str(df2)
    ggplot(data = df2, aes(x=Time, y=outletN2)) + geom_point() + ggtitle(nrow(df2)) + theme_bw()
  })
}
shinyApp(ui, server)


SessionInfo()

# R version 3.5.1 (2018-07-02)
# Platform: x86_64-w64-mingw32/x64 (64-bit)
# Running under: Windows Server >= 2012 x64 (build 9200)
# 
# Matrix products: default
# 
# locale:
# [1] LC_COLLATE=English_United States.1252  LC_CTYPE=English_United States.1252   
# [3] LC_MONETARY=English_United States.1252 LC_NUMERIC=C                          
# [5] LC_TIME=English_United States.1252    
# 
# attached base packages:
#   [1] stats     graphics  grDevices utils     datasets  methods   base     
# 
# other attached packages:
#   [1] shinyFiles_0.7.2    bindrcpp_0.2.2      dplyr_0.7.8         shiny_1.2.0         ggplot2_3.1.0      
# [6] future_1.10.0       sparklyr_0.9.3.9000
# 
# loaded via a namespace (and not attached):
#   [1] tidyselect_0.2.5 forge_0.1.9002   purrr_0.2.5      listenv_0.7.0    lattice_0.20-38  colorspace_1.3-2
# [7] generics_0.0.2   htmltools_0.3.6  yaml_2.2.0       base64enc_0.1-3  rlang_0.3.0.1    later_0.7.5     
# [13] pillar_1.3.0     glue_1.3.0       withr_2.1.2      DBI_1.0.0        dbplyr_1.2.2     bindr_0.1.1     
# [19] plyr_1.8.4       munsell_0.5.0    gtable_0.2.0     htmlwidgets_1.3  codetools_0.2-15 labeling_0.3    
# [25] httpuv_1.4.5     parallel_3.5.1   broom_0.5.1      r2d3_0.2.2       Rcpp_1.0.0       xtable_1.8-3    
# [31] openssl_1.1      promises_1.0.1   backports_1.1.2  scales_1.0.0     jsonlite_1.6     config_0.3      
# [37] fs_1.2.6         mime_0.6         digest_0.6.18    grid_3.5.1       rprojroot_1.3-2  tools_3.5.1     
# [43] magrittr_1.5     lazyeval_0.2.1   tibble_1.4.2     crayon_1.3.4     tidyr_0.8.2      pkgconfig_2.0.2 
# [49] rsconnect_0.8.12 assertthat_0.2.0 httr_1.4.0       rstudioapi_0.8   R6_2.3.0         globals_0.12.4  
# [55] nlme_3.1-137     compiler_3.5.1  

Но что я действительно хочу, так это реактивно читать по одному файлу за раз и делатьggplot.Это похоже на Spark Streaming, но Spark Streaming (из того, что я понимаю) считывает ВСЕ текстовые файлы в один RDD.Из документации Spark в Python есть функция с именем SparkContext.wholeTextFiles , которая позволяет читать каталог, содержащий несколько небольших текстовых файлов, и возвращает каждый из них в виде пар (имя файла, содержимое) ( ссылка *).1025 *).Я не проверял это, поскольку я хочу сохранить все в R прямо сейчас.Я посмотрел на предоставляет отличные файлы, но не смог найти ни одной функции, которая бы это делала (https://github.com/thomasp85/shinyFiles).

Есть ли что-то похожее в R / Sparklyr? То, что я пытаюсь сделать, звучит глупо? Если вы считаете, чтоэффективный способ достижения этого в R, я все уши!

Спасибо.

Ответы [ 2 ]

0 голосов
/ 20 декабря 2018

Я нашел способ с некоторой помощью @tricky.Полное решение ниже.Грязный, но работает на данный момент.

# Get list of current files in HDFS
files <- system("hadoop fs -ls /nik_ml", show.output.on.console = FALSE, intern = TRUE)
# Extract file names
fileNames <- na.omit(str_extract(files, "(?<=/)[^/]*$"))

# CheckFunc for reactivePoll, checks for changes in fileNames
listFiles <- function(){
  files <<- system("hadoop fs -ls /nik_ml", show.output.on.console = FALSE, intern = TRUE)
  fileNames <<- na.omit(str_extract(files, "(?<=/)[^/]*$"))
  fileNames
}

# ValueFunc for reactivePoll. Returns a vector of HDFS filepaths
ReadHdfsData=function(){ 
  path <- paste0("hdfs://localhost:9000/nik_ml/", fileNames)
  return(path)
}

ui3 <- function(){
  plotOutput("plot")
}

server3 <- function(input, output, session){

  output$plot <- renderPlot({
    allFiles <- reactivePoll(5 * 1000, session, listFiles, ReadHdfsData)
    # Find filepaths which are added to HDFS
    newFile <<- setdiff(allFiles(), newFile)
    # print(newFile)
    # Do something with each new file. 
    # I am plotting currently, but I will end up using it for ML predictions.
    for(temp in newFile){
    df <- spark_read_csv(sc, "name", temp) %>%
      select(Time, outletN2) %>%
      collect()
    # print(head(df))
    p1 <- ggplot(data = df, aes(x=Time, y=outletN2)) + 
      geom_point() + 
      ggtitle(paste("File =",temp)) + 
      theme_bw()
    print(p1)
    }
  })
}
# Initialise newFile to "" before running the app
newFile <- character(0)
shinyApp(ui3, server3)
0 голосов
/ 20 декабря 2018

У меня была ваша проблема во время одного из моих проектов.То, что я в итоге использовал, - это реактивная функция для обновления моего графика.

Таким образом, у вас есть два варианта: либо вы обновляете график каждые x секунд, не зная, есть ли новые файлы.В этом примере 120 секунд, так что две минуты: Вы инициализируете свой аккумулятор b в начале кода приложения.

b <- 0

IsThereNew = function(){
  b <<- b+1
  b
}

ReadHdfsData=function(){ # A function that calculates the underlying value
  path <- paste0("/your/path/to/data.json")
  df <- sc %>%
    spark_read_json("name", path) %>%
    collect()
  return(df)
}

df <- reactivePoll(120 * 1000, session, IsThereNew, ReadHdfsData)

Так что в этом случае, тупо, вы обновляете свой график каждые 2 минуты, дажеесли нет новых данных.

Другой способ, который вы можете сделать, - перечислять количество файлов в каталоге hdfs каждые x секунд, и если счетчик будет изменен, график будет обновлен.Вам нужно будет определить функцию listNumberOfFiles, которая возвращает количество файлов, и заменить функцию isThereNew.

...