diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 2d9e1279bb2..f96029f15a4 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -1703,7 +1703,7 @@ ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn) ent = (ReorderBufferTupleCidEnt *) hash_search(txn->tuplecid_hash, (void *) &key, - HASH_ENTER | HASH_FIND, + HASH_ENTER, &found); if (!found) { diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 689a66cc72d..4b112593c65 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -177,7 +177,7 @@ bool in_remote_transaction = false; static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr; /* fields valid only when processing streamed transaction */ -bool in_streamed_transaction = false; +static bool in_streamed_transaction = false; static TransactionId stream_xid = InvalidTransactionId; @@ -345,7 +345,10 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) */ xid = pq_getmsgint(s, 4); - Assert(TransactionIdIsValid(xid)); + if (!TransactionIdIsValid(xid)) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("invalid transaction ID in streamed replication transaction"))); /* Add the new subxact to the array (unless already there). */ subxact_info_add(xid); @@ -785,7 +788,12 @@ apply_handle_commit(StringInfo s) logicalrep_read_commit(s, &commit_data); - Assert(commit_data.commit_lsn == remote_final_lsn); + if (commit_data.commit_lsn != remote_final_lsn) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)", + LSN_FORMAT_ARGS(commit_data.commit_lsn), + LSN_FORMAT_ARGS(remote_final_lsn)))); apply_handle_commit_internal(s, &commit_data); @@ -812,7 +820,7 @@ apply_handle_origin(StringInfo s) (IsTransactionState() && !am_tablesync_worker()))) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg("ORIGIN message sent out of order"))); + errmsg_internal("ORIGIN message sent out of order"))); } /* @@ -824,7 +832,10 @@ apply_handle_stream_start(StringInfo s) bool first_segment; HASHCTL hash_ctl; - Assert(!in_streamed_transaction); + if (in_streamed_transaction) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("duplicate STREAM START message"))); /* * Start a transaction on stream start, this transaction will be committed @@ -841,6 +852,11 @@ apply_handle_stream_start(StringInfo s) /* extract XID of the top-level transaction */ stream_xid = logicalrep_read_stream_start(s, &first_segment); + if (!TransactionIdIsValid(stream_xid)) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("invalid transaction ID in streamed replication transaction"))); + /* * Initialize the xidhash table if we haven't yet. This will be used for * the entire duration of the apply worker so create it in permanent @@ -873,7 +889,10 @@ apply_handle_stream_start(StringInfo s) static void apply_handle_stream_stop(StringInfo s) { - Assert(in_streamed_transaction); + if (!in_streamed_transaction) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("STREAM STOP message without STREAM START"))); /* * Close the file with serialized changes, and serialize information about @@ -905,7 +924,10 @@ apply_handle_stream_abort(StringInfo s) TransactionId xid; TransactionId subxid; - Assert(!in_streamed_transaction); + if (in_streamed_transaction) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("STREAM ABORT message without STREAM STOP"))); logicalrep_read_stream_abort(s, &xid, &subxid); @@ -932,7 +954,6 @@ apply_handle_stream_abort(StringInfo s) * performed rollback to savepoint for one of the earlier * sub-transaction. */ - int64 i; int64 subidx; BufFile *fd; @@ -967,13 +988,15 @@ apply_handle_stream_abort(StringInfo s) return; } - Assert((subidx >= 0) && (subidx < subxact_data.nsubxacts)); - ent = (StreamXidHash *) hash_search(xidhash, (void *) &xid, HASH_FIND, - &found); - Assert(found); + NULL); + if (!ent) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("transaction %u not found in stream XID hash table", + xid))); /* open the changes file */ changes_filename(path, MyLogicalRepWorker->subid, xid); @@ -1006,13 +1029,15 @@ apply_handle_stream_commit(StringInfo s) int nchanges; char path[MAXPGPATH]; char *buffer = NULL; - bool found; LogicalRepCommitData commit_data; StreamXidHash *ent; MemoryContext oldcxt; BufFile *fd; - Assert(!in_streamed_transaction); + if (in_streamed_transaction) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("STREAM COMMIT message without STREAM STOP"))); xid = logicalrep_read_stream_commit(s, &commit_data); @@ -1031,11 +1056,17 @@ apply_handle_stream_commit(StringInfo s) /* open the spool file for the committed transaction */ changes_filename(path, MyLogicalRepWorker->subid, xid); elog(DEBUG1, "replaying changes from file \"%s\"", path); + ent = (StreamXidHash *) hash_search(xidhash, (void *) &xid, HASH_FIND, - &found); - Assert(found); + NULL); + if (!ent) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("transaction %u not found in stream XID hash table", + xid))); + fd = BufFileOpenShared(ent->stream_fileset, path, O_RDONLY); buffer = palloc(BLCKSZ); @@ -1080,7 +1111,9 @@ apply_handle_stream_commit(StringInfo s) errmsg("could not read from streaming transaction's changes file \"%s\": %m", path))); - Assert(len > 0); + if (len <= 0) + elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"", + len, path); /* make sure we have sufficiently large buffer */ buffer = repalloc(buffer, len); @@ -1108,7 +1141,7 @@ apply_handle_stream_commit(StringInfo s) nchanges++; if (nchanges % 1000 == 0) - elog(DEBUG1, "replayed %d changes from file '%s'", + elog(DEBUG1, "replayed %d changes from file \"%s\"", nchanges, path); } @@ -2053,7 +2086,8 @@ apply_dispatch(StringInfo s) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg("invalid logical replication message type \"%c\"", action))); + errmsg_internal("invalid logical replication message type \"%c\"", + action))); } /* @@ -2589,20 +2623,19 @@ static void subxact_info_write(Oid subid, TransactionId xid) { char path[MAXPGPATH]; - bool found; Size len; StreamXidHash *ent; BufFile *fd; Assert(TransactionIdIsValid(xid)); - /* find the xid entry in the xidhash */ + /* Find the xid entry in the xidhash */ ent = (StreamXidHash *) hash_search(xidhash, (void *) &xid, HASH_FIND, - &found); - /* we must found the entry for its top transaction by this time */ - Assert(found); + NULL); + /* By this time we must have created the transaction entry */ + Assert(ent); /* * If there is no subtransaction then nothing to do, but if already have @@ -2667,13 +2700,11 @@ static void subxact_info_read(Oid subid, TransactionId xid) { char path[MAXPGPATH]; - bool found; Size len; BufFile *fd; StreamXidHash *ent; MemoryContext oldctx; - Assert(TransactionIdIsValid(xid)); Assert(!subxact_data.subxacts); Assert(subxact_data.nsubxacts == 0); Assert(subxact_data.nsubxacts_max == 0); @@ -2682,7 +2713,12 @@ subxact_info_read(Oid subid, TransactionId xid) ent = (StreamXidHash *) hash_search(xidhash, (void *) &xid, HASH_FIND, - &found); + NULL); + if (!ent) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("transaction %u not found in stream XID hash table", + xid))); /* * If subxact_fileset is not valid that mean we don't have any subxact @@ -2836,14 +2872,17 @@ stream_cleanup_files(Oid subid, TransactionId xid) { char path[MAXPGPATH]; StreamXidHash *ent; - bool found = false; - /* By this time we must have created the transaction entry */ + /* Find the xid entry in the xidhash */ ent = (StreamXidHash *) hash_search(xidhash, (void *) &xid, HASH_FIND, - &found); - Assert(found); + NULL); + if (!ent) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("transaction %u not found in stream XID hash table", + xid))); /* Delete the change file and release the stream fileset memory */ changes_filename(path, subid, xid); @@ -2893,9 +2932,9 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment) /* create or find the xid entry in the xidhash */ ent = (StreamXidHash *) hash_search(xidhash, (void *) &xid, - HASH_ENTER | HASH_FIND, + HASH_ENTER, &found); - Assert(first_segment || found); + changes_filename(path, subid, xid); elog(DEBUG1, "opening file \"%s\" for streamed changes", path); @@ -2915,6 +2954,11 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment) MemoryContext savectx; SharedFileSet *fileset; + if (found) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("incorrect first-segment flag for streamed replication transaction"))); + /* * We need to maintain shared fileset across multiple stream * start/stop calls. So, need to allocate it in a persistent context. @@ -2934,6 +2978,11 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment) } else { + if (!found) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("incorrect first-segment flag for streamed replication transaction"))); + /* * Open the file and seek to the end of the file because we always * append the changes file. @@ -3140,7 +3189,8 @@ ApplyWorkerMain(Datum main_arg) */ if (!myslotname) ereport(ERROR, - (errmsg("subscription has no replication slot set"))); + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("subscription has no replication slot set"))); /* Setup replication origin tracking. */ StartTransactionCommand();