Чтение файла CSV построчно и создание потоков с некоторыми строками, отправленными со случайной задержкой - PullRequest
1 голос
/ 22 мая 2019

Я читаю одновременно 3 файла CSV построчно и отправляю их в очередь сообщений (Apache Kafka).Строки данных упорядочены по возрастанию значений меток времени.Я имитирую потоки, просматривая последнюю временную метку и новую временную метку и засыпая поток на разницу двух временных меток.Я делаю это для того, чтобы смоделировать создание сообщений.

Теперь я хочу, чтобы некоторые из этих сообщений были задержаны на случайное количество времени, и подготовил функцию, которая переводит поток в спящий режим на случайное количество.времени и выбирает случайным образом, когда выполнять эту операцию.

Когда я это делаю, я засыпаю все чтение файла CSV, задерживая все последующие сообщения, которые должны быть созданы.

Возможно, мне не хватает опыта в этом деле, но я не знаю, как произвольно уложить одно из сообщений, которые я создаю, без задержки всех сообщений, которые должны прийти?

//Read the CSV file line by line, serialize into object and put to sleep fo
public void readLikesEventStreamCSV(
        final BufferedReader bufferedReader, StreamproducerApplication.StreamProducer producer) throws IOException {
    String last_timestamp = "";
    StreamWaitSimulation sleep = new StreamWaitSimulation();
    try {
        String line;
        line = bufferedReader.readLine(); //read the first line. we do nothing with it.
        while ((line = bufferedReader.readLine()) != null) {
            final String[] lineArray = pattern.split(line);

            LikesEventStream value = new LikesEventStream
                    .Builder()
                    .personId(Integer.parseInt(lineArray[0]))
                    .postId(Integer.parseInt(lineArray[1].equals("") ? "-1":lineArray[1] )) //TODO: handle this empty string problem in a cleaner way.
                    .creationDate(lineArray[2])
                    .build();
            //Here the code will wait before sending the LikesEventStream value created above
            sleep.wait(last_timestamp, lineArray[2]);
            last_timestamp = lineArray[2];

            //This sends the object to a topic in Kafka
            send(value, producer, likesTopicName);
        }

    } finally {
        bufferedReader.close();
    }
}

Aсообщение, отправленное в тему, выглядит так:

{"personId":721,"postId":270250,"creationDate":"2012-02-02T01:09:00.000Z","sentAt":1328141340000}

1 Ответ

1 голос
/ 23 мая 2019

Вы можете отключить ветку для каждого отложенного сообщения.Это позволяет продолжить вашу основную обработку, пока один поток спит:

public class Demo {

    public static void main(String[] args) {

        long timeToWait = 2000L;
        Runnable runner = new Runnable() {
            @Override
            public void run() {
                try { Thread.sleep(timeToWait); } catch (InterruptedException e) { }
                System.out.println("Writing delayed message here");
            }

        };

        Thread thread = new Thread(runner);
        thread.start();

        System.out.println("Processing continues after forking off message delay");

    }

}
...