Я сделал UDF в Java, цель которого состоит в том, чтобы сгруппировать строки между началом работы компонента (1 рабочий-0 не работает). После этого он скажет, сколько длилось событие (помимо всего прочего). Кроме того, я могу установить время принятия для нулевых значений между '1', поэтому в случае потери некоторых значений в середине работы, я могу сказать, хочу ли я считать его работающим-неработающим.
Давайте предположим, что эти данные упорядочены во времени
Time | Component | Value
___________________________
1 Hvac 0
2 Hvac 1
3 Hvac 1
4 Hvac 1
5 Hvac 0
6 Hvac 0
7 Hvac 1
8 Hvac 1
9 Hvac null
10 Hvac null
11 Hvac 1
12 Hvac 0
При времени принятия 3 он вернется:
Time | State | Group | Duration
_________________________________________
1 Not working null null
2 Start 2 null
3 Working 2 null
4 Working 2 null
5 End 2 3
6 Not working null null
7 Start 7 null
8 Working 7 null
9 No state 7 null
10 No state 7 null
11 Working 7 null
12 End 7 5
Принимая во внимание, что при времени принятия 1 он вернется:
Time | State | Group | Duration
_________________________________________
1 Not working null null
2 Start 2 null
3 Working 2 null
4 Working 2 null
5 End 2 3
6 Not working null null
7 Start 7 null
8 End 7 1
9 No state null null
10 No state null null
11 Start 11 null
12 End 11 1
Процедура вызова UDF на импале следующая:
select my_udf(actual_value, actual_time, next_time, next_value, time_acceptance, component)
from(
/*
We have to make this so that impala orders data in time. Limit clause is necessary in impala so that it does not ignore the order by
*/
select time, component
from mytable
order by time
limit 9999999999
)a;
Я видел, что UDF работает правильно в 97% случаев более или менее, но иногда он ведет себя непредсказуемо, не соблюдая порядок по выражению.
Хотя я не думаю, что проблема связана с самим кодом, поскольку он немного сложен, я его вставлю.
public class GroupStates extends UDF
{
// Related to each component
HashMap<String, Long> last_time =
new HashMap<String, Long>();
HashMap<String, Integer> last_value =
new HashMap<String, Integer>();
HashMap<String, Long> time_last_start =
new HashMap<String, Long>();
HashMap<String, String> working_type =
new HashMap<String, String>();
public Text evaluate( Integer bit, Long time, Long next_time,Integer next_bit, Integer acceptance_time, String component)
{
Object[] result = new Object[4];
String estado = new String();
if (next_time==null || next_bit==null)
{
next_bit=null;
next_time=null;
}
if(bit==null)
{
result[0]=new Text("No state");
}
else
{
if(bit==1 && (
(
( next_time == null || ((next_time-time)) > acceptance_time )
&&
( last_time.get(component) == null || ((time-last_time.get(component))) > acceptance_time )
)
||
(
(last_value.get(component)!=null) &&
last_value.get(component)==0 && ((time-last_time.get(component))) <=acceptance_time &&
(next_time == null ||
(next_time-time) > acceptance_time)
)
||
(
(next_bit!=null) &&
( next_bit==0 && ((next_time-time)) <= acceptance_time &&
( (last_time.get(component) == null ||
((time-last_time.get(component))) > acceptance_time )
)
)
)
)
)
{ estado= "isolated";
result[0]=new Text("isolated");}
else if
(
bit==1 &&
(
last_value.get(component) != null &&
last_value.get(component)==0 && ((time-last_time.get(component)) ) <=acceptance_time)
){ estado= "Start";
result[0]=new Text("Start");}
else if
(
bit==0 &&
( last_value.get(component) != null &&
last_value.get(component)==1 && ((time-last_time.get(component)) ) <=acceptance_time )
){estado= "End";
result[0]=new Text("End");}
else if
(
bit==1 && (last_time.get(component)==null || ((time-last_time.get(component)) ) > acceptance_time )
){estado= "No info start";
result[0]=new Text("No info start");}
else if
(
bit==1 && (next_bit==null || ((next_time-time) ) > acceptance_time )
){estado= "No info End";
result[0]=new Text("No info End");}
else if
(bit==1 ){
result[0]=new Text("working");}
else if
(bit==0 ){
result[0]=new Text("not working");}
// Actualizar valores
last_value.pcomponent(component,bit);
last_time.pcomponent(component,time);
}
if (estado.equals("isolated"))
{ result[1]= new LongWritable(1);
// Podria ser freq. muestreo, nuevo parametro
result[2]= new Text("isolated");
result[3]= new LongWritable(time);
}
else if (
estado.equals("Start") ||
estado.equals("No info start")
){
time_last_start.pcomponent(component,time);
working_type.pcomponent(component,estado);
result[3]=new LongWritable(time);
}
else if (
estado.equals("End") ||
estado.equals("No info End")
){
if (time_last_start.get(component)!=null)
{
result[3]=new LongWritable(time_last_start.get(component));
result[1]= new LongWritable((time-time_last_start.get(component)));
result[2]= new Text(working_type.get(component)+"-"+estado); // Mark end type
time_last_start.pcomponent(component,null);
}
}
else
if (time_last_start.get(component) == null)
{
result[3] =null;
}
else
result[3]=new LongWritable(time_last_start.get(component));
String resultado="";
for (int i=0;i<4;i++)
{
if (i==3)
resultado=resultado+String.valueOf(result[i]);
else
resultado=resultado+String.valueOf(result[i])+";";
}
return new Text(resultado);
}}
В примере с 3-секундным временем приема я иногда случайно получаю что-то вроде этого:
Time | State | Group | Duration
_________________________________________
1 Not working null null
2 Start 2 null
3 Working 2 null
4 Working 11 null
5 End 11 -7
6 Not working 7 null
7 Start 7 null
8 Working 7 null
9 No state 7 null
10 No state 7 null
11 Working 7 null
12 End 7 7
Как видите, кажется, что UDF не соблюдает порядок, а «распространение» значений производится случайным образом. Я не уверен, что это так, потому что impala неправильно распределяет данные между узлами при использовании java udf. Кроме того, я обычно вызываю udf в одном и том же списке несколько раз для разных компонентов:
select
my_udf(actual_value_comp1, actual_time_comp1, next_time_comp1, next_value_comp1, time_acceptance, component1),
my_udf(actual_value_comp2, actual_time_comp2, next_time_comp2, next_value_comp2, time_acceptance, component2),
my_udf(actual_value_comp3, actual_time_comp3, next_time_comp3, next_value_comp3, time_acceptance, component3)
from(
select time, component1, component2, component3
from mytable
order by time
limit 9999999999
)a;
Может ли кто-нибудь объяснить мне, почему такое поведение происходит случайным образом?