распознаватель sbt jackson для проекта spark использует базу данных maxmind - PullRequest
0 голосов
/ 21 мая 2018

Я использую версию Spark Streaming 2.3.0, которая использует Jackson 2.6.7.Я использую библиотеку maxmind, которая использует версию 2.9.5 Джексона.

Я пытаюсь получить гео-данные по ip-адресу с помощью библиотеки maxmind. Ниже код отлично работает в том же проекте и в том же пакете.

package org.apache.spark.examples.streaming

import java.io.File
import java.net.InetAddress

import com.maxmind.db.CHMCache
import com.maxmind.geoip2.DatabaseReader
//import com.tvid.converter.IpParser.ip2geo


object GeoIP2Test {

  def getGeoFromIP( ip_address:String, reader:DatabaseReader) : String = {
    //val reader = new DatabaseReader.Builder(new File(db_file)).withCache(new CHMCache()).build()
    //DatabaseReader reader = new DatabaseReader.Builder(database).build();
    val ipAddress = InetAddress.getByName(ip_address)
    // Replace "city" with the appropriate method for your database, e.g.,
    // "country".
    val response = reader.city(ipAddress)
    val country = response.getCountry
    // String country_iso_code = country.getIsoCode();
    val country_name = country.getName
    val subdivision = response.getMostSpecificSubdivision
    val subdivision_name = subdivision.getName
    // String subdivision_iso_code = subdivision.getIsoCode();
    val city = response.getCity
    val city_name = city.getName
    val postal = response.getPostal
    val postal_code = postal.getCode
    val location = response.getLocation
    val latitude = location.getLatitude.toString
    val longitude = location.getLongitude.toString
    val res = Array(country_name, subdivision_name, city_name, postal_code, latitude, longitude)
    val geo_details = res(0) + "," + res(1) + "," + res(2) + "," + res(3) + "," + res(4) + "," + res(5)
    return geo_details
  }

  def main(args: Array[String]): Unit = {
    val db_file = "/Users/ajay/Documents/maxmind_databse/GeoIP2-City.mmdb"
    val ip_address = "123.123.123.123"
    val reader = new DatabaseReader.Builder(new File(db_file)).withCache(new CHMCache()).build()
    val geo_details = getGeoFromIP(ip_address,reader)
    print(geo_details)

    //try java method integration
    //val res = ip2geo(ip_address,db_file)
    //print(res)
  }
}

это прекрасно работает и дает мне о / п: Китай, Пекин, Пекин, ноль, 39.9289,116.3883

, но когда я пытаюсь использовать этот метод в потоковой передаче искры с использованием сниппета:

var db_file = ssc.sparkContext.broadcast(new DatabaseReader.Builder(new File("/Users/ajay/Documents/maxmind_databse/GeoIP2-City.mmdb")).withCache(new CHMCache()).build())
    val reader=db_file.value
    //val db_file = "/Users/ajay/Documents/maxmind_databse/GeoIP2-City.mmdb"
    val ip_address = "123.123.123.123"
    val geo_details = getGeoFromIP(ip_address,reader)
    print(geo_details)

выдает ошибку:

Exception in thread "main" com.amazonaws.SdkClientException: Unable to marshall request to JSON: Jackson jackson-core/jackson-dataformat-cbor incompatible library version detected.
You have two possible resolutions:
        1) Ensure the com.fasterxml.jackson.core:jackson-core & com.fasterxml.jackson.dataformat:jackson-dataformat-cbor libraries on your classpath have the same version number
        2) Disable CBOR wire-protocol by passing the -Dcom.amazonaws.sdk.disableCbor property or setting the AWS_CBOR_DISABLE environment variable (warning this may affect performance)

, поскольку версия спарк джексона - 2.6.7.

Мой build.sbt:

name := "scala_spark_stream_metrices"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.3.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kinesis-asl
libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" % "2.3.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.3.0"
// https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark
libraryDependencies += "org.elasticsearch" % "elasticsearch-hadoop" % "6.2.3"

libraryDependencies += "com.maxmind.geoip2" % "geoip2" % "2.12.0"

Как убедиться, что метод getGeoFromIP использует Джексон 2.9.5, переопределив искры Джексона 2.6.7

Ответы [ 2 ]

0 голосов
/ 21 мая 2018

изменение build.sbt на фрагмент ниже решило мою проблему.

name := "scala_spark_stream_metrices"

version := "1.0"

scalaVersion := "2.11.8"

dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-core" % "2.9.5"
dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-databind" % "2.9.5"
dependencyOverrides += "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.9.5"
// https://mvnrepository.com/artifact/com.fasterxml.jackson.dataformat/jackson-dataformat-cbor
dependencyOverrides += "com.fasterxml.jackson.dataformat" % "jackson-dataformat-cbor" % "2.9.5"


libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.3.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kinesis-asl
libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" % "2.3.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.3.0"
// https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark
libraryDependencies += "org.elasticsearch" % "elasticsearch-hadoop" % "6.2.3"
// https://mvnrepository.com/artifact/com.github.seratch/awscala
libraryDependencies += "com.github.seratch" %% "awscala" % "0.6.3"
//geo
// https://mvnrepository.com/artifact/com.maxmind.geoip2/geoip2
//libraryDependencies += "com.maxmind.geoip2" % "geoip2" % "2.12.0" exclude ("com.fasterxml.jackson.core","jackson-annotations") exclude ("com.fasterxml.jackson.core","jackson-core") exclude ("com.fasterxml.jackson.core","jackson-databind")
libraryDependencies += "com.maxmind.geoip2" % "geoip2" % "2.12.0"
0 голосов
/ 21 мая 2018

Чтобы использовать Jackson v2.9.5, вы должны переопределить его зависимость.Например:

name := "scala_spark_stream_metrices"

version := "1.0"

scalaVersion := "2.11.8"

dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-core" % "2.9.5"
dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-databind" % "2.9.5"
dependencyOverrides += "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.9.5"

libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.3.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kinesis-asl
libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" % "2.3.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.3.0"
// https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark
libraryDependencies += "org.elasticsearch" % "elasticsearch-hadoop" % "6.2.3"

libraryDependencies += "com.maxmind.geoip2" % "geoip2" % "2.12.0"

Это обеспечит использование Jackson v2.9.5 вместо v2.6.7

Надеюсь, это поможет!

...