Не удается получить сообщение через общие подписки с VerneMQ / MQTT - PullRequest
0 голосов
/ 28 мая 2019

Проблема

Я хочу использовать общие подписки, но не могу получать сообщения при подписке на общие темы.

C:\Program Files\mosquitto>mosquitto_sub -h localhost -p 1883 -q 2 -t \$share/group/audit/#

Однако при подписке на темубез ключевого слова share все отлично работает

C:\Program Files\mosquitto>mosquitto_sub -h localhost -p 1883 -q 2 -t /audit/#
{"objectUuid":"3749003c-5302-4b1f-a111-4a42c5f08f14","trackingId":"155049F9-D8B7-467A-B775-5E9D82E94B26","system":"core","market":"Germany","user":"3749003c-5302-4b1f-a111-4a42c5f08f13","type":"Customer","action":"UNLOCKED","severity":"INFO","content":"testN480","createdAt":null}

Что я пробовал

Я уже пробовал разные версии названия темы (с косой чертой, без экранирования ключевого слова share, без подстановочных знаков ...)

При просмотре тем в экземплярах брокера (брокер mqtt кластеризован) подписка выглядит успешной:

vernemq@mqtt-broker-1:~$ vmq-admin session show --topic
+----------------------------+
|           topic            |
+----------------------------+
|$share/audit-service/audit/#|
|          /audit/#          |
|$share/audit-service/audit/#|
|    $share/group/audit/#    |
+----------------------------+

Конфигурация выглядит следующим образом

[{eleveldb,
     [{compression,true},
      {fadvise_willneed,false},
      {eleveldb_threads,71},
      {verify_compaction,true},
      {verify_checksums,true},
      {block_size_steps,16},
      {block_restart_interval,16},
      {sst_block_size,4096},
      {block_cache_threshold,33554432},
      {use_bloomfilter,true},
      {write_buffer_size_max,62914560},
      {write_buffer_size_min,31457280},
      {limited_developer_mem,false},
      {sync,false},
      {total_leveldb_mem_percent,70},
      {data_root,"./data/leveldb"},
      {data,[{dir,"./data"}]},
      {delete_threshold,1000},
      {tiered_slow_level,0}]},
 {riak_sysmon,
     [{busy_dist_port,true},
      {busy_port,true},
      {port_limit,2},
      {process_limit,30},
      {gc_ms_limit,0},
      {schedule_ms_limit,0},
      {heap_word_limit,20055500}]},
 {kernel,[{inet_dist_listen_max,9109},{inet_dist_listen_min,9100}]},
 {lager,
     [{error_logger_hwm,100},
      {error_logger_redirect,true},
      {crash_log_date,"$D0"},
      {crash_log_size,10485760},
      {crash_log_msg_size,65536},
      {handlers,
          [{lager_console_backend,debug},
           {lager_file_backend,
               [{file,"./log/error.log"},
                {level,error},
                {size,10485760},
                {date,"$D0"},
                {count,5}]}]},
      {crash_log,"./log/crash.log"},
      {crash_log_count,5}]},
 {sasl,[{sasl_error_logger,false}]},
 {vmq_swc,[{db_backend,leveldb},{data_dir,"./data/swc_meta"}]},
 {vmq_bridge,
     [{clique_lead_line,"    bridge      Manage MQTT bridges\n"},
      {registry_mfa,{vmq_reg,direct_plugin_exports,[vmq_bridge]}},
      {config,{[],[]}}]},
 {vmq_webhooks,
     [{pool_timeout,60000},
      {pool_max_connections,100},
      {clique_lead_line,"    webhooks    Manage webhooks\n"},
      {user_webhooks,[]}]},
 {vmq_diversity,
     [{clique_lead_line,"    script      Manage lua scripts\n"},
      {db_config,
          [{memcache,[{port,11211},{host,"localhost"}]},
           {redis,
               [{pool_size,5},
                {database,0},
                {password,[]},
                {port,6379},
                {host,"localhost"}]},
           {mongodb,
               [{w_mode,safe},
                {r_mode,master},
                {pool_size,5},
                {port,27017},
                {host,"localhost"}]},
           {mysql,
               [{password_hash_method,password},
                {pool_size,5},
                {database,"vernemq_db"},
                {password,"password"},
                {user,"root"},
                {port,3306},
                {host,"localhost"}]},
           {postgres,
               [{pool_size,5},
                {database,"vernemq_db"},
                {password,"password"},
                {user,"root"},
                {port,5432},
                {host,"localhost"}]}]},
      {auth_cache,
          [{redis,[{file,"./share/lua/auth/redis.lua"},{enabled,false}]},
           {mongodb,[{file,"./share/lua/auth/mongodb.lua"},{enabled,false}]},
           {mysql,[{file,"./share/lua/auth/mysql.lua"},{enabled,false}]},
           {postgres,
               [{file,"./share/lua/auth/postgres.lua"},{enabled,false}]}]},
      {script_dir,"./share/lua"},
      {nr_lua_states,1},
      {keep_state,false}]},
 {vmq_passwd,[{interval,10},{file,"./etc/vmq.passwd"}]},
 {vmq_acl,[{interval,10},{file,"./etc/vmq.acl"}]},
 {vmq_plugin,
     [{wait_for_proc,vmq_server_sup},
      {default_schema_dir,["/vernemq/lib/vmq_server-1.7.0/priv"]}]},
 {vmq_server,
     [{metadata_impl,vmq_plumtree},
      {receive_max_broker,65535},
      {receive_max_client,65535},
      {topic_alias_max_broker,0},
      {topic_alias_max_client,0},
      {suppress_lwt_on_session_takeover,false},
      {shared_subscription_timeout_action,ignore},
      {shared_subscription_policy,prefer_local},
      {msg_store_opts,[{store_dir,"./data/msgstore"}]},
      {graphite_include_labels,false},
      {graphite_api_key,[]},
      {graphite_prefix,[]},
      {graphite_reconnect_timeout,15000},
      {graphite_connect_timeout,5000},
      {graphite_interval,20000},
      {graphite_port,2003},
      {graphite_host,"localhost"},
      {graphite_enabled,false},
      {systree_reg_view,vmq_reg_trie},
      {systree_retain,false},
      {systree_qos,0},
      {systree_mountpoint,[]},
      {systree_interval,20000},
      {systree_enabled,true},
      {crl_refresh_interval,60000},
      {remote_enqueue_timeout,5000},
      {outgoing_clustering_buffer_size,10000},
      {upgrade_outgoing_qos,false},
      {max_message_size,0},
      {max_message_rate,0},
      {max_offline_messages,1000},
      {max_online_messages,1000},
      {max_inflight_messages,20},
      {max_client_id_size,100},
      {retry_interval,20},
      {default_reg_view,vmq_reg_trie},
      {queue_sup_sup_children,50},
      {max_msgs_per_drain_step,100},
      {max_drain_time,500},
      {allow_multiple_sessions,false},
      {allow_unsubscribe_during_netsplit,false},
      {allow_subscribe_during_netsplit,false},
      {allow_publish_during_netsplit,false},
      {allow_register_during_netsplit,false},
      {allow_anonymous,true},
      {queue_deliver_mode,fanout},
      {queue_type,fifo},
      {reg_views,[vmq_reg_trie]},
      {persistent_client_expiration,0},
      {max_last_will_delay,0},
      {message_size_limit,0},
      {http_modules,
          [vmq_metrics_http,vmq_http_mgmt_api,vmq_status_http,
           vmq_health_http]},
      {listeners,
          [{mqtt,
               [{{{0,0,0,0},1883},
                 [{max_connections,10000},
                  {nr_of_acceptors,10},
                  {mountpoint,[]},
                  {proxy_protocol,false},
                  {allowed_protocol_versions,[3,4,131]}]}]},
           {mqtts,[]},
           {mqttws,
               [{{{10,6,19,176},8080},
                 [{max_connections,10000},
                  {nr_of_acceptors,10},
                  {mountpoint,[]},
                  {proxy_protocol,false},
                  {allowed_protocol_versions,[3,4,131]}]}]},
           {mqttwss,[]},
           {vmq,
               [{{{10,6,19,176},44053},
                 [{max_connections,10000},
                  {nr_of_acceptors,10},
                  {mountpoint,[]}]}]},
           {vmqs,[]},
           {http,
               [{{{10,6,19,176},8888},
                 [{max_connections,10000},
                  {nr_of_acceptors,10},
                  {config_mod,vmq_http_config},
                  {config_fun,config},
                  {proxy_protocol,false}]},
                {{{127,0,0,1},8888},
                 [{max_connections,10000},
                  {nr_of_acceptors,10},
                  {config_mod,vmq_http_config},
                  {config_fun,config},
                  {proxy_protocol,false}]}]},
           {https,[]}]},
      {tcp_listen_options,
          [{nodelay,true},
           {linger,{true,0}},
           {send_timeout,30000},
           {send_timeout_close,true}]},
      {user_plugins,
          [{"vmq_acl",#{name => vmq_acl,path => undefined}},
           {"vmq_passwd",#{name => vmq_passwd,path => undefined}}]}]},
 {setup,[{data_dir,"./data/"},{log_dir,"./log/"}]},
 {plumtree,
     [{drop_i_have_threshold,1000000},
      {outstanding_limit,250000},
      {plumtree_data_dir,"./data/meta"}]}]

Есть ли неправильная конфигурация?Нужно ли определять какие-либо значения, чтобы заставить работать общие подписки?Я использую verneMq 1.7.0.Журнал ошибок пуст.Как можно отладить, почему сообщения не отправляются подписчикам?

...