Я пытаюсь использовать библиотеку Scala XML во Flink для анализа XML, и я не могу заставить ее работать.Обратите внимание, что мне нужно использовать как сериализованную, так и не сериализованную (строковую) версию для моего кода в одной и той же функции обработки.
Я пробовал уже разные решения, они всегда работают в IntelliJ, но не когда я запускаю их наФлинк кластер.Они всегда возвращают разные java.lang.LinkageError: com/sun/org/apache/xerces/internal/jaxp/SAXParserImpl$JAXPSAXParser
;Я пробовал несколько вещей, но все равно получаю ошибку, похожую на эту.
Это пример того, как выглядит мое задание Flink:
object StreamingJob {
import org.apache.flink.streaming.api.scala._
val l = List(
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// set up kafka section excluded
val stream = env.fromCollection(l)
.map(new Processor)
Это пример моей функции обработки:
import javax.xml.parsers.{SAXParser, SAXParserFactory}
import org.apache.flink.api.common.functions.MapFunction
import scala.xml.{Elem, XML}
import scala.xml.factory.XMLLoader
class Processor extends MapFunction[String, String] {
override def map(translatedMessage: String): String = {
val xml = Processor.xmlLoader.loadString(translatedMessage)
object Processor {
val factory: SAXParserFactory = SAXParserFactory.newInstance
val SAXParser: SAXParser = factory.newSAXParser
val xmlLoader: XMLLoader[Elem] = XML.withSAXParser(SAXParser)
и, наконец, это мой pom.xml, использующий плагин maven-shade для создания банки, которую я передаю моргать:
<!-- other sections of the pom are excluded -->
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<!-- Scala Library, provided by Flink as well. -->
<!-- other sections of the pom are excluded -->
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Run shade goal on package phase -->
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<!-- Java Compiler -->
<!-- Scala Compiler -->
<!-- Add src/main/scala to eclipse build path -->
<!-- Add src/test/scala to eclipse build path -->
<!-- other sections of the pom are excluded -->
Я считаю, что проблема как-то связана среализация, которая будет использоваться для SAXParser
, который Flink использует во время выполнения.Я также попытался использовать аннотацию @transient
, чтобы предотвратить сохранение полей в Flink, но безуспешно.
Однако я не совсем понимаю, что именно происходит, кто-нибудь знает, как предотвратить ошибку и что пошло не так?