Почему мое потоковое задание Flink на AWS EMR выполняется в локальном мини-кластере? - PullRequest
0 голосов
/ 16 января 2020

Я совершенно новичок во Flink и Kafka, и я написал простую работу по потоковой передаче Flink для проверки Flink на EMR. Код Scala выглядит следующим образом

object KafkaStreamingJob {
  def main(args: Array[String]) {
    val senv = StreamExecutionEnvironment.getExecutionEnvironment
    val bootstrapServers = ... // Kafka bootstrap servers
    val properties = new Properties
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group")
    properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

    val consumer = new FlinkKafkaConsumer[String]("test", new SimpleStringSchema, properties)
    val kafkaDataStream = senv.addSource(consumer).print

    senv.execute("Demo Flink App")
  }
}

pom. xml

...
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>
...

Затем я отправил задание в EMR с предложенным аргументом flink-yarn-session -n 2 -d, но по какой-то причине моя работа всегда запускается на "локальном встроенном мини-кластере Flink"

org.apache.flink.streaming.api.environment.LocalStreamEnvironment (main): Running job on local embedded Flink mini cluster

, а в журнале ошибок указано:

#
# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill -9 %p"
#   Executing /bin/sh -c "kill -9 6282"...

Я думаю, что ошибка OOM может быть как-то связана с фактом Я работаю на "мини кластере". Но я не смог найти ничего, связанного с отсутствием «мини-кластера» в документах AWS. Кто-нибудь может мне помочь, пожалуйста?

...