Я переключаю приложение с потоковой передачи на структурированную. Это приложение для чтения логов из кафки, чтобы разобрать и сохранить его на Кассандре.
C:x\x\\CassandraHelper.scala:425:122: Unable to find encoder for type com.xx.dtl.business.cassandra.ConnectionCassDto. An implicit Encoder[com.xx.dtl.business.cassandra.ConnectionCassDto] is needed to store com.xx.dtl.business.cassandra.ConnectionCassDto instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.[error] val successConnectionDS= connectionDS.filter(x => x.libelleOperation.equals(xx_SUCCESSFULL_IDENTIFICATION)).flatMap(connection => mapToDto(connection))
Здесь я получаю сообщение об ошибке:
def persisteConnection(connectionDS: Dataset[Connection]): Unit = {
val successConnectionDS= connectionDS.filter(x => x.libelleOperation.equals(ATOS_SUCCESSFULL_IDENTIFICATION)).flatMap(connection => mapToDto(connection))
val faliedConnectionDS = connectionDS.filter(x => x.libelleOperation.equals(ATOS_FAILURE_IDENTIFICATION)).flatMap(connection => mapToDto(connection))
successConnectionDS.saveToCassandra(AppConf.CassandraReferentielValorizationKeySpace, "connexion_reussie", SomeColumns(
"identifiant_web",
"date_connexion",
"code_pays",
"coords",
"city_name",
"region_name",
"isp",
"asn",
"id_personne",
"id_dim_temps",
"ip",
"pays",
"session_id",
"client_media_id",
"brs_session_id"))
faliedConnectionDS.saveToCassandra(AppConf.CassandraReferentielValorizationKeySpace, "connexion_echouee", SomeColumns(
"identifiant_web",
"date_connexion",
"code_pays",
"coords",
"city_name",
"region_name",
"isp",
"asn",
"id_personne",
"id_dim_temps",
"ip",
"pays",
"session_id",
"client_media_id",
"brs_session_id"))
}
def mapToDto(connection: Connection): Option[ConnectionCassDto] = {
Some(new ConnectionCassDto(
connection.id_web,
connection.id_dim_temps,
connection.timestamp,
connection.contact_id,
EmptyStringField,
connection.code_pays,
connection.coords.mkString(", "),
connection.city_name,
connection.region_name,
connection.isp,
connection.asn,
connection.ip,
connection.sessionID,
connection.client_media_id,
connection.brsSessionId))
}
в основном я изменил весь DSTREAM с набором данных и способом чтения из kafka.
Я ничего не изменил на этапе разбора. иметь дело с наборами данных, как я сделал с DSTREAM.
Есть какие-нибудь подсказки?