Разбор строки DateTime из файла CSV в качестве метки времени события - PullRequest
0 голосов
/ 17 апреля 2020

Используя WSO2 SP, мое приложение считывает строки из следующего файла CSV:

20170801 000001237,1.321420,1.321510,0
20170801 000001487,1.321440,1.321530,0
20170801 000001737,1.321450,1.321530,0
20170801 000001987,1.321440,1.321530,0

Первый столбец - это строка метки времени, которую необходимо проанализировать для метки времени событие:

yyyyMMdd hhmmssfff

, где fff - миллисекунды

мое текущее приложение выглядит так:

@App:name('ReceiveAndCount')
@App:description('count events in csv file')
@source(type = 'file', 
    mode='line',
    tailing='false',
    file.uri = "file:/Users/A/Desktop/siddhi/wso2sp-4.4.0/data/DAT_ASCII_GBPUSD_T_201708.csv", 
    action.after.process='NONE',
        @map(type = 'csv', header='false', delimiter = ",",
        @attributes(
        dateTime = '0',
        bid='1',
        ask='2',
        ignore='3'  ) ))

define stream csvGBPUSDstream (dateTime string, bid double, ask double, ignore int);

@sink(type = 'log', priority='info')
define stream TotalCountStream (totalCount long);

-- Count the incoming events
@info(name = 'query1')
from csvGBPUSDstream 
select count() as totalCount
insert into TotalCountStream;

Любые указания по синтаксическому анализу строки даты в первом столбце как метки времени события приветствуются

Ответы [ 2 ]

0 голосов
/ 06 мая 2020

Запуск wso2sp-editor в docker с помощью следующей команды с переключателем громкости, позволяющим читать локальный файл данных:

docker run -it -p 9390:9390 -v /Users/A/Desktop/wso2sp-editor-docker-data:/home/wso2carbon/wso2sp-4.4.0/data/ --name editor wso2/wso2sp-editor:4.4.0

Наконец код приложения:

 @App:name('CsvParseDateTime')

@ Приложение: описание ('разбор строки метки времени из CSV-файла') --abrakadabra @source (type = 'file', mode = "line", tailing = "false", file.uri = "file: // home / wso2carbon / wso2sp-4.4.0 / data / DAT_ASCII_GBPUSD_T_201708.csv ", -" файл: /Users/A/Desktop/siddhi/wso2sp-4.4.0/data/DAT_ASCII_GBPUSD_T_201708.csv ", action.after.process =" НЕТ ", @map (type = 'csv', header = "false", delimiter = ",", @attributes (dateTime = "0", ask = "2", ignore = "3", bid = "1"))) определить поток csvGBPUSDstream (строка dateTime, двойной ставки, двойной спрос, игнорировать int);

@ сток (type = 'log', priority = "info") определить поток fiveSecRangeStream (timestamp long, fiveSecRange double);

- строка для анализа строки dateTime по времени в миллисекундах, которая может использоваться как индекс для времени windows --time: timestampInMilliseconds (date.value, date.format) гггг -MM-dd ЧЧ: мм: ss.SSS @info (name = 'Parse Timestamp') из csvGBPUSDstream выбрать время: timestampInMilliseconds (dateTime, 'yyyyMMdd HHmmssSSS') как отметку времени, ставку, запросить вставку в индексированный GBPUSDstream;

- время внешнего окна (временная метка в потоке событий) из indexedGBPUSDstream # window.externalTime (временная метка, 5 Se c) выбрать временную метку, max (bid) -min (bid) для вставки fiveSecRange в fiveSecRangeStream; - для всех событий;

0 голосов
/ 28 апреля 2020

Ответ заключается в использовании функции # window.externalTime ()

@App:name('CsvParseDateTime')

@ App: description ('анализ строки метки времени из файла CSV')

@ source (type = 'file', mode = "line", tailing = "false", file.uri = "file: /Users/A/Desktop/siddhi/wso2sp-4.4.0/data/DAT_ASCII_GBPUSD_T_201708.csv", action.after.process = "NONE", @map (type = 'csv', header = "false", delimiter = ",", @attributes (dateTime = "0", ask = "2", ignore = "3", bid = " 1 "))) определить поток csvGBPUSDstream (строка dateTime, двойной ставки, двойной спрос, игнорировать int);

@ сток (type = 'log', priority =" info ") определить поток fiveSecRangeStream (timestamp long, fiveSecRange double);

- строка для анализа строки dateTime по времени в миллисекундах, которая может использоваться в качестве индекса для времени windows --time: timestampInMilliseconds (date.value, date.format) гггг-ММ-дд ЧЧ: мм: ss.SSS @info (name = 'Parse Timestamp') из csvGBPUSDstream выбрать время: timestampInMilliseconds (dateTime, 'yyyyMMdd HHmmssSSS') в качестве отметки времени, ставки, запроса вставки into indexedGBPUSDstream;

- время внешнего окна (отметка времени в потоке событий) из indexedGBPUSDstream # window.externalTime (отметка времени, 5 Se c) выберите отметку времени, max (bid) -min (bid) в качестве вставки fiveSecRange в fiveSecRangeStream; - для всех событий;

...