Прежде всего, вы должны придерживаться HTTP Protocol и запускать фоновые задачи только с помощью POST
операций.Вы, вероятно, не хотите, чтобы сканер запускал длительный фоновый процесс с помощью GET
запросов, не так ли?
Таким образом, вы также должны использовать HTTP-заголовок Location
для возврата URIИз ресурса можно получить дополнительную информацию о текущем состоянии процесса.Я бы также использовал общий URI, а не какое-то перенаправление.
В вашей настройке маршрута я обычно храню все зависящие от маршрута вещи в блоке .route()
.Мы поддерживаем процесс сборки каталога и систему архивирования сообщений EDI, которая собирает сообщения, отправленные и / или полученные в определенный период времени, в соответствии с немецким законодательством, обязывающим клиентов создавать резервные копии своих обмененных сообщений EDI.
Мы разделяем запуск новогоархивирование или сборка запроса и получение текущего состояния запроса.
rest("/archives")
.post()
.bindingMode(RestBindingMode.json)
.type(ArchiveRequestSettings.class)
.consumes(MediaType.APPLICATION_JSON)
.produces(MediaType.APPLICATION_JSON)
.description("Invokes the generation of a new message archive for
"messages matching a criteria contained in the payload")
.route().routeId("create-archives")
// Extract the IP address of the user who invokes the service
.bean(ExtractClientIP.class)
// Basic Authentication
.bean(SpringSecurityContextLoader.class).policy(authorizationPolicy)
// check the amount of requests received within a certain time-period
.bean(receivedRequestFilter)
// extract specified settings
.bean(ExtractArchiveRequestSettings.class)
// forward the task to the archive generation queue
.to(SomeEndpoints.ARCHIVE_GENERATION_QUEUE)
// return 202 Accepted response
.bean(ReturnArchiveRequestCreatedStatus.class)
.endRest()
.get("/{archiveId}")
.bindingMode(RestBindingMode.json)
.outType(ArchiveRequestEntity.class)
.produces(MediaType.APPLICATION_JSON)
.description("Returns the status of the message archive generation process."
+ " If the process has finished this operation will return the"
+ " link to the download location of the generated archive")
.route().routeId("archive-status")
// Extract the IP address of the user who invokes the service
.bean(ExtractClientIP.class)
// Basic Authentication
.bean(SpringSecurityContextLoader.class).policy(authorizationPolicy)
// check the amount of requests received within a certain time-period
.bean(receivedRequestFilter)
// return the current state of the task to the client. If the job is done,
// the response will also include a download link as wel as an MD5 hash to
// verify the correctness of the downloaded archive
.bean(ReturnArchiveRequestStatus.class)
.endRest();
Класс ExtractArchiveRequestSettings
просто выполняет проверки работоспособности полученной полезной нагрузки и устанавливает значения по умолчанию для пропущенных полей.После этого запрос сохраняется в базе данных, а его уникальный идентификатор сохраняется в заголовке.
ArchiveRequestSetting выглядит как пример ниже (слегка упрощенный)
@Getter
@Setter
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
public class ArchiveRequestSettings {
/** Specifies if sent or received messages should be included in the artifact. Setting this field
* to 'DELIVERED' will include only delivered documents where the companyUuid of the requesting
* user matches the documents sender identifier. Specifying this field as 'RECEIVED' will include
* only documents whose receiver identifier matches the companyUuid of the requesting user. **/
private String direction;
/** The naming schema of entries within the archive **/
private String entryPattern;
/** The upper timestamp bound to include messages. Entries older than this value will be omitted **/
@JsonSerialize(using = Iso8601DateSerializer.class)
@JsonDeserialize(using = Iso8601DateDeserializer.class)
private Date from;
/** The lower timestamp bound to include messages. Entries younger than this value will be
* omitted. If left empty this will include even the most recent messages. **/
@JsonSerialize(using = Iso8601DateSerializer.class)
@JsonDeserialize(using = Iso8601DateDeserializer.class)
private Date till;
}
Класс ReturnArchiveRequestCreatedStatus
ищет класссохраненный объект запроса и возвращает его с ответом 202 Accepted
.
@Handler
public void returnStatus(Exchange exchange) {
String archiveId = exchange.getIn().getHeader(HeaderConstants.ARCHIVES_REQUEST_ID, String.class);
ArchiveRequestEntity archive = repository.findOne(archiveId);
Message msg = new DefaultMessage(exchange.getContext());
msg.setHeader(Exchange.HTTP_RESPONSE_CODE, 202); // Accepted
msg.setHeader(Exchange.CONTENT_TYPE, "application/json; charset=\"utf-8\"");
msg.setHeader("Location", archiveLocationUrl + "/" + archiveId);
msg.setBody(archive);
exchange.setOut(msg);
}
Возвращение текущего состояния сохраненного запроса гарантирует, что клиент может проверить, какие параметры фактически были применены, и может обновить их, если какой-либо из них по умолчаниюнастройки неудобны, или необходимо применить дальнейшие изменения.
Фактический процесс резервного копирования начинается с отправки обмена в очередь Redis, которая используется на другом компьютере.Результатом этого процесса будет архив, содержащий запрошенные файлы, которые загружены в общедоступное местоположение, и в объекте запроса будет сохранена только ссылка.Обратите внимание, что у нас есть специальный компонент верблюда, который имитирует точку входа seda только для очередей Redis.Использование seda
должно быть достаточным для запуска обработки задачи в другом потоке.
В зависимости от текущего состояния процесса поддержки сохраненная сущность запроса будет обновлена процессом поддержки.При получении запроса о состоянии (через GET
) к хранилищу данных запрашивается текущее состояние и сопоставляется с определенными ответами:
public class ReturnArchiveRequestStatus {
@Resource
private ArchiveRequestRepository repository;
@Handler
public void returnArchiveStatus(Exchange exchange) throws JSONException {
String archiveId = exchange.getIn().getHeader("archiveId", String.class);
if (StringUtils.isBlank(archiveId)) {
badRequest(exchange);
return;
}
ArchiveRequestEntity archive = repository.findOne(archiveId);
if (null == archive) {
notFound(archiveId, exchange);
return;
}
ok(archive, exchange);
}
private void badRequest(Exchange exchange) throws JSONException {
Message msg = new DefaultMessage(exchange.getContext());
msg.setHeader(Exchange.HTTP_RESPONSE_CODE, 400);
msg.setHeader(Exchange.CONTENT_TYPE, "application/json; charset=\"utf-8\"");
msg.setFault(false);
JSONObject json = new JSONObject();
json.put("status", "ERROR");
json.put("message", "No archive identifier found");
msg.setBody(json.toString());
exchange.setOut(msg);
}
private void notFound(String archiveId, Exchange exchange) throws JSONException {
Message msg = new DefaultMessage(exchange.getContext());
msg.setHeader(Exchange.HTTP_RESPONSE_CODE, 403);
msg.setHeader(Exchange.CONTENT_TYPE, "application/json; charset=\"utf-8\"");
msg.setFault(false);
JSONObject json = new JSONObject();
json.put("status", "ERROR");
json.put("message", "Could not find pending archive process with ID " + archiveId);
msg.setBody(json.toString());
exchange.setOut(msg);
}
private void ok(UserArchiveRequestEntity archive, Exchange exchange) throws JSONException {
Message msg = new DefaultMessage(exchange.getContext());
msg.setHeader(Exchange.HTTP_RESPONSE_CODE, 200);
msg.setHeader(Exchange.CONTENT_TYPE, "application/json; charset=\"utf-8\"");
msg.setFault(false);
msg.setBody(archive);
exchange.setOut(msg);
}
}
Фактическая сущность, сохраненная и обновленная в течение всего процесса, просматривает что-то вроде строки (упрощенный):
@Getter
@Setter
@Builder
@ToString
@Document(collection = "archive")
@JsonInclude(JsonInclude.Include.NON_EMPTY)
public class ArchiveRequestEntity {
/**
* The current state of the archiving process
*/
public enum State {
/** The request to create an archive was cued but not yet processed **/
QUEUED,
/** The archive is currently under construction **/
RUNNING,
/** The archive was generated successfully. {@link #downloadUrl} should contain the link the
* archive can be found **/
FINISHED,
/** Indicates that the archive generation failed. {@link #error} should indicate the actual
* reason why the request failed **/
FAILED
}
@Id
@JsonIgnore
private String id;
/** Timestamp the process was triggered **/
@JsonIgnore
@Indexed(expireAfterSeconds = DEFAULT_EXPIRE_TIME)
private Date timestamp = new Date();
/** The identifier of the company to create the archive for **/
private String companyUuid;
/** The state this archive is currently in **/
private State state = State.QUEUED;
...
/** Marks the upper limit to include entries to the archive. Entries older then this field will
* not be included in the archives while entries equal or younger than this timestamp will be
* included unless they are younger than {@link #till} timestamp **/
@JsonFormat(pattern = "yyyy-MM-dd'T'HH:mm:ssXX")
private Date from;
/** Marks the lower limit to include entries to the archive. Entries younger than this field will
* not be included in the archive **/
@JsonFormat(pattern = "yyyy-MM-dd'T'HH:mm:ssXX")
private Date till;
/** Information on why the archive creation failed **/
private String error;
/** The URL of the final archive to download **/
private String downloadUrl;
/** The MD5 Hash of the final artifact in order to guarantee clients an unmodified version of the
* archive **/
private String md5Hash;
...
}
Обратите внимание, что независимо от текущего состояния статуса обработки возвращается 200 OK
с текущим представлением JSON состояния процессов.Клиент увидит либо состояние FINISHED
с установленными свойствами downloadUrl
и md5Hash
, либо другой статус с еще другими доступными свойствами.
Процесс поддержки, разумеется, должен обновить статус запросасоответственно, в противном случае клиент не получит правильную информацию о текущем состоянии запроса.
Этот подход должен применяться практически к любым длительным процессам, хотя внутренняя часть передаваемой вами информации, вероятно, будет отличаться от нашего сценария.,Надеюсь, это поможет, хотя