Я изучаю DDS, используя RTI (все еще очень плохо знакомый с этой топикой c). Я создаю издателя, который пишет подписчику, и подписчик выводит сообщение. Одна вещь, которую я хотел бы смоделировать, - это отброшенные пакеты. Например, допустим, что издатель пишет подписчику 4 раза в секунду, но подписчик может читать только одну секунду (самое последнее сообщение).
На данный момент я могу создать издателя & Подписчик без каких-либо пакетов, которые были сброшены.
Я прочитал некоторую документацию и нашел HistoryQosPolicyKind.KEEP_LAST_HISTORY_QOS.
Поправьте меня, если я ошибаюсь, но у меня сложилось впечатление, что это, по сути, сохранит самое последнее сообщение, полученное от издателя. Вместо этого Абонент получает все сообщения, но задерживается на 1 секунду.
Я не хочу кэшировать сообщения, но отбрасываю сообщения. Как я могу смоделировать «отброшенный» пакет?
Кстати: я не хочу ничего менять в файле. xml. Я хочу сделать это программно.
Вот некоторые фрагменты моего кода.
//Publisher.java
//writer = (MsgDataWriter)publisher.create_datawriter(topic, Publisher.DATAWRITER_QOS_DEFAULT,null /* listener */, StatusKind.STATUS_MASK_NONE);
writer = (MsgDataWriter)publisher.create_datawriter(topic, write, null,
StatusKind.STATUS_MASK_ALL);
if (writer == null) {
System.err.println("create_datawriter error\n");
return;
}
// --- Write --- //
String[] messages= {"1", "2", "test", "3"};
/* Create data sample for writing */
Msg instance = new Msg();
InstanceHandle_t instance_handle = InstanceHandle_t.HANDLE_NIL;
/* For a data type that has a key, if the same instance is going to be
written multiple times, initialize the key here
and register the keyed instance prior to writing */
//instance_handle = writer.register_instance(instance);
final long sendPeriodMillis = (long) (.25 * 1000); // 4 per second
for (int count = 0;
(sampleCount == 0) || (count < sampleCount);
++count) {
if (count == 11)
{
return;
}
System.out.println("Writing Msg, count " + count);
/* Modify the instance to be written here */
instance.message =words[count];
instance.sender = "some user";
/* Write data */
writer.write(instance, instance_handle);
try {
Thread.sleep(sendPeriodMillis);
} catch (InterruptedException ix) {
System.err.println("INTERRUPTED");
break;
}
}
//writer.unregister_instance(instance, instance_handle);
} finally {
// --- Shutdown --- //
if(participant != null) {
participant.delete_contained_entities();
DomainParticipantFactory.TheParticipantFactory.
delete_participant(participant);
}
//Subscriber
// Customize time & Qos for receiving info
DataReaderQos readerQ = new DataReaderQos();
subscriber.get_default_datareader_qos(readerQ);
Duration_t minTime = new Duration_t(1,0);
readerQ.time_based_filter.minimum_separation.sec = minTime.sec;
readerQ.time_based_filter.minimum_separation.nanosec = minTime.nanosec;
readerQ.history.kind = HistoryQosPolicyKind.KEEP_LAST_HISTORY_QOS;
readerQ.reliability.kind = ReliabilityQosPolicyKind.BEST_EFFORT_RELIABILITY_QOS;
reader = (MsgDataReader)subscriber.create_datareader(topic, readerQ, listener, StatusKind.STATUS_MASK_ALL);
if (reader == null) {
System.err.println("create_datareader error\n");
return;
}
// --- Wait for data --- //
final long receivePeriodSec = 1;
for (int count = 0;
(sampleCount == 0) || (count < sampleCount);
++count) {
//System.out.println("Msg subscriber sleeping for "+ receivePeriodSec + " sec...");
try {
Thread.sleep(receivePeriodSec * 1000); // in millisec
} catch (InterruptedException ix) {
System.err.println("INTERRUPTED");
break;
}
}
} finally {
// --- Shutdown --- //