Я сделал UDF в Java, цель которого состоит в том, чтобы сгруппировать строки между началом работы компонента (1 рабочий-0 не работает). После этого он скажет, сколько длилось событие (помимо всего прочего). Кроме того, я могу установить время принятия для нулевых значений между '1', поэтому в случае потери некоторых значений в середине работы, я могу сказать, хочу ли я считать его работающим-неработающим.
Давайте предположим, что эти данные упорядочены во времени
Time | Component | Value
1 Hvac 0
2 Hvac 1
3 Hvac 1
4 Hvac 1
5 Hvac 0
6 Hvac 0
7 Hvac 1
8 Hvac 1
9 Hvac null
10 Hvac null
11 Hvac 1
12 Hvac 0
При времени принятия 3 он вернется:
Time | State | Group | Duration
1 Not working null null
2 Start 2 null
3 Working 2 null
4 Working 2 null
5 End 2 3
6 Not working null null
7 Start 7 null
8 Working 7 null
9 No state 7 null
10 No state 7 null
11 Working 7 null
12 End 7 5
Принимая во внимание, что при времени принятия 1 он вернется:
Time | State | Group | Duration
1 Not working null null
2 Start 2 null
3 Working 2 null
4 Working 2 null
5 End 2 3
6 Not working null null
7 Start 7 null
8 End 7 1
9 No state null null
10 No state null null
11 Start 11 null
12 End 11 1
Процедура вызова UDF на импале следующая:
select my_udf(actual_value, actual_time, next_time, next_value, time_acceptance, component)
We have to make this so that impala orders data in time. Limit clause is necessary in impala so that it does not ignore the order by
select time, component
from mytable
order by time
limit 9999999999
Я видел, что UDF работает правильно в 97% случаев более или менее, но иногда он ведет себя непредсказуемо, не соблюдая порядок по выражению.
Хотя я не думаю, что проблема связана с самим кодом, поскольку он немного сложен, я его вставлю.
public class GroupStates extends UDF
// Related to each component
HashMap<String, Long> last_time =
new HashMap<String, Long>();
HashMap<String, Integer> last_value =
new HashMap<String, Integer>();
HashMap<String, Long> time_last_start =
new HashMap<String, Long>();
HashMap<String, String> working_type =
new HashMap<String, String>();
public Text evaluate( Integer bit, Long time, Long next_time,Integer next_bit, Integer acceptance_time, String component)
Object[] result = new Object[4];
String estado = new String();
if (next_time==null || next_bit==null)
result[0]=new Text("No state");
if(bit==1 && (
( next_time == null || ((next_time-time)) > acceptance_time )
( last_time.get(component) == null || ((time-last_time.get(component))) > acceptance_time )
(last_value.get(component)!=null) &&
last_value.get(component)==0 && ((time-last_time.get(component))) <=acceptance_time &&
(next_time == null ||
(next_time-time) > acceptance_time)
(next_bit!=null) &&
( next_bit==0 && ((next_time-time)) <= acceptance_time &&
( (last_time.get(component) == null ||
((time-last_time.get(component))) > acceptance_time )
{ estado= "isolated";
result[0]=new Text("isolated");}
else if
bit==1 &&
last_value.get(component) != null &&
last_value.get(component)==0 && ((time-last_time.get(component)) ) <=acceptance_time)
){ estado= "Start";
result[0]=new Text("Start");}
else if
bit==0 &&
( last_value.get(component) != null &&
last_value.get(component)==1 && ((time-last_time.get(component)) ) <=acceptance_time )
){estado= "End";
result[0]=new Text("End");}
else if
bit==1 && (last_time.get(component)==null || ((time-last_time.get(component)) ) > acceptance_time )
){estado= "No info start";
result[0]=new Text("No info start");}
else if
bit==1 && (next_bit==null || ((next_time-time) ) > acceptance_time )
){estado= "No info End";
result[0]=new Text("No info End");}
else if
(bit==1 ){
result[0]=new Text("working");}
else if
(bit==0 ){
result[0]=new Text("not working");}
// Actualizar valores
if (estado.equals("isolated"))
{ result[1]= new LongWritable(1);
// Podria ser freq. muestreo, nuevo parametro
result[2]= new Text("isolated");
result[3]= new LongWritable(time);
else if (
estado.equals("Start") ||
estado.equals("No info start")
result[3]=new LongWritable(time);
else if (
estado.equals("End") ||
estado.equals("No info End")
if (time_last_start.get(component)!=null)
result[3]=new LongWritable(time_last_start.get(component));
result[1]= new LongWritable((time-time_last_start.get(component)));
result[2]= new Text(working_type.get(component)+"-"+estado); // Mark end type
if (time_last_start.get(component) == null)
result[3] =null;
result[3]=new LongWritable(time_last_start.get(component));
String resultado="";
for (int i=0;i<4;i++)
if (i==3)
return new Text(resultado);
В примере с 3-секундным временем приема я иногда случайно получаю что-то вроде этого:
Time | State | Group | Duration
1 Not working null null
2 Start 2 null
3 Working 2 null
4 Working 11 null
5 End 11 -7
6 Not working 7 null
7 Start 7 null
8 Working 7 null
9 No state 7 null
10 No state 7 null
11 Working 7 null
12 End 7 7
Как видите, кажется, что UDF не соблюдает порядок, а «распространение» значений производится случайным образом. Я не уверен, что это так, потому что impala неправильно распределяет данные между узлами при использовании java udf. Кроме того, я обычно вызываю udf в одном и том же списке несколько раз для разных компонентов:
my_udf(actual_value_comp1, actual_time_comp1, next_time_comp1, next_value_comp1, time_acceptance, component1),
my_udf(actual_value_comp2, actual_time_comp2, next_time_comp2, next_value_comp2, time_acceptance, component2),
my_udf(actual_value_comp3, actual_time_comp3, next_time_comp3, next_value_comp3, time_acceptance, component3)
select time, component1, component2, component3
from mytable
order by time
limit 9999999999
Может ли кто-нибудь объяснить мне, почему такое поведение происходит случайным образом?