Разделить набор данных на основе значения столбца - PullRequest
0 голосов
/ 24 мая 2018

У меня есть Dataset<Row>, который является результатом Kafka readStream, как показано ниже в фрагменте кода Java.

m_oKafkaEvents = getSparkSession().readStream().format("kafka")  
  .option("kafka.bootstrap.servers", strKafkaAddress)  
  .option("subscribe", getInsightEvent().getTopic())  
  .option("maxOffsetsPerTrigger", "100000")  
  .option("startingOffsets", "latest")  
  .option("failOnDataLoss", false)  
  .load()  
  .select(functions.from_json(functions.col("value").cast("string"), oSchema).as("events"))  
  .select("events.*");  

m_oKafkaEvents  
{  
    {"EventTime":"1527005246864000000","InstanceID":"231","Model":"Opportunity_1","Milestone":"OrderProcessed"},  
    {"EventTime":"1527005246864000002","InstanceID":"232","Model":"Opportunity_2","Milestone":"OrderProcessed"},  
    {"EventTime":"1527005246864000001","InstanceID":"233","Model":"Opportunity_1","Milestone":"OrderProcessed"},  
    {"EventTime":"1527005246864000002","InstanceID":"234","Model":"Opportunity_2","Milestone":"OrderProcessed"}  
}  

Мне нужно разделить этот набор данных на основе столбца «Модель», что приведет к двум наборам данных, как показано ниже;

 m_oKafkaEvents_for_Opportunity_1_topic 
   {  
       {"EventTime":"1527005246864000000","InstanceID":"231","Model":"Opportunity_1","Milestone":"OrderProcessed"},  
       {"EventTime":"1527005246864000001","InstanceID":"233","Model":"Opportunity_1","Milestone":"OrderProcessed"}   
   }  

   m_oKafkaEvents_for_Opportunity_2_topic  
   {  
      {"EventTime":"1527005246864000002","InstanceID":"232","Model":"Opportunity_2","Milestone":"OrderProcessed"},  
      {"EventTime":"1527005246864000002","InstanceID":"234","Model":"Opportunity_2","Milestone":"OrderProcessed"}  
   }  

Эти наборы данных будут опубликованы в приемнике Кафки.Название темы будет значением модели.то есть Opportunity_1 и Opportunity_2.
Следовательно, мне нужно иметь значение столбца дескриптора «Модель» и список соответствующих событий.
Поскольку я новичок в Spark, ищу помощь в том, как этого можно добиться с помощью Java.код.
Ценю любую помощь.

1 Ответ

0 голосов
/ 25 мая 2018

Самое простое решение будет выглядеть так:

allEvents.selectExpr("topic", "CONCAT('m_oKafkaEvents_for_', Model, '_topic')")
        .write()
        .format("kafka")
        .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
        .save();

Пример вы можете увидеть здесь https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html#writing-the-output-of-batch-queries-to-kafka.Но после просмотра кода Spark кажется, что у нас может быть только 1 тема / запись, т.е. он выберет в качестве темы первую найденную строку:

def write(
  sparkSession: SparkSession,
  queryExecution: QueryExecution,
  kafkaParameters: ju.Map[String, Object],
  topic: Option[String] = None): Unit = {
val schema = queryExecution.analyzed.output
validateQuery(schema, kafkaParameters, topic)
queryExecution.toRdd.foreachPartition { iter =>
  val writeTask = new KafkaWriteTask(kafkaParameters, schema, topic)
  Utils.tryWithSafeFinally(block = writeTask.execute(iter))(
    finallyBlock = writeTask.close())
}

Вы можете попробовать этот подход и сказать здесь, еслиэто работает как сказано выше?Если это не работает, у вас есть альтернативные решения, такие как:

  1. Кэшируйте основной DataFrame и создайте 2 других DataFrames, отфильтрованных по атрибуту Model
  2. Используйте foreachPartition и средство записи Kafka для отправкисообщения без разделения основного набора данных

Первое решение довольно просто реализовать, и вы используете для этого все возможности Spark.С другой стороны, и, по крайней мере, теоретически, разделение набора данных должно быть немного медленнее, чем второе предложение.Но попробуйте измерить, прежде чем выбрать тот или иной вариант, возможно, разница будет очень мала, и всегда лучше использовать понятный и одобренный сообществом подход.

Ниже вы можете найти код, показывающий обе ситуации:

SparkSession spark = SparkSession
            .builder()
            .appName("JavaStructuredNetworkWordCount")
            .getOrCreate();
    Dataset<Row> allEvents = spark.readStream().format("kafka")
            .option("kafka.bootstrap.servers", "")
            .option("subscribe", "event")
            .option("maxOffsetsPerTrigger", "100000")
            .option("startingOffsets", "latest")
            .option("failOnDataLoss", false)
            .load()
            .select(functions.from_json(functions.col("value").cast("string"), null).as("events"))
            .select("events.*");


    // First solution
    Dataset<Row> opportunity1Events = allEvents.filter("Model = 'Opportunity_1'");
    opportunity1Events.write().format("kafka").option("kafka.bootstrap.servers", "")
            .option("topic", "m_oKafkaEvents_for_Opportunity_1_topic").save();
    Dataset<Row> opportunity2Events = allEvents.filter("Model = 'Opportunity_2'");
    opportunity2Events.write().format("kafka").option("kafka.bootstrap.servers", "")
            .option("topic", "m_oKafkaEvents_for_Opportunity_2_topic").save();
    // Note: Kafka writer was added in 2.2.0 https://github.com/apache/spark/commit/b0a5cd89097c563e9949d8cfcf84d18b03b8d24c

    // Another approach with iteration throughout messages accumulated within each partition
    allEvents.foreachPartition(new ForeachPartitionFunction<Row>() {
        private KafkaProducer<String, Row> localProducer = new KafkaProducer<>(new HashMap<>());

        private final Map<String, String> modelsToTopics = new HashMap<>();
        {
            modelsToTopics.put("Opportunity_1", "m_oKafkaEvents_for_Opportunity_1_topic");
            modelsToTopics.put("Opportunity_2", "m_oKafkaEvents_for_Opportunity_2_topic");
        }

        @Override
        public void call(Iterator<Row> rows) throws Exception {
            // If your message is Opportunity1 => add to messagesOpportunity1
            // otherwise it goes to Opportunity2
            while (rows.hasNext()) {
                Row currentRow = rows.next();
                // you can reformat your row here or directly in Spark's map transformation
                localProducer.send(new ProducerRecord<>(modelsToTopics.get(currentRow.getAs("Model")),
                        "some_message_key", currentRow));
            }
            // KafkaProducer accumulates messages in a in-memory buffer and sends when a threshold was reached
            // Flush them synchronously here to be sure that every stored message was correctly
            // delivered
            // You can also play with features added in Kafka 0.11: the idempotent producer and the transactional producer
            localProducer.flush();
        }
    });
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...