Я читаю одновременно 3 файла CSV построчно и отправляю их в очередь сообщений (Apache Kafka).Строки данных упорядочены по возрастанию значений меток времени.Я имитирую потоки, просматривая последнюю временную метку и новую временную метку и засыпая поток на разницу двух временных меток.Я делаю это для того, чтобы смоделировать создание сообщений.
Теперь я хочу, чтобы некоторые из этих сообщений были задержаны на случайное количество времени, и подготовил функцию, которая переводит поток в спящий режим на случайное количество.времени и выбирает случайным образом, когда выполнять эту операцию.
Когда я это делаю, я засыпаю все чтение файла CSV, задерживая все последующие сообщения, которые должны быть созданы.
Возможно, мне не хватает опыта в этом деле, но я не знаю, как произвольно уложить одно из сообщений, которые я создаю, без задержки всех сообщений, которые должны прийти?
//Read the CSV file line by line, serialize into object and put to sleep fo
public void readLikesEventStreamCSV(
final BufferedReader bufferedReader, StreamproducerApplication.StreamProducer producer) throws IOException {
String last_timestamp = "";
StreamWaitSimulation sleep = new StreamWaitSimulation();
try {
String line;
line = bufferedReader.readLine(); //read the first line. we do nothing with it.
while ((line = bufferedReader.readLine()) != null) {
final String[] lineArray = pattern.split(line);
LikesEventStream value = new LikesEventStream
.Builder()
.personId(Integer.parseInt(lineArray[0]))
.postId(Integer.parseInt(lineArray[1].equals("") ? "-1":lineArray[1] )) //TODO: handle this empty string problem in a cleaner way.
.creationDate(lineArray[2])
.build();
//Here the code will wait before sending the LikesEventStream value created above
sleep.wait(last_timestamp, lineArray[2]);
last_timestamp = lineArray[2];
//This sends the object to a topic in Kafka
send(value, producer, likesTopicName);
}
} finally {
bufferedReader.close();
}
}
Aсообщение, отправленное в тему, выглядит так:
{"personId":721,"postId":270250,"creationDate":"2012-02-02T01:09:00.000Z","sentAt":1328141340000}