Найти динамические интервалы для группы с помощью Sparklyr - PullRequest
0 голосов
/ 04 марта 2019

У меня есть огромная (~ 10 миллиардов строк) data.frame, которая выглядит примерно так:

data <- data.frame(Person = c(rep("John", 9), rep("Steve", 7), rep("Jane", 4)),
Year = c(1900:1908, 1902:1908, 1905:1908),
Grade = c(c(6,3,4,4,8,5,2,9,7), c(4,3,5,5,6,4,7), c(3,7,2,9)) )

Это набор из 3 человек, наблюдавшихся в разные годы, и у нас есть их Оценка заГод под вопросом.Я хотел бы создать переменную, которая для каждой оценки возвращает «упрощенную оценку».Упрощенная оценка - это просто Оценка, разрезанная через различные интервалы.Сложность в том, что интервалы у человека разные.Чтобы получить пороговые значения интервалов по Персоне, у меня есть следующий список:

list.threshold <- list(John = c(5,7), Steve = 4, Jane = c(3,5,8))

Таким образом, оценки Стива будут сокращены в 2 интервала, а оценки Джейн - в 4 интервала.Вот требуемые результаты (SimpleGrade):

    Person  Year  Grade  SimpleGrade
1:   John   1900    6        1
2:   John   1901    3        0
3:   John   1902    4        0
4:   John   1903    4        0
5:   John   1904    8        2
6:   John   1905    5        1
7:   John   1906    2        0
8:   John   1907    9        2
9:   John   1908    7        2
10:  Steve  1902    4        1
11:  Steve  1903    3        0
12:  Steve  1904    5        1
13:  Steve  1905    5        1
14:  Steve  1906    6        1
15:  Steve  1907    4        1
16:  Steve  1908    7        1
17:  Jane   1905    3        1
18:  Jane   1906    7        2
19:  Jane   1907    2        0
20:  Jane   1908    9        3

Мне нужно будет найти решение в sparklyr, потому что я работаю с огромной таблицей искр.

В dplyr я бы сделал что-нибудькак это:

dplyr

data <- group_by(data, Person) %>% 
mutate(SimpleGrade = cut(Grade, breaks = c(-Inf, list.threshold[[unique(Person)]], Inf), labels = FALSE, right = TRUE, include.lowest = TRUE) - 1)

Это работает, но у меня возникают проблемы с преобразованием этого решения в sparklyr из-за того, что пороги для каждого человека различны.Я думаю, что мне придется использовать функцию ft_bucketizer.Где я до сих пор с sparklyr:

sparklyr

spark_tbl <- group_by(spark_tbl, Person) %>%
ft_bucketizer(input_col  = "Grade",
            output_col = "SimpleGrade",
            splits     = c(-Inf, list.threshold[["John"]], Inf))

spark_tbl - только эквивалент таблицы искровых данных.Это работает, если я не изменяю пороги и использую, например, только значения Джона.

Большое спасибо, Том С.

1 Ответ

0 голосов
/ 05 марта 2019

Spark ML Bucketizer можно использовать только для глобальных операций, поэтому он не будет работать для вас.Вместо этого вы можете создать справочную таблицу

ref <- purrr::map2(names(list.threshold), 
   list.threshold, 
   function(name, brks) purrr::map2(
     c("-Infinity", brks), c(brks, "Infinity"),
     function(low, high) list(
       name = name, 
       low = low,
       high = high))) %>%
   purrr::flatten() %>% 
   bind_rows() %>% 
   group_by(name) %>%
   arrange(low, .by_group = TRUE) %>%
   mutate(simple_grade = row_number() - 1) %>%
   copy_to(sc, .) %>%
   mutate_at(vars(one_of("low", "high")), as.numeric)
# Source: spark<?> [?? x 4]
  name    low  high simple_grade
  <chr> <dbl> <dbl>        <dbl>
1 Jane   -Inf     3            0
2 Jane      3     5            1
3 Jane      5     8            2
4 Jane      8   Inf            3
5 John   -Inf     5            0
6 John      5     7            1
7 John      7   Inf            2
8 Steve  -Inf     4            0
9 Steve     4   Inf            1

, а затем left_join с таблицей данных:

sdf <- copy_to(sc, data)

simplified <- left_join(sdf, ref, by=c("Person" = "name")) %>%
  filter(Grade >= low & Grade < High) %>%
  select(-low, -high)
simplified
# Source: spark<?> [?? x 4]
   Person  Year Grade simple_grade
   <chr>  <int> <dbl>        <dbl>
 1 John    1900     6            1
 2 John    1901     3            0
 3 John    1902     4            0
 4 John    1903     4            0
 5 John    1904     8            2
 6 John    1905     5            1
 7 John    1906     2            0
 8 John    1907     9            2
 9 John    1908     7            2
10 Steve   1902     4            1
# … with more rows
simplified %>% dbplyr::remote_query_plan()
== Physical Plan ==
*(2) Project [Person#132, Year#133, Grade#134, simple_grade#15]
+- *(2) BroadcastHashJoin [Person#132], [name#12], Inner, BuildRight, ((Grade#134 >= low#445) && (Grade#134 < high#446))
   :- *(2) Filter (isnotnull(Grade#134) && isnotnull(Person#132))
   :  +- InMemoryTableScan [Person#132, Year#133, Grade#134], [isnotnull(Grade#134), isnotnull(Person#132)]
   :        +- InMemoryRelation [Person#132, Year#133, Grade#134], StorageLevel(disk, memory, deserialized, 1 replicas)
   :              +- Scan ExistingRDD[Person#132,Year#133,Grade#134]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
      +- *(1) Project [name#12, cast(low#13 as double) AS low#445, cast(high#14 as double) AS high#446, simple_grade#15]
         +- *(1) Filter ((isnotnull(name#12) && isnotnull(cast(high#14 as double))) && isnotnull(cast(low#13 as double)))
            +- InMemoryTableScan [high#14, low#13, name#12, simple_grade#15], [isnotnull(name#12), isnotnull(cast(high#14 as double)), isnotnull(cast(low#13 as double))]
                  +- InMemoryRelation [name#12, low#13, high#14, simple_grade#15], StorageLevel(disk, memory, deserialized, 1 replicas)
                        +- Scan ExistingRDD[name#12,low#13,high#14,simple_grade#15]
...