Включить BinLogs для чтения-реплики с binlog_format = "row" - PullRequest
0 голосов
/ 12 октября 2018

Я недавно включил binlogs в моей реплике для чтения, включив автоматическое резервное копирование (как уже упоминалось здесь ).Однако для binlog_format по умолчанию было задано значение MIXED.

binlog_format=MIXED

Из-за этого несоответствия происходит сбой коннектора Debezium, так как он находит начальные журналы в формате MIXED.Есть ли способ включить бины с форматом ROW с самого начала?

Добавление журнала ошибок:

Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: Caused by: io.debezium.text.ParsingException: Failed to parse statement 'update user_payments set address='400001', amount_details='{base:100.00, tax_1:0.00, tax_2:0.00, tax_3:0.00
}', bank_reference_number='698774', commercial_pack='HSPremiumMonth', country='in', coupon=null, create_date='2018-10-12 08:35:27', currency='INR', customer_id='acn|9a3722ef-5547-4e0e-ad3f-ef40eabf095e@mandardeodhar.me', discount_amount=0
.0, email='9a3722ef-5547-4e0e-ad3f-ef40eabf095e@mandardeodhar.me', fname='Vibhor', freetrial=1, hs_invoice_number=null, invoice_amount=199.0, invoice_date=null, invoice_number=null, last_update_date='2018-10-12 08:35:43', lname='User', me
ta=null, parent_transaction_id=null, payment_hash='2HL5LM', pg_commercial_pack='hotstar-razor-upi-hsp-month', pg_name='payu', pg_transaction_id='403993715518439176', service_configuration_mci=18, service_end_date='2018-11-11 08:35:43', se
rvice_start_date='2018-10-12 08:35:43', service_type='RECURRING', settlement_group=null, subscription_family_name='HotstarPremium', tax_amount=0.0, transaction_amount=199.0, transaction_status='Completed', transaction_type='Payment' where
 transaction_id=1012005'
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at io.debezium.relational.ddl.LegacyDdlParser.parse(LegacyDdlParser.java:225)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at io.debezium.relational.ddl.LegacyDdlParser.parse(LegacyDdlParser.java:200)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at io.debezium.connector.mysql.MySqlSchema.applyDdl(MySqlSchema.java:297)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at io.debezium.connector.mysql.BinlogReader.handleQueryEvent(BinlogReader.java:637)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:436)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011... 7 more
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: Caused by: io.debezium.text.ParsingException: Expecting token type 128 at line 1, column 1 but found 'update':  ===>> update user_payments
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at io.debezium.text.TokenStream.consume(TokenStream.java:750)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at io.debezium.relational.ddl.LegacyDdlParser.consumeStatement(LegacyDdlParser.java:462)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at io.debezium.relational.ddl.LegacyDdlParser.parseUnknownStatement(LegacyDdlParser.java:309)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at io.debezium.connector.mysql.MySqlDdlParser.parseNextStatement(MySqlDdlParser.java:191)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at io.debezium.relational.ddl.LegacyDdlParser.parse(LegacyDdlParser.java:219)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011... 11 more
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: [2018-10-12 08:35:43,348] INFO Error processing binlog event, and propagating to Kafka Connect so it stops this connector. Future binlog events read before connector is shutdown will be ignored. (io.debezium.connector.mysql.BinlogReader:457)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: [2018-10-12 08:35:43,517] INFO WorkerSourceTask{id=um-users-qa-test-7-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:328)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: [2018-10-12 08:35:43,517] INFO WorkerSourceTask{id=um-users-qa-test-7-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:345)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: [2018-10-12 08:35:43,518] INFO WorkerSourceTask{id=um-users-qa-test-7-0} Finished commitOffsets successfully in 1 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:427)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: [2018-10-12 08:35:43,518] ERROR WorkerSourceTask{id=um-users-qa-test-7-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: org.apache.kafka.connect.errors.ConnectException: Failed to parse statement 'update user_payments set address='400001', amount_details='{base:100.00, tax_1:0.00, tax_2:0.00, tax_3:0.00}', bank_reference_number='698774', commercial_pack='HSPremiumMonth', country='in', coupon=null, create_date='2018-10-12 08:35:27', currency='INR', customer_id='acn|9a3722ef-5547-4e0e-ad3f-ef40eabf095e@mandardeodhar.me', discount_amount=0.0, email='9a3722ef-5547-4e0e-ad3f-ef40eabf095e@mandardeodhar.me', fname='Vibhor', freetrial=1, hs_invoice_number=null, invoice_amount=199.0, invoice_date=null, invoice_number=null, last_update_date='2018-10-12 08:35:43', lname='User', meta=null, parent_transaction_id=null, payment_hash='2HL5LM', pg_commercial_pack='hotstar-razor-upi-hsp-month', pg_name='payu', pg_transaction_id='403993715518439176', service_configuration_mci=18, service_end_date='2018-11-11 08:35:43', service_start_date='2018-10-12 08:35:43', service_type='RECURRING', settlement_group=null, subscription_family_name='HotstarPremium', tax_amount=0.0, transaction_amount=199.0, transaction_status='Completed', transaction_type='Payment' where transaction_id=1012005'
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:200)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:178)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:452)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at io.debezium.connector.mysql.EventBuffer.completeTransaction(EventBuffer.java:187)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at io.debezium.connector.mysql.EventBuffer.add(EventBuffer.java:101)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1055)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:913)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:559)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:793)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at java.lang.Thread.run(Thread.java:748)

1 Ответ

0 голосов
/ 07 ноября 2018

После включения резервного копирования вы можете изменить группу параметров binlog_format на ROW в RDS.

Выберите продукт RDS -> Выберите свой экземпляр.

Нажмите в группе параметров своего экземпляра:

enter image description here

Поиск параметра binlog_format -> Выберите его -> Изменить параметры -> Выбрать Row.

enter image description here

После того, как вы это сделаете, вам нужно будет перезапустить ваш экземпляр, чтобы применить это новое значение параметра.После того, как ваша база данных снова подключится к сети, вы можете проверить правильность значений, выполнив следующие команды:

show global variables like 'log_bin';
show global variables like 'binlog_format';

Результат должен быть примерно таким: enter image description here

Затем вы можете удалить свой соединитель с помощью REST API из Kafka Connect и снова зарегистрировать свой соединитель с помощью "snapshot.mode": "when_needed".Это создаст все строки из ваших таблиц в конфигурации белого списка для соответствующих тем в Kafka.

Также, как сказал Майкл, вы можете увеличить параметр binlog retention hours.

CALL mysql.rds_show_configuration;
CALL mysql.rds_set_configuration('binlog retention hours', 24);

Надеюсь, это поможет.

...