Присоединитесь к набору данных Spark, используя java - PullRequest
0 голосов
/ 06 мая 2020

У меня есть 2 набора данных, которые я пытаюсь объединить:

Dataset1 (machine):

 String machineID:
 List<Integer> machineCat;(100,200,300)

Dataset2 (car):

 String carID:
 List<Integer> carCat;(30,200,100,300)

Мне нужно чтобы получить каждый элемент List machineCat из набора данных1 и проверить, содержится ли он в List carCat набора данных2. Если это совпадает, объедините 2 набора данных, как показано ниже:

окончательный набор данных:

machineID,machineCat(100),carID,carCat(100)
machineID,machineCat(200),carID,carCat(200)
machineID,machineCat(300),carID,carCat(400)

любая помощь о том, как это сделать, используя соединение набора данных в java.

Рассматриваем вариант с array_contain (что-то вроде ниже)

  machine.foreachPartition((ForeachPartitionFunction<Machine>) iterator -> {

    while (iterator.hasNext()) {

        Machine machine = iterator.next();
        machine.getmachineCat().stream().filter(cat -> {

            LOG.info("matched");
            spark.sql(
                    "select * from machineDataset m"
                            + " join"
                            + " carDataset c "
                            + "where array_contains(m.machineCat,cat)");
            return true;
        });

    }
});

1 Ответ

1 голос
/ 07 мая 2020
import static org.apache.spark.sql.functions.*; // before main class

Machine machine = new Machine("m1",Arrays.asList(100,200,300));
Car car = new Car("c1", Arrays.asList(30,200,100,300));

Dataset<Row> mDF= spark.createDataFrame(Arrays.asList(machine), Machine.class);
mDF.show();
Dataset<Row> cDF= spark.createDataFrame(Arrays.asList(car), Car.class);
cDF.show();

вывод:

+---------------+---------+
|     machineCat|machineId|
+---------------+---------+
|[100, 200, 300]|       m1|
+---------------+---------+

+-------------------+-----+
|             carCat|catId|
+-------------------+-----+
|[30, 200, 100, 300]|   c1|
+-------------------+-----+

затем

Dataset<Row> mDF2 = mDF.select(col("machineId"),explode(col("machineCat")).as("machineCat"));
Dataset<Row> cDF2 = cDF.select(col("catId"),explode(col("carCat")).as("carCat"));
Dataset<Row> joinedDF = mDF2.join(cDF2).where(mDF2.col("machineCat").equalTo(cDF2.col("carCat")));
Dataset<Row> finalDF = joinedDF.select(col("machineId"),array(col("machineCat")), col("catId"),array(col("carCat")) );
finalDF.show();

и наконец:

+---------+-----------------+-----+-------------+
|machineId|array(machineCat)|catId|array(carCat)|
+---------+-----------------+-----+-------------+
|       m1|            [100]|   c1|        [100]|
|       m1|            [200]|   c1|        [200]|
|       m1|            [300]|   c1|        [300]|
+---------+-----------------+-----+-------------+

root
 |-- machineId: string (nullable = true)
 |-- array(machineCat): array (nullable = false)
 |    |-- element: integer (containsNull = true)
 |-- catId: string (nullable = true)
 |-- array(carCat): array (nullable = false)
 |    |-- element: integer (containsNull = true)
...