Соберитесь в sparklyr - PullRequest
       25

Соберитесь в sparklyr

0 голосов
/ 22 мая 2018

Я использую sparklyr для манипулирования некоторыми данными.Учитывая,

a<-tibble(id = rep(c(1,10), each = 10),
          attribute1 = rep(c("This", "That", 'These', 'Those', "The", "Other", "Test", "End", "Start", 'Beginning'), 2),
          value = rep(seq(10,100, by = 10),2),
          average = rep(c(50,100),each = 10),
          upper_bound = rep(c(80, 130), each =10),
          lower_bound = rep(c(20, 70), each =10))

Я хотел бы использовать «сбор» для манипулирования данными, например так:

b<- a %>% 
     gather(key = type_data, value = value_data, -c(id:attribute1))

Однако «сбор» не доступен на sparklyr.Я видел некоторых людей, использующих sdf_pivot для имитации «сборки» (например, Как использовать sdf_pivot () в sparklyr и сцепленных строках? ), но я не вижу, как использовать его в этом случае.

У кого-нибудь есть идея?

Приветствия!

Ответы [ 3 ]

0 голосов
/ 23 мая 2018

Вот функция для имитации gather в sparklyr.Это будет собирать данные столбцы, оставляя все остальное нетронутым, но при необходимости его можно легко расширить.

# Function
sdf_gather <- function(tbl, gather_cols){

  other_cols <- colnames(tbl)[!colnames(tbl) %in% gather_cols]

  lapply(gather_cols, function(col_nm){
    tbl %>% 
      select(c(other_cols, col_nm)) %>% 
      mutate(key = col_nm) %>%
      rename(value = col_nm)  
  }) %>% 
    sdf_bind_rows() %>% 
    select(c(other_cols, 'key', 'value'))
}

# Example
spark_df %>% 
  select(col_1, col_2, col_3, col_4) %>% 
  sdf_gather(c('col_3', 'col_4'))
0 голосов
/ 01 июня 2018

Вы можете создать эквивалент, используя map / explode:

sdf_gather <- function(data, key = "key", value = "value", ...) {
  cols <- list(...) %>% unlist()

  # Explode with map (same as stack) requires multiple aliases so
  # dplyr mutate won't work for us here.
  expr <- list(paste(
    "explode(map(",
    paste("'", cols, "',`",  cols, "`", sep = "", collapse = ","),
    ")) as (", key, ",", value, ")", sep = ""))

  keys <- data %>% colnames() %>% setdiff(cols) %>% as.list()

  data %>%
    spark_dataframe() %>% 
    sparklyr::invoke("selectExpr", c(keys, expr)) %>% 
    sdf_register()
}

или функцию Hive stack :

sdf_gather <- function(data, key = "key", value = "value", ...) {
  cols <- list(...) %>% unlist()
  expr <- list(paste(
    "stack(", length(cols), ", ",
    paste("'", cols, "',`",  cols, "`", sep="", collapse=","),
    ") as (", key, ",", value, ")", sep=""))

  keys <- data %>% colnames() %>% setdiff(cols) %>% as.list()

  data %>%
    spark_dataframe() %>% 
    sparklyr::invoke("selectExpr", c(keys, expr)) %>% 
    sdf_register()
}

Оба должныдать тот же результат:

long <- sdf_gather(
  df, "my_key", "my_value",
  "value", "average", "upper_bound", "lower_bound")
long
# Source:   table<sparklyr_tmp_7b8f5989ba4d> [?? x 4]
# Database: spark_connection
      id attribute1 my_key      my_value
   <dbl> <chr>      <chr>          <dbl>
 1     1 This       value             10
 2     1 This       average           50
 3     1 This       upper_bound       80
 4     1 This       lower_bound       20
 5     1 That       value             20
 6     1 That       average           50
 7     1 That       upper_bound       80
 8     1 That       lower_bound       20
 9     1 These      value             30
10     1 These      average           50
# ... with more rows

и может быть изменен для поддержки нестандартной оценки.

Обратите внимание, что оба метода требуют однородных типов столбцов.

Примечания

explode версия генерирует следующий запрос:

SELECT id, attribute1, 
       explode(map(
         'value', `value`,
         'average', `average`,
         'upper_bound', `upper_bound`,
         'lower_bound', `lower_bound`)) as (my_key,my_value)

FROM df

и оптимизированный логический план выполнения

org.apache.spark.sql.catalyst.plans.logical.Generate
Generate explode(map(value, value#16, average, average#17, upper_bound, upper_bound#18, lower_bound, lower_bound#19)), [2, 3, 4, 5], false, [my_key#226, my_value#227]
+- InMemoryRelation [id#14, attribute1#15, value#16, average#17, upper_bound#18, lower_bound#19], StorageLevel(disk, memory, deserialized, 1 replicas)
      +- Scan ExistingRDD[id#14,attribute1#15,value#16,average#17,upper_bound#18,lower_bound#19]

в то время как stack version генерирует

SELECT id, attribute1, 
       stack(4, 
             'value', `value`,
             'average', `average`,
             'upper_bound', `upper_bound`,
             'lower_bound', `lower_bound`) as (my_key,my_value)
FROM df

и

org.apache.spark.sql.catalyst.plans.logical.Generate
Generate stack(4, value, value#16, average, average#17, upper_bound, upper_bound#18, lower_bound, lower_bound#19), [2, 3, 4, 5], false, [my_key#323, my_value#324]
+- InMemoryRelation [id#14, attribute1#15, value#16, average#17, upper_bound#18, lower_bound#19], StorageLevel(disk, memory, deserialized, 1 replicas)
      +- Scan ExistingRDD[id#14,attribute1#15,value#16,average#17,upper_bound#18,lower_bound#19]

Одинарные значения в кавычках (т. е. 'value'), в сгенерированном SQL являются литеральными строками, а значения в кавычках представляют ссылку на столбец.

0 голосов
/ 22 мая 2018

Нет, нет pivot ответ здесь.

Я также жду лучшего ответа.

library(sparklyr)
library(rlang)
library(dplyr)

#Given
sparkDf_a <- copy_to(dest = sc, df = a)

helper_fn <- function(df, key, val, ...){

    quo_col <- enquo(val)

    df %>% 
        dplyr::group_by(id, attribute1) %>% 
        dplyr::select(!!quo_col) %>% 
        mutate(type_data  = key, 
               value_data = !!quo_col) %>% 
        dplyr::select(-!!quo_col)
}

b <- sdf_bind_rows(
      helper_fn(df = sparkDf_a, key = 'value', val = value),
      helper_fn(df = sparkDf_a, key = 'average', val = average),
      helper_fn(df = sparkDf_a, key = 'upper_bound', val = upper_bound),
      helper_fn(df = sparkDf_a, key = 'lower_bound', val = lower_bound)
 )

Результат

collect(b)
# A tibble: 80 x 4
# Groups:   id, attribute1 [20]
      id attribute1   type_data value_data
   <dbl>      <chr>       <chr>      <dbl>
 1     1        End upper_bound         80
 2     1      Other lower_bound         20
 3     1      Start lower_bound         20
 4     1       Test     average         50
 5     1       Test upper_bound         80
 6     1       That     average         50
 7     1       That lower_bound         20
 8     1      Those       value         40
 9    10      Start lower_bound         70
10    10       That     average        100
# ... with 70 more rows
...