У меня уходит некоторое время на изучение исходных кодов, и я пытаюсь ответить на вопрос самостоятельно.
- Postgresql Walsender включает только совершенные транзакции, игнорируя отмененные.
Путь к коду и фрагмент кода:
pg_logical_slot_get_changes_guts() -> LogicalDecodingProcessRecord() -> DecodeXactOp() ->
ReorderBufferCommit() -> ReorderBufferIterTXNNext()
DecodeXactOp ():
switch (info)
{
case XLOG_XACT_COMMIT:
case XLOG_XACT_COMMIT_PREPARED:
{
xl_xact_commit *xlrec;
xl_xact_parsed_commit parsed;
TransactionId xid;
xlrec = (xl_xact_commit *) XLogRecGetData(r);
ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
if (!TransactionIdIsValid(parsed.twophase_xid))
xid = XLogRecGetXid(r);
else
xid = parsed.twophase_xid;
DecodeCommit(ctx, buf, &parsed, xid);
break;
}
- Встроенный логический работник применения не воспроизводит SQL, но использует множество внутренних API для применения изменений на стороне получателя.
https://github.com/postgres/postgres/blob/master/src/backend/replication/logical/worker.c#L585
/* Input functions may need an active snapshot, so get one */
PushActiveSnapshot(GetTransactionSnapshot());
/* Process and store remote tuple in the slot */
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
slot_store_cstrings(remoteslot, rel, newtup.values);
slot_fill_defaults(rel, estate, remoteslot);
MemoryContextSwitchTo(oldctx);
ExecOpenIndices(estate->es_result_relation_info, false);
/* Do the insert. */
ExecSimpleRelationInsert(estate, remoteslot);
/* Cleanup. */
ExecCloseIndices(estate->es_result_relation_info);
PopActiveSnapshot();
Логическая репликация PG10 использует pgoutput
в качестве выходного модуля, который находится в двоичном формате и подходит для непосредственного доступа к данным.
https://github.com/postgres/postgres/blob/master/src/backend/replication/pgoutput/pgoutput.c