У меня есть поток с несколькими различными типами json, каждый из которых связан с пользовательскими событиями, что является наиболее эффективным способом разделения и объединения - PullRequest
1 голос
/ 18 марта 2019

У меня есть один поток с несколькими различными типами сообщений json. Всего существует 65 типов событий json, все с разными схемами. Все они имеют общий идентификатор пользователя.

{'id': 123, 'event': 'clicked', 'target': 'my_button'}
{'id': 123, 'event': 'viewed', 'website': 'http://xyz1...'}
{'id': 123, 'event': 'viewed', 'website': 'http://xyz2...'}
{'id': 123, 'event': 'login', 'username': 'Bob'}
{'id': 456, 'event': 'viewed', 'website': 'http://xyz3...'}
{'id': 456, 'event': 'login', 'username': 'Susy'}

Я хотел бы обработать все типы событий, каждый с настраиваемыми полями, а затем объединить все данные по всем типам фильтров.

{'id': 123, 'page_view_cnt': 100, 'user': 'Bob', 'click_cnt': 20}
{'id': 456, 'page_view_cnt': 14, 'user': 'Susy'}

Кто-нибудь знает эффективный способ сделать это? Вот текущий мыслительный процесс

  • Начать с потока строк
  • Используйте GSON для разбора json вместо встроенного парсера json, который может попытаться определить типы.
  • Создание 65 фильтров операторов на основе каждого типа. У json будет событие = xyz, по которому я могу различить.
  • Объединить пользовательские свойства каждого фильтра в отображение идентификатора пользователя -> свойства
  • Объединить все карты из всех фильтров

Звучит нормально или есть лучший способ сделать это?

1 Ответ

1 голос
/ 22 марта 2019

Вот что я придумал, используя RDD API и Джексона. Я выбрал низкоуровневый API Spark, так как он не имеет схемы и не уверен, как структурированный API будет соответствовать переменным типам событий ввода. Если упомянутый Gson поддерживает полиморфную десериализацию, его можно использовать вместо Джексона, я просто выбрал Джексона, так как я более знаком с ним.

Проблема может быть разбита на этапы:

  1. Десериализация ввода в объекты по типу события.
  2. Уменьшить по идентификатору и типу. Снижение должно вести себя по-разному для разных типов, например, представления просто сводятся к сумме, в то время как имя пользователя должно обрабатываться по-другому. В этом примере давайте просто предположим, что имя пользователя уникально в id и выберем первое.
  3. Соберите уменьшенные предметы на id.

Шаг 2 требует наибольшего внимания, поскольку в Spark API такой функциональности нет, и должна быть какая-то проверка во время выполнения, если десериализованные события относятся к другому классу. Чтобы преодолеть это, давайте введем общую черту Reducible, которая может инкапсулировать различные типы:

trait Reducible[T] {
    def reduce(that: Reducible[_]): this.type

    def value: T
}

// simply reduces to sum
case class Sum(var value: Int) extends Reducible[Int] {
    override def reduce(that: Reducible[_]): Sum.this.type = that match {
        case Sum(thatValue) =>
            value += thatValue
            this
    }
}

// for picking the first element, i.e. username
case class First(value: String) extends Reducible[String] {
    override def reduce(that: Reducible[_]): First.this.type = this
}

Проверка времени выполнения обрабатывается в этих классах, например, Sum завершится неудачей, если правый объект не того же типа.

Далее давайте определим модели для событий и расскажем Джексону, как обращаться с полиморфизмом:

@JsonTypeInfo(use=JsonTypeInfo.Id.NAME, include=JsonTypeInfo.As.PROPERTY, property="event", visible=true)
sealed trait Event[T] {
    val id: Int
    val event: String

    def value: Reducible[T]
}

abstract class CountingEvent extends Event[Int] {
    override def value: Reducible[Int] = Sum(1)
}

@JsonTypeName("clicked") case class Click(id: Int, event: String, target: String) extends CountingEvent
@JsonTypeName("viewed") case class View(id: Int, event: String, website: String) extends CountingEvent
@JsonTypeName("login") case class Login(id: Int, event: String, username: String) extends Event[String] {
    override def value: Reducible[String] = First(username)
}

object EventMapper {
    private val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
    // the list of classes could be auto-generated, see
    // https://stackoverflow.com/questions/34534002/getting-subclasses-of-a-sealed-trait
    mapper.registerSubtypes(classOf[Click], classOf[View], classOf[Login])

    def apply(v1: String): Event[_] = mapper.readValue(v1, classOf[Event[_]])
}

Все события должны иметь поля id и event. Последний используется для определения того, в какой класс десериализоваться, Джексону необходимо знать все классы заранее. Черта Event объявлена ​​как запечатанная черта, поэтому все реализующие классы могут быть определены во время компиляции. Я пропускаю этот рефлексивный шаг и просто жестко программирую список классов, здесь есть хороший ответ, как сделать это автоматически Получение подклассов запечатанной черты

Теперь мы готовы написать логику приложения. Для простоты sc.parallelize используется для загрузки данных примера. Также можно использовать потоковое искрение.

val in = List(
    "{\"id\": 123, \"event\": \"clicked\", \"target\": \"my_button\"}",
    "{\"id\": 123, \"event\": \"viewed\", \"website\": \"http://xyz1...\"}",
    "{\"id\": 123, \"event\": \"viewed\", \"website\": \"http://xyz1...\"}",
    "{\"id\": 123, \"event\": \"login\", \"username\": \"Bob\"}",
    "{\"id\": 456, \"event\": \"login\", \"username\": \"Sue\"}",
    "{\"id\": 456, \"event\": \"viewed\", \"website\": \"http://xyz1...\"}"
)

// partition (id, event) pairs only by id to minimize shuffle
// when we later group by id
val partitioner = new HashPartitioner(10) {

    override def getPartition(key: Any): Int = key match {
        case (id: Int, _) => super.getPartition(id)
        case id: Int => super.getPartition(id)
    }
}

sc.parallelize(in)
    .map(EventMapper.apply)
    .keyBy(e => (e.id, e.event))
    .mapValues(_.value)
    .reduceByKey(partitioner, (left, right) => left.reduce(right))
    .map {
        case ((id, key), wrapper) => (id, (key, wrapper.value))
    }
    .groupByKey(partitioner)
    .mapValues(_.toMap)
    .foreach(println)

Выход:

(123,Map(clicked -> 1, viewed -> 2, login -> Bob))
(456,Map(login -> Sue, viewed -> 1))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...