Я должен создать тестовые случаи для метода, который получает блокировку с помощью zookeeper и данные обрабатываются с использованием CompletableFuture.
Ниже приведен код высокого уровня:
import lombok.Data;
@Data
public class ConfigurationsIntegrationModel {
public enum InteractionType {
TEST,
DEV;
}
private InteractionType interactionType;
private String lockName;
}
import org.springframework.stereotype.Service;
import java.util.Arrays;
import java.util.List;
@Service("configurationsIntegrationService")
public interface ConfigurationsIntegrationService {
public default List<ConfigurationsIntegrationModel> getRecords(ConfigurationsIntegrationModel.InteractionType integrationType) {
return Arrays.asList(getDynamicIntegrationConfigurationMock(integrationType));
}
private static ConfigurationsIntegrationModel getDynamicIntegrationConfigurationMock(ConfigurationsIntegrationModel.InteractionType integrationType) {
ConfigurationsIntegrationModel configurationsIntegration = new ConfigurationsIntegrationModel();
configurationsIntegration.setLockName("Test_Lock");
configurationsIntegration.setInteractionType(integrationType);
return configurationsIntegration;
}
}
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.apache.curator.retry.RetryNTimes;
import java.util.concurrent.TimeUnit;
public class DistributedLockProcessor {
private CuratorFramework client;
private String path;
public DistributedLockProcessor(String host, String path) {
RetryPolicy retryPolicy = new RetryNTimes(5, 90);
client = CuratorFrameworkFactory.newClient(host, retryPolicy);
client.start();
}
public InterProcessLock acquire(String lockName) throws Exception {
InterProcessSemaphoreMutex sharedLock = new InterProcessSemaphoreMutex(client, path + "/" + lockName);
if (!sharedLock.acquire(0, TimeUnit.SECONDS)) {
return null;
}
return sharedLock;
}
public boolean release(InterProcessLock sharedLock) throws Exception {
sharedLock.release();
return true;
}
}
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
public class LockingExecutorProcessor<T> {
private Executor executor = null;
private DistributedLockProcessor distributedLock = null;
public LockingExecutorProcessor(String host, String path, int executorCount) {
executor = Executors.newFixedThreadPool(executorCount);
distributedLock = new DistributedLockProcessor(host, path);
}
public void process(List<String> locks, List<T> items, Consumer<T> consumer) throws ExecutionException, InterruptedException {
final List<CompletableFuture<Void>> completableFutures = new ArrayList<>();
for (int i = 0; i < locks.size(); i++) {
final int record = i;
CompletableFuture<Void> future =
CompletableFuture.runAsync(
() -> {
InterProcessLock interProcessLock = null;
try {
interProcessLock = distributedLock.acquire(locks.get(record));
} catch (Exception e) {
e.printStackTrace();
}
if (interProcessLock != null) {
consumer.accept(items.get(record));
}
}, executor);
completableFutures.add(future);
}
CompletableFuture<Void> completable = CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[completableFutures.size()]));
completable.get();
}
}
import org.springframework.stereotype.Service;
import java.util.Arrays;
import java.util.List;
@Service("messageService")
public interface MessageService {
public default List<String> getMessagesList(ConfigurationsIntegrationModel.InteractionType integrationType) {
return Arrays.asList("Message1", "Message2", "Message3","Message4");
}
}
import com.google.common.util.concurrent.RateLimiter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
@Component("sampleJob")
public class SampleJob {
@Autowired
private LockingExecutorProcessor<ConfigurationsIntegrationModel> lockingExecutorProcessor;
@Autowired
private ConfigurationsIntegrationService configurationsIntegrationService;
@Autowired
private RateLimiter rateLimiter;
@Autowired
private MessageService messageService;
private List<String> getLockNames(List<ConfigurationsIntegrationModel> integrationConfigurations) {
List<String> lockNames = new ArrayList<>();
for (ConfigurationsIntegrationModel integrationConfiguration : integrationConfigurations) {
lockNames.add(integrationConfiguration.getLockName());
}
return lockNames;
}
@Scheduled(fixedRateString = "100")
public void executeJob() throws ExecutionException, InterruptedException {
List<ConfigurationsIntegrationModel> testRecords = configurationsIntegrationService.getRecords(ConfigurationsIntegrationModel.InteractionType.TEST);
List<String> lockNames = getLockNames(testRecords);
lockingExecutorProcessor.process(
lockNames,
testRecords,
recordsConfig -> {
final List<CompletableFuture<Void>> completableFutures = new ArrayList<>();
List<String> msgList = messageService.getMessagesList(recordsConfig.getInteractionType());
for (String message : msgList) {
completableFutures.add(
CompletableFuture.runAsync(
() -> {
System.out.println("Message is @@@@ "+ message);
}, Executors.newFixedThreadPool(10)));
}
});
}
}
Нижеэто тестовый пример, который я пробовал до сих пор:
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.concurrent.ExecutionException;
@RunWith(MockitoJUnitRunner.class)
public class SampleJobTest {
@InjectMocks
private SampleJob sampleJob = new SampleJob();
@Mock
private ConfigurationsIntegrationService configurationsIntegrationService;
@Mock
private MessageService messageService;
@Mock
private LockingExecutorProcessor<ConfigurationsIntegrationModel> lockingExecutorProcessor;
@Test
public void testSampleJob() throws ExecutionException, InterruptedException {
Mockito.doCallRealMethod().when(lockingExecutorProcessor).process(Mockito.any(), Mockito.any(), Mockito.any());
Mockito.doCallRealMethod().when(configurationsIntegrationService).getRecords(Mockito.any());
Mockito.doCallRealMethod().when(messageService).getMessagesList(Mockito.any());
sampleJob.executeJob();
}
}
Когда я отлаживаю код, он разрывается в строке CompletableFuture.runAsync из LockingExecutorProcessor и выдает ошибку нулевого указателя;причина в том, что объект распределенного локатора является нулевым;
Как мы можем сделать это и как подключиться к тестовому серверу zookeeper вместо оригинального, чтобы гарантировать, что блокировка работает нормально