Spark Scala - проверить поле вложенного класса дел - PullRequest
0 голосов
/ 17 ноября 2018

У меня есть три класса дел, как показано ниже:

case class Result(
   result: Seq[Signal],
   hop:    Int)

case class Signal(
   rtt:  Double,
   from: String)

case class Traceroute(
  dst_name:  String,
  from:      String,
  prb_id:    BigInt,
  msm_id:    BigInt,
  timestamp: BigInt,
  result:    Seq[Result])

A Traceroute имеет поле result, которое представляет собой последовательность Результат .Каждый Результат представляет собой последовательность Сигнал .

Я пытаюсь проверить, не является ли поле Result отрицательным.Моя json-запись выглядит следующим образом:

{"prb_id": 4247, "result": [{"result": [{"rtt": 1.955, "ttl": 255, "from": "89.105.200.57", "size": 28}, {"rtt": 1.7, "ttl": 255, "from": "10.10.0.5", "size": 28}, {"rtt": 1.709, "ttl": 255, "from": "89.105.200.57", "size": 28}], "hop": 1}]}

Для ясности я опускаю некоторые атрибуты в json-записи.Атрибут result - это поле результата в классе case Traceroute.

Я использовал фильтр, чтобы проверить, является ли сигнал rtt в ноте отрицательным, используя фильтр, но я не получил ожидаемого.

val checkrtts = checkError.filter(x => x.result.foreach(p => p.result.foreach(f => checkSignal(f))))

Функция checkSignal выглядит следующим образом:

def checkSignal(signal: Signal): Signal = {
  if (signal.rtt > 0) {
    return signal
  } else {
    return null
  }

}

Приведен пример двух экземпляров Traceroute:

{"timestamp": 1514768409, "result": [{"result": [{"rtt": 1.955, "ttl": 255, "from": "89.105.200.57", "size": 28}], "hop": 1}]}
{"timestamp": 1514768402, "result": [{"result": [{"rtt": -2.5, "ttl": 255, "from": "89.105.200.57", "size": 28},{"rtt": 19.955, "ttl": 255, "from": "89.105.200.57", "size": 28}], "hop": 2}]}

Для первого Traceroute без измененийбыть примененнымДля второго Traceroute поле result.result имеет два элемента (тип Signal), первый сигнал имеет отрицательное значение rtt, поэтому я должен удалить этот сигнал из result.result.Но второй сигнал не должен быть удален.

В результате выходной сигнал должен быть следующим:

{"timestamp": 1514768409, "result": [{"result": [{"rtt": 1.955, "ttl": 255, "from": "89.105.200.57", "size": 28}], "hop": 1}]}
{"timestamp": 1514768402, "result": [{"result": [{"rtt": 19.955, "ttl": 255, "from": "89.105.200.57", "size": 28}], "hop": 2}]}

Любая помощь, пожалуйста.Я новичок в искре и скале.Я пробовал много способов, но результат не такой, как ожидалось.

1 Ответ

0 голосов
/ 17 ноября 2018

Кажется, у вас есть небольшое недопонимание относительно того, что должна делать функция фильтра. Он фильтрует весь объект Traceroute из набора данных, который возвращает false. Что вам нужно сделать, это написать функцию карты, которая преобразует ваш исходный объект Traceroute в нужный. Ниже приведен пример того, как это сделать для Dataset[Traceroute]

Сначала вам нужно немного изменить классы дел, как показано ниже.

case class Result(var result: Seq[Signal],
                   hop:    Int)

case class Signal(rtt:  Double,
                   from: String)

case class Traceroute( dst_name:  String,
                       from:      String,
                       prb_id:    BigInt,
                       msm_id:    BigInt,
                       timestamp: BigInt,
                       result:    Seq[Result])

Как видите, я добавил var в поле result класса Result. Это поможет нам позже изменить поле result в вашей пользовательской функции, которое мы передадим операции карты

Затем определите следующие две функции, как показано ниже:

def checkSignal(signal: Signal): Boolean = {
    if (signal.rtt > 0) {
      return true
    } else {
      return false
    }

  }

 def removeNegative(traceroute: Traceroute): Traceroute = {

    val outerList = traceroute.result
    for( temp <- outerList){

      val innerList = temp.result
      //here we are filtering the list to only contain nonnegative elements
      val newinnerList = innerList.filter(checkSignal(_))
      //here we are reassigning the newlist to result
      temp.result = newinnerList

    }

    traceroute
  }

Теперь мы сопоставим исходный набор данных с преобразованным, где мы правильно получим отфильтрованный список.

val dataPath = "hdfs/path/to/traceroute.json"
val tracerouteSchema = ScalaReflection.schemaFor[Traceroute].dataType.asInstanceOf[StructType]
val dataset = spark.read.schema(tracerouteSchema).json(dataPath).as[Traceroute]

println("Showing 10 rows of original Dataset")
dataset.show(10, truncate = false)

val maprtts = dataset.map(x => removeNegative(x))


println("Showing 10 rows of transformed dataset")
maprtts.show(10, truncate = false)

Следующий вывод:

Showing 10 rows of original dataset
+--------+----+------+------+----------+-------------------------------------------------------+
|dst_name|from|prb_id|msm_id|timestamp |result                                                 |
+--------+----+------+------+----------+-------------------------------------------------------+
|null    |null|null  |null  |1514768409|[[[[1.955, 89.105.200.57]], 1]]                        |
|null    |null|null  |null  |1514768402|[[[[-2.5, 89.105.200.57], [19.955, 89.105.200.57]], 2]]|
+--------+----+------+------+----------+-------------------------------------------------------+

Showing 10 rows of transformed dataset
+--------+----+------+------+----------+--------------------------------+
|dst_name|from|prb_id|msm_id|timestamp |result                          |
+--------+----+------+------+----------+--------------------------------+
|null    |null|null  |null  |1514768409|[[[[1.955, 89.105.200.57]], 1]] |
|null    |null|null  |null  |1514768402|[[[[19.955, 89.105.200.57]], 2]]|
+--------+----+------+------+----------+--------------------------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...