Mon go Spark Java Соединитель Группа по - PullRequest
0 голосов
/ 06 марта 2020

Я храню события на моем сервере из клиентского мобильного приложения, хранилище событий - mongodb. У меня есть mon go -spark разъем, который выбирает список этих событий и должен отображать их с помощью rest api. Это должно быть потоковое на потом, но сейчас я пытаюсь отобразить его как один вызов.

Пока я написал свой контроллер, как указано ниже:

@RestController
@RequestMapping("/analytics")
class EventController @Autowired constructor(val eventMongoServiceImpl: EventMongoServiceImpl,
                                             val javaSparkContext: JavaSparkContext) {

    @GetMapping("/event")
    fun getEvent(): ResponseEntity<EventResponse> {
        val customRdd: JavaMongoRDD<Document> = MongoSpark.load(javaSparkContext)
        val toDF = customRdd.toDF()
    }
}

Пожалуйста, помогите мне Отфильтруйте эти результаты, приведенные ниже для остальных API:

 [
      {
        "key": "Event A",
        "description": "Event A Description",
        "count": 3
      },
      {
        "key": "Event B",
        "description": "Event B Description",
        "count": 0
      }
    ]

У меня есть набор данных, как указано ниже:

/* 1 */
{
    "_id" : ObjectId("5e61e38eb8425d3b1c7679ea"),
    "name" : "Event A",
    "description" : "Event A Description",
    "date" : ISODate("2020-03-05T18:30:00.000Z"),
    "_class" : "x"
}

/* 2 */
{
    "_id" : ObjectId("5e61e416b8425d3b1c7679ec"),
    "name" : "Event A",
    "description" : "Event A Description",
    "date" : ISODate("2020-03-05T18:30:00.000Z"),
    "_class" : "x"
}

/* 3 */
{
    "_id" : ObjectId("5e61e47fb8425d3b1c7679ee"),
    "name" : "Event A",
    "description" : "Event A Description",
    "date" : ISODate("2020-03-05T18:30:00.000Z"),
    "_class" : "x"
}

1 Ответ

0 голосов
/ 06 марта 2020

Вы должны иметь возможность на фрейме данных сделать что-то вроде этого

val aggDf = toDf
 .groupBy("name")
 .agg(count("name"), max("description"))

Теперь на новом фрейме данных aggDf вы можете сделать aggDf.toJson и получить результаты. Если столбцы не соответствуют выводу, вы можете настроить их с помощью withColumnRenamed

...