Я просто экспериментирую с использованием Scala классов классов в Flink. Я определил следующий интерфейс класса типов:
trait LikeEvent[T] {
def timestamp(payload: T): Int
}
Теперь я хочу рассмотреть DataSet
из LikeEvent[_]
следующим образом:
// existing classes that need to be adapted/normalized (without touching them)
case class Log(ts: Int, severity: Int, message: String)
case class Metric(ts: Int, name: String, value: Double)
// create instances for the raw events
object EventInstance {
implicit val logEvent = new LikeEvent[Log] {
def timestamp(log: Log): Int = log.ts
}
implicit val metricEvent = new LikeEvent[Metric] {
def timestamp(metric: Metric): Int = metric.ts
}
}
// add ops to the raw event classes (regular class)
object EventSyntax {
implicit class Event[T: LikeEvent](val payload: T) {
val le = implicitly[LikeEvent[T]]
def timestamp: Int = le.timestamp(payload)
}
}
Следующее приложение работает нормально :
// set up the execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
// underlying (raw) events
val events: DataSet[Event[_]] = env.fromElements(
Metric(1586736000, "cpu_usage", 0.2),
Log(1586736005, 1, "invalid login"),
Log(1586736010, 1, "invalid login"),
Log(1586736015, 1, "invalid login"),
Log(1586736030, 2, "valid login"),
Metric(1586736060, "cpu_usage", 0.8),
Log(1586736120, 0, "end of world"),
)
// count events per hour
val eventsPerHour = events
.map(new GetMinuteEventTuple())
.groupBy(0).reduceGroup { g =>
val gl = g.toList
val (hour, count) = (gl.head._1, gl.size)
(hour, count)
}
eventsPerHour.print()
Печать ожидаемого вывода
(0,5)
(1,1)
(2,1)
Однако, если я изменяю объект синтаксиса следующим образом:
// couldn't make it work with Flink!
// add ops to the raw event classes (case class)
object EventSyntax2 {
case class Event[T: LikeEvent](payload: T) {
val le = implicitly[LikeEvent[T]]
def timestamp: Int = le.timestamp(payload)
}
implicit def fromPayload[T: LikeEvent](payload: T): Event[T] = Event(payload)
}
Я получаю следующую ошибку:
type mismatch;
found : org.apache.flink.api.scala.DataSet[Product with Serializable]
required: org.apache.flink.api.scala.DataSet[com.salvalcantara.fp.EventSyntax2.Event[_]]
Итак, руководствуясь сообщением, я делаю следующее изменение:
val events: DataSet[Event[_]] = env.fromElements[Event[_]](...)
После этого ошибка меняется на:
could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[com.salvalcantara.fp.EventSyntax2.Event[_]]
Не могу понять почему EventSyntax2
приводит к этим ошибкам, тогда как EventSyntax
компилируется и работает хорошо. Почему использование оболочки класса case в EventSyntax2
более проблематично c, чем использование обычного класса, как в EventSyntax
?
В любом случае, мой вопрос состоит из двух частей:
- Как можно решить мою проблему с помощью
EventSyntax2
? - Каким будет самый простой способ достижения моих целей? Здесь я просто экспериментирую с шаблоном класса типов ради обучения, но, безусловно, более объектно-ориентированный подход (основанный на подтипах) выглядит мне проще. Примерно так:
// Define trait
trait Event {
def timestamp: Int
def payload: Product with Serializable // Any case class
}
// Metric adapter (similar for Log)
object MetricAdapter {
implicit class MetricEvent(val payload: Metric) extends Event {
def timestamp: Int = payload.ts
}
}
А затем просто используйте val events: DataSet[Event] = env.fromElements(...)
в основном.
Обратите внимание, что Список классов, реализующих определенный класс типов ставит аналогичный вопрос, но рассматривает простой Scala List
вместо Flink DataSet
(или DataStream
). Основное внимание в моем вопросе уделяется использованию шаблона класса типов в Flink для того, чтобы как-то рассмотреть гетерогенные потоков / наборов данных, и понять, действительно ли это имеет смысл, или в этом случае следует просто отдавать предпочтение обычной характеристике и наследовать ее как указано выше.
Кстати, вы можете найти код здесь: https://github.com/salvalcantara/flink-events-and-polymorphism.