DDS Reader не сбрасывает сообщения - PullRequest
1 голос
/ 05 марта 2020

Я изучаю DDS, используя RTI (все еще очень плохо знакомый с этой топикой c). Я создаю издателя, который пишет подписчику, и подписчик выводит сообщение. Одна вещь, которую я хотел бы смоделировать, - это отброшенные пакеты. Например, допустим, что издатель пишет подписчику 4 раза в секунду, но подписчик может читать только одну секунду (самое последнее сообщение).

На данный момент я могу создать издателя & Подписчик без каких-либо пакетов, которые были сброшены.

Я прочитал некоторую документацию и нашел HistoryQosPolicyKind.KEEP_LAST_HISTORY_QOS.

Поправьте меня, если я ошибаюсь, но у меня сложилось впечатление, что это, по сути, сохранит самое последнее сообщение, полученное от издателя. Вместо этого Абонент получает все сообщения, но задерживается на 1 секунду.

Я не хочу кэшировать сообщения, но отбрасываю сообщения. Как я могу смоделировать «отброшенный» пакет?

Кстати: я не хочу ничего менять в файле. xml. Я хочу сделать это программно.

Вот некоторые фрагменты моего кода.

//Publisher.java

            //writer = (MsgDataWriter)publisher.create_datawriter(topic, Publisher.DATAWRITER_QOS_DEFAULT,null /* listener */, StatusKind.STATUS_MASK_NONE);
            writer = (MsgDataWriter)publisher.create_datawriter(topic, write, null,        
            StatusKind.STATUS_MASK_ALL);
            if (writer == null) {
                System.err.println("create_datawriter error\n");
                return;
            }           

            // --- Write --- //
            String[] messages= {"1", "2", "test", "3"};

            /* Create data sample for writing */

            Msg instance = new Msg();


            InstanceHandle_t instance_handle = InstanceHandle_t.HANDLE_NIL;
            /* For a data type that has a key, if the same instance is going to be
            written multiple times, initialize the key here
            and register the keyed instance prior to writing */
            //instance_handle = writer.register_instance(instance);

            final long sendPeriodMillis = (long) (.25 * 1000); // 4 per second

            for (int count = 0;
            (sampleCount == 0) || (count < sampleCount);
            ++count) {
                if (count == 11)
                {
                    return;
                }
                System.out.println("Writing Msg, count " + count);

                /* Modify the instance to be written here */
                instance.message =words[count];
                instance.sender = "some user";
                /* Write data */
                writer.write(instance, instance_handle);
                try {
                    Thread.sleep(sendPeriodMillis);
                } catch (InterruptedException ix) {
                    System.err.println("INTERRUPTED");
                    break;
                }
            }

            //writer.unregister_instance(instance, instance_handle);

        } finally {

            // --- Shutdown --- //

            if(participant != null) {
                participant.delete_contained_entities();

                DomainParticipantFactory.TheParticipantFactory.
                delete_participant(participant);
            }

//Subscriber
// Customize time & Qos for receiving info 
            DataReaderQos readerQ = new DataReaderQos();
            subscriber.get_default_datareader_qos(readerQ);
            Duration_t minTime = new Duration_t(1,0);
            readerQ.time_based_filter.minimum_separation.sec = minTime.sec;
            readerQ.time_based_filter.minimum_separation.nanosec = minTime.nanosec;

            readerQ.history.kind = HistoryQosPolicyKind.KEEP_LAST_HISTORY_QOS;

            readerQ.reliability.kind = ReliabilityQosPolicyKind.BEST_EFFORT_RELIABILITY_QOS;

            reader = (MsgDataReader)subscriber.create_datareader(topic, readerQ, listener, StatusKind.STATUS_MASK_ALL);
            if (reader == null) {
                System.err.println("create_datareader error\n");
                return;
            }


            // --- Wait for data --- //

            final long receivePeriodSec = 1;

            for (int count = 0;
            (sampleCount == 0) || (count < sampleCount);
            ++count) {
                //System.out.println("Msg subscriber sleeping for "+ receivePeriodSec + " sec...");

                try {
                    Thread.sleep(receivePeriodSec * 1000);  // in millisec
                } catch (InterruptedException ix) {
                    System.err.println("INTERRUPTED");
                    break;
                }
            }
        } finally {

            // --- Shutdown --- //

1 Ответ

1 голос
/ 06 марта 2020

На стороне подписчика полезно различать guish три различных типа взаимодействия между вашим приложением и доменом DDS: опрос, прослушиватели и WaitSets

Опрос означает, что приложение решает, когда он читает доступные данные. Часто это механизм, управляемый временем.

Слушатели - это, в основном, функции обратного вызова, которые вызываются, как только поток инфраструктуры становится доступным для чтения этих данных.

WaitSets реализует механизм аналогично механизму сокета select: поток приложения ожидает (блокирует) доступ к данным и после разблокировки считывает новые данные.

Ваше приложение использует механизм прослушивателя. Вы не опубликовали реализацию функции обратного вызова, но из общей картины вполне вероятно, что реализация прослушивателя немедленно попытается прочитать данные в момент вызова обратного вызова. Нет времени для того, чтобы данные были «вытолкнуты» или «отброшены», как вы их назвали. Это чтение происходит в другом потоке, чем ваш основной поток, который спит большую часть времени. Вы можете найти статью базы знаний об этом здесь .

Единственное, что не ясно, это влияние настройки time_based_filter QoS. Вы не упомянули об этом в своем вопросе, но он обнаруживается в коде. Я ожидаю, что это отфильтрует некоторые из ваших образцов. Это иной механизм, чем выталкивание из истории. Поведение для основанного на времени фильтра может быть реализовано по-разному для разных реализаций DDS. Какой продукт вы используете?

...