Я использую версию Spark Streaming 2.3.0, которая использует Jackson 2.6.7.Я использую библиотеку maxmind, которая использует версию 2.9.5 Джексона.
Я пытаюсь получить гео-данные по ip-адресу с помощью библиотеки maxmind. Ниже код отлично работает в том же проекте и в том же пакете.
package org.apache.spark.examples.streaming
import java.io.File
import java.net.InetAddress
import com.maxmind.db.CHMCache
import com.maxmind.geoip2.DatabaseReader
//import com.tvid.converter.IpParser.ip2geo
object GeoIP2Test {
def getGeoFromIP( ip_address:String, reader:DatabaseReader) : String = {
//val reader = new DatabaseReader.Builder(new File(db_file)).withCache(new CHMCache()).build()
//DatabaseReader reader = new DatabaseReader.Builder(database).build();
val ipAddress = InetAddress.getByName(ip_address)
// Replace "city" with the appropriate method for your database, e.g.,
// "country".
val response = reader.city(ipAddress)
val country = response.getCountry
// String country_iso_code = country.getIsoCode();
val country_name = country.getName
val subdivision = response.getMostSpecificSubdivision
val subdivision_name = subdivision.getName
// String subdivision_iso_code = subdivision.getIsoCode();
val city = response.getCity
val city_name = city.getName
val postal = response.getPostal
val postal_code = postal.getCode
val location = response.getLocation
val latitude = location.getLatitude.toString
val longitude = location.getLongitude.toString
val res = Array(country_name, subdivision_name, city_name, postal_code, latitude, longitude)
val geo_details = res(0) + "," + res(1) + "," + res(2) + "," + res(3) + "," + res(4) + "," + res(5)
return geo_details
}
def main(args: Array[String]): Unit = {
val db_file = "/Users/ajay/Documents/maxmind_databse/GeoIP2-City.mmdb"
val ip_address = "123.123.123.123"
val reader = new DatabaseReader.Builder(new File(db_file)).withCache(new CHMCache()).build()
val geo_details = getGeoFromIP(ip_address,reader)
print(geo_details)
//try java method integration
//val res = ip2geo(ip_address,db_file)
//print(res)
}
}
это прекрасно работает и дает мне о / п: Китай, Пекин, Пекин, ноль, 39.9289,116.3883
, но когда я пытаюсь использовать этот метод в потоковой передаче искры с использованием сниппета:
var db_file = ssc.sparkContext.broadcast(new DatabaseReader.Builder(new File("/Users/ajay/Documents/maxmind_databse/GeoIP2-City.mmdb")).withCache(new CHMCache()).build())
val reader=db_file.value
//val db_file = "/Users/ajay/Documents/maxmind_databse/GeoIP2-City.mmdb"
val ip_address = "123.123.123.123"
val geo_details = getGeoFromIP(ip_address,reader)
print(geo_details)
выдает ошибку:
Exception in thread "main" com.amazonaws.SdkClientException: Unable to marshall request to JSON: Jackson jackson-core/jackson-dataformat-cbor incompatible library version detected.
You have two possible resolutions:
1) Ensure the com.fasterxml.jackson.core:jackson-core & com.fasterxml.jackson.dataformat:jackson-dataformat-cbor libraries on your classpath have the same version number
2) Disable CBOR wire-protocol by passing the -Dcom.amazonaws.sdk.disableCbor property or setting the AWS_CBOR_DISABLE environment variable (warning this may affect performance)
, поскольку версия спарк джексона - 2.6.7.
Мой build.sbt:
name := "scala_spark_stream_metrices"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.3.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kinesis-asl
libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" % "2.3.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.3.0"
// https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark
libraryDependencies += "org.elasticsearch" % "elasticsearch-hadoop" % "6.2.3"
libraryDependencies += "com.maxmind.geoip2" % "geoip2" % "2.12.0"
Как убедиться, что метод getGeoFromIP использует Джексон 2.9.5, переопределив искры Джексона 2.6.7