Удаление дублированных строк в sparklyr - PullRequest
1 голос
/ 12 января 2020

Мне нужно удалить строки, которые дублируются в одном столбце на основе дубликатов в другом столбце, используя sparklyr.

Набор данных iris имеет ряд наблюдений, для которых 4 функции идентичны Значения Sepal.Width, Petal.Length, Petal.Width и Species похожи (строки различаются только для столбца Sepal.Length).

Давайте создадим копию iris в spark

library(sparklyr)
sc <- spark_connect(master = "local", version = "2.3") 

iris_spark <- copy_to(sc, iris)

Базовый метод R

Это базовый метод R, который удаляет дублирующиеся строки, сохраняя только строку с наибольшим значением для Sepal.Length:

iris_order = iris[order(iris[,'Sepal.Length'],-iris[,'Sepal.Length']),] ### sort first
iris_subset = iris_order[!duplicated(iris_order$Sepal.Length),] ### Keep highest
dim(iris_subset) # 35 5

, но это не работает для tbl_spark объекта:

iris_spark_order = iris_spark[order(iris_spark[,'Sepal.Length'],-iris_spark[,'Sepal.Length']),]

Ошибка в iris_spark [, "Sepal.Length"]: неверное количество измерений

Tidyverse

Существует два возможных dplyr решения, которые я могу придумать, которые подходят для data.frame, но не tbl_spark:

1)

library(dplyr)
iris %>% distinct()
iris_spark %>% distinct()

Error: org.apache.spark.sql.AnalysisException: cannot resolve '`Sepal.Length`' given input columns: [iris.Sepal_Length, iris.Sepal_Width, iris.Petal_Width, iris.Petal_Length, iris.Species]; line 1 pos 16;
'Distinct
+- 'Project ['Sepal.Length]
   +- SubqueryAlias iris
      +- LogicalRDD [Sepal_Length#13, Sepal_Width#14, Petal_Length#15, Petal_Width#16, Species#17], false

    at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:92)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:89)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:106)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:118)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:122)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.immutable.List.map(List.scala:285)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:122)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:127)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:127)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:95)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:89)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:84)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:84)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:92)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
    at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
    at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
    at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at sparklyr.Invoke.invoke(invoke.scala:147)
    at sparklyr.StreamHandler.handleMethodCall(stream.scala:123)
    at sparklyr.StreamHandler.read(stream.scala:66)
    at sparklyr.BackendHandler.channelRead0(handler.scala:51)
    at sparklyr.BackendHandler.channelRead0(handler.scala:4)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
    at java.lang.Thread.run(Unknown Source)

2)

iris_order <- arrange(iris, Sepal.Length)
iris_subset <- iris_order [!duplicated(iris_order $Sepal.Length),]

, но не работает на tbl_spark объекте:

library(dplyr)
iris_order <- arrange(iris_spark, Sepal.Length)
iris_subset <- iris_order [!duplicated(iris_order$Sepal.Length),]

Ошибка в iris_order [! Duplicated (iris_order $ Sepal.Length) ),]: неверное число измерений

data.table

Решение DT для data.frame

library(data.table)
df <- iris # iris resides in package that is locked so copy to new object
unique(setDT(df)[order(Sepal.Length, -Species)], by = "Sepal.Length")

, но не работает на tbl_spark объект:

unique(setDT(iris_spark)[order(Sepal.Length)], by = "Sepal.Length")

Ошибка в с etDT (iris_spark): Все элементы в аргументе от 'x' до 'setDT' должны иметь одинаковую длину, но профиль входных длин (длина: частота): [1: 1, 2: 1] Первая запись с меньшим, чем 2 записи - это 1

Так как же один на самом деле завершил sh эту задачу в Spark с sparklyr?

Ответы [ 2 ]

1 голос
/ 16 января 2020

Если бы проблема была такой же простой, как указано в вопросе, где вы хотите получить наибольшее значение одного столбца, учитывая n - 1 столбцов группировки, простого агрегирования будет достаточно:

iris_spark %>% 
  group_by(Sepal_Width, Petal_Length, Petal_Width, Species) %>% 
  summarise(Sepal_Length=max(Sepal_Length))

Если вы не важно, какое значение вы получите *, а количество столбцов будет отличаться, вы можете удалить дубликаты (это внутренне использует first, который нельзя использовать в dplyr без окна):

iris_spark %>% 
  spark_dataframe() %>% 
  invoke(
    "dropDuplicates",
    list("Sepal_Width", "Petal_Length" ,"Petal_Width", "Species")) %>% 
  sdf_register()

Если вы заботитесь о заказе, решение Аркуна технически правильно, но не очень масштабируемо. Вместо этого вы можете объединить оставшиеся столбцы в struct и взять его max (structs использовать лексикографический c порядок).

iris_spark %>%
  group_by(Sepal_Width, Petal_Length, Petal_Width, Species) %>% 
  # You can put additional values in the struct
  summarise(values=max(struct(Sepal_Length))) %>% 
  mutate(Sepal_Length=values.Sepal_Length) 

* Важно подчеркнуть, что любой предыдущий порядок игнорируется, даже если в игрушечных примерах может быть указано иное.

1 голос
/ 12 января 2020

filter будет работать с sparklyr

library(dplyr)
library(sparklyr)
iris_spark %>% 
    group_by(Sepal.Length) %>% 
    filter(n() ==1)
...