Я совершенно новичок во Flink и Kafka, и я написал простую работу по потоковой передаче Flink для проверки Flink на EMR. Код Scala выглядит следующим образом
object KafkaStreamingJob {
def main(args: Array[String]) {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
val bootstrapServers = ... // Kafka bootstrap servers
val properties = new Properties
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group")
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val consumer = new FlinkKafkaConsumer[String]("test", new SimpleStringSchema, properties)
val kafkaDataStream = senv.addSource(consumer).print
senv.execute("Demo Flink App")
}
}
pom. xml
...
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
...
Затем я отправил задание в EMR с предложенным аргументом flink-yarn-session -n 2 -d
, но по какой-то причине моя работа всегда запускается на "локальном встроенном мини-кластере Flink"
org.apache.flink.streaming.api.environment.LocalStreamEnvironment (main): Running job on local embedded Flink mini cluster
, а в журнале ошибок указано:
#
# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill -9 %p"
# Executing /bin/sh -c "kill -9 6282"...
Я думаю, что ошибка OOM может быть как-то связана с фактом Я работаю на "мини кластере". Но я не смог найти ничего, связанного с отсутствием «мини-кластера» в документах AWS. Кто-нибудь может мне помочь, пожалуйста?