Сообщение коммит в Spark Структурированная Потоковая передача - PullRequest
0 голосов
/ 20 марта 2020

Я использую потоковую обработку с искрой (2.3) и версию kafka 2.4.

Я хочу узнать, как я могу использовать ASync and Sync свойство смещения коммита.

Если я установлю enable.auto.commit как верно, это Sync or ASync?

Как я могу определить обратный вызов в потоковой структурированной искре? Или как я могу использовать Sync or ASync в структурированной потоковой передаче Spark?

Заранее спасибо

Мой код

package sparkProject;


import java.io.StringReader;
import java.util.*;

import javax.xml.bind.JAXBContext;
import javax.xml.bind.Unmarshaller;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;

import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;

public class XMLSparkStreamEntry {

    static StructType structType = new StructType();

    static {
        structType = structType.add("FirstName", DataTypes.StringType, false);
        structType = structType.add("LastName", DataTypes.StringType, false);
        structType = structType.add("Title", DataTypes.StringType, false);
        structType = structType.add("ID", DataTypes.StringType, false);
        structType = structType.add("Division", DataTypes.StringType, false);
        structType = structType.add("Supervisor", DataTypes.StringType, false);

    }

    static ExpressionEncoder<Row> encoder = RowEncoder.apply(structType);

    public static void main(String[] args) throws StreamingQueryException {

        SparkConf conf = new SparkConf();
        SparkSession spark = SparkSession.builder().config(conf).appName("Spark Program").master("local[*]")
                .getOrCreate();

        Dataset<Row> ds1 = spark.readStream().format("kafka").option("kafka.bootstrap.servers", "localhost:9092")
                .option("subscribe", "Kafkademo").load();

        Dataset<Row> ss = ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

        Dataset<Row> finalOP = ss.flatMap(new FlatMapFunction<Row, Row>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Iterator<Row> call(Row t) throws Exception {

                JAXBContext jaxbContext = JAXBContext.newInstance(FileWrapper.class);
                Unmarshaller unmarshaller = jaxbContext.createUnmarshaller();

                StringReader reader = new StringReader(t.getAs("value"));
                FileWrapper person = (FileWrapper) unmarshaller.unmarshal(reader);

                List<Employee> emp = new ArrayList<Employee>(person.getEmployees());
                List<Row> rows = new ArrayList<Row>();
                for (Employee e : emp) {

                    rows.add(RowFactory.create(e.getFirstname(), e.getLastname(), e.getTitle(), e.getId(),
                            e.getDivision(), e.getSupervisor()));

                }
                return rows.iterator();
            }
        }, encoder);


        Dataset<Row> wordCounts = finalOP.groupBy("firstname").count();

        StreamingQuery query = wordCounts.writeStream().outputMode("complete").format("console").start();
        System.out.println("SHOW SCHEMA");
        query.awaitTermination();

    }

}

Могу ли я кого-нибудь проверить, где и как я могу реализовать ASyn c и Syn c смещение коммит в моем коде выше?

Заранее спасибо ..!

Ответы [ 2 ]

0 голосов
/ 24 марта 2020

Пожалуйста, прочитайте https://www.waitingforcode.com/apache-spark-structured-streaming/apache-spark-structured-streaming-apache-kafka-offsets-management/read Это превосходный источник, хотя немного читается между строк.

Короче:

Структурированная потоковая передача игнорируется смещения фиксируются в Apache Кафке. Вместо этого он опирается на собственное управление смещениями на стороне водителя, которое отвечает за распределение смещений исполнителям и за контрольные точки их в конце цикла обработки (эпоха или микропакет).

Batck Spark Structured Streaming & KAFKA Integration снова работает иначе.

0 голосов
/ 24 марта 2020

Spark Structured Streaming не поддерживает функцию смещения фиксации Kafka. Рекомендуемый вариант из официальных документов - включить контрольные точки.
https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

Другое предложение - изменить его на Spark Streaming, который поддерживает Kafka commitAsyn c API. https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

...