как использовать неявный набор данных кодировщика - PullRequest
0 голосов
/ 02 июля 2019

Я переключаю приложение с потоковой передачи на структурированную. Это приложение для чтения логов из кафки, чтобы разобрать и сохранить его на Кассандре.

C:x\x\\CassandraHelper.scala:425:122: Unable to find encoder for type com.xx.dtl.business.cassandra.ConnectionCassDto. An implicit Encoder[com.xx.dtl.business.cassandra.ConnectionCassDto] is needed to store com.xx.dtl.business.cassandra.ConnectionCassDto instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.[error] val successConnectionDS= connectionDS.filter(x => x.libelleOperation.equals(xx_SUCCESSFULL_IDENTIFICATION)).flatMap(connection => mapToDto(connection)) Здесь я получаю сообщение об ошибке:

def persisteConnection(connectionDS: Dataset[Connection]): Unit = { val successConnectionDS= connectionDS.filter(x => x.libelleOperation.equals(ATOS_SUCCESSFULL_IDENTIFICATION)).flatMap(connection => mapToDto(connection)) val faliedConnectionDS = connectionDS.filter(x => x.libelleOperation.equals(ATOS_FAILURE_IDENTIFICATION)).flatMap(connection => mapToDto(connection)) successConnectionDS.saveToCassandra(AppConf.CassandraReferentielValorizationKeySpace, "connexion_reussie", SomeColumns( "identifiant_web", "date_connexion", "code_pays", "coords", "city_name", "region_name", "isp", "asn", "id_personne", "id_dim_temps", "ip", "pays", "session_id", "client_media_id", "brs_session_id")) faliedConnectionDS.saveToCassandra(AppConf.CassandraReferentielValorizationKeySpace, "connexion_echouee", SomeColumns( "identifiant_web", "date_connexion", "code_pays", "coords", "city_name", "region_name", "isp", "asn", "id_personne", "id_dim_temps", "ip", "pays", "session_id", "client_media_id", "brs_session_id")) }

def mapToDto(connection: Connection): Option[ConnectionCassDto] = {
Some(new ConnectionCassDto(
  connection.id_web,
  connection.id_dim_temps,
  connection.timestamp,
  connection.contact_id,
  EmptyStringField,
  connection.code_pays,
  connection.coords.mkString(", "),
  connection.city_name,
  connection.region_name,
  connection.isp,
  connection.asn,
  connection.ip,
  connection.sessionID,
  connection.client_media_id,
  connection.brsSessionId))

} в основном я изменил весь DSTREAM с набором данных и способом чтения из kafka. Я ничего не изменил на этапе разбора. иметь дело с наборами данных, как я сделал с DSTREAM.

Есть какие-нибудь подсказки?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...