Распределение сокета данных между узлами кластера kafka - PullRequest
0 голосов
/ 02 января 2019

Я хочу получить данные из сокета и поместить их в тему kafka, чтобы моя программа flink могла читать данные из темы и обрабатывать их. Я могу сделать это на одном узле. Но я хочу иметь кластер kafka, по крайней мере, с тремя разными узлами (разными IP-адресами) и опрашивать данные из сокета, чтобы распределить их по узлам. Я не знаю, как это сделать и изменить этот код. Моя простая программа в следующем:

public class WordCount {

   public static void main(String[] args) throws Exception {

    kafka_test objKafka=new kafka_test();
  // Checking input parameters
    final ParameterTool params = ParameterTool.fromArgs(args);
    int myport = 9999;
    String hostname = "localhost";
 // set up the execution environment
    final StreamExecutionEnvironment env = 
  StreamExecutionEnvironment.getExecutionEnvironment();


 // make parameters available in the web interface
    env.getConfig().setGlobalJobParameters(params);

    DataStream<String> stream = env.socketTextStream(hostname,myport);

    stream.addSink(objKafka.createStringProducer("testFlink", 
    "localhost:9092"));

    DataStream<String> text = 
    env.addSource(objKafka.createStringConsumerForTopic("testFlink", 
    "localhost:9092", "test"));
    DataStream<Tuple2<String, Long>> counts = text
     .flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
                @Override
public void flatMap(String value, Collector<Tuple2<String, Long>> out) 
   {
          // normalize and split the line
             String[] words = value.toLowerCase().split("\\W+");

                    // emit the pairs
             for (String word : words) {
                  if (!word.isEmpty()) {
                     out.collect(new Tuple2<String, Long>(word, 1L));
                        }
                    }
                }
            })
            .keyBy(0)
            .timeWindow(Time.seconds(5))
            .sum(1);
          // emit result
        if (params.has("output")) {
           counts.writeAsText(params.get("output"));
          } else {
          System.out.println("Printing result to stdout. Use --output 
          to specify output path.");
          counts.print();
         }
    // execute program
    env.execute("Streaming WordCount");

    }//main
   }

  public class kafka_test {
  public FlinkKafkaConsumer<String> createStringConsumerForTopic(
        String topic, String kafkaAddress, String kafkaGroup) {
  //        ************************** KAFKA Properties ******        
     Properties props = new Properties();
    props.setProperty("bootstrap.servers", kafkaAddress);
    props.setProperty("group.id", kafkaGroup);
    FlinkKafkaConsumer<String> myconsumer = new FlinkKafkaConsumer<>(
            topic, new SimpleStringSchema(), props);
    myconsumer.setStartFromLatest();     

    return myconsumer;
  }

  public FlinkKafkaProducer<String> createStringProducer(
        String topic, String kafkaAddress) {

        return new FlinkKafkaProducer<>(kafkaAddress,
            topic, new SimpleStringSchema());
     }
  }

Подскажите, пожалуйста, как транслировать данные потокового сокета между различными узлами kafka?

Любая помощь будет оценена.

1 Ответ

0 голосов
/ 02 января 2019

Я думаю, что ваш код правильный. Кафка позаботится о «распространении» данных. Как данные будут распределены среди брокеров Kafka, будет зависеть от конфигурации темы.

Проверьте ответ здесь , чтобы лучше понять темы и разделы Кафки.

Допустим, у вас есть 3 брокера Kafka. Тогда, если вы создаете свою тему с 3 репликами и 3 разделами

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic my-topic

Это приведет к тому, что ваша тема будет иметь 3 раздела, и каждый раздел будет храниться 3 раза в вашем кластере. С 3 брокерами вы получите 1 раздел и 2 реплики на каждого брокера.

Тогда вам просто нужно создать раковину Кафки

FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(
        "broker1:9092,broker2:9092,broker3:9092",
        "my-topic",
        new SimpleStringSchema());

stream.addSink(myProducer);
...