Java UDF, используемый в Impala, не работает правильно.Это случайно смешивает значения, не соблюдая порядок строк - PullRequest
0 голосов
/ 26 марта 2019

Я сделал UDF в Java, цель которого состоит в том, чтобы сгруппировать строки между началом работы компонента (1 рабочий-0 не работает). После этого он скажет, сколько длилось событие (помимо всего прочего). Кроме того, я могу установить время принятия для нулевых значений между '1', поэтому в случае потери некоторых значений в середине работы, я могу сказать, хочу ли я считать его работающим-неработающим.

Давайте предположим, что эти данные упорядочены во времени

  Time | Component  | Value
___________________________
  1        Hvac        0
  2        Hvac        1
  3        Hvac        1
  4        Hvac        1
  5        Hvac        0
  6        Hvac        0
  7        Hvac        1
  8        Hvac        1
  9        Hvac        null
  10       Hvac        null
  11       Hvac        1
  12       Hvac        0

При времени принятия 3 он вернется:

Time | State      | Group    | Duration
_________________________________________
  1    Not working  null        null
  2    Start        2           null
  3    Working      2           null
  4    Working      2           null
  5    End          2           3
  6    Not working  null        null
  7    Start        7           null
  8    Working      7           null
  9    No state     7           null
  10   No state     7           null
  11   Working      7           null
  12   End          7           5

Принимая во внимание, что при времени принятия 1 он вернется:

  Time | State      | Group    | Duration
_________________________________________
  1    Not working  null        null
  2    Start        2           null
  3    Working      2           null
  4    Working      2           null
  5    End          2           3
  6    Not working  null        null
  7    Start        7           null
  8    End          7           1
  9    No state     null        null
  10   No state     null        null
  11   Start        11          null
  12   End          11          1

Процедура вызова UDF на импале следующая: select my_udf(actual_value, actual_time, next_time, next_value, time_acceptance, component) from( /* We have to make this so that impala orders data in time. Limit clause is necessary in impala so that it does not ignore the order by */ select time, component from mytable order by time limit 9999999999 )a;

Я видел, что UDF работает правильно в 97% случаев более или менее, но иногда он ведет себя непредсказуемо, не соблюдая порядок по выражению.

Хотя я не думаю, что проблема связана с самим кодом, поскольку он немного сложен, я его вставлю.

public class GroupStates extends UDF
{
// Related to each component
HashMap<String, Long> last_time = 
new HashMap<String, Long>();

HashMap<String, Integer> last_value = 
new HashMap<String, Integer>();

HashMap<String, Long> time_last_start = 
new HashMap<String, Long>();

HashMap<String, String> working_type = 
new HashMap<String, String>();
public  Text evaluate( Integer bit,  Long time,  Long next_time,Integer next_bit, Integer acceptance_time, String component)
{  
Object[] result = new Object[4];
String estado = new String();

if (next_time==null || next_bit==null)
{
    next_bit=null;
    next_time=null;
}

if(bit==null)
{
    result[0]=new Text("No state");

}
else 
{
    if(bit==1 && (

                (
                ( next_time == null ||  ((next_time-time)) > acceptance_time )
                &&
                ( last_time.get(component) == null  || ((time-last_time.get(component))) > acceptance_time  ) 
                )

                ||
                (
                (last_value.get(component)!=null) && 
                last_value.get(component)==0 && ((time-last_time.get(component))) <=acceptance_time && 
                    (next_time == null ||
                    (next_time-time) > acceptance_time)
                )
                ||
                (
                        (next_bit!=null) && 
                        (       next_bit==0 && ((next_time-time)) <= acceptance_time && 
                        (   (last_time.get(component) == null ||
                            ((time-last_time.get(component))) > acceptance_time )

                        )
                        )
                )
                )
            )
         { estado= "isolated";
        result[0]=new Text("isolated");}
    else if 
(

        bit==1 && 
        (
        last_value.get(component) != null &&    
        last_value.get(component)==0 && ((time-last_time.get(component)) ) <=acceptance_time)
){  estado= "Start";
    result[0]=new Text("Start");}
else if 
(
        bit==0 && 
        ( last_value.get(component) != null && 
        last_value.get(component)==1 && ((time-last_time.get(component)) ) <=acceptance_time )
){estado= "End";
    result[0]=new Text("End");} 
else if 
(
        bit==1 && (last_time.get(component)==null ||  ((time-last_time.get(component)) ) > acceptance_time  )
){estado= "No info start";
    result[0]=new Text("No info start");}
 else if 
(
        bit==1 && (next_bit==null || ((next_time-time) ) > acceptance_time  )
){estado= "No info End";
    result[0]=new Text("No info End");} 
else if
(bit==1 ){
    result[0]=new Text("working");}
else if
(bit==0 ){
    result[0]=new Text("not working");}
    // Actualizar valores
    last_value.pcomponent(component,bit);
    last_time.pcomponent(component,time);
}

if (estado.equals("isolated"))
{ result[1]= new LongWritable(1);
// Podria ser freq. muestreo, nuevo parametro
result[2]= new Text("isolated");
  result[3]= new LongWritable(time);
} 

else if ( 
 estado.equals("Start") || 
 estado.equals("No info start")
  ){
    time_last_start.pcomponent(component,time);
    working_type.pcomponent(component,estado);
    result[3]=new LongWritable(time);
    }
else if ( 
        estado.equals("End") || 
        estado.equals("No info End")
          ){
    if (time_last_start.get(component)!=null) 
    {
        result[3]=new LongWritable(time_last_start.get(component));
        result[1]= new LongWritable((time-time_last_start.get(component)));
        result[2]= new Text(working_type.get(component)+"-"+estado); // Mark end type
        time_last_start.pcomponent(component,null);
    }
            }
else 
    if (time_last_start.get(component) == null)
    {
        result[3] =null;
    }
    else
    result[3]=new LongWritable(time_last_start.get(component));

String resultado="";
for (int i=0;i<4;i++)
{
if (i==3)
resultado=resultado+String.valueOf(result[i]);
else
    resultado=resultado+String.valueOf(result[i])+";";
}
return new Text(resultado);
}}

В примере с 3-секундным временем приема я иногда случайно получаю что-то вроде этого:

Time | State      | Group    | Duration
_________________________________________
  1    Not working  null        null
  2    Start        2           null
  3    Working      2           null
  4    Working      11          null
  5    End          11          -7
  6    Not working  7           null
  7    Start        7           null
  8    Working      7           null
  9    No state     7           null
  10   No state     7           null
  11   Working      7           null
  12   End          7           7

Как видите, кажется, что UDF не соблюдает порядок, а «распространение» значений производится случайным образом. Я не уверен, что это так, потому что impala неправильно распределяет данные между узлами при использовании java udf. Кроме того, я обычно вызываю udf в одном и том же списке несколько раз для разных компонентов:

select my_udf(actual_value_comp1, actual_time_comp1, next_time_comp1, next_value_comp1, time_acceptance, component1), my_udf(actual_value_comp2, actual_time_comp2, next_time_comp2, next_value_comp2, time_acceptance, component2), my_udf(actual_value_comp3, actual_time_comp3, next_time_comp3, next_value_comp3, time_acceptance, component3) from( select time, component1, component2, component3 from mytable order by time limit 9999999999 )a;

Может ли кто-нибудь объяснить мне, почему такое поведение происходит случайным образом?

...