Как использовать Scala XML с Apache Flink? - PullRequest
0 голосов
/ 31 января 2019

Я пытаюсь использовать библиотеку Scala XML во Flink для анализа XML, и я не могу заставить ее работать.Обратите внимание, что мне нужно использовать как сериализованную, так и не сериализованную (строковую) версию для моего кода в одной и той же функции обработки.

Я пробовал уже разные решения, они всегда работают в IntelliJ, но не когда я запускаю их наФлинк кластер.Они всегда возвращают разные java.lang.LinkageError: com/sun/org/apache/xerces/internal/jaxp/SAXParserImpl$JAXPSAXParser;Я пробовал несколько вещей, но все равно получаю ошибку, похожую на эту.

Это пример того, как выглядит мое задание Flink:

object StreamingJob {
  import org.apache.flink.streaming.api.scala._

  val l = List(
    """<ciao>ciao</ciao>""",
  )

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // set up kafka section excluded
    env.setParallelism(10)

    val stream = env.fromCollection(l)

    stream
      .uid("process")
      .map(new Processor)
      .print

    env.execute("Flink-TEST")
  }
}

Это пример моей функции обработки:

import javax.xml.parsers.{SAXParser, SAXParserFactory}
import org.apache.flink.api.common.functions.MapFunction
import scala.xml.{Elem, XML}
import scala.xml.factory.XMLLoader

class Processor extends MapFunction[String, String] {
  override def map(translatedMessage: String): String = {
    val xml = Processor.xmlLoader.loadString(translatedMessage)
    xml.toString
  }
}
object Processor {
  val factory: SAXParserFactory = SAXParserFactory.newInstance
  val SAXParser: SAXParser = factory.newSAXParser
  val xmlLoader: XMLLoader[Elem] = XML.withSAXParser(SAXParser)
}

и, наконец, это мой pom.xml, использующий плагин maven-shade для создания банки, которую я передаю моргать:

        <!-- other sections of the pom are excluded -->
        <properties>
            <flink.version>1.7.0</flink.version>
            <scala.binary.version>2.12</scala.binary.version>
            <scala.version>2.12.8</scala.version>
        </properties>
        <!-- other sections of the pom are excluded -->
    <dependencies>
        <!-- Apache Flink dependencies -->
        <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <!-- Scala Library, provided by Flink as well. -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.dataformat</groupId>
            <artifactId>jackson-dataformat-xml</artifactId>
            <version>2.9.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.12</artifactId>
            <version>1.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.11.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>2.11.1</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.4.0</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.dataformat</groupId>
            <artifactId>jackson-dataformat-yaml</artifactId>
            <version>2.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api-scala_2.12</artifactId>
            <version>11.0</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang.modules</groupId>
            <artifactId>scala-xml_2.12</artifactId>
            <version>1.1.1</version>
        </dependency>
    </dependencies>
        <!-- other sections of the pom are excluded -->
<build>
        <plugins>
            <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.mycompany.myproj.artifactId.default.StreamingJob</mainClass>
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <!-- Java Compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <!-- Scala Compiler -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>build-helper-maven-plugin</artifactId>
                <version>1.7</version>
                <executions>
                    <!-- Add src/main/scala to eclipse build path -->
                    <execution>
                        <id>add-source</id>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>add-source</goal>
                        </goals>
                        <configuration>
                            <sources>
                                <source>src/main/scala</source>
                            </sources>
                        </configuration>
                    </execution>
                    <!-- Add src/test/scala to eclipse build path -->
                    <execution>
                        <id>add-test-source</id>
                        <phase>generate-test-sources</phase>
                        <goals>
                            <goal>add-test-source</goal>
                        </goals>
                        <configuration>
                            <sources>
                                <source>src/test/scala</source>
                            </sources>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
        <!-- other sections of the pom are excluded -->

Я считаю, что проблема как-то связана среализация, которая будет использоваться для SAXParser, который Flink использует во время выполнения.Я также попытался использовать аннотацию @transient, чтобы предотвратить сохранение полей в Flink, но безуспешно.

Однако я не совсем понимаю, что именно происходит, кто-нибудь знает, как предотвратить ошибку и что пошло не так?

1 Ответ

0 голосов
/ 03 апреля 2019

Через некоторое время я выясняю, что с ним не так.

Документы Scala XML говорят:

В Scala 2.11 и более поздних версиях добавьте следующее в файл build.sbt:libraryDependencies:

"org.scala-lang.modules" %% "scala-xml" % "1.1.1"

Что в Maven означает:

<dependency>
    <groupId>org.scala-lang.modules</groupId>
    <artifactId>scala-xml_2.12</artifactId>
    <version>1.1.1</version>
</dependency>

Похоже, что эта зависимость не нужна, хотя Flink 1.7.2, похоже, использует Scala 2.12.8 он по-прежнему сохраняет Scala XML внутри своего дистрибутива (следовательно, в пути к классам), я считаю, что это может вызвать проблемы, в которые класс фактически загружен и, если это правильно, однако, это может не быть решением ошибки компоновки.

Решение этой ошибки на самом деле заключается в использовании самого Флинка RichMapFunction[InputT, OutputT]:

class Processor extends RichMapFunction[String, String] {
  var factory: SAXParserFactory = _
  var SAXParser: SAXParser = _
  var xmlLoader: XMLLoader[Elem] = _

  override def open(parameters: Configuration): Unit = {
    factory = SAXParserFactory.newInstance
    SAXParser = factory.newSAXParser
    xmlLoader = XML.withSAXParser(SAXParser)
  }

  override def map(translatedMessage: String): String = {
    val xml = xmlLoader.loadString(translatedMessage)
    xml.toString
  }
}

Как сказано в JavaDoc:

Метод инициализации функции.

Вызывается перед фактическими методами работы (например, map или join ) и, таким образом, подходит для однократной настройки.Для функций, являющихся частью итерации, этот метод будет вызываться в начале каждого шага итерации.

К сожалению, использование var в этом случае является предпочтительным в качестве инициализации значений / переменных.Flink должен предотвратить ошибку связывания во время выполнения.

Некоторые примечания:

  • Я понял, что это может произойти только на DataStream[T], а не на DataSet[T].
  • Задание должно иметь параллелизм, установленный более чем на 1, чтобы несколько менеджеров задач загружали один и тот же класс. Если это делается в IDE, это может быть сложно, объяснил здесь .
  • ПослеЗаметив причину этой проблемы, кажется, что сопутствующие объекты могут быть не идеальными для использования Flink.
  • Эта последняя часть может быть хорошей запиской, которая будет помещена на странице Flink "Расширения API Scala", где она такжеобъясняет, как Flink обычно не поддерживает анонимные функции сопоставления с образцом для деконструкции кортежей, если не используется расширение API Flink Scala: https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/scala_api_extensions.html
...