Мои потоки Mulesoft выполняются не раз - PullRequest
0 голосов
/ 08 апреля 2020

Я занимаюсь разработкой приложения для переноса данных между двумя базами данных с использованием платформы Anypoint от Mulesoft. Я продолжаю получать дубликаты ошибок ключа, и во время отладки я понял, что мои потоки выполняются более одного раза.

Вот часть кода:

    <flow name="docLogicFlow" doc:id="7e901506-e31d-4d1a-baf0-e54fef27f083" >
        <flow-ref doc:name="DGD_TCONTENT Migration Flow Reference" doc:id="9139e02d-36b2-4c4c-a8bb-12fa1e372134" name="DGD_TCONTENTMigrationFlow" />
        <flow-ref doc:name="DOC_TFILECONTENTTYPE Migration Flow Reference" doc:id="ef67a89a-ecd9-416b-9bc4-4a67a91efd67" name="DOC_TFILECONTENTTYPEMigrationFlow" />
        <flow-ref doc:name="DOC_TFILESTORAGE Migration Flow Reference" doc:id="2adbb928-9f7e-44be-8a50-33bdceacfc2f" name="DOC_TFILESTORAGEMigrationFlow" />
    </flow>
    <flow name="docCleanFlow" doc:id="8278a9e1-64c7-4abc-8647-ce22c28f66c5" >
        <db:delete doc:name="Delete" doc:id="bc7920fa-ed99-4fe1-9337-a061f5ec800b" config-ref="Database_Config">
            <db:sql >DELETE FROM DGD_TCONTENT;
DELETE FROM DOC_TFILE;
DELETE FROM DOC_TFILECONTENTTYPE;
DELETE FROM DOC_TFILESTORAGE;</db:sql>
        </db:delete>
    </flow>
    <flow name="DGD_TCONTENTMigrationFlow" doc:id="cbdcdcb7-a22c-4636-ba02-639f12882700" >
        <db:select doc:name="Select DGD_TCONTENT from Oracle DB" doc:id="3b60f901-03f5-4213-8373-92b29abe6a3d" config-ref="MYCAREER_DEV_DB" >
            <db:sql >SELECT * FROM DGD_TCONTENT</db:sql>
        </db:select>
        <batch:job jobName="DGD_TCONTENTMigrationBatchJob" doc:id="3317b3d8-582c-4f6c-82be-2d84ccdbd28f" >
            <batch:process-records >
                <batch:step name="DGD_TCONTENTMigrationBatchStep" doc:id="a4d82fbe-96c7-4ff8-8fc9-5614f2963e60" >
                    <batch:aggregator doc:name="DGD_TCONTENT Batch Aggregator" doc:id="64d3b1a3-bf3e-44c5-80f9-cf3f6a063cdf" size="20" >
                        <foreach doc:name="For Each" doc:id="1bea88df-b68a-4af9-9f7d-a0dbdf65d720" >
                            <db:stored-procedure doc:name="Insert into DGD_TCONTENT" doc:id="e942f30b-7f3c-4a1b-b175-2ad969ab39b9" config-ref="Database_Config">
                                <db:sql >{call InsertIntoContent (:CONTENT_ID, :CONTENT_TYPE, :CONTENT_EXT_ID, :CONTENT_TITLE, :CONTENT_SUMMARY, :CONTENT_URL, :CONTENT_FORMAT,
:CONTENT_OBSOLETE, :CONTENT_IMAGE_URL, :CONTENT_LANGUAGE, :CONTENT_DURATION, :CONTENT_DURATION_TYPE, :CONTENT_PROVIDER,
:CONTENT_INTERNAL, :CONTENT_CREATED_DATE, :CONTENT_MODIFIED_DATE)}</db:sql>
                                <db:input-parameters ><![CDATA[#[output application/java
---
{
    CONTENT_ID              : payload.content_id,
    CONTENT_TYPE            : payload.content_type,
    CONTENT_EXT_ID          : payload.content_ext_id,
    CONTENT_TITLE           : payload.content_title,
    CONTENT_SUMMARY         : payload.content_summary,
    CONTENT_URL             : payload.content_url,
    CONTENT_FORMAT          : payload.content_format,
    CONTENT_OBSOLETE        : payload.content_obsolete,
    CONTENT_IMAGE_URL       : payload.content_image_url,
    CONTENT_LANGUAGE        : payload.content_language,
    CONTENT_DURATION        : payload.content_duration,
    CONTENT_DURATION_TYPE   : payload.content_duration_type,
    CONTENT_PROVIDER        : payload.content_provider,
    CONTENT_INTERNAL        : payload.content_internal,
    CONTENT_CREATED_DATE    : payload.content_created_date,
    CONTENT_MODIFIED_DATE   : payload.content_modified_date,
}]]]></db:input-parameters>
                            </db:stored-procedure>
                        </foreach>
                    </batch:aggregator>
                </batch:step>
            </batch:process-records>
            <batch:on-complete >
                <logger level="INFO" doc:name="Logger" doc:id="42ae6570-e00d-4f0f-a2fa-3cd36ccc5a98" message="DGD_TCONTENT finished data migration."/>
            </batch:on-complete>
        </batch:job>
    </flow>
    <flow name="DOC_TFILEMigrationFlow" doc:id="40bf34a9-6b57-4640-9ebe-801f86da6331" >
        <db:select doc:name="Select DOC_TFILE from Oracle DB" doc:id="c29cd80b-272f-4d92-97b6-9c1c8a214fb9" config-ref="MYCAREER_DEV_DB" >
            <db:sql >SELECT * FROM DOC_TFILE</db:sql>
        </db:select>
        <batch:job jobName="MigrateDOC_TFILEBatchJob" doc:id="6cfb7fa7-eab5-4591-b188-10bd88f40efc" >
            <batch:process-records >
                <batch:step name="MigrateDOC_TFILEBatchStep" doc:id="82ab6f35-8867-46a7-9591-eb6f79ec64b4" >
                    <batch:aggregator doc:name="DOC_TFILE Batch Aggregator" doc:id="8c9277d0-65aa-4bba-9c28-3acf272f2936" size="20" >
                        <foreach doc:name="For Each" doc:id="9f50fb18-16b9-4be5-bf98-b7b024546ee8" >
                            <db:stored-procedure doc:name="Insert into DOC_TFILE" doc:id="6a7b3f09-284d-4bc5-91ea-945d02195395" config-ref="Database_Config">
                                <db:sql >{call InsertIntoFile (:FILE_CODE,:FILECONTENTTYPE_CODE,:FILE_NAME,:FILE_SIZE,  :FILE_STATUS,:USER_CODE,    :FILE_DATA,:FILE_STORAGE_VALUE,:FILE_UPLOAD_CODE,
                                :FILE_UPLOAD_TEMP,:FILESTORAGE_CODE,:FILE_DATE_INSERT,:FILE_DATE_UPDATE,:FILE_DATE_DELETE)}</db:sql>
                                <db:input-parameters ><![CDATA[#[output application/java
---
{
    FILE_CODE               : payload.file_code,
    FILECONTENTTYPE_CODE            : payload.filecontenttype_code,
    FILE_NAME           : payload.file_name,
    FILE_SIZE           : payload.file_size,
    FILE_STATUS         : payload.file_status,
    USER_CODE               : payload.user_code,
    FILE_DATA           : payload.file_data,
    FILE_STORAGE_VALUE      : payload.file_storage_value,
    FILE_UPLOAD_CODE        : payload.file_upload_code,
    FILE_UPLOAD_TEMP        : payload.file_upload_temp,
    FILESTORAGE_CODE        : payload.filestorage_code,
    FILE_DATE_INSERT        : payload.file_date_insert,
    FILE_DATE_UPDATE        : payload.file_date_upload,
    CONTENT_CREATED_DATE    : payload.content_created_date,
    FILE_DATE_DELETE    : payload.file_date_delete
}]]]></db:input-parameters>
                            </db:stored-procedure>
                        </foreach>
                    </batch:aggregator>
                </batch:step>
            </batch:process-records>
            <batch:on-complete >
                <logger level="INFO" doc:name="Logger" doc:id="c51d684a-f195-4758-84b8-355fc8cda3b5" message="DOC_TFILE finished data migration."/>
            </batch:on-complete>
        </batch:job>
    </flow>
    <flow name="DOC_TFILECONTENTTYPEMigrationFlow" doc:id="4447f39a-aabf-48df-ae81-9c23a9c17927" >
        <db:select doc:name="Select DOC_TFILECONTENTTYPE from Oracle DB" doc:id="54a25f9f-d84f-4621-bc5b-b2578dcf4891" config-ref="MYCAREER_DEV_DB" >
            <db:sql >SELECT * FROM DOC_TFILECONTENTTYPE</db:sql>
        </db:select>
        <batch:job jobName="MigrateDOC_TFILECONTENTTYPEBatchJob" doc:id="f895d255-e3f4-4139-8c9c-bdba1f292abf" >
            <batch:process-records >
                <batch:step name="MigrateDOC_TFILECONTENTTYPEBatchStep" doc:id="3ff4c3ff-659d-4efc-ad81-e3e3f42d0afb" >
                    <batch:aggregator doc:name="DOC_TFILECONTENTTYPE Batch Aggregator" doc:id="d7e56e72-172a-4322-acdd-a66a7e626224" size="20" >
                        <foreach doc:name="For Each" doc:id="d0e3b9ee-c820-4722-94cf-52ffabd14b04" >
                            <db:stored-procedure doc:name="Insert into DOC_TFILECONTENTTYPE" doc:id="0d873708-4611-42af-98c0-22f656bc5ec4" config-ref="Database_Config">
                                <db:sql >{call InsertIntoFileContentType (:CONTENTTYPE_CODE,:CONTENTTYPE_ID,:CONTENTTYPE_STATUS,:CONTENTTYPE_ICON,:CONTENTTYPE_NAME,
                            :CONTENTTYPE_DESCRIPTION)}</db:sql>
                                <db:input-parameters ><![CDATA[#[{
    CONTENTTYPE_CODE            : payload.contenttype_code, 
    CONTENTTYPE_ID              : payload.contenttype_id,
    CONTENTTYPE_STATUS          : payload.contenttype_status,
    CONTENTTYPE_ICON            : payload.contenttype_icon,
    CONTENTTYPE_NAME            : payload.contenttype_name,
    CONTENTTYPE_DESCRIPTION     : payload.contenttype_description,
}]]]></db:input-parameters>
                            </db:stored-procedure>
                        </foreach>
                    </batch:aggregator>
                </batch:step>
            </batch:process-records>
            <batch:on-complete >
                <logger level="INFO" doc:name="Logger" doc:id="fd4797d9-bca0-4b97-bca4-b95dce09c2bb" message="DOC_TFILE_CONTENTTYPE finished data migration."/>
            </batch:on-complete>
        </batch:job>
    </flow>

BussinessLogicFlow, который вызывает DocLogicFlow:

    <flow name="mainFlow" doc:id="7c0a5bef-b3d5-442f-bff3-10d038f69a5e">
        <flow-ref doc:name="businesslogicFlow" doc:id="91360ede-4d71-44c7-9b64-8ee762e04ea0" name="businesslogicFlow" />
        <error-handler>
            <on-error-propagate enableNotifications="true" logException="true" doc:name="On Error Propagate" doc:id="488b507d-e26c-4c56-8759-8bb4f6645d71" type="ANY">
                <flow-ref doc:name="errorHandlingFlow" doc:id="afdaf73c-0137-4d60-84f6-5c41234771a3" name="errorHandlingFlow" />
            </on-error-propagate>
        </error-handler>
    </flow>
    <flow name="businesslogicFlow" doc:id="5aa7011d-8abd-453d-9459-c7322838f14a" tracking:enable-default-events="true">
        <db:select doc:name="Select" doc:id="58bc689c-b708-4b1b-b645-693735104a25" config-ref="MYCAREER_DEV_DB">
            <db:sql >select 1 from dual</db:sql>
        </db:select>
        <batch:job jobName="template-db2db-account-migrationBatch_Job" doc:id="af55c5cf-807b-4582-9868-66f144b0a8e9">
            <batch:process-records>
                <batch:step name="Batch_Step" doc:id="428bb0a0-5082-451d-9253-2b6f0a147719" >
                    <flow-ref doc:name="Flow Reference" doc:id="ebf025cf-70b1-4145-8fd9-270d92c06420" name="docCleanFlow"/>
                    <logger level="INFO" doc:name="Logger" doc:id="92c2d1da-fb59-4cb3-b1e6-ac5e9ae28922" message="DELETE COMPLETED"/>
                </batch:step>
            </batch:process-records>
            <batch:on-complete >
                <flow-ref doc:name="Flow Reference" doc:id="9a331b54-55e4-4818-9050-cf70cc348581" name="docLogicFlow"/>
            </batch:on-complete>

</batch:job>

Конечная точка, которая вызывает bussinessLogicFlow:

    <flow name="triggerFlow" doc:id="25a15396-5def-4f1c-bac2-6c7a769f4278" >
        <http:listener doc:name="/migrate" doc:id="ca1efe4f-1e53-428a-b439-c1d905246a34" config-ref="HTTP_Listener_config" path="/migrate"/>
        <flow-ref doc:name="mainFlow" doc:id="9c665dd4-7df3-4e0d-a1eb-01ac63781ce7" name="mainFlow"/>
        <ee:transform doc:name="Build response" doc:id="2abddd58-c707-435a-a004-ec5ba9107429">
            <ee:message>
                <ee:set-payload><![CDATA[%dw 2.0
output application/json
 ---
 {
    Message: "Batch Process initiated",
    ID: payload.id,
    RecordCount: payload.recordCount,
    StartExecutionOn: payload.creationTime as DateTime
 }
 ]]></ee:set-payload>
            </ee:message>
        </ee:transform>
    </flow> 

Кто-нибудь знает, почему это происходит? И как этого избежать? Спасибо!

1 Ответ

0 голосов
/ 08 апреля 2020

Похоже, что предоставленный код не является источником дублирования. Это может быть что-то, что вызывает docLogicFlow

Какой поток вызывает это? Это генерируется каким-то планировщиком? Дважды проверьте его - он может запустить поток снова, даже если предыдущий поток все еще работает. Планировщик не зависит от исключения потока. Это зависит только от графика. В общем случае это может быть много потоков, работающих параллельно из одного и того же планировщика.

Если это не планировщик - проверьте события, которые запускают этот поток. Я думаю - эти события происходят слишком часто, и ваш обычный поток все еще работает. Посмотрите о планировщиках и нескольких планировщиках здесь https://simpleflatservice.com/mule4/Multipleschedules.html

Это хорошо, что вы используете базу данных. Вы можете использовать его механизм трансформации для предотвращения дублирования. Всегда будьте готовы, что что-то может вызывать поток снова и снова и отклонять дублированные запросы. Это легко в среде БД. Просто установите sh любую блокировку и отклоните / проигнорируйте дублированные запросы.

PS Ваша третья ссылка на поток не связана ни с чем. Это один

<flow-ref doc:name="DOC_TFILESTORAGE Migration Flow Reference" doc:id="0bda80b0-e330-4e86-b0f1-acfeb57e031a" name="DOC_TFILESTORAGEMigrationFlow" />
...