Я пишу тестовый код для функции processElement
в Apache Flink 1.4:
public class ProcessFunctionClass {
public void processElement(Tuple2<String, String> tuple2, Context context, Collector<Tuple2<String, String>> collector) {
// if the state is empty, start a timer
if (listState.get().iterator().hasNext() == false)
context.timerService().registerEventTimeTimer(1000);
listState.add("someStringToBeStored");
// ...
}
}
public class ProcessFunctionClassTest {
private ProcessFunctionClass processFunctionClass;
@Mock
private ListState<String> listState;
@Before
public void setUp() throws Exception {
processFunctionClass = new ProcessFunctionClass();
}
@Test
public void testProcessElement() {
ListState mockListState = mock(ListState.class);
Iterable mockIterable = mock(Iterable.class);
Iterator mockIterator = mock(Iterator.class);
MockitoAnnotations.initMocks(this);
when(tDPListState.get()).thenReturn(mockIterable);
when(tDPListState.get().iterator()).thenReturn(mockIterator);
when(tDPListState.get().iterator().hasNext()).thenReturn(false);
processFunctionClass.processElement(tuple2, context, collector);
// verify(...)
}
}
Когда я отлаживаю с помощью своей IDE, непосредственно перед тем, как войти в processElement()
Метод listState
не null
и, похоже, был успешно издеваться, но как только я добираюсь до listState.get().iterator().hasNext()
, listState
становится null
, и я получаю исключение NullPointerException. Что я здесь делаю не так?