Я пытаюсь направить пустые или пустые сообщения в локальный архив в системе и обработать файлы с содержимым.Используя Camel 2.17, пакет сервисных миксов Jboss Fuse.Я не могу заставить его работать правильно.это должно быть что-то, что я делаю неправильно.Я попробовал несколько разных вещей, показанных ниже (все прокомментировано).
Примеры проб:
} else if (Boolean.parseBoolean(isCompressedOnly)) { //Only Zipped or Compressed
// .when(simple("${file:length} == 0"))
// .when(simple("${header.CamelFileLength} == 0" ))
// .when(header("CamelFileLength").isEqualTo(0))
// .when(body().isNull())
// .when(header("CamelFileLength").isEqualTo(0))
// .when(simple("${header.CamelFileLength} < 10" ))
// .when(simple("${header.CamelFileLength} &eq; 0" ))
// .when(simple("${file:length} == null"))
ZipFileDataFormat zipFile = new ZipFileDataFormat();
zipFile.setUsingIterator(true);
from(fromStr)
.routeId("Zipped.Only")
.log(LoggingLevel.INFO, "Message received ${file:name} for Only Zipped or Compressed files from host " + host)
.unmarshal(zipFile)
.split(body(Iterator.class))
.streaming()
.convertBodyTo(String.class)
.choice()
.when(simple("${file:length} == null"))
.wireTap("file" + errorArchive)
.log(LoggingLevel.INFO,"File Size: ${header.CamelFileLength}, File is Empty or Zero Length!")
.endChoice()
.otherwise()
.wireTap("file:" + fileArchive)
.split(body().tokenize("\n"), new FleetAggregationStrategy()).streaming()
.process(new EndpointParametersProcessor(decoderName))
.end()
.to(toStr);
} else {
Единственный оператор, который работает вроде (потому что все сообщения направляются в нулевой файлконечная точка, даже если они имеют длину / размер / данные / содержимое)
.when(simple("${header.CamelFileLength} < 10" ))
Можете ли вы сказать мне, что я делаю неправильно?спасибо!
Где я могу найти методы, связанные с заголовком?Например,
.when (заголовок ("результат"). isEqualTo ("A") .to (routeA)
().
ZipFileDataFormat zipFile = new ZipFileDataFormat();
zipFile.setUsingIterator(true);
from(fromStr)
.routeId("Zipped.Only")
.log(LoggingLevel.INFO, "Message received ${file:name} for Only Zipped or Compressed files from host " + host)
.unmarshal(zipFile)
.split(body(Iterator.class))
.streaming()
.convertBodyTo(String.class)
.choice()
.when(simple("${header.CamelFileLength} < 10" ))
.wireTap("file:" + errorArchive)
.log(LoggingLevel.INFO,"File Size: ${header.CamelFileLength}, File is Empty or Zero Length!")
.endChoice()
.when(simple("${header.CamelFileLength} > 10" ))
.wireTap("file:" + fileArchive)
.split(body().tokenize("\n"), new FleetAggregationStrategy()).streaming()
.process(new EndpointParametersProcessor(decoderName))
.end()
.to(toStr)
.endChoice()
.end();
├── [drwxr-xr-x 4.0K] core
│ ├── [drwxr-xr-x 4.0K] enriched
│ ├── [drwxr-xr-x 4.0K] mt1
│ │ └── [drwxr-xr-x 4.0K] processed
│ ├── [drwxr-xr-x 4.0K] mt2
│ │ └── [drwxr-xr-x 4.0K] processed
│ ├── [drwxr-xr-x 4.0K] mt3
│ │ └── [drwxr-xr-x 4.0K] processed
│ └── [drwxr-xr-x 4.0K] tobeprocessed
├── [drwxr-xr-x 4.0K] dbase
├── [drwxr-xr-x 4.0K] fleet
│ └── [drwxr-xr-x 4.0K] core
│ ├── [drwxr-xr-x 4.0K] archive
│ └── [drwxr-xr-x 4.0K] error
│ ├── [-rw-r--r-- 0] GCMS_20190420-0815.csv
│ ├── [-rw-r--r-- 0] GCMS_20190420-0816.csv
│ ├── [-rw-r--r-- 0] GCMS_20190420-0817.csv
│ ├── [-rw-r--r-- 1.1M] GCMS_20190420-0945.csv
│ ├── [-rw-r--r-- 1.1M] GCMS_20190420-0953.csv
│ └── [-rw-r--r-- 1.1M] GCMS_20190420-1001.csv
└── [drwxr-xr-x 4.0K] upload
Клаус, я пробовал предел блокировки чтения, здесь он не влияет.
@Override
public void configure() throws Exception {
if (validateConfiguration()) {
final String fromStr = String.format("%s://%s@%s:%s/%s?password=RAW(%s)&recursive=%s&stepwise=%s&useList=%s&passiveMode=%s&disconnect=%s"
+ "&move=.processed"
+ "&maxMessagesPerPoll=0"
+ "&eagerMaxMessagesPerPoll=false"
+ "&sortBy=file:modified"
+ "&sendEmptyMessageWhenIdle=false"
+ "&delay=60000"
+ "&initialDelay=60000"
+ "&connectTimeout=15000"
+ "&localWorkDirectory=/tmp"
+ "&readLockMinLength=0"
, transport, username, host, port, path, password, recursive, stepwise, useList, passiveMode, disconnect);
// Format the To Endpoint from Parameter(s).
final String toStr = String.format("%s", toEndpoint);
Я изменил маршрут, чтобы он выглядел следующим образом, попытался упроститьэто., но это все еще не работает, потому что он принимает все файлы, даже -0- файлы нулевой длины, которые я не хочу обрабатывать. Я действительно верю, что таков выбор. Когда формируется простое утверждение, что-то не такВыйти правильно. Я перепробовал много комбинаций, все из которых дают один и тот же результат.
from(fromStr)
.routeId("Zipped.Only")
.log(LoggingLevel.INFO, "Message received ${file:name} for Only Zipped or Compressed files from host " + host)
.unmarshal(zipFile)
.split(body(Iterator.class))
.streaming()
.convertBodyTo(String.class)
.wireTap("file:" + fileArchive)
.choice()
.when(simple("${header.CamelFileLength} > 0" ))
.split(body().tokenize("\n"), new FleetAggregationStrategy()).streaming()
.process(new EndpointParametersProcessor(decoderName))
.end()
.to(toStr)
.endChoice()
.end();
дает, я хочу обрабатывать или перемещаться в папку tobeprocessed, когда файл имеет данные или file.length! = 0. что-тоневерно с тем, как использовалось простое выражение ??? Я буду продолжать пытаться.
ge-digital/
├── [drwxr-xr-x 4.0K] core
│ ├── [drwxr-xr-x 4.0K] enriched
│ ├── [drwxr-xr-x 4.0K] mt1
│ │ └── [drwxr-xr-x 4.0K] processed
│ ├── [drwxr-xr-x 4.0K] mt2
│ │ └── [drwxr-xr-x 4.0K] processed
│ ├── [drwxr-xr-x 4.0K] mt3
│ │ └── [drwxr-xr-x 4.0K] processed
│ └── [drwxr-xr-x 4.0K] tobeprocessed
│ ├── [-rw-r--r-- 0] GCMS_20190420-0815.csv
│ ├── [-rw-r--r-- 0] GCMS_20190420-0816.csv
│ └── [-rw-r--r-- 0] GCMS_20190420-0817.csv
├── [drwxr-xr-x 4.0K] dbase
├── [drwxr-xr-x 4.0K] fleet
│ └── [drwxr-xr-x 4.0K] core
│ ├── [drwxr-xr-x 4.0K] archive
│ │ ├── [-rw-r--r-- 0] GCMS_20190420-0815.csv
│ │ ├── [-rw-r--r-- 0] GCMS_20190420-0816.csv
│ │ └── [-rw-r--r-- 0] GCMS_20190420-0817.csv
│ └── [drwxr-xr-x 4.0K] error
└── [drwxr-xr-x 4.0K] upload
Маршрут, который сейчас работает, за исключением длины / размера файла, является неправильным. Это исходная длина файла, НЕ выделеннаядлина файлов. Как я могу обойти это?
доsplit - CamelFileLength: 598 и file.length: 598 после split - CamelFileLength: 598 и file.length: 598
from(fromStr)
.routeId("Zipped.Only")
.log(LoggingLevel.INFO, "Message received ${file:name} for Only Zipped or Compressed files from host " + host)
.unmarshal(zipFile)
.split(body(Iterator.class))
.streaming()
.convertBodyTo(String.class)
.wireTap("file:" + fileArchive)
.log(LoggingLevel.INFO, "before split - CamelFileLength: ${header.CamelFileLength} and file.length: ${file:length}")
.split(body().tokenize("\n"), new FleetAggregationStrategy()) //.streaming()
.process(new EndpointParametersProcessor(decoderName))
.end()
.choice()
//.when(simple("${file:length} > 0"))
.when(simple("${header.CamelFileLength} > '0'"))
.log(LoggingLevel.INFO, "after split - CamelFileLength: ${header.CamelFileLength} and file.length: ${file:length}")
.to(toStr)
.otherwise()
.log(LoggingLevel.INFO, "Message received ${file:name} is Empty or Null")
.to("file:" + errorArchive)
.end();
FINAL, хорошо, наконец-то у нас все работает.чувак, такие мелочи могут немного отбросить тебя назад.для тех, кто заинтересован и может помочь вам.
Я использовал body.size, хммм, не указывал размер тела, затем я использовал body.length, и это сработало.многие из хороших статических методов, с которыми я не мог работать, я думаю, мне нужно перейти на новую версию верблюда.В любом случае, проблема в том, что заголовки, такие как CamelFileLength и другие, несут длину / размер исходного файла, а не ту, которая была выделена с помощью демаршалинга zipfile и разделения всех файлов.Я использовал заголовки, пока не понял вышеизложенное, поэтому мы использовали body () для нашего предиката, и это сработало, и мы использовали length
} else if (Boolean.parseBoolean(isCompressedOnly)) { //Only Zipped or Compressed
ZipFileDataFormat zipFile = new ZipFileDataFormat();
zipFile.setUsingIterator(true);
from(fromStr)
.routeId("Zipped.Only")
.log(LoggingLevel.INFO, "Message received ${file:name} for Only Zipped or Compressed files from host " + host)
.unmarshal(zipFile)
.split(body(Iterator.class))
.streaming()
.convertBodyTo(String.class)
.wireTap("file:" + fileArchive)
.split(body().tokenize("\n"), new FleetAggregationStrategy()).streaming()
.process(new EndpointParametersProcessor(decoderName))
.end()
.choice()
.when(simple("${body.length} > '0'" ))
.to(toStr)
.end();
// .when(simple("${file:length} == 0"))
// .when(simple("${header.CamelFileLength} == 0" ))
// .when(header("CamelFileLength").isEqualTo(0))
// .when(body().isNull())
// .when(body().isNotNull())
// .when(header("CamelFileLength").isEqualTo(0))
// .when(simple("${header.CamelFileLength} < 10" ))
// .when(simple("${header.CamelFileLength} &eq; 0" ))
// .when(simple("${file:length} == null"))
} else {
Выход:
ge-digital/
├── [drwxr-xr-x 4.0K] core
│ ├── [drwxr-xr-x 4.0K] enriched
│ ├── [drwxr-xr-x 4.0K] error
│ │ └── [-rw-r--r-- 47] GCMS_2019-04-23_05-27-18.666.csv
│ ├── [drwxr-xr-x 4.0K] mt1
│ │ └── [drwxr-xr-x 4.0K] processed
│ ├── [drwxr-xr-x 4.0K] mt2
│ │ └── [drwxr-xr-x 4.0K] processed
│ ├── [drwxr-xr-x 4.0K] mt3
│ │ └── [drwxr-xr-x 4.0K] processed
│ │ ├── [-rw-r--r-- 2.9K] GCMS_112-A-001_Gas_Turbine_2019-04-23_05-29-18.csv
│ │ ├── [-rw-r--r-- 2.1K] GCMS_112-A-001_Gas_Turbine_cw_2019-04-23_05-29-18.csv
│ │ ├── [-rw-r--r-- 911] GCMS_112-A-001_Generator_2019-04-23_05-29-18.csv
│ │ ├── [-rw-r--r-- 2.8K] GCMS_112-A-002_Gas_Turbine_2019-04-23_05-29-18.csv
│ │ ├── [-rw-r--r-- 2.1K] GCMS_112-A-002_Gas_Turbine_cw_2019-04-23_05-29-18.csv
│ │ └── [-rw-r--r-- 888] GCMS_112-A-002_Generator_2019-04-23_05-29-18.csv
│ └── [drwxr-xr-x 4.0K] tobeprocessed
│ ├── [-rw-r--r-- 935K] GCMS_2019-04-23_05-27-44.673.csv
│ ├── [-rw-r--r-- 0] GCMS_2019-04-23_05-27-44.673.csv.camelLock
│ ├── [-rw-r--r-- 941K] GCMS_2019-04-23_05-28-07.013.csv
│ ├── [-rw-r--r-- 936K] GCMS_2019-04-23_05-28-33.694.csv
│ ├── [-rw-r--r-- 939K] GCMS_2019-04-23_05-29-02.300.csv
│ └── [-rw-r--r-- 946K] GCMS_2019-04-23_05-29-28.928.csv
├── [drwxr-xr-x 4.0K] dbase
├── [drwxr-xr-x 4.0K] fleet
│ └── [drwxr-xr-x 4.0K] core
│ ├── [drwxr-xr-x 4.0K] archive
│ │ ├── [-rw-r--r-- 981K] GCMS_20190319-0429.csv
│ │ ├── [-rw-r--r-- 977K] GCMS_20190319-0437.csv
│ │ ├── [-rw-r--r-- 982K] GCMS_20190319-0446.csv
│ │ ├── [-rw-r--r-- 977K] GCMS_20190319-0454.csv
│ │ ├── [-rw-r--r-- 980K] GCMS_20190319-0502.csv
│ │ ├── [-rw-r--r-- 988K] GCMS_20190319-0511.csv
│ │ ├── [-rw-r--r-- 0] GCMS_20190420-0815.csv
│ │ ├── [-rw-r--r-- 0] GCMS_20190420-0816.csv
│ │ ├── [-rw-r--r-- 0] GCMS_20190420-0817.csv
│ │ ├── [-rw-r--r-- 0] GCMS_20190420-0818.csv
│ │ ├── [-rw-r--r-- 0] GCMS_20190420-0819.csv
│ │ ├── [-rw-r--r-- 0] GCMS_20190420-0820.csv
│ │ ├── [-rw-r--r-- 0] GCMS_20190420-0821.csv
│ │ ├── [-rw-r--r-- 0] GCMS_20190420-0822.csv
│ │ ├── [-rw-r--r-- 0] GCMS_20190420-0823.csv
│ │ ├── [-rw-r--r-- 0] GCMS_20190420-0824.csv
│ │ ├── [-rw-r--r-- 0] GCMS_20190420-0825.csv
│ │ ├── [-rw-r--r-- 15K] GCMS_20190420-0826.csv
│ │ ├── [-rw-r--r-- 0] GCMS_20190420-0827.csv
│ │ ├── [-rw-r--r-- 0] GCMS_20190420-0828.csv
│ │ └── [-rw-r--r-- 0] GCMS_20190420-0829.csv
│ └── [drwxr-xr-x 4.0K] error
└── [drwxr-xr-x 4.0K] upload