Можно ли передавать данные таблицы базы данных в поток с помощью потоковой передачи искр - PullRequest
0 голосов
/ 07 мая 2020

Попытка передать данные таблицы SQLServer в поток. Итак, создали простую java программу с основным классом. Создал sparkconf и, используя его, инициировал JavaStreamingContext и извлек из него SparkContext. Используя JdbcRDD и JavaRDD API Spark, получили данные из базы данных и инициировали inputQueue, а затем подготовили JavaInputDStream. Итак, предварительные требования выполнены и запущен JavaStreamingContext. Итак, я получаю первый набор данных, который я получил при подготовке inputQueue, но не получаю данные для дальнейших потоков.

package com.ApacheSparkConnection.ApacheSparkConnection;

import java.io.Serializable;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.rdd.JdbcRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import com.fasterxml.jackson.databind.deser.std.StringDeserializer;
import com.infosys.himi.maskit.algorithms.encryptiondecryption.EncryptionARC4;
import com.infosys.maskit.common.util.ConfigParams;

import scala.Tuple2;
import scala.reflect.ClassManifestFactory$;
import scala.runtime.AbstractFunction0;
import scala.runtime.AbstractFunction1;

public class MainSparkConnector {

    public static void main(String[] args) throws Exception {

        String dbtableQuery = "SELECT TOP 10 AGENT_CODE,AGENT_NAME,WORKING_AREA,COMMISSION,PHONE_NO,COUNTRY FROM dbo.AGENTS where AGENT_CODE >= ? and AGENT_CODE <= ?";

        String host = "XXXXXXXXX";
        String databaseName = "YYYY";
        String user = "sa";
        String password = "XXXXXX@123";

        long previewSize = 0; 

        Instant start = Instant.now();

        SparkConf sparkConf = new SparkConf().setAppName("SparkJdbcDs")
                .setMaster("local[4]")
                .set("spark.driver.allowMultipleContexts", "true");

        JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(10));
        JavaSparkContext javaSparkContext  =  javaStreamingContext.sparkContext();
        SparkContext sparkContext = javaSparkContext.sc(); 

        String url = "jdbc:sqlserver://" + host + ":1433;databaseName=" + databaseName;
        String driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"; 

        DbConnection dbConnection = new DbConnection(driver, url, user, password);

        JdbcRDD<Object[]> jdbcRDD =
                new JdbcRDD<Object[]>(sparkContext, dbConnection, dbtableQuery, 0,
                              100000, 10, new MapResult(), ClassManifestFactory$.MODULE$.fromClass(Object[].class));

        JavaRDD<Object[]> javaRDD = JavaRDD.fromRDD(jdbcRDD, ClassManifestFactory$.MODULE$.fromClass(Object[].class));

        List<String> employeeFullNameList = javaRDD.map(new Function<Object[], String>() {
            @Override
            public String call(final Object[] record) throws Exception {
                String rec = "";
                for(Object ob : record) {
                    rec = rec + " " + ob;
                }
                return rec;
            }
        }).collect();

        JavaRDD<String> javaRDD1 = javaStreamingContext.sparkContext().parallelize(employeeFullNameList);
        Queue<JavaRDD<String>> inputQueue = new LinkedList<JavaRDD<String>>();

        inputQueue.add(javaRDD1);

        JavaInputDStream<String> javaDStream = javaStreamingContext.queueStream(inputQueue, true);
        System.out.println("javaDStream.print()");
        javaDStream.print();
        javaDStream.foreachRDD( rdd-> {
            System.out.println("rdd.count() : "+ rdd.count());
            rdd.collect().stream().forEach(n-> System.out.println("item of list: "+n));
        });
        javaStreamingContext.start();

        System.out.println("employeeFullNameList.size() : "+employeeFullNameList.size());

        javaStreamingContext.awaitTermination();
    }

    static class DbConnection extends AbstractFunction0<Connection> implements Serializable {

        private String driverClassName;
        private String connectionUrl;
        private String userName;
        private String password;

        public DbConnection(String driverClassName, String connectionUrl, String userName, String password) {
            this.driverClassName = driverClassName;
            this.connectionUrl = connectionUrl;
            this.userName = userName;
            this.password = password;
        }

        public Connection apply() {
            try {
                Class.forName(driverClassName);
            } catch (ClassNotFoundException e) {
                System.out.println("Failed to load driver class" +e);
            }

            Properties properties = new Properties();
            properties.setProperty("user", userName);
            properties.setProperty("password", password);

            Connection connection = null;
            try {
                connection = DriverManager.getConnection(connectionUrl, properties);
            } catch (SQLException e) {
                System.out.println("Connection failed"+ e);
            }

            return connection;
        }
    }

    static class MapResult extends AbstractFunction1<ResultSet, Object[]> implements Serializable {

        public Object[] apply(ResultSet row) {
            return JdbcRDD.resultSetToObjectArray(row);
        }
    }
}````
Please let me know if am in wrong direction

1 Ответ

0 голосов
/ 07 мая 2020

Потоковая передача Реляционная СУБД мгновенная съемка исходных данных через Spark Streaming проста, но нет прямого способа получить конечные изменения, происходящие в БД.

Лучшее решение - go через Debezium SQL Server Connector

Debezium SQL Server Connector может отслеживать и записывать на уровне строк изменения в схемах базы данных SQL сервера.

  • Вам потребуется настроить кластер Kafka
  • Включить CD C для SQL сервера

SQL Серверный компакт-диск C не предназначен для хранения полной истории изменений базы данных. Таким образом, необходимо, чтобы Debezium установил базовый уровень текущего контента базы данных и направил его в Kafka. Это достигается с помощью процесса, называемого моментальным снимком.

По умолчанию (начальный режим моментального снимка) коннектор при первом запуске выполняет начальный согласованный моментальный снимок базы данных (что означает, что структура и данные в любых таблицах будут записаны как согласно конфигурации фильтра коннектора).

Каждый моментальный снимок состоит из следующих шагов:

Determine the tables to be captured

Obtain a lock on each of the monitored tables to ensure that no structural changes can occur to any of the tables. The level of the lock is determined by snapshot.isolation.mode configuration option.

Read the maximum LSN ("log sequence number") position in the server’s transaction log.

Capture the structure of all relevant tables.

Optionally release the locks obtained in step 2, i.e. the locks are held usually only for a short period of time.

Scan all of the relevant database tables and schemas as valid at the LSN position read in step 3, and generate a READ event for each row and write that event to the appropriate table-specific Kafka topic.

Record the successful completion of the snapshot in the connector offsets.

Чтение таблиц измененных данных

При первом запуске коннектор принимает структурный снимок структуры захваченных таблиц и сохраняет эту информацию в своей внутренней истории базы данных topi c. Затем коннектор идентифицирует таблицу изменений для каждой из исходных таблиц и выполняет основную l oop

For each change table read all changes that were created between last stored maximum LSN and current maximum LSN

Order the read changes incrementally according to commit LSN and change LSN. This ensures that the changes are replayed by Debezium in the same order as were made to the database.

Pass commit and change LSNs as offsets to Kafka Connect.

Store the maximum LSN and repeat the loop.

После перезапуска коннектор возобновит работу со смещения (фиксация и изменение номеров LSN), с которого он оставался. выключен раньше.

Коннектор может определять, включен ли CD C для исходной таблицы из белого списка во время выполнения, и изменять его поведение.

Серверный коннектор SQL записывает события для всех операций вставки, обновления и удаления в одной таблице в один Kafka topi c. Имя тем Kafka всегда имеет вид serverName.schemaName.tableName, где serverName - это логическое имя коннектора, указанное в свойстве конфигурации database.server.name, schemaName - это имя схемы, в которой произошла операция, и tableName - это имя таблицы базы данных, в которой произошла операция.

Например, рассмотрим установку сервера SQL с базой данных инвентаризации, содержащей четыре таблицы: products, products_on_hand, customers, and orders в схеме dbo. Если соединителю, контролирующему эту базу данных, было присвоено логическое имя сервера выполнения, то соединитель будет генерировать события по этим четырем темам Kafka:

    fulfillment.dbo.products
    fulfillment.dbo.products_on_hand
    fulfillment.dbo.customers

    fulfillment.dbo.orders
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...