DataSet / DataStream типа интерфейс класса - PullRequest
2 голосов
/ 13 апреля 2020

Я просто экспериментирую с использованием Scala классов классов в Flink. Я определил следующий интерфейс класса типов:

trait LikeEvent[T] {
    def timestamp(payload: T): Int
}

Теперь я хочу рассмотреть DataSet из LikeEvent[_] следующим образом:

// existing classes that need to be adapted/normalized (without touching them)
case class Log(ts: Int, severity: Int, message: String)
case class Metric(ts: Int, name: String, value: Double)

// create instances for the raw events
object EventInstance {

    implicit val logEvent = new LikeEvent[Log] {
        def timestamp(log: Log): Int = log.ts
    }

    implicit val metricEvent = new LikeEvent[Metric] {
        def timestamp(metric: Metric): Int = metric.ts
    }
}

// add ops to the raw event classes (regular class)
object EventSyntax {

    implicit class Event[T: LikeEvent](val payload: T) {
        val le = implicitly[LikeEvent[T]]
        def timestamp: Int = le.timestamp(payload)
    }
}

Следующее приложение работает нормально :

// set up the execution environment
val env = ExecutionEnvironment.getExecutionEnvironment

// underlying (raw) events
val events: DataSet[Event[_]] = env.fromElements(
  Metric(1586736000, "cpu_usage", 0.2),
  Log(1586736005, 1, "invalid login"),
  Log(1586736010, 1, "invalid login"),
  Log(1586736015, 1, "invalid login"),
  Log(1586736030, 2, "valid login"),
  Metric(1586736060, "cpu_usage", 0.8),
  Log(1586736120, 0, "end of world"),
)

// count events per hour
val eventsPerHour = events
  .map(new GetMinuteEventTuple())
  .groupBy(0).reduceGroup { g =>
    val gl = g.toList
    val (hour, count) = (gl.head._1, gl.size)
    (hour, count)
  }

eventsPerHour.print()

Печать ожидаемого вывода

(0,5)
(1,1)
(2,1)

Однако, если я изменяю объект синтаксиса следующим образом:

// couldn't make it work with Flink!
// add ops to the raw event classes (case class)
object EventSyntax2 {

  case class Event[T: LikeEvent](payload: T) {
    val le = implicitly[LikeEvent[T]]
    def timestamp: Int = le.timestamp(payload)
  }

  implicit def fromPayload[T: LikeEvent](payload: T): Event[T] = Event(payload)  
}

Я получаю следующую ошибку:

type mismatch;
found   : org.apache.flink.api.scala.DataSet[Product with Serializable]
required: org.apache.flink.api.scala.DataSet[com.salvalcantara.fp.EventSyntax2.Event[_]]

Итак, руководствуясь сообщением, я делаю следующее изменение:

val events: DataSet[Event[_]] = env.fromElements[Event[_]](...)

После этого ошибка меняется на:

could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[com.salvalcantara.fp.EventSyntax2.Event[_]]

Не могу понять почему EventSyntax2 приводит к этим ошибкам, тогда как EventSyntax компилируется и работает хорошо. Почему использование оболочки класса case в EventSyntax2 более проблематично c, чем использование обычного класса, как в EventSyntax?

В любом случае, мой вопрос состоит из двух частей:

  • Как можно решить мою проблему с помощью EventSyntax2?
  • Каким будет самый простой способ достижения моих целей? Здесь я просто экспериментирую с шаблоном класса типов ради обучения, но, безусловно, более объектно-ориентированный подход (основанный на подтипах) выглядит мне проще. Примерно так:
// Define trait
trait Event {
    def timestamp: Int
    def payload: Product with Serializable // Any case class
}

// Metric adapter (similar for Log)
object MetricAdapter {

    implicit class MetricEvent(val payload: Metric) extends Event {
        def timestamp: Int = payload.ts
    }
}

А затем просто используйте val events: DataSet[Event] = env.fromElements(...) в основном.

Обратите внимание, что Список классов, реализующих определенный класс типов ставит аналогичный вопрос, но рассматривает простой Scala List вместо Flink DataSet (или DataStream). Основное внимание в моем вопросе уделяется использованию шаблона класса типов в Flink для того, чтобы как-то рассмотреть гетерогенные потоков / наборов данных, и понять, действительно ли это имеет смысл, или в этом случае следует просто отдавать предпочтение обычной характеристике и наследовать ее как указано выше.

Кстати, вы можете найти код здесь: https://github.com/salvalcantara/flink-events-and-polymorphism.

1 Ответ

3 голосов
/ 18 апреля 2020

Краткий ответ: Flink не может получить TypeInformation в scala для типов подстановочных знаков

Длинный ответ: оба ваших вопроса действительно спрашивают, что такое TypeInformation, как оно используется и как это выведено.

TypeInformation - это внутренняя система типов Flink, которую она использует для сериализации данных, когда они перетасовываются по сети и сохраняются в backbend-состоянии (при использовании API DataStream).

Сериализация является основной проблемой производительности при обработке данных, поэтому Flink содержит специализированные сериализаторы для общих типов данных и шаблонов. В своем стеке Java он поддерживает все примитивы JVM, кортежи Pojo, Flink, некоторые распространенные типы коллекций и avro. Тип вашего класса определяется с помощью отражения, и если он не соответствует известному типу, он вернется к Kryo.

В API scala информация о типе получается с помощью имплицитов. Все методы в scala DataSet и DataStream api имеют свои обобщенные параметры c, аннотированные для неявного класса классов.

def map[T: TypeInformation] 

Этот TypeInformation может быть предоставлен вручную, как любой класс типов, или получен с использованием макроса, импортированного из flink.

import org.apache.flink.api.scala._

Этот макрос украшает стек типов java с поддержкой кортежей scala, классов падежей scala и некоторых распространенных типов библиотек scala std. Я говорю декоратор, потому что он может и будет возвращаться к стеку java, если ваш класс не относится к таким типам.

Так почему же работает версия 1?

Поскольку это обычный класс, которому не может соответствовать стек типов, он разрешил его типу generi c и возвратил сериализатор на основе kryo. Вы можете проверить это с консоли и увидеть, что он возвращает тип generi c.

> scala> implicitly[TypeInformation[EventSyntax.Event[_]]]
res2: org.apache.flink.api.common.typeinfo.TypeInformation[com.salvalcantara.fp.EventSyntax.Event[_]] = GenericType<com.salvalcantara.fp.EventSyntax.Event>

Версия 2 не работает, поскольку она распознала тип как класс case, а затем рекурсивно получает TypeInformation экземпляров для каждого из его членов. Это невозможно для подстановочных типов, которые отличаются от Any, поэтому деривация завершается неудачно.

В общем, вы не должны использовать Flink с разнородными типами, потому что он не сможет получить эффективные сериализаторы для вашей рабочей нагрузки.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...