Добавить схему в набор данных [Row] в Java - PullRequest
0 голосов
/ 12 апреля 2019

Я новичок в Spark и пытаюсь исследовать структурированную потоковую передачу Spark.Я буду принимать сообщения от Kafka (вложенный JSON), фильтровать эти сообщения на основе определенных условий атрибута JSON.Каждое сообщение, удовлетворяющее фильтру, должно быть отправлено Кассандре.

Я прочитал документацию по коннектору искры Cassandra https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html

Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load() 

df.selectExpr("CAST(value AS STRING)")

Мне нужны только некоторые из многих атрибутов, присутствующих в этом вложенном JSON.Как применить схему поверх нее, чтобы я мог использовать sparkSQL для фильтрации?

Для примера JSON мне нужно сохранить имя, возраст, опыт, hobby_name, hobby_experience для игроков, чья сумма игрычастота больше 5.

{
    "name": "Tom",
    "age": "24",
    "gender": "male",
    "hobbies": [{
        "name": "Tennis",
        "experience": 5,
        "places": [{
            "city": "London",
            "frequency": 4
        }, {
            "city": "Sydney",
            "frequency": 3
        }]
    }]
}

Я относительно новичок в Spark, пожалуйста, прости, если это повторение.Также я ищу решение в JAVA.

1 Ответ

0 голосов
/ 22 апреля 2019

Вы можете указать свою схему следующим образом:

import org.apache.spark.sql.types.{DataTypes, StructField, StructType};

StructType schema = DataTypes.createStructType(new StructField[] {
    DataTypes.createStructField("name",  DataTypes.StringType, true),
    DataTypes.createStructField("age", DataTypes.StringType, true),
    DataTypes.createStructField("gender", DataTypes.StringType, true),
    DataTypes.createStructField("hobbies", DataTypes.createStructType(new StructField[] {
        DataTypes.createStructField("name", DataTypes.StringType, true),
        DataTypes.createStructField("experience", DataTypes.IntegerType, true),
        DataTypes.createStructField("places", DataTypes.createStructType(new StructField[] {
            DataTypes.createStructField("city", DataTypes.StringType, true),
            DataTypes.createStructField("frequency", DataTypes.IntegerType, true)
        }), true)
    }), true)
});

И затем использовать схему для создания вашего кадра данных при необходимости:

import org.apache.spark.sql.functions.{col, from_json};

df.select(from_json(col("value"), schema).as("data"))
  .select(
    col("data.name").as("name"),
    col("data.hobbies.name").as("hobbies_name"))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...