SparkR :: gapply возвращает меньше строк, чем ожидалось - PullRequest
0 голосов
/ 09 января 2019

См. Пример ниже. У меня есть датафрейм с 2 столбцами и 1000 строк. Z simple добавляет 10 к одному из столбцов, используя gapply, на выходе получается еще один SparkDataFrame с 1000 строками - это хорошо. newZ делает то же самое, но если key==10, возвращает NULL.

Я бы ожидал, что на выходе будет 999 строк. Почему это меньше, чем это?

library(SparkR)
SparkR::sparkR.session()

sdf=as.DataFrame(data.frame(x=1:1000,y=1),numPartitions=10)

Z=gapply(sdf,'x',function(key,d){
  data.frame(x=key[[1]],newy=d$y+10)
},schema="x int, newy int")

count(Z)
# [1] 1000

newZ=gapply(sdf,'x',function(key,d){
  if(as.integer(key[[1]])==10)return(NULL)
  data.frame(x=key[[1]],newy=d$y+10)
},schema="x int, newy int")

count(newZ)
# [1] 993

Конфигурация некоторых свечей:

> sparkR.conf()
$eventLog.rolloverIntervalSeconds
[1] "3600"

$spark.akka.frameSize
[1] "256"

$spark.app.name
[1] "Databricks Shell"

$spark.databricks.cloudProvider
[1] "Azure"

$spark.databricks.clusterUsageTags.clusterMaxWorkers
[1] "12"

$spark.databricks.clusterUsageTags.clusterMetastoreAccessType
[1] "RDS_DIRECT"

$spark.databricks.clusterUsageTags.clusterMinWorkers
[1] "2"

$spark.databricks.clusterUsageTags.clusterPythonVersion
[1] "3"

$spark.databricks.clusterUsageTags.clusterResourceClass
[1] "Serverless"

$spark.databricks.clusterUsageTags.clusterScalingType
[1] "autoscaling"

$spark.databricks.clusterUsageTags.clusterTargetWorkers
[1] "2"

$spark.databricks.clusterUsageTags.clusterWorkers
[1] "2"

$spark.databricks.clusterUsageTags.driverNodeType
[1] "Standard_E8s_v3"

$spark.databricks.clusterUsageTags.enableElasticDisk
[1] "true"

$spark.databricks.clusterUsageTags.numPerClusterInitScriptsV2
[1] "1"

$spark.databricks.clusterUsageTags.sparkVersion
[1] "latest-stable-scala2.11"

$spark.databricks.clusterUsageTags.userProvidedRemoteVolumeCount
[1] "0"

$spark.databricks.clusterUsageTags.userProvidedRemoteVolumeSizeGb
[1] "0"   

$spark.databricks.delta.multiClusterWrites.enabled
[1] "true"

$spark.databricks.driverNodeTypeId
[1] "Standard_E8s_v3"

$spark.databricks.r.cleanWorkspace
[1] "true"   

$spark.databricks.workerNodeTypeId
[1] "Standard_DS13_v2"

$spark.driver.maxResultSize
[1] "4g"

$spark.eventLog.enabled
[1] "false"

$spark.executor.id
[1] "driver"

$spark.executor.memory
[1] "40658m"

$spark.hadoop.databricks.dbfs.client.version
[1] "v2"

$spark.hadoop.fs.s3a.connection.maximum
[1] "200"

$spark.hadoop.fs.s3a.multipart.size
[1] "10485760"

$spark.hadoop.fs.s3a.multipart.threshold
[1] "104857600"

$spark.hadoop.fs.s3a.threads.max
[1] "136"

$spark.hadoop.fs.wasb.impl.disable.cache
[1] "true"

$spark.hadoop.fs.wasbs.impl
[1] "shaded.databricks.org.apache.hadoop.fs.azure.NativeAzureFileSystem"

$spark.hadoop.fs.wasbs.impl.disable.cache
[1] "true"

$spark.hadoop.hive.server2.idle.operation.timeout
[1] "7200000"

$spark.hadoop.hive.server2.idle.session.timeout
[1] "900000"

$spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version
[1] "2"

$spark.hadoop.parquet.memory.pool.ratio
[1] "0.5"

$spark.home
[1] "/databricks/spark"

$spark.logConf
[1] "true"

$spark.r.numRBackendThreads
[1] "1"

$spark.rdd.compress
[1] "true"

$spark.scheduler.mode
[1] "FAIR"

$spark.serializer.objectStreamReset
[1] "100"

$spark.shuffle.manager
[1] "SORT"

$spark.shuffle.memoryFraction
[1] "0.2"

$spark.shuffle.reduceLocality.enabled
[1] "false"

$spark.shuffle.service.enabled
[1] "true"

$spark.sql.catalogImplementation
[1] "hive"

$spark.sql.hive.convertCTAS
[1] "true"

$spark.sql.hive.convertMetastoreParquet
[1] "true"

$spark.sql.hive.metastore.jars
[1] "/databricks/hive/*"

$spark.sql.hive.metastore.version
[1] "0.13.0"

$spark.sql.parquet.cacheMetadata
[1] "true"

$spark.sql.parquet.compression.codec
[1] "snappy"

$spark.sql.ui.retainedExecutions
[1] "100"

$spark.sql.warehouse.dir
[1] "/user/hive/warehouse"

$spark.storage.blockManagerTimeoutIntervalMs
[1] "300000"

$spark.storage.memoryFraction
[1] "0.5"

$spark.streaming.driver.writeAheadLog.allowBatching
[1] "true"

$spark.task.reaper.enabled
[1] "true"

$spark.task.reaper.killTimeout
[1] "60s"

$spark.worker.cleanup.enabled
[1] "false"
...