Как обработать запись потока кинезиса?(несколько процессоров) - PullRequest
0 голосов
/ 02 мая 2019

Я работаю над проектом, который контролирует систему на основе микросервисов.Созданные мною микросервисы создают данные и загружают их в Amazon Kinesis, теперь я использую этот код здесь, из Amazon, для производства и использования Kinesis.Но я не смог понять, как я могу добавить больше процессоров (рабочих), которые будут работать с одним и тем же списком записей (возможно, одновременно), то есть я пытаюсь выяснить, где и как подключить свой код к добавленному коду AmazonЯ добавил здесь ниже.

В моей программе будет два процессора:

  1. Сохранит каждую запись в БД.
  2. Обновит графический интерфейсэто покажет мониторинг системы, учитывая, что она может сравнить текущую транзакцию с действительной транзакцией.Мои действительные транзакции также будут храниться в БД.Это означает, что мы сможем увидеть весь поток данных в системе и посмотреть, как обрабатывается каждый запрос от конца к концу.

Я был бы очень признателен за некоторые рекомендации, так как это мой первый отраслевой проекти я также немного новичок в AWS (хотя я много об этом читал).Спасибо!

Вот код от Amazon, взятый по этой ссылке: https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/src/com/amazonaws/services/kinesis/producer/sample/SampleConsumer.java

/*
 * Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Amazon Software License (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 * http://aws.amazon.com/asl/
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

package com.amazonaws.services.kinesis.producer.sample;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record;

/**
 * If you haven't looked at {@link SampleProducer}, do so first.
 * 
 * <p>
 * As mentioned in SampleProducer, we will check that all records are received
 * correctly by the KCL by verifying that there are no gaps in the sequence
 * numbers.
 * 
 * <p>
 * As the consumer runs, it will periodically log a message indicating the
 * number of gaps it found in the sequence numbers. A gap is when the difference
 * between two consecutive elements in the sorted list of seen sequence numbers
 * is greater than 1.
 * 
 * <p>
 * Over time the number of gaps should converge to 0. You should also observe
 * that the range of sequence numbers seen is equal to the number of records put
 * by the SampleProducer.
 * 
 * <p>
 * If the stream contains data from multiple runs of SampleProducer, you should
 * observe the SampleConsumer detecting this and resetting state to only count
 * the latest run.
 * 
 * <p>
 * Note if you kill the SampleConsumer halfway and run it again, the number of
 * gaps may never converge to 0. This is because checkpoints may have been made
 * such that some records from the producer's latest run are not processed
 * again. If you observe this, simply run the producer to completion again
 * without terminating the consumer.
 * 
 * <p>
 * The consumer continues running until manually terminated, even if there are
 * no more records to consume.
 * 
 * @see SampleProducer
 * @author chaodeng
 *
 */
public class SampleConsumer implements IRecordProcessorFactory {
    private static final Logger log = LoggerFactory.getLogger(SampleConsumer.class);

    // All records from a run of the producer have the same timestamp in their
    // partition keys. Since this value increases for each run, we can use it
    // determine which run is the latest and disregard data from earlier runs.
    private final AtomicLong largestTimestamp = new AtomicLong(0);

    // List of record sequence numbers we have seen so far.
    private final List<Long> sequenceNumbers = new ArrayList<>();

    // A mutex for largestTimestamp and sequenceNumbers. largestTimestamp is
    // nevertheless an AtomicLong because we cannot capture non-final variables
    // in the child class.
    private final Object lock = new Object();

    /**
     * One instance of RecordProcessor is created for every shard in the stream.
     * All instances of RecordProcessor share state by capturing variables from
     * the enclosing SampleConsumer instance. This is a simple way to combine
     * the data from multiple shards.
     */
    private class RecordProcessor implements IRecordProcessor {
        @Override
        public void initialize(String shardId) {}

        @Override
        public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
            long timestamp = 0;
            List<Long> seqNos = new ArrayList<>();

            for (Record r : records) {
                // Get the timestamp of this run from the partition key.
                timestamp = Math.max(timestamp, Long.parseLong(r.getPartitionKey()));

                // Extract the sequence number. It's encoded as a decimal
                // string and placed at the beginning of the record data,
                // followed by a space. The rest of the record data is padding
                // that we will simply discard.
                try {
                    byte[] b = new byte[r.getData().remaining()];
                    r.getData().get(b);
                    seqNos.add(Long.parseLong(new String(b, "UTF-8").split(" ")[0]));
                } catch (Exception e) {
                    log.error("Error parsing record", e);
                    System.exit(1);
                }
            }

            synchronized (lock) {
                if (largestTimestamp.get() < timestamp) {
                    log.info(String.format(
                            "Found new larger timestamp: %d (was %d), clearing state",
                            timestamp, largestTimestamp.get()));
                    largestTimestamp.set(timestamp);
                    sequenceNumbers.clear();
                }

                // Only add to the shared list if our data is from the latest run.
                if (largestTimestamp.get() == timestamp) {
                    sequenceNumbers.addAll(seqNos);
                    Collections.sort(sequenceNumbers);
                }
            }

            try {
                checkpointer.checkpoint();
            } catch (Exception e) {
                log.error("Error while trying to checkpoint during ProcessRecords", e);
            }
        }

        @Override
        public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
            log.info("Shutting down, reason: " + reason);
            try {
                checkpointer.checkpoint();
            } catch (Exception e) {
                log.error("Error while trying to checkpoint during Shutdown", e);
            }
        }
    }

    /**
     * Log a message indicating the current state.
     */
    public void logResults() {
        synchronized (lock) {
            if (largestTimestamp.get() == 0) {
                return;
            }

            if (sequenceNumbers.size() == 0) {
                log.info("No sequence numbers found for current run.");
                return;
            }

            // The producer assigns sequence numbers starting from 1, so we
            // start counting from one before that, i.e. 0.
            long last = 0;
            long gaps = 0;
            for (long sn : sequenceNumbers) {
                if (sn - last > 1) {
                    gaps++;
                }
                last = sn;
            }

            log.info(String.format(
                    "Found %d gaps in the sequence numbers. Lowest seen so far is %d, highest is %d",
                    gaps, sequenceNumbers.get(0), sequenceNumbers.get(sequenceNumbers.size() - 1)));
        }
    }

    @Override
    public IRecordProcessor createProcessor() {
        return this.new RecordProcessor();
    }

    public static void main(String[] args) {
        KinesisClientLibConfiguration config =
                new KinesisClientLibConfiguration(
                        "KinesisProducerLibSampleConsumer",
                        SampleProducer.STREAM_NAME,
                        new DefaultAWSCredentialsProviderChain(),
                        "KinesisProducerLibSampleConsumer")
                                .withRegionName(SampleProducer.REGION)
                                .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);

        final SampleConsumer consumer = new SampleConsumer();

        Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                consumer.logResults();
            }
        }, 10, 1, TimeUnit.SECONDS);

        new Worker.Builder()
            .recordProcessorFactory(consumer)
            .config(config)
            .build()
            .run();
    }
}

1 Ответ

0 голосов
/ 02 мая 2019

Ваш вопрос очень широкий, но вот несколько советов для потребителей Kinesis, которые, как мы надеемся, имеют отношение к вашему варианту использования.

Каждый поток Kinesis разделен на один или несколько осколков. Существуют ограничения, накладываемые на каждый шард, например, вы не можете записать в шард больше данных, чем МБ данных в секунду, и вы не можете инициировать более 5 запросов GetRecords (которые вызывает сам процесс CustomerRecords) в секунду для одного осколок. (См. Полный список ограничений здесь .) Если вы работаете с объемами данных, которые приближаются к этим ограничениям или превосходят их, вам нужно увеличить количество сегментов в вашем потоке.

Когда у вас есть только одно потребительское приложение и один работник, он берет на себя ответственность за обработку всех сегментов соответствующего потока. Если имеется несколько работников, каждый из них берет на себя ответственность за некоторое подмножество сегментов, так что каждый шард назначается одному и только одному работнику (если вы просматриваете журналы потребителей, вы можете найти это как "получение аренды" на шардах).

Если вы хотите иметь несколько процессоров, которые независимо принимают трафик Kinesis и обрабатывают записи, вам нужно зарегистрировать два отдельных пользовательских приложения. В приведенном выше коде имя приложения является первым параметром конструктора KinesisClientLibConfiguration. Обратите внимание, что даже если они являются отдельными пользовательскими приложениями, все равно применяется ограничение в 5 GetRecords в секунду.

Другими словами, вам нужно иметь два отдельных процесса, один из которых будет создавать экземпляр потребителя, который обращается к БД, а другой будет создавать экземпляр клиента, который обновляет графический интерфейс:

KinesisClientLibConfiguration databaseSaverKclConfig =
  new KinesisClientLibConfiguration(
    "DatabaseSaverKclApp",
    "your-stream",
    new DefaultAWSCredentialsProviderChain(),
// I believe worker ids don't need to be unique, but it's a good practice to make them unique so you can easily identify the workers
    "unique-worker-id")
        .withRegionName(SampleProducer.REGION)
// this only matters the very first time your consumer is launched, subsequent launches will read the checkpoint from the previous runs
        .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);

final IRecordProcessorFactory databaseSaverConsumer = new DatabaseSaverConsumer();
KinesisClientLibConfiguration guiUpdaterKclConfig =
  new KinesisClientLibConfiguration(
    "GuiUpdaterKclApp",
    "your-stream",
    new DefaultAWSCredentialsProviderChain(),
    "unique-worker-id")
.withRegionName(SampleProducer.REGION)
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);

final IRecordProcessorFactory guiUpdaterConsumer = new GuiUpdaterConsumer();

А как насчет реализации DatabaseSaverConsumer и GuiUpdaterConsumer? Каждый из них должен реализовать собственную логику в методе processRecords. Вы должны убедиться, что каждый из них выполняет правильную работу внутри этого метода, и что логика контрольных точек является надежной. Давайте расшифруем это:

  • Допустим, processRecords занимает 10 секунд на 100 записей, но соответствующий шард получает 500 записей за 10 секунд. Каждый последующий вызов processRecords будет отставать от сегмента. Это означает, что либо часть работы должна быть извлечена из processRecords, либо необходимо увеличить количество шардов.
  • И наоборот, если processRecords занимает всего 0,1 секунды, тогда processRecords будет вызываться 10 раз в секунду, что превышает выделенные 5 транзакций в секунду на шард. Если я правильно понимаю / помню, нет способа добавить паузу между последующими вызовами к processRecords в конфигурации KCL, поэтому вы должны добавить спящий код в ваш код.
  • Контрольная точка: каждому работнику необходимо отслеживать свой прогресс, чтобы в случае его неожиданного прерывания и получения другим работником того же осколка он знал, с чего продолжить. Обычно это делается двумя способами: в начале processRecords или в конце. В первом случае вы говорите: «Я могу прыгать через некоторые записи в потоке, но определенно не хочу обрабатывать их дважды»; в последнем случае вы говорите: «Я хорошо обрабатываю некоторые записи дважды, но определенно не могу потерять ни одну из них». (Когда вам нужно лучшее из обоих миров, то есть обрабатывать записи один раз и только один раз, вы должны сохранять состояние в каком-либо хранилище данных вне рабочих.) В вашем случае разработчику базы данных, скорее всего, понадобится контрольная точка после обработки; Я не так уверен в его GUI.

Говоря о графическом интерфейсе, что вы используете для отображения данных и почему потребителю Kinesis нужно его обновлять, а не сам графический интерфейс, запрашивающий базовые хранилища данных?

В любом случае, я надеюсь, что это поможет. Дайте мне знать, если у вас есть более конкретные вопросы.

...