весенняя загрузка обрабатывает большое тело запроса - обрабатывает большую часть через многопоточность - PullRequest
2 голосов
/ 18 июня 2019

У меня есть один из контроллеров пружинной загрузки, который принимает многих пользователей, как показано ниже

пример json

{
  "users": [
    { "name":"john", "age":18, "type":"1"},
    { "name":"kim", , "age":18, "type":"2"},
    { "name":"Fits", "age":18, "type","3"},
  ]
 }

обработчик запросов

@RequestMapping(value = "/users", method = RequestMethod.POST, headers = "Accept=application/json")
public void Add(@RequestBody List<user> users) throws Exception {

 // Here I am iterating users and writing one by one to different message topic based on the type
 // if any error in the given user while writing to message topic I am storing that user in other DB


}

, он хорошо работает, когда у меня есть некоторые100 пользователей в списке пользователей, но если список большой, например 1000 и т. Д., Это займет слишком много времени.так есть ли весеннее пакетное задание, где я могу назначить это для выполнения этого?

Я просто хочу вернуть HTTP-код ответа 202 для запроса и назначить эту полезную нагрузку весеннему пакетному заданию

Ответы [ 2 ]

1 голос
/ 19 июня 2019

Один из вариантов - использовать Spring Async Task для длительно запущенных процессов в отдельном потоке, поэтому не будет ждать выполнения всего запроса и отправки ответа обратно.

Сначала настройте асинхронную задачу следующим образом.

@Configuration
@EnableAsync
public class AsynchTaskConfiguration{

    @Bean
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(2);
        executor.setQueueCapacity(500);
        executor.setThreadNamePrefix("ProcessUsers-");
        executor.initialize();
        return executor;
    }
}

И здесь вы можете использовать асинхронную задачу в вашем сервисе для обработки пользователей

@Service
public class UserProcessingService {


    private final AnotherUserProcessingService service;
    @Autowired
    public UserProcessingService (AnotherUserProcessingService service) {
        this.service= service;
    }

    @Async
    public CompletableFuture<List<User>> processUser(List<User> users) throws InterruptedException {

        users.forEach(user -> logger.info("Processing " + user));
        List<User> usersListResult = service.process(users);
        // Artificial delay of 1s for demonstration purposes
        Thread.sleep(1000L);
        return CompletableFuture.completedFuture(usersListResult);
    }

}

processUser(User user) помечено @Async, указывающим, что метод будет выполняться в отдельном потоке в соответствии с конфигурацией taskExecutor, представленной выше. @EnableAsync включить Spring для запуска любого метода в фоновом потоке с аннотацией @Async. И убедитесь, что ваша служба, использующая асинхронную задачу для обработки пользователей, должна быть создана внутри класса @Configuration или забрана @ComponentScan. Вы можете настроить taskExecutor в соответствии со своими потребностями.

Здесь вы можете найти, как ThreadPoolTaskExecutor работает.

0 голосов
/ 20 июня 2019

https://github.com/softnrajkumar1994/multithreading-example

Важно

1) Вместо 1000 пользователей в одном запросе, пожалуйста, отправьте список этих пользователей в виде маленьких кусочков

public class ThreadManager {

    private static ThreadPoolExecutor stpe = null;


    static {
         /**
         *
         *  corePoolSize --->the number of threads to keep in the pool, even
         *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
         *        
         *  maximumPoolSize --- >the maximum number of threads to allow in the
         *        pool
         *        
         *  keepAliveTime---> when the number of threads is greater than
         *        the core, this is the maximum time that excess idle threads
         *        will wait for new tasks before terminating.
         *        
         *  unit the time unit for the {@code keepAliveTime} argument
         *  
         *  workQueue the queue to use for holding tasks before they are
         *        executed.  This queue will hold only the {@code Runnable}
         *        tasks submitted by the {@code execute} method.

         */
        stpe = new ThreadPoolExecutor(5, 10, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1090));
        System.out.println("THREAD MANAGER INTIALIZED SUCCESSFULLY");
    }

    public static void execute(Runnable task) {
        stpe.execute(task);
    }
}

Приведенный выше класс получит выполняемую задачу и выполнится с незанятыми потоками пула потоков.

Пример класса пользователя:

 public class User {

    private String name;
    private String mobile;
    private String email;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getMobile() {
        return mobile;
    }

    public void setMobile(String mobile) {
        this.mobile = mobile;
    }

    public String getEmail() {
        return email;
    }

    public void setEmail(String email) {
        this.email = email;
    }

}

@RestController
public class UserController {

    @PostMapping("/users")
    public void Add(@RequestBody List<User> users) throws Exception {
        /**
         * Here we are rotating user's list and assigning each and every user into a
         * separate worker thread, so that work will be done parallely
         */
        for (User user : users) {
            try {
                ThreadManager.execute(new UserWork(user));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

}

Пользовательский исполняемый рабочий класс для работы с пользовательским объектом. Реализация бизнес-процесса в управляемом методе.

  public class UserWork implements Runnable {



    private User user;

    public UserWork(User user) {
        this.user = user;
    }

    @Override
    public void run() {
        // Please add your businees logic here
// Here I am iterating users and writing one by one to different message topic based on the type
 // if any error in the given user while writing to message topic I am storing that user in other DB
    }

}
...