Scala Spark создает дополнительный столбец с ACL, используя выражение фильтра - PullRequest
0 голосов
/ 27 сентября 2019

Добрый день, коллеги.

  • Это ограничение профиля
((col1 = valueA 3) or (col2 = ValueB 2)) - Common  sql expression
((NOT col1 = valueA N) and (col3 = ValueC 2)) - It could be with all kind of logic operators

.У меня есть источник данных, например:

+-----------+----------+----------+
|   Col1    |   Col2   |   Col3   |
+-----------+----------+----------+
| ValueA 1  | ValueB 2 | ValueC 3 |
| ValueA 1  | ValueB 3 | ValueC 4 |
+-----------+----------+----------+

И мне нужно получить следующий набор данных:

+-----------+----------+----------+----------+
|   Col1    |   Col2   |   Col3   | Profile1 |
+-----------+----------+----------+----------+
| ValueA 1  | ValueB 2 | ValueC 3 |        1 |
| ValueA 1  | ValueB 3 | ValueC 4 |        0 |
+-----------+----------+----------+----------+
  • 1 - означает, что функция фильтра вернула true
  • 0 - означает, что функция фильтра возвратила false

Я знаю, как это сделать с помощью соединения (выполнить фильтрацию исходного набора данных по sql_expr, соединение сColumn и т. Д.).Но у меня есть около 100 профилей, и я не буду делать 100 соединений.Я не ищу готового решения, но некоторые советы, как сделать это эффективно, будут точными.Я думаю, что я мог бы как-то создать коллекцию ограничений профилей (profile_id, sql_expression) и сделать карту для каждой строки, создать столбец с массивом, содержащий внутри правильные profile_ids, и в конце концов сделать flatmap.

UPDATE1: В настоящее время я использую это решение, но не могу проверить его, потому что локально оно никогда не заканчивается.

    @Override
    public <V extends SomeData, T extends ObjWithRestr> Dataset<Row> filterByMultipleRestrictionObjs(Dataset<V> source,
                                                                                                          List<T> objsWithRestr,
                                                                                                          Class<V> tClass) {
        Dataset<Row> resultDataset = source.as(Encoders.bean(Row.class));
        for (T objWithRestr : objsWithRestr) {
            Profile profile = (Profile) objWithRestr;
            String client_id = profile.getClient_id();
            ProfileRestrictions profileRestrictions = gsonAdapter
                    .fromJson(new StringReader(objWithRestr.getRestrictions()), ProfileRestrictions.class);

            String combinedFilter = getCombinedFilter(profileRestrictions.getDemoFilter(), profileRestrictions.getMediaFilter());
            Dataset<Row> filteredDataset = resultDataset.filter(combinedFilter);
            Dataset<Row> falseDataset = resultDataset.exceptAll(filteredDataset).withColumn(client_id, lit(0));
            Dataset<Row> trueDataset = resultDataset.intersectAll(filteredDataset).withColumn(client_id, lit(1));
            resultDataset = falseDataset.unionByName(trueDataset);

        }
        return resultDataset;
    }

1 Ответ

1 голос
/ 27 сентября 2019
# With the help of below approach you can be able to solve the isseue i believe

Your filter condition values
filter_col1|filter_col2
valueA 3|ValueB 2
valueA 4|ValueB 3
valueA 5|ValueB 4
valueA 6|ValueB 5

//read them and conver them into a dataframe - filter_cond_df
//Create temp table on top of filter_cond_df
filter_cond_df.createOrReplaceTempView("filter_temp")

Your input Data:
+-----------+----------+----------+
|   Col1    |   Col2   |   Col3   |
+-----------+----------+----------+
| ValueA 1  | ValueB 2 | ValueC 3 |
| ValueA 1  | ValueB 3 | ValueC 4 |
+-----------+----------+----------+

//consider this as input_df, create a temp table on top it
input_df.createOrReplaceTempView("input_temp")
//to get only the matching for your filter condition
val matching_df = spark.sql("""select * from input_temp where col1 in (select filtert_col1 from filter_temp) or col2 in (select filter_col2 from filter_temp)""")

//get the remaining or not matched from your input
val notmatching_df = input_df.except(matching_df)

//adding profile column with value 1 to matching_df
val result1 = matching_df.withColumn("profile"),lit(1))
//adding profile column with value 0 to notmatching_df
val result2 = notmatching_df.withColumn("profile",lit(0))

val final_result = result1.union(result2)

i hope this helps!
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...