Операция подсчета не работает на агрегированном IgniteDataFrame - PullRequest
1 голос
/ 22 мая 2019

Я работаю с Apache Spark и Apache Ignite. У меня есть набор данных spark, который я написал в Ignite, используя следующий код

dataset.write()
                .mode(SaveMode.Overwrite)
                .format(FORMAT_IGNITE())
                .option(OPTION_CONFIG_FILE(), "ignite-server-config.xml")
                .option(OPTION_TABLE(), "CUSTOM_VALUES")
                .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS(), "ID")
                .save();

И я читаю это снова, чтобы выполнить групповое действие, которое будет переведено в Ignite.

  Dataset igniteDataset = sparkSession.read()
                .format(FORMAT_IGNITE())
                .option(OPTION_CONFIG_FILE(), "ignite-server-config.xml")
                .option(OPTION_TABLE(), "CUSTOM_VALUES")
                .load();


        RelationalGroupedDataset idGroupedData = igniteDataset.groupBy(customized_id);

        Dataset<Row> result = idGroupedData.agg(count(id).as("count_id"),
                count(fid).as("count_custom_field_id"),
                count(type).as("count_customized_type"),
                count(val).as("count_value"), count(customized_id).as("groupCount"));

Теперь я хочу получить количество строк, возвращаемых действием groupby. Итак, я вызываю count () для набора данных как result.count();

Когда я это делаю, я получаю следующее исключение.

Caused by: org.h2.jdbc.JdbcSQLException: Syntax error in SQL statement "SELECT COUNT(1) AS COUNT FROM (SELECT  FROM CUSTOM_VALUES GROUP[*] BY CUSTOMIZED_ID) TABLE1 "; expected "., (, USE, AS, RIGHT, LEFT, FULL, INNER, JOIN, CROSS, NATURAL, ,, SELECT"; SQL statement:
SELECT COUNT(1) AS count FROM (SELECT  FROM CUSTOM_VALUES GROUP BY CUSTOMIZED_ID) table1 [42001-197]
    at org.h2.message.DbException.getJdbcSQLException(DbException.java:357)
    at org.h2.message.DbException.getSyntaxError(DbException.java:217)

Другие функции, такие как show(), collectAsList().size();, работают.

Что мне здесь не хватает?

1 Ответ

0 голосов
/ 31 мая 2019

Я протестировал ваш пример на последней версии сообщества 7.7.5 GridGain, которая является версией Gridgain с открытым исходным кодом, основанной на источниках Ignite 2.7.0, с подмножеством дополнительных исправлений (https://www.gridgain.com/resources/download).

Вот код:

public class Main {
    public static void main(String[] args) {
        if (args.length < 1)
            throw new IllegalArgumentException("You should set the path to client configuration file.");

        String configPath = args[0];

        SparkSession session = SparkSession.builder()
                .enableHiveSupport()
                .getOrCreate();

        Dataset<Row> igniteDataset = session.read()
                .format(IgniteDataFrameSettings.FORMAT_IGNITE())                  //Data source
                .option(IgniteDataFrameSettings.OPTION_TABLE(), "Person")         //Table to read.
                .option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), configPath) //Ignite config.
                .load();

        RelationalGroupedDataset idGroupedData = igniteDataset.groupBy("CITY_ID");

        Dataset<Row> result = idGroupedData.agg(count("id").as("count_id"),
                count("city_id").as("count_city_id"),
                count("name").as("count_name"),
                count("age").as("count_age"),
                count("company").as("count_company"));

        result.show();

        session.close();
    }
} 

Вот maven зависимости:

<dependencies>
    <dependency>
        <groupId>org.gridgain</groupId>
        <artifactId>gridgain-core</artifactId>
        <version>8.7.5</version>
    </dependency>
    <dependency>
        <groupId>org.gridgain</groupId>
        <artifactId>ignite-core</artifactId>
        <version>8.7.5</version>
    </dependency>
    <dependency>
        <groupId>org.gridgain</groupId>
        <artifactId>ignite-spring</artifactId>
        <version>8.7.5</version>
    </dependency>
    <dependency>
        <groupId>org.gridgain</groupId>
        <artifactId>ignite-indexing</artifactId>
        <version>8.7.5</version>
    </dependency>
    <dependency>
        <groupId>org.gridgain</groupId>
        <artifactId>ignite-spark</artifactId>
        <version>8.7.5</version>
    </dependency>
</dependencies>

Вот конфигурация кеша:

    <property name="cacheConfiguration">
        <list>
            <bean class="org.apache.ignite.configuration.CacheConfiguration">
                <property name="name" value="Person"/>
                <property name="cacheMode" value="PARTITIONED"/>
                <property name="atomicityMode" value="ATOMIC"/>
                <property name="sqlSchema" value="PUBLIC"/>

                <property name="queryEntities">
                    <list>
                        <bean class="org.apache.ignite.cache.QueryEntity">
                            <property name="keyType" value="PersonKey"/>
                            <property name="valueType" value="PersonValue"/>
                            <property name="tableName" value="Person"/>

                            <property name="keyFields">
                                <list>
                                    <value>id</value>
                                    <value>city_id</value>
                                </list>
                            </property>

                            <property name="fields">
                                <map>
                                    <entry key="id" value="java.lang.Integer"/>
                                    <entry key="city_id" value="java.lang.Integer"/>
                                    <entry key="name" value="java.lang.String"/>
                                    <entry key="age" value="java.lang.Integer"/>
                                    <entry key="company" value="java.lang.String"/>
                                </map>
                            </property>

                            <property name="aliases">
                                <map>
                                    <entry key="id" value="id"/>
                                    <entry key="city_id" value="city_id"/>
                                    <entry key="name" value="name"/>
                                    <entry key="age" value="age"/>
                                    <entry key="company" value="company"/>
                                </map>
                            </property>
                        </bean>
                    </list>
                </property>
            </bean>
        </list>
    </property>

Используя Spark 2.3.0, который поддерживается только для зависимости воспламенения от искры, у меня есть следующий результат на моих тестовых данных:

Данные:

ID,CITY_ID,NAME,AGE,COMPANY,
4,1,Justin Bronte,23,bank,
3,1,Helen Richard,49,bank,

Результат:

+-------+--------+-------------+----------+---------+-------------+
|CITY_ID|count_id|count_city_id|count_name|count_age|count_company|
+-------+--------+-------------+----------+---------+-------------+
|      1|       2|            2|         2|        2|            2|
+-------+--------+-------------+----------+---------+-------------+

Кроме того, этот код может быть полностью применен к Ignite 2.7.0.

...