Можем ли мы использовать сообщения JMS из темы через Spark Streaming? - PullRequest
1 голос
/ 30 мая 2019

Проблема: существует тема JMS, и у меня есть следующие сведения о соединении: URL: xyz Фабрика соединений: jms.xyz Имя темы: jms.xyz имя пользователя: пароль:

Есть ли рабочий код, который создаетподписчик в Spark Scala, который принимает сообщения JMS из темы?

Я пытался использовать функцию socketTextStream для потоковой передачи искры, но у нее есть только параметр URL.Я ищу функцию потокового воспроизведения, которая может иметь все мои 5 параметров: 1) URL 2) Фабрика соединений 3) Имя темы 4) Имя пользователя 5) Пароль

Я пробовал работать в Spark-Shell

Я ищу функцию потокового воспроизведения, которая может иметь все мои 5 параметров и рабочий код spark-scala, который может принимать сообщения JMS из темы: 1) URL 2) Фабрика соединений 3) Название темы 4)Имя пользователя 5) Пароль

Я ищу базовые команды Spark-Shell, которые я могу выполнять построчно

1 Ответ

1 голос
/ 30 мая 2019

В: Можем ли мы использовать сообщения JMS из Темы через Spark Streaming?

Да.AFAIK для этого не существует решения «серебряной пули».

В зависимости от реализации поставщика сообщений может отличаться.для этого вам может понадобиться написать пользовательский приемник из spark docs ..

См. пример 2, в котором используется интеграция темы jms с потоковой передачей искры

Пример 1 ( источник ):

import org.apache.log4j.Logger;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.receiver.Receiver;

import javax.jms.*;
import javax.naming.Context;
import java.util.Hashtable;

public class JMSReceiver extends Receiver<JMSEvent> implements MessageListener
{
    private static final Logger log = Logger.getLogger(JMSReceiver.class);

    private static final String JNDI_INITIAL_CONTEXT_FACTORY       = "org.apache.qpid.jms.jndi.JmsInitialContextFactory";
    private static final String JNDI_CONNECTION_FACTORY_NAME       = "JMSReceiverConnectionFactory";
    private static final String JNDI_QUEUE_NAME                    = "JMSReceiverQueue";
    private static final String JNDI_CONNECTION_FACTORY_KEY_PREFIX = "connectionfactory.";
    private static final String JNDI_QUEUE_KEY_PREFIX              = "queue.";

    private StorageLevel _storageLevel;

    private String _brokerURL;
    private String _username;
    private String _password;
    private String _queueName;
    private String _selector;

    private Connection _connection;

    public JMSReceiver(String brokerURL, String username, String password, String queueName, String selector, StorageLevel storageLevel)
    {
        super(storageLevel);
        _storageLevel = storageLevel;
        _brokerURL = brokerURL;
        _username = username;
        _password = password;
        _queueName = queueName;
        _selector = selector;

        log.info("Constructed" + this);
    }

    @Override
    public void onMessage(Message message)
    {
        try
        {
            log.info("Received: " + message);
            JMSEvent jmsEvent = new JMSEvent(message);
            store(jmsEvent);
        } catch (Exception exp)
        {
            log.error("Caught exception converting JMS message to JMSEvent", exp);
        }
    }

    @Override
    public StorageLevel storageLevel()
    {
        return _storageLevel;
    }

    public void onStart()
    {

        log.info("Starting up...");

        try
        {

            Hashtable<Object, Object> env = new Hashtable<Object, Object>();
            env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_INITIAL_CONTEXT_FACTORY);
            env.put(JNDI_CONNECTION_FACTORY_KEY_PREFIX + JNDI_CONNECTION_FACTORY_NAME, _brokerURL);
            env.put(JNDI_QUEUE_KEY_PREFIX + JNDI_QUEUE_NAME, _queueName);
            javax.naming.Context context = new javax.naming.InitialContext(env);

            ConnectionFactory factory = (ConnectionFactory) context.lookup(JNDI_CONNECTION_FACTORY_NAME);
            Destination queue = (Destination) context.lookup(JNDI_QUEUE_NAME);

            if ((_username == null) || (_password == null))
            {
                _connection = factory.createConnection();
            } else
            {
                _connection = factory.createConnection(_username, _password);
            }
            _connection.setExceptionListener(new JMSReceiverExceptionListener());

            Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            MessageConsumer messageConsumer;

            if (_selector != null)
            {
                messageConsumer = session.createConsumer(queue, _selector);
            } else
            {
                messageConsumer = session.createConsumer(queue);
            }
            messageConsumer.setMessageListener(this);

            _connection.start();

            log.info("Completed startup.");
        } catch (Exception exp)
        {
            // Caught exception, try a restart
            log.error("Caught exception in startup", exp);
            restart("Caught exception, restarting.", exp);
        }
    }

    public void onStop()
    {
        // Cleanup stuff (stop threads, close sockets, etc.) to stop receiving data

        log.info("Stopping...");
        try
        {
            _connection.close();
        } catch (JMSException exp)
        {
            log.error("Caught exception stopping", exp);
        }
        log.info("Stopped.");
    }

    private class JMSReceiverExceptionListener implements ExceptionListener
    {
        @Override
        public void onException(JMSException exp)
        {
            log.error("Connection ExceptionListener fired, attempting restart.", exp);
            restart("Connection ExceptionListener fired, attempting restart.");
        }
    }

    @Override
    public String toString()
    {
        return "JMSReceiver{" +
                "brokerURL='" + _brokerURL + '\'' +
                ", username='" + _username + '\'' +
                ", password='" + _password + '\'' +
                ", queueName='" + _queueName + '\'' +
                ", selector='" + _selector + '\'' +
                '}';
    }
}

ваш JMSInputDstream будет выглядетькак

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver.Receiver

private[streaming]
class JMSInputDStream(
                       @transient ssc_ : StreamingContext,
                       brokerURL: String,
                       username: String,
                       password: String,
                       queuename: String,
                       selector: String,
                       storageLevel: StorageLevel
                       ) extends ReceiverInputDStream[JMSEvent](ssc_) {

  override def getReceiver(): Receiver[JMSEvent] = {
    new JMSReceiver(brokerURL, username, password, queuename, selector, storageLevel)
  }
}

Пример 2 с использованием activemq и JmsTopicReceiver.scala :

import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import javax.{jms => jms}

/** Simple class of a receiver that can be run on worker nodes to receive the data from JMS Topic.
  *
  * In JMS a Topic implements publish and subscribe semantics.
  * When you publish a message it goes to all the subscribers who are interested - so zero to many subscribers will receive a copy of the message.
  * Only subscribers who had an active subscription at the time the broker receives the message will get a copy of the message.
  *
  * {{{
  *  val sc: SparkContext = SparkContext.getOrCreate(conf)
  *  val ssc: StreamingContext = new StreamingContext(sc, Seconds(...))
  *
  *  val stream: InputDStream[String] = ssc.receiverStream(new JmsTopicReceiver(
  *    topicName = "testTopic",
  *    transformer = { msg => msg.asInstanceOf[javax.jms.TextMessage].getText() },
  *    connectionProvider = { () => {
  *      val cf = new org.apache.activemq.ActiveMQConnectionFactory("tcp://localhost:61616")
  *      cf.setOptimizeAcknowledge(true)
  *      cf.createConnection("username", "password")
  *    }}
  *  ))
  *
  *  ...
  *
  *  ssc.start()
  *  ssc.awaitTermination()
  * }}}
  *
  * @param connectionProvider provides <CODE>javax.jms.Connection</CODE> for the receiver.
  * @param transformer (pre)transforms <CODE>javax.jms.Message</CODE> to appropriate class (it's required to do this before populate the result).
  * @param topicName the name of required <CODE>javax.jms.Topic</CODE>.
  * @param messageSelector only messages with properties matching the message selector expression are delivered.
  * @param storageLevel flags for controlling the storage of an RDD.
  * @tparam T RDD element type.
  */
class JmsTopicReceiver[T] (
  connectionProvider: (() => jms.Connection),
  transformer: (jms.Message => T),
  topicName: String,
  messageSelector: Option[String] = None,
  storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
) extends AbstractJmsReceiver[T](
  messageSelector = messageSelector,
  storageLevel = storageLevel
) with Logging {

  override protected def buildConnection(): jms.Connection = connectionProvider()
  override protected def transform(message: jms.Message): T = transformer(message)
  override protected def buildDestination(session: jms.Session): jms.Destination = session.createTopic(topicName)

}

Пример 3: Утешение использовало пользовательский приемник искры: так я долго работал, когда была искра 1.3,

Solace-JMS-Integration-Spark-Streaming.pdf

Дополнительная информация: Обработка данных из MQ с Spark Streaming: Часть 1. Введение в обмен сообщениями, JMS & MQ

...