Я использую Rabbitmq в двух приложениях для Android, одно приложение предназначено для потребителя, а другое - для производителя. Как сервер Rabbitmq я использую локальное приложение на своей машине.
Похоже, что приложение-производитель Android правильно подключено к серверу.Но продюсер этого не делает.Я запускаю приложения на своем мобильном телефоне.
Приложение My Producer имеет следующую структуру:
Класс с именем Функции:
public class Functions {
public static String EXCHANGE_NAME = "exchangename";
private static ConnectionFactory factory;
/**
* Initialization for ConnectionFactory
*/
static {
factory = new ConnectionFactory();
factory.setHost("172.22.72.179");//the address of my machine
}
/**
* Creates a connection to the broker
*
* @return A connection if it succedes, <i>null</i> if fails
*/
public static Connection createConnection() {
try {
return factory.newConnection();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}}
Класс с именем Producer:
public class MyProducer {
private Connection connection;
private Channel channel;
public MyProducer() {
}
/**
* Makes a connection to the broker, creates a channel and a queue. It also
* declares an exchange
*
* @throws Exception
*/
public void connect() throws Exception {
connection = Functions.createConnection();
channel = connection.createChannel();
channel.exchangeDeclare(Functions.EXCHANGE_NAME, "topic");
}
/**
* Sends a message with a routing key to the broker
*
* @param routingKey
* Routing key of the message
* @param message
* Message content
* @throws IOException
*/
public void sendMessage(String routingKey, String message) throws IOException {
channel.basicPublish(Functions.EXCHANGE_NAME, routingKey, null, message.getBytes());
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
}
/**
* Disconnects from the broker
*
* @throws IOException
* @throws TimeoutException
*/
public void disconnect() throws IOException, TimeoutException {
channel.close();
connection.close();
}}
Тогда Наконецосновной класс:
public class MainActivity extends AppCompatActivity {
MyProducer producer;
RadioGroup rgroup;
EditText inputProduct;
EditText inputOffer;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
StrictMode.ThreadPolicy policy = new StrictMode.ThreadPolicy.Builder().permitNetwork().build();
StrictMode.setThreadPolicy(policy);
rgroup = (RadioGroup) findViewById(R.id.rgroup);
rgroup.check(R.id.rbBeauty);
inputProduct = (EditText) findViewById(R.id.textProduct);
inputOffer = (EditText) findViewById(R.id.textOffer);
producer = new MyProducer();
try {
producer.connect();
} catch (Exception e) {
// createDialog("ERROR", e.getMessage());Log.d("myTag", "MAL" + e.getCause().getMessage());
}
}
protected void onDestroy() {
super.onDestroy();
try {
producer.disconnect();
} catch (Exception ex) {
// createDialog("ERROR", ex.getMessage());
// Log.d("myTag", "MAL" + ex.getCause().getMessage());
}
}
public void sendOffer(View v) {
String[] data = getDataFromGUI();
if (data != null) {
try {
sendMessage(data[0], data[1], data[2]);
inputProduct.setText("");
inputOffer.setText("");
createDialog("Ok", "Message sent ");
} catch (IOException e) {
// createDialog("ERROR", e.getMessage());
// Log.d("myTag", "MAL" + e.getCause().getMessage());
}
} else
createDialog("Error", "You must introduce some text in textfields");
}
private void sendMessage(String category, String product, String mensaje) throws IOException {
String routingKey = "offers." + category + "." + product;
producer.sendMessage(routingKey, mensaje);
}
public void createDialog(String title, String msg) {
AlertDialog.Builder builder = new AlertDialog.Builder(this);
builder.setMessage(msg).setTitle(title);
AlertDialog dialog = builder.create();
dialog.show();
}
public String[] getDataFromGUI() {
String category = getSelectedRadioButton();
String product = "";
String offer = "";
product = inputProduct.getText().toString();
offer = inputOffer.getText().toString();
if (validString(product) && validString(offer) && validString(category))
return new String[]{category, product, offer};
else
return null;
}
public String getSelectedRadioButton() {
RadioButton button = (RadioButton) findViewById(rgroup.getCheckedRadioButtonId());
return button.getText().toString();
}
private boolean validString(String string) {
if (string == null)
return false;
if (string.length() == 0)
return false;
return true;
}}
Кажется, что производитель связан с брокером, проблема в том, что я не знаю, почему потребитель не может подключиться к брокеру.
Структура двух приложений одинакова:
Функция, Myconsumer, Mainacctivity.
Класс MyConsumer:
public class MyConsumer {
private Connection connection;
private Channel channel;
private String queueName = "consumer";
public MyConsumer() {
}
/**
* Makes a connection to the broker, creates a channel and a queue. It also
* declares an exchange
*
* @throws IOException
*/
public void connect() throws IOException {
connection = Functions.createConnection();
Log.d("myTag", "IS null connetion " + (connection == null));
channel = connection.createChannel();
channel.exchangeDeclare(Functions.EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
channel.queueDeclare(queueName, true, true, false, new HashMap<String, Object>()).getQueue();
}
/**
* Once you've made a connection you can set the routing keys of the
* messages you're willing to receive
*
* @param keys
* Array of Routing keys
* @throws IOException
*/
public void setRoutingKeys(String... keys) throws IOException {
for (String bindingKey : keys) {
channel.queueBind(queueName, Functions.EXCHANGE_NAME, bindingKey);
}
}
/**
* Once you've made a connection you can clear the routing keys of the
* messages you're not willing to receive
*
* @param keys
* Array of Routing keys
* @throws IOException
*/
public void clearRoutingKeys(String... keys) throws IOException {
for (String bindingKey : keys) {
channel.queueUnbind(queueName, Functions.EXCHANGE_NAME, bindingKey);
}
}
/**
* Creates a default consumer and it starts receiving messages in the queue.
* It won't stop until his connection with the broker is finished
*
* @throws IOException
*/
public void recieveMessages(Consumer consumer) throws IOException {
channel.basicConsume(queueName, true, consumer);
}
/**
* Disconnects from the broker
*
* @throws IOException
* @throws TimeoutException
*/
public void disconnect() throws IOException, TimeoutException {
channel.close();
connection.close();
}
public Channel getChannel(){
return this.channel;
}}