Я сталкиваюсь с вышеуказанной ошибкой при попытке получить журналы из kafka-topic
, обработать их и вставить их в solr
.
Проблема возникает, когда я добавляю часть публикации Solr, как явозможность использовать и распечатывать на hdfs или на консоли поток kafka.Для этой конкретной части я следовал этому простому примеру https://github.com/mganta/streaming-data/blob/master/src/main/java/com/example/streaming/CarEventsProcessor.java, так как не мог понять документацию библиотеки spark-solr
из lucidworks
.
val topics = //my topics
val kafkaParams = Map[String, Object](...)
val stream =
KafkaUtils.createDirectStream[String, String](
mySparkStreamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
val processed = //process stream
def convert(field_to_process: ...): SolrInputDocument = {
//create document to push to solr
//test with basic document
val doc = SolrSupport.autoMapToSolrInputDoc("", null, Map())
doc.addField("left", "right")
doc
}
SolrSupport.indexDStreamOfDocs(brokers, "table", 1, processed.map(convert))
ssc.start()
ssc.awaitTermination()
ssc.stop()
Я подозреваю, что это будетошибка зависимости.Мой следующий pow.xml:
<spark.version>2.2.1</spark.version>
<scala.version>2.11.8</scala.version>
<scala.compat.version>2.11</scala.compat.version>
<spark.solr.version>3.4.5</spark.solr.version>
<solr.version>7.3.0</solr.version>
<fasterxml.version>2.9.9</fasterxml.version>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.compat.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.compat.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_${scala.compat.version}</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>com.lucidworks.spark</groupId>
<artifactId>spark-solr</artifactId>
<version>${spark.solr.version}</version>
</dependency>
<!-- slf4j libraries -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.apache.solr</groupId>
<artifactId>solr-core</artifactId>
<version>${solr.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${fasterxml.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${fasterxml.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${fasterxml.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.compat.version}</artifactId>
<version>${fasterxml.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-jaxrs</artifactId>
<version>1.9.8</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.9.8</version>
</dependency>