COLLECT_SET () в Hive, сохранить дубликаты? - PullRequest
36 голосов
/ 22 июня 2011

Есть ли способ сохранить дубликаты в собранном наборе в Hive или смоделировать тот тип совокупного сбора, который Hive предоставляет, используя другой метод? Я хочу объединить все элементы в столбце с одинаковым ключом в массив с дубликатами.

т.е:.

hash_id | num_of_cats
=====================
ad3jkfk            4
ad3jkfk            4
ad3jkfk            2
fkjh43f            1
fkjh43f            8
fkjh43f            8
rjkhd93            7
rjkhd93            4
rjkhd93            7

должен вернуть:

hash_agg | cats_aggregate
===========================
ad3jkfk   Array<int>(4,4,2)
fkjh43f   Array<int>(1,8,8)
rjkhd93   Array<int>(7,4,7)

Ответы [ 9 ]

28 голосов
/ 11 ноября 2014

Попробуйте использовать COLLECT_LIST (col) после Hive 0.13.0

SELECT
    hash_id, COLLECT_LIST(num_of_cats) AS aggr_set
FROM
    tablename
WHERE
    blablabla
GROUP BY
    hash_id
;
22 голосов
/ 02 июля 2011

Ничего не встроено, но создание пользовательских функций, в том числе агрегатов, не так уж плохо.Единственная грубая часть - попытаться сделать их типовыми, но вот пример сбора.

package com.example;

import java.util.ArrayList;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;

public class CollectAll extends AbstractGenericUDAFResolver
{
    @Override
    public GenericUDAFEvaluator getEvaluator(TypeInfo[] tis)
            throws SemanticException
    {
        if (tis.length != 1)
        {
            throw new UDFArgumentTypeException(tis.length - 1, "Exactly one argument is expected.");
        }
        if (tis[0].getCategory() != ObjectInspector.Category.PRIMITIVE)
        {
            throw new UDFArgumentTypeException(0, "Only primitive type arguments are accepted but " + tis[0].getTypeName() + " was passed as parameter 1.");
        }
        return new CollectAllEvaluator();
    }

    public static class CollectAllEvaluator extends GenericUDAFEvaluator
    {
        private PrimitiveObjectInspector inputOI;
        private StandardListObjectInspector loi;
        private StandardListObjectInspector internalMergeOI;

        @Override
        public ObjectInspector init(Mode m, ObjectInspector[] parameters)
                throws HiveException
        {
            super.init(m, parameters);
            if (m == Mode.PARTIAL1)
            {
                inputOI = (PrimitiveObjectInspector) parameters[0];
                return ObjectInspectorFactory
                        .getStandardListObjectInspector((PrimitiveObjectInspector) ObjectInspectorUtils
                        .getStandardObjectInspector(inputOI));
            }
            else
            {
                if (!(parameters[0] instanceof StandardListObjectInspector))
                {
                    inputOI = (PrimitiveObjectInspector)  ObjectInspectorUtils
                            .getStandardObjectInspector(parameters[0]);
                    return (StandardListObjectInspector) ObjectInspectorFactory
                            .getStandardListObjectInspector(inputOI);
                }
                else
                {
                    internalMergeOI = (StandardListObjectInspector) parameters[0];
                    inputOI = (PrimitiveObjectInspector) internalMergeOI.getListElementObjectInspector();
                    loi = (StandardListObjectInspector) ObjectInspectorUtils.getStandardObjectInspector(internalMergeOI);
                    return loi;
                }
            }
        }

        static class ArrayAggregationBuffer implements AggregationBuffer
        {
            ArrayList<Object> container;
        }

        @Override
        public void reset(AggregationBuffer ab)
                throws HiveException
        {
            ((ArrayAggregationBuffer) ab).container = new ArrayList<Object>();
        }

        @Override
        public AggregationBuffer getNewAggregationBuffer()
                throws HiveException
        {
            ArrayAggregationBuffer ret = new ArrayAggregationBuffer();
            reset(ret);
            return ret;
        }

        @Override
        public void iterate(AggregationBuffer ab, Object[] parameters)
                throws HiveException
        {
            assert (parameters.length == 1);
            Object p = parameters[0];
            if (p != null)
            {
                ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab;
                agg.container.add(ObjectInspectorUtils.copyToStandardObject(p, this.inputOI));
            }
        }

        @Override
        public Object terminatePartial(AggregationBuffer ab)
                throws HiveException
        {
            ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab;
            ArrayList<Object> ret = new ArrayList<Object>(agg.container.size());
            ret.addAll(agg.container);
            return ret;
        }

        @Override
        public void merge(AggregationBuffer ab, Object o)
                throws HiveException
        {
            ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab;
            ArrayList<Object> partial = (ArrayList<Object>)internalMergeOI.getList(o);
            for(Object i : partial)
            {
                agg.container.add(ObjectInspectorUtils.copyToStandardObject(i, this.inputOI));
            }
        }

        @Override
        public Object terminate(AggregationBuffer ab)
                throws HiveException
        {
            ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab;
            ArrayList<Object> ret = new ArrayList<Object>(agg.container.size());
            ret.addAll(agg.container);
            return ret;
        }
    }
}

Затем в улье просто введите add jar Whatever.jar; и CREATE TEMPORARY FUNCTION collect_all AS 'com.example.CollectAll';. Вы должны использовать его какожидается.

hive> SELECT hash_id, collect_all(num_of_cats) FROM test GROUP BY hash_id;
OK
ad3jkfk [4,4,2]
fkjh43f [1,8,8]
rjkhd93 [7,4,7]

Стоит отметить, что порядок элементов следует считать неопределенным, поэтому, если вы намереваетесь использовать это для подачи информации в n_grams, вам может понадобиться немного ее расширить, чтобы отсортировать данные какнеобходимо.

12 голосов
/ 14 декабря 2012

Модифицированный код Джеффа Макса, чтобы убрать ограничение (предположительно унаследованное от collect_set), что входные данные должны быть примитивными типами.Эта версия может собирать структуры, карты и массивы, а также примитивы.

package com.example;

import java.util.ArrayList;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;

public class CollectAll extends AbstractGenericUDAFResolver
{
    @Override
    public GenericUDAFEvaluator getEvaluator(TypeInfo[] tis)
            throws SemanticException
    {
        if (tis.length != 1)
        {
            throw new UDFArgumentTypeException(tis.length - 1, "Exactly one argument is expected.");
        }
        return new CollectAllEvaluator();
    }

    public static class CollectAllEvaluator extends GenericUDAFEvaluator
    {
        private ObjectInspector inputOI;
        private StandardListObjectInspector loi;
        private StandardListObjectInspector internalMergeOI;

        @Override
        public ObjectInspector init(Mode m, ObjectInspector[] parameters)
                throws HiveException
        {
            super.init(m, parameters);
            if (m == Mode.PARTIAL1)
            {
                inputOI = parameters[0];
                return ObjectInspectorFactory
                        .getStandardListObjectInspector(ObjectInspectorUtils
                        .getStandardObjectInspector(inputOI));
            }
            else
            {
                if (!(parameters[0] instanceof StandardListObjectInspector))
                {
                    inputOI = ObjectInspectorUtils
                            .getStandardObjectInspector(parameters[0]);
                    return (StandardListObjectInspector) ObjectInspectorFactory
                            .getStandardListObjectInspector(inputOI);
                }
                else
                {
                    internalMergeOI = (StandardListObjectInspector) parameters[0];
                    inputOI = internalMergeOI.getListElementObjectInspector();
                    loi = (StandardListObjectInspector) ObjectInspectorUtils.getStandardObjectInspector(internalMergeOI);
                    return loi;
                }
            }
        }

        static class ArrayAggregationBuffer implements AggregationBuffer
        {
            ArrayList<Object> container;
        }

        @Override
        public void reset(AggregationBuffer ab)
                throws HiveException
        {
            ((ArrayAggregationBuffer) ab).container = new ArrayList<Object>();
        }

        @Override
        public AggregationBuffer getNewAggregationBuffer()
                throws HiveException
        {
            ArrayAggregationBuffer ret = new ArrayAggregationBuffer();
            reset(ret);
            return ret;
        }

        @Override
        public void iterate(AggregationBuffer ab, Object[] parameters)
                throws HiveException
        {
            assert (parameters.length == 1);
            Object p = parameters[0];
            if (p != null)
            {
                ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab;
                agg.container.add(ObjectInspectorUtils.copyToStandardObject(p, this.inputOI));
            }
        }

        @Override
        public Object terminatePartial(AggregationBuffer ab)
                throws HiveException
        {
            ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab;
            ArrayList<Object> ret = new ArrayList<Object>(agg.container.size());
            ret.addAll(agg.container);
            return ret;
        }

        @Override
        public void merge(AggregationBuffer ab, Object o)
                throws HiveException
        {
            ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab;
            ArrayList<Object> partial = (ArrayList<Object>)internalMergeOI.getList(o);
            for(Object i : partial)
            {
                agg.container.add(ObjectInspectorUtils.copyToStandardObject(i, this.inputOI));
            }
        }

        @Override
        public Object terminate(AggregationBuffer ab)
                throws HiveException
        {
            ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab;
            ArrayList<Object> ret = new ArrayList<Object>(agg.container.size());
            ret.addAll(agg.container);
            return ret;
        }
    }
}
11 голосов
/ 28 декабря 2013

Начиная с улья 0.13, есть встроенный UDAF под названием collect_list(), который достигает этого. Смотри здесь .

2 голосов
/ 10 июля 2014

Проверьте, что кирпичный дом собирает UDAF (http://github.com/klout/brickhouse/blob/master/src/main/java/brickhouse/udf/collect/CollectUDAF.java)

Также поддерживает сбор в карту.Brickhouse также содержит много полезных UDF, не входящих в стандартный дистрибутив Hive.

1 голос
/ 09 июля 2014

Для чего стоит (хотя я знаю, что это более старая статья), Hive 0.13.0 имеет новую функцию collect_list () , которая не дедуплицирует.

1 голос
/ 10 июня 2014

Вот точный запрос улья, который выполняет эту работу (работает только в кусте> 0,13):

SELECT hash_id, collect_set (num_of_cats) FROM GROUP BY hash_id;

0 голосов
/ 17 декабря 2015

Просто интересно - если бы государство -

SELECT
    hash_id, COLLECT_LIST(num_of_cats) AS aggr_set
FROM
    tablename
WHERE
    blablabla
GROUP BY
    hash_id
;

мы хотим иметь сортировку и ограничить элементы для num_of_cats - как это сделать? COS в больших данных мы имеем дело с PB данных ... нам может не понадобиться все это в таких случаях, но топ-10 или ограничить его.

0 голосов
/ 05 июня 2015

Обходной путь для сбора структуры

предположим, что у вас есть таблица

tableWithStruct(
id string,
obj struct <a:string,b:string>)

, теперь создайте другую таблицу как

CREATE EXTERNAL TABLE tablename (
id string,
temp array<string>
)
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY '\t' COLLECTION ITEMS TERMINATED BY ',' MAP KEYS TERMINATED BY '|'

запрос вставки

insert into table tablename select id,collect(concat_ws('|',cast(obj.a as string),cast(obj.b as string)) from tableWithStruct group by id;

теперь создайте другую таблицу в том же месте, что и tablename

CREATE EXTERNAL TABLE tablename_final (
id string,
array_list array<struct<a:string,b:string>>
)
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY '\t' COLLECTION ITEMS TERMINATED BY ',' MAP KEYS TERMINATED BY '|'

, когда вы выбираете из tablename_final , вы получите желаемый результат

...