В: Можем ли мы использовать сообщения JMS из Темы через Spark Streaming?
Да.AFAIK для этого не существует решения «серебряной пули».
В зависимости от реализации поставщика сообщений может отличаться.для этого вам может понадобиться написать пользовательский приемник из spark docs ..
См. пример 2, в котором используется интеграция темы jms с потоковой передачей искры
Пример 1 ( источник ):
import org.apache.log4j.Logger;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.receiver.Receiver;
import javax.jms.*;
import javax.naming.Context;
import java.util.Hashtable;
public class JMSReceiver extends Receiver<JMSEvent> implements MessageListener
{
private static final Logger log = Logger.getLogger(JMSReceiver.class);
private static final String JNDI_INITIAL_CONTEXT_FACTORY = "org.apache.qpid.jms.jndi.JmsInitialContextFactory";
private static final String JNDI_CONNECTION_FACTORY_NAME = "JMSReceiverConnectionFactory";
private static final String JNDI_QUEUE_NAME = "JMSReceiverQueue";
private static final String JNDI_CONNECTION_FACTORY_KEY_PREFIX = "connectionfactory.";
private static final String JNDI_QUEUE_KEY_PREFIX = "queue.";
private StorageLevel _storageLevel;
private String _brokerURL;
private String _username;
private String _password;
private String _queueName;
private String _selector;
private Connection _connection;
public JMSReceiver(String brokerURL, String username, String password, String queueName, String selector, StorageLevel storageLevel)
{
super(storageLevel);
_storageLevel = storageLevel;
_brokerURL = brokerURL;
_username = username;
_password = password;
_queueName = queueName;
_selector = selector;
log.info("Constructed" + this);
}
@Override
public void onMessage(Message message)
{
try
{
log.info("Received: " + message);
JMSEvent jmsEvent = new JMSEvent(message);
store(jmsEvent);
} catch (Exception exp)
{
log.error("Caught exception converting JMS message to JMSEvent", exp);
}
}
@Override
public StorageLevel storageLevel()
{
return _storageLevel;
}
public void onStart()
{
log.info("Starting up...");
try
{
Hashtable<Object, Object> env = new Hashtable<Object, Object>();
env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_INITIAL_CONTEXT_FACTORY);
env.put(JNDI_CONNECTION_FACTORY_KEY_PREFIX + JNDI_CONNECTION_FACTORY_NAME, _brokerURL);
env.put(JNDI_QUEUE_KEY_PREFIX + JNDI_QUEUE_NAME, _queueName);
javax.naming.Context context = new javax.naming.InitialContext(env);
ConnectionFactory factory = (ConnectionFactory) context.lookup(JNDI_CONNECTION_FACTORY_NAME);
Destination queue = (Destination) context.lookup(JNDI_QUEUE_NAME);
if ((_username == null) || (_password == null))
{
_connection = factory.createConnection();
} else
{
_connection = factory.createConnection(_username, _password);
}
_connection.setExceptionListener(new JMSReceiverExceptionListener());
Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer messageConsumer;
if (_selector != null)
{
messageConsumer = session.createConsumer(queue, _selector);
} else
{
messageConsumer = session.createConsumer(queue);
}
messageConsumer.setMessageListener(this);
_connection.start();
log.info("Completed startup.");
} catch (Exception exp)
{
// Caught exception, try a restart
log.error("Caught exception in startup", exp);
restart("Caught exception, restarting.", exp);
}
}
public void onStop()
{
// Cleanup stuff (stop threads, close sockets, etc.) to stop receiving data
log.info("Stopping...");
try
{
_connection.close();
} catch (JMSException exp)
{
log.error("Caught exception stopping", exp);
}
log.info("Stopped.");
}
private class JMSReceiverExceptionListener implements ExceptionListener
{
@Override
public void onException(JMSException exp)
{
log.error("Connection ExceptionListener fired, attempting restart.", exp);
restart("Connection ExceptionListener fired, attempting restart.");
}
}
@Override
public String toString()
{
return "JMSReceiver{" +
"brokerURL='" + _brokerURL + '\'' +
", username='" + _username + '\'' +
", password='" + _password + '\'' +
", queueName='" + _queueName + '\'' +
", selector='" + _selector + '\'' +
'}';
}
}
ваш JMSInputDstream будет выглядетькак
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver.Receiver
private[streaming]
class JMSInputDStream(
@transient ssc_ : StreamingContext,
brokerURL: String,
username: String,
password: String,
queuename: String,
selector: String,
storageLevel: StorageLevel
) extends ReceiverInputDStream[JMSEvent](ssc_) {
override def getReceiver(): Receiver[JMSEvent] = {
new JMSReceiver(brokerURL, username, password, queuename, selector, storageLevel)
}
}
Пример 2 с использованием activemq и JmsTopicReceiver.scala :
import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import javax.{jms => jms}
/** Simple class of a receiver that can be run on worker nodes to receive the data from JMS Topic.
*
* In JMS a Topic implements publish and subscribe semantics.
* When you publish a message it goes to all the subscribers who are interested - so zero to many subscribers will receive a copy of the message.
* Only subscribers who had an active subscription at the time the broker receives the message will get a copy of the message.
*
* {{{
* val sc: SparkContext = SparkContext.getOrCreate(conf)
* val ssc: StreamingContext = new StreamingContext(sc, Seconds(...))
*
* val stream: InputDStream[String] = ssc.receiverStream(new JmsTopicReceiver(
* topicName = "testTopic",
* transformer = { msg => msg.asInstanceOf[javax.jms.TextMessage].getText() },
* connectionProvider = { () => {
* val cf = new org.apache.activemq.ActiveMQConnectionFactory("tcp://localhost:61616")
* cf.setOptimizeAcknowledge(true)
* cf.createConnection("username", "password")
* }}
* ))
*
* ...
*
* ssc.start()
* ssc.awaitTermination()
* }}}
*
* @param connectionProvider provides <CODE>javax.jms.Connection</CODE> for the receiver.
* @param transformer (pre)transforms <CODE>javax.jms.Message</CODE> to appropriate class (it's required to do this before populate the result).
* @param topicName the name of required <CODE>javax.jms.Topic</CODE>.
* @param messageSelector only messages with properties matching the message selector expression are delivered.
* @param storageLevel flags for controlling the storage of an RDD.
* @tparam T RDD element type.
*/
class JmsTopicReceiver[T] (
connectionProvider: (() => jms.Connection),
transformer: (jms.Message => T),
topicName: String,
messageSelector: Option[String] = None,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
) extends AbstractJmsReceiver[T](
messageSelector = messageSelector,
storageLevel = storageLevel
) with Logging {
override protected def buildConnection(): jms.Connection = connectionProvider()
override protected def transform(message: jms.Message): T = transformer(message)
override protected def buildDestination(session: jms.Session): jms.Destination = session.createTopic(topicName)
}
Пример 3: Утешение использовало пользовательский приемник искры: так я долго работал, когда была искра 1.3,
Solace-JMS-Integration-Spark-Streaming.pdf
Дополнительная информация: Обработка данных из MQ с Spark Streaming: Часть 1. Введение в обмен сообщениями, JMS & MQ