Я пытаюсь создать элементы, используя FlinkKafkaProducer010, однако, когда я открываю окно консоли потребителя, элементы появляются не в порядке.
Я создал тему, используя: kafka-themes.bat --create --topic mytopic --zookeeper localhost: 2181 --partitions 1 --replication-factor 1
Потребитель создан с помощью: kafka-console-consumer.bat --zookeeperlocalhost: 2181 --topic mytopic
Код Kafka Producer, который я использую:
public static void main(String[] args) throws Exception {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
if(parameterTool.getNumberOfParameters() < 2) {
System.out.println("Missing parameters!");
System.out.println("Usage: Kafka --topic <topic> --bootstrap.servers <kafka brokers>");
return;
}
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
DataStream<String> messageStream = env.addSource(getSourceFunction());
FlinkKafkaProducer010<String> producer = new FlinkKafkaProducer010<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties());
messageStream.addSink(producer);
env.execute("Kafka Producer");
}
public static SourceFunction<String> getSourceFunction() {
return new SourceFunction<String>() {
private static final long serialVersionUID = 6369260225318862378L;
public boolean running = true;
@Override
public void run(SourceContext<String> ctx) {
int counter = 0;
while (this.running && counter < 500) {
String data = "item " + Integer.toString(counter);
ctx.collect(data);
counter++;
}
}
@Override
public void cancel() {
running = false;
}
};
}
Когда я просматриваю файлы журнала Kafka, я вижу файл .log сэлементы вышли из строя.Упорядочение элементов делает скачки около 10 значений.В моем случае использования важно иметь правильный порядок.Я искал, как обеспечить порядок элементов, но пока безуспешно.Я что-то упустил, чтобы исправить порядок?
Заранее спасибо за любую помощь!