Как вернуть ArrayList с миллиардами записей от Kafka Producer? - PullRequest
0 голосов
/ 10 декабря 2018

Я подготовил производителя кафки, который помещает Список в тему кафки.Отлично работает на 1 миллион строк / записей.Производственный файл, который я получил, состоит из 110 миллионов записей. Как лучше всего справляться с такими огромными данными на моем KafkaProducer?

Ниже приведен код, который я использовал для обработки 1 миллиона записей, и для его установки требуется около 4 минут.в тему кафки.

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.RandomAccessFile;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;

public class KafkaSourceTask extends SourceTask {

    private String filename;

    private String topic;

    private RandomAccessFile raf;

    private long lastRecordedOffset = 0L;

    private BufferedReader bufferedReader = null;

    Schema schema = SchemaBuilder.struct().field("emp_id", 
            Schema.STRING_SCHEMA).field("name", Schema.STRING_SCHEMA)
            .field("last_name", Schema.STRING_SCHEMA).field("department", 
            Schema.STRING_SCHEMA).build();

public void start(Map<String, String> props) {
    filename = props.get("file");
    topic = props.get("topic");

}

@Override
public List<SourceRecord> poll() throws InterruptedException {
    double startTime = System.nanoTime();
    try {
        bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream(new File(filename)),
                StandardCharsets.UTF_8));
        raf = new RandomAccessFile(filename, "r");
        long filePointer = raf.getFilePointer();
        System.out.println(filePointer + " - " + lastRecordedOffset);
        if (bufferedReader.ready() && (filePointer > lastRecordedOffset || filePointer == 0)) {
            raf.seek(lastRecordedOffset);

            ArrayList<SourceRecord> records = new ArrayList<>();
            String line;
            while ((line = raf.readLine()) != null) {
                records.add(new SourceRecord(null, null, topic, schema, buildRecordValue(line)));
            }
            lastRecordedOffset = raf.getFilePointer();
            raf.close();
            bufferedReader.close();

            double endTime = System.nanoTime();
            return records;
        }
    }
    catch (IOException e) {

        e.printStackTrace();
    }

    return null;
}

@Override
public synchronized void stop() {
    try {
        raf.close();
    }
    catch (IOException e) {
        e.printStackTrace();
    }
}

private Struct buildRecordValue(String line) {
    String[] values = line.split(",");
    Struct value = new Struct(schema).put("emp_id", values[0]).put("name", values[1]).put("last_name", values[2])
            .put("department", values[3]);
    return value;
}

@Override
public String version() {
    // TODO Auto-generated method stub
    return null;
}
}

Буду признателен за любую помощь или предложение по этому вопросу. Заранее спасибо.

Ответы [ 2 ]

0 голосов
/ 20 декабря 2018

Для начала, пакетные записи производителя Kafka перед отправкой их брокерам, вы должны проверить и поиграть с двумя конфигами linger.ms и batch.record.size.

Теперь вы можете использовать другой поток для чтенияфайл (я думаю, что это одна запись на строку), поместите их в очередь Java и используйте поток, размещающий производителя kafka, для непрерывного чтения этой очереди.Тема Kafka, проверьте принцип Single Writer.

Хорошо, в любом случае вам придется немного настроить своего производителя kafka, но, как сказал @ cricket_007, вы должны рассмотреть возможность использования kafka connect с файловым соединителем csv, по крайней мереесли вы не найдете подходящий вам разъем, вы можете разработать его самостоятельно.

Надеюсь, это поможет.

0 голосов
/ 19 декабря 2018

ArrayList с миллиардами записей ?Подумайте об этом, если у вас есть хотя бы 1 миллиард, а размер каждой записи составляет всего 1 байт (смешная недооценка), у вас есть 1 гигабайт памяти SI.

Грубое и готовое определение «больших данных» означает, что данные, которые не помещаются в память на одном хосте, находятся на краю или после этой точки, и вам необходимо начать использовать большиеметоды данных.Во-первых, вы можете попробовать многопоточность, а затем вы можете попробовать многопоточность на нескольких машинах, что является преимуществом использования Kafka - клиентского API - как при потреблении, так и при производстве, - это легко.

...