Как читать сообщения в MQ с использованием потоковой передачи искры, т.е. ZeroMQ M RabbitMQ? - PullRequest
0 голосов
/ 09 ноября 2018

Как говорит spark docs, он поддерживает kafka как источник потоковой передачи данных. Но я использую ZeroMQ, а ZeroMQUtil нет. Как я могу его использовать? и вообще, как насчет других MQ. Я совершенно новичок в разжигании и искровом потоке, поэтому прошу прощения, если вопрос глупый. Может ли кто-нибудь дать мне решение. Спасибо Кстати, я использую Python.

Обновление, я наконец сделал это в Java с Custom Receiver. Ниже мое решение

public class ZeroMQReceiver extends Receiver<T> {

    private static final ObjectMapper mapper = new ObjectMapper();

    public ZeroMQReceiver() {

        super(StorageLevel.MEMORY_AND_DISK_2());
    }

    @Override
    public void onStart() {
        // Start the thread that receives data over a connection
        new Thread(this::receive).start();
    }

    @Override
    public void onStop() {
        // There is nothing much to do as the thread calling receive()
        // is designed to stop by itself if isStopped() returns false
    }

    /** Create a socket connection and receive data until receiver is stopped */
    private void receive() {
        String message = null;

        try {

            ZMQ.Context context = ZMQ.context(1); 
            ZMQ.Socket subscriber = context.socket(ZMQ.SUB);     
            subscriber.connect("tcp://ip:port");    
            subscriber.subscribe("".getBytes());  

            // Until stopped or connection broken continue reading
            while (!isStopped() && (message = subscriber.recvStr()) != null) {
                List<T> results = mapper.readValue(message,
                        new TypeReference<List<T>>(){} );
                for (T item : results) {
                    store(item);
                }
            }
            // Restart in an attempt to connect again when server is active again
            restart("Trying to connect again");
        } catch(Throwable t) {
            // restart if there is any other error
            restart("Error receiving data", t);
        }
    }
}

1 Ответ

0 голосов
/ 09 ноября 2018

Полагаю, вы говорите о структурированной потоковой передаче.

Я не знаком с ZeroMQ, но важным моментом в источниках структурированной потоковой передачи Spark является возможность воспроизведения (для обеспечения отказоустойчивости), которая, если я понимаю,правильно, ZeroMQ не доставляет из коробки.

Практическим подходом будет буферизация данных либо в Kafka с использованием KafkaSource, либо в виде файлов в (локальная FS / NFS, HDFS, S3) и использование FileSource для чтения.Ср Spark Docs .Если вы используете FileSource, убедитесь, что вы ничего не добавляете к существующему файлу во входном каталоге FileSource, а перемещаете их в каталог атомарно.

...