Как использовать функцию карты в моем коде ниже вместо функции сбора? - PullRequest
0 голосов
/ 25 июня 2019

Производитель Kafka пытается отправить массовое сообщение (10 КБ) за раз, но после передачи 3 КБ начинает выдавать ошибку.В приведенном ниже коде я использовал функцию сбора данных, которая может быть проблемой, поэтому я хочу заменить ее на карту.

//Memberjson is JavaRDD<String>

for (String outputMbrJson : memberjson.collect()) {

    try {

        Producer kafkaProducer = new Producer(configFile);

        kafkaProducer.runProducer(Arrays.asList(outputMbrJson).iterator());
        } 
        catch (Throwable e) {
            e.printStackTrace();
        }
    }
---------------------------------------
//runProducer method 

public Iterator<String> runProducer(Iterator<String> jsons) throws Exception {

        final Producer<Long, String> producer = createProducer();

        ArrayList<String> ret = new ArrayList<String>();

        try {
            while (jsons.hasNext()) {
                String jsonobj = jsons.next();

                ret.add(EMPTY_STRING);
                try {
                    ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(conf.getProperty("producer.topic"), jsonobj.toString());

                    producer.send(record);

                } catch (Exception e) {
                    System.out.println(e.getMessage());
                }
            }
        } 
        catch (Exception e) {
            e.printStackTrace();
        } 
        finally {
            producer.flush();
            producer.close();
        }
        return ret.iterator();
}
...