обработка сетевых пакетов в искре с сохранением состояния - PullRequest
0 голосов
/ 12 февраля 2019

Я хотел бы использовать Spark для разбора сетевых сообщений и группировки их в логические объекты с сохранением состояния.

Описание проблемы

Предположим, что каждое сообщение находится в одной строке входного фрейма данных, изображенного ниже.

| row   | time | raw payload   |
+-------+------+---------------+
|  1    | 10   | TEXT1;        |
|  2    | 20   | TEXT2;TEXT3;  |
|  3    | 30   | LONG-         |
|  4    | 40   | TEXT1;        |
|  5    | 50   | TEXT4;TEXT5;L |
|  6    | 60   | ONG           |
|  7    | 70   | -TEX          |
|  8    | 80   | T2;           | 

Задача состоит в разборелогические сообщения в необработанной полезной нагрузке и предоставляют их в новом выходном кадре данных.В этом примере каждое логическое сообщение в полезной нагрузке заканчивается точкой с запятой (разделителем).

Тогда требуемый выходной кадр данных может выглядеть следующим образом:

| row   | time | message       |
+-------+------+---------------+
|  1    | 10   | TEXT1;        |
|  2    | 20   | TEXT2;        |
|  3    | 20   | TEXT3;        |
|  4    | 30   | LONG-TEXT1;   |
|  5    | 50   | TEXT4;        |
|  6    | 50   | TEXT5;        |
|  7    | 50   | LONG-TEXT2;   |

Обратите внимание, что некоторые строки сообщений не приводят к новой строке в результате (например, строки 4, 6,7,8)и некоторые дают даже несколько строк (например, строки 2, 5)

Мои вопросы:

  • это вариант использования для UDAF?Если да, то как, например, я должен реализовать функцию merge?я понятия не имею, какова его цель.
  • , поскольку порядок сообщений имеет значение (я не могу правильно обработать LONGTEXT-1, LONGTEXT-2 без соблюдения порядка сообщений), могу ли я сказать, что спарк должен распараллеливаться, возможно, на более высоком уровне(например, за календарный день сообщений), но не следует распараллеливать в течение дня (например, необходимо обрабатывать события во время 50, 60, 70, 80).
  • следующий вопрос: возможно ли, что решениебудет использоваться не только в традиционной искровой, но и в искровой структурированной трансляции?Или последний требует своего собственного метода обработки с сохранением состояния?

Ответы [ 2 ]

0 голосов
/ 22 февраля 2019

Хорошо, пока я понял, как это сделать с UDAF.

class TagParser extends UserDefinedAggregateFunction {

  override def inputSchema: StructType = StructType(StructField("value", StringType) :: Nil)

  override def bufferSchema: StructType = StructType(
    StructField("parsed", ArrayType(StringType)) ::
      StructField("rest", StringType)
      :: Nil)

  override def dataType: DataType = ArrayType(StringType)

  override def deterministic: Boolean = true

  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = IndexedSeq[String]()
    buffer(1) = null
  }

  def doParse(str: String, buffer: MutableAggregationBuffer): Unit = {

    buffer(0) = IndexedSeq[String]()

    val prevRest = buffer(1)
    var idx = -1
    val strToParse = if (prevRest != null) prevRest + str else str

    do {
      val oldIdx = idx;
      idx = strToParse.indexOf(';', oldIdx + 1)

      if (idx == -1) {
        buffer(1) = strToParse.substring(oldIdx + 1)
      } else {
        val newlyParsed = strToParse.substring(oldIdx + 1, idx)
        buffer(0) = buffer(0).asInstanceOf[IndexedSeq[String]] :+ newlyParsed
        buffer(1) = null
      }

    } while (idx != -1)
  }

  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {

    if (buffer == null) {
      return
    }

    doParse(input.getAs[String](0), buffer)
  }

  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = throw new UnsupportedOperationException

  override def evaluate(buffer: Row): Any = buffer(0)
}

Здесь демонстрационное приложение использует вышеуказанный UDAF для решения проблемы сверху:

case class Packet(time: Int, payload: String)

object TagParserApp extends App {

  val spark, sc = ... // kept out for brevity

  val df = sc.parallelize(List(
    Packet(10, "TEXT1;"),
    Packet(20, "TEXT2;TEXT3;"),
    Packet(30, "LONG-"),
    Packet(40, "TEXT1;"),
    Packet(50, "TEXT4;TEXT5;L"),
    Packet(60, "ONG"),
    Packet(70, "-TEX"),
    Packet(80, "T2;")
  )).toDF()

  val tp = new TagParser
  val window = Window.rowsBetween(Window.unboundedPreceding, Window.currentRow)
  val df2 = df.withColumn("msg", tp.apply(df.col("payload")).over(window))
  df2.show()
}

это дает:

+----+-------------+--------------+
|time|      payload|           msg|
+----+-------------+--------------+
|  10|       TEXT1;|       [TEXT1]|
|  20| TEXT2;TEXT3;|[TEXT2, TEXT3]|
|  30|        LONG-|            []|
|  40|       TEXT1;|  [LONG-TEXT1]|
|  50|TEXT4;TEXT5;L|[TEXT4, TEXT5]|
|  60|          ONG|            []|
|  70|         -TEX|            []|
|  80|          T2;|  [LONG-TEXT2]|
+----+-------------+--------------+

главная проблема для меня состояла в том, чтобы выяснить, как на самом деле применить этот UDAF, а именно с помощью этого:

df.withColumn("msg", tp.apply(df.col("payload")).over(window))

единственное, что мне нужно сейчасВыяснить аспекты параллелизации (что я хочу сделать только тогда, когда мы не полагаемся на порядок), но это отдельная проблема для меня.

0 голосов
/ 12 февраля 2019

Как правило, вы можете запускать произвольные агрегаты с отслеживанием состояния при потоковой передаче с использованием mapGroupsWithState из flatMapGroupsWithState.Вы можете найти несколько примеров здесь .Ни один из них, тем не менее, не гарантирует, что обработка потока будет упорядочена по времени события.

Если вам нужно принудительно упорядочить данные, вы должны попытаться использовать оконные операции для времени события .В этом случае вам нужно вместо этого запускать операции без сохранения состояния, но если количество элементов в каждой группе окон достаточно мало, вы можете использовать, например, collectList, а затем применить UDF (где вы можете управлять состоянием для каждой группы окон) в каждом списке.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...