Вот что я придумал, используя RDD API и Джексона. Я выбрал низкоуровневый API Spark, так как он не имеет схемы и не уверен, как структурированный API будет соответствовать переменным типам событий ввода. Если упомянутый Gson поддерживает полиморфную десериализацию, его можно использовать вместо Джексона, я просто выбрал Джексона, так как я более знаком с ним.
Проблема может быть разбита на этапы:
- Десериализация ввода в объекты по типу события.
- Уменьшить по идентификатору и типу. Снижение должно вести себя по-разному для разных типов, например, представления просто сводятся к сумме, в то время как имя пользователя должно обрабатываться по-другому. В этом примере давайте просто предположим, что имя пользователя уникально в
id
и выберем первое.
- Соберите уменьшенные предметы на
id
.
Шаг 2 требует наибольшего внимания, поскольку в Spark API такой функциональности нет, и должна быть какая-то проверка во время выполнения, если десериализованные события относятся к другому классу. Чтобы преодолеть это, давайте введем общую черту Reducible
, которая может инкапсулировать различные типы:
trait Reducible[T] {
def reduce(that: Reducible[_]): this.type
def value: T
}
// simply reduces to sum
case class Sum(var value: Int) extends Reducible[Int] {
override def reduce(that: Reducible[_]): Sum.this.type = that match {
case Sum(thatValue) =>
value += thatValue
this
}
}
// for picking the first element, i.e. username
case class First(value: String) extends Reducible[String] {
override def reduce(that: Reducible[_]): First.this.type = this
}
Проверка времени выполнения обрабатывается в этих классах, например, Sum
завершится неудачей, если правый объект не того же типа.
Далее давайте определим модели для событий и расскажем Джексону, как обращаться с полиморфизмом:
@JsonTypeInfo(use=JsonTypeInfo.Id.NAME, include=JsonTypeInfo.As.PROPERTY, property="event", visible=true)
sealed trait Event[T] {
val id: Int
val event: String
def value: Reducible[T]
}
abstract class CountingEvent extends Event[Int] {
override def value: Reducible[Int] = Sum(1)
}
@JsonTypeName("clicked") case class Click(id: Int, event: String, target: String) extends CountingEvent
@JsonTypeName("viewed") case class View(id: Int, event: String, website: String) extends CountingEvent
@JsonTypeName("login") case class Login(id: Int, event: String, username: String) extends Event[String] {
override def value: Reducible[String] = First(username)
}
object EventMapper {
private val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
// the list of classes could be auto-generated, see
// https://stackoverflow.com/questions/34534002/getting-subclasses-of-a-sealed-trait
mapper.registerSubtypes(classOf[Click], classOf[View], classOf[Login])
def apply(v1: String): Event[_] = mapper.readValue(v1, classOf[Event[_]])
}
Все события должны иметь поля id
и event
. Последний используется для определения того, в какой класс десериализоваться, Джексону необходимо знать все классы заранее. Черта Event
объявлена как запечатанная черта, поэтому все реализующие классы могут быть определены во время компиляции. Я пропускаю этот рефлексивный шаг и просто жестко программирую список классов, здесь есть хороший ответ, как сделать это автоматически Получение подклассов запечатанной черты
Теперь мы готовы написать логику приложения. Для простоты sc.parallelize
используется для загрузки данных примера. Также можно использовать потоковое искрение.
val in = List(
"{\"id\": 123, \"event\": \"clicked\", \"target\": \"my_button\"}",
"{\"id\": 123, \"event\": \"viewed\", \"website\": \"http://xyz1...\"}",
"{\"id\": 123, \"event\": \"viewed\", \"website\": \"http://xyz1...\"}",
"{\"id\": 123, \"event\": \"login\", \"username\": \"Bob\"}",
"{\"id\": 456, \"event\": \"login\", \"username\": \"Sue\"}",
"{\"id\": 456, \"event\": \"viewed\", \"website\": \"http://xyz1...\"}"
)
// partition (id, event) pairs only by id to minimize shuffle
// when we later group by id
val partitioner = new HashPartitioner(10) {
override def getPartition(key: Any): Int = key match {
case (id: Int, _) => super.getPartition(id)
case id: Int => super.getPartition(id)
}
}
sc.parallelize(in)
.map(EventMapper.apply)
.keyBy(e => (e.id, e.event))
.mapValues(_.value)
.reduceByKey(partitioner, (left, right) => left.reduce(right))
.map {
case ((id, key), wrapper) => (id, (key, wrapper.value))
}
.groupByKey(partitioner)
.mapValues(_.toMap)
.foreach(println)
Выход:
(123,Map(clicked -> 1, viewed -> 2, login -> Bob))
(456,Map(login -> Sue, viewed -> 1))