Боковой ввод размером около 50Мб вызывает длительную паузу G C - PullRequest
0 голосов
/ 28 марта 2020
  • Мы запускаем приложение Beam на кластере Flink с боковыми входами размером 50 Мб.
    • Боковой ввод refre sh (извлечение из внешнего источника данных) на основе уведомления, отправляемого уведомлению topi c в Кафке.
  • По мере продвижения приложения из-за бокового ввода Часто G C происходит часто, и каждый G C занимает ~ 30 сек c, что приостанавливает работу диспетчера задач, чтобы отправить сердцебиение Мастер.
  • После последовательной пропадания сердцебиения мастер, предполагая, что работник мертв, и начинает переназначение заданий, приводит к перезапуску приложения.
  • Мы попытались удалить боковой ввод, приложение работает нормально.

Вопросы:

  • Есть ли ограничения на размер бокового ввода в Apache Вход со стороны луча?

  • Я создал карту бокового ввода с помощью asSingleton (), будет ли создавать отдельную копию для каждой задачи? Я дал 15 параллелизма. Собирается ли создать 15 копий в JVM (при условии, что все задачи назначены одному и тому же работнику)?

  • Какая альтернатива для боковых входов?

Это пример конвейера:

public class BeamApplication {
public static final CloseableHttpClient httpClient = HttpClients.createDefault();

public static void main(String[] args) {
    PipelineOptions options = PipelineOptionsFactory.create();
    options.as(FlinkPipelineOptions.class).setRunner(FlinkRunner.class);
    Pipeline pipeline = Pipeline.create(options);

    PCollection<Map<String, Double>> sideInput = pipeline
            .apply(KafkaIO.<String, String>read().withBootstrapServers("localhost:9092")
                    .withKeyDeserializer(StringDeserializer.class).withValueDeserializer(StringDeserializer.class)
                    .withTopic("testing"))
            .apply(ParDo.of(new DoFn<KafkaRecord<String, String>, Map<String, Double>>() {
                @ProcessElement
                public void processElement(ProcessContext processContext) {
                    KafkaRecord<String, String> record = processContext.element();
                    String message = record.getKV().getValue().split("@@")[0];
                    String change = record.getKV().getValue().split("@@")[1];
                    if (message.equals("START_REST")) {

                        Map<String, Double> map = new HashMap<>();
                        Map<String,Double> changeMap = new HashMap<>();

                        HttpGet request = new HttpGet("http://localhost:8080/config-service/currency");

                        try (CloseableHttpResponse response = httpClient.execute(request)) {
                            HttpEntity entity = response.getEntity();
                            String responseString = EntityUtils.toString(entity, "UTF-8");
                            ObjectMapper objectMapper = new ObjectMapper();
                            CurrencyDTO jsonObject = objectMapper.readValue(responseString, CurrencyDTO.class);
                            map.putAll(jsonObject.getQuotes());
                            System.out.println(change);
                            Random rand = new Random();
                            Double db = rand.nextDouble();
                            System.out.println(db);
                            changeMap.put(change,db);
                            entity.getContent();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        processContext.output(changeMap);
                    }
                }
            }));
    PCollection<Map<String, Double>> currency = sideInput
            .apply(Window.<Map<String, Double>>into(new GlobalWindows())
                    .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
                    .withAllowedLateness(Duration.ZERO).discardingFiredPanes());

    PCollectionView<Map<String, Double>> sideInputView = currency.apply(View.asSingleton());

    PCollection<KafkaRecord<Long, String>> kafkaEvents = pipeline
            .apply(KafkaIO.<Long, String>read().withBootstrapServers("localhost:9092")
                    .withKeyDeserializer(LongDeserializer.class).withValueDeserializer(StringDeserializer.class)
                    .withTopic("event_testing"));

    PCollection<String> output = kafkaEvents
            .apply("Extract lines", ParDo.of(new DoFn<KafkaRecord<Long, String>, String>() {
                @ProcessElement
                public void processElement(ProcessContext processContext) {
                    String element = processContext.element().getKV().getValue();

                    Map<String, Double> map = processContext.sideInput(sideInputView);
                    System.out.println("This is it : " + map.entrySet());
                }
            }).withSideInputs(sideInputView));

    pipeline.run().waitUntilFinish();
}

}

1 Ответ

0 голосов
/ 20 апреля 2020

Какой бэкэнд состояния вы используете?

Если я не ошибаюсь, боковые входы реализованы как состояние в Flink. Если вы используете MemoryStateBackend в качестве бэкэнда состояния, вы действительно можете оказать давление на потребление памяти.

Кроме того, обработка событий будет блокироваться до тех пор, пока ввод с этой стороны не будет готов, буферизируя события , Если подготовка бокового ввода занимает много времени или частота входящих событий высока, это может привести к увеличению нагрузки на память.

Можно попробовать альтернативный бэкэнд состояния? Предпочтительно RocksDBStateBackend , он хранит данные в полете в базе данных RocksDB, а не в памяти.

Трудно догадаться, в чем проблема. Я бы порекомендовал отслеживать показатели, связанные с памятью - см. Хороший пост по этому вопросу здесь .

Вы также можете запустить профилирование на диспетчерах задач и проанализировать дампы - см. здесь

Увеличивается ли память также, если вы только опубликуете sh первое сообщение о «тестировании» topi c?

Возможно, чтобы изолировать проблему, я бы использовал более простой побочный ввод. Удалите HTTP-вызов и установите данные c. Может быть, периоди c сработала вместо Кафки:

GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5L))

...