[tarantool-patches] [PATCH v3 1/2] Journal transaction boundaries

Georgy Kirichenko georgy at tarantool.org
Tue Feb 12 23:04:31 MSK 2019


Append txn_id and txn_commit to xrow_header structure, txn_id identifies
transaction id on replica where transaction was started. As transaction id
a lsn of the first row in the transaction is used. txn_commit is set to true
if it is the last row in a transaction, so we could commit transaction by the
last row or by additional NOP requests with txn_commit set as well as
start transaction with NOP and corresponding txn_id. In case of  replication
all local changes moved to an journal entry tail to form a separate transaction
(like autonomous transaction) to be able to replicate changes back.

As encoding/deconding rule assumed:
 * txn_id and txn_commit are encoded only for multi-row transactions.
   So if we do not have txn_id while row decoding then this means that it
   is a single row transaction.
 * TXN_ID field is differential encoded as lsn - txn_id value
 * txn_commit packed into TXN_FLAGS field

These rules provide compatibility with previous xlog format as well
as good compaction level.

Needed for: 2798
---
 src/box/iproto_constants.c |  4 ++--
 src/box/iproto_constants.h |  7 +++++++
 src/box/txn.c              | 21 +++++++++++++++++----
 src/box/txn.h              |  2 ++
 src/box/wal.c              | 28 ++++++++++++++++++++++++----
 src/box/xrow.c             | 36 ++++++++++++++++++++++++++++++++++++
 src/box/xrow.h             |  4 +++-
 test/unit/xrow.cc          |  2 ++
 8 files changed, 93 insertions(+), 11 deletions(-)

diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c
index 7fd295775..4d2e21752 100644
--- a/src/box/iproto_constants.c
+++ b/src/box/iproto_constants.c
@@ -41,11 +41,11 @@ const unsigned char iproto_key_type[IPROTO_KEY_MAX] =
 		/* 0x05 */	MP_UINT,   /* IPROTO_SCHEMA_VERSION */
 		/* 0x06 */	MP_UINT,   /* IPROTO_SERVER_VERSION */
 		/* 0x07 */	MP_UINT,   /* IPROTO_GROUP_ID */
+		/* 0x08 */	MP_UINT,   /* IPROTO_TXN_ID */
+		/* 0x09 */	MP_UINT,   /* IPROTO_TXN_FLAGS */
 	/* }}} */
 
 	/* {{{ unused */
-		/* 0x08 */	MP_UINT,
-		/* 0x09 */	MP_UINT,
 		/* 0x0a */	MP_UINT,
 		/* 0x0b */	MP_UINT,
 		/* 0x0c */	MP_UINT,
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 728514297..fd80e3111 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -49,6 +49,11 @@ enum {
 	XLOG_FIXHEADER_SIZE = 19
 };
 
+enum {
+	/** Set for the last xrow in a transaction. */
+	TXN_FLAG_COMMIT = 0x01,
+};
+
 enum iproto_key {
 	IPROTO_REQUEST_TYPE = 0x00,
 	IPROTO_SYNC = 0x01,
@@ -60,6 +65,8 @@ enum iproto_key {
 	IPROTO_SCHEMA_VERSION = 0x05,
 	IPROTO_SERVER_VERSION = 0x06,
 	IPROTO_GROUP_ID = 0x07,
+	IPROTO_TXN_ID = 0x08,
+	IPROTO_TXN_FLAGS = 0x09,
 	/* Leave a gap for other keys in the header. */
 	IPROTO_SPACE_ID = 0x10,
 	IPROTO_INDEX_ID = 0x11,
diff --git a/src/box/txn.c b/src/box/txn.c
index 7f4e85b47..de0152706 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -34,6 +34,7 @@
 #include "journal.h"
 #include <fiber.h>
 #include "xrow.h"
+#include "replication.h"
 
 double too_long_threshold;
 
@@ -150,6 +151,7 @@ txn_begin(bool is_autocommit)
 	txn->engine = NULL;
 	txn->engine_tx = NULL;
 	txn->psql_txn = NULL;
+	txn->remote_row_count = 0;
 	/* fiber_on_yield/fiber_on_stop initialized by engine on demand */
 	fiber_set_txn(fiber(), txn);
 	return txn;
@@ -231,6 +233,10 @@ txn_commit_stmt(struct txn *txn, struct request *request)
 	 * stmt->space can be NULL for IRPOTO_NOP.
 	 */
 	if (stmt->space == NULL || !space_is_temporary(stmt->space)) {
+		if (request->header &&
+		    request->header->replica_id != instance_id &&
+		    request->header->replica_id != 0)
+			++txn->remote_row_count;
 		if (txn_add_redo(stmt, request) != 0)
 			goto fail;
 		++txn->n_rows;
@@ -271,14 +277,21 @@ txn_write_to_wal(struct txn *txn)
 		return -1;
 
 	struct txn_stmt *stmt;
-	struct xrow_header **row = req->rows;
+	struct xrow_header **remote_row = req->rows;
+	struct xrow_header **local_row = req->rows + txn->remote_row_count;
 	stailq_foreach_entry(stmt, &txn->stmts, next) {
 		if (stmt->row == NULL)
-			continue; /* A read (e.g. select) request */
-		*row++ = stmt->row;
+			/* A read (e.g. select) request */
+			continue;
+		if (stmt->row->replica_id == instance_id ||
+		    stmt->row->replica_id == 0)
+			*local_row++ = stmt->row;
+		else
+			*remote_row++ = stmt->row;
 		req->approx_len += xrow_approx_len(stmt->row);
 	}
-	assert(row == req->rows + req->n_rows);
+	assert(remote_row == req->rows + txn->remote_row_count);
+	assert(local_row == req->rows + req->n_rows);
 
 	ev_tstamp start = ev_monotonic_now(loop());
 	int64_t res = journal_write(req);
diff --git a/src/box/txn.h b/src/box/txn.h
index de5cb0de4..143f21715 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -180,6 +180,8 @@ struct txn {
 	 /** Commit and rollback triggers */
 	struct rlist on_commit, on_rollback;
 	struct sql_txn *psql_txn;
+	/** Count of remote rows. */
+	uint32_t remote_row_count;
 };
 
 /* Pointer to the current transaction (if any) */
diff --git a/src/box/wal.c b/src/box/wal.c
index cdcaabc00..0ea3be68d 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -896,10 +896,11 @@ wal_writer_begin_rollback(struct wal_writer *writer)
 	cpipe_push(&wal_thread.tx_prio_pipe, &writer->in_rollback);
 }
 
-static void
-wal_assign_lsn(struct vclock *vclock, struct xrow_header **row,
+static int
+wal_assign_lsn(struct vclock *vclock, struct xrow_header **begin,
 	       struct xrow_header **end)
 {
+	struct xrow_header **row = begin;
 	/** Assign LSN to all local rows. */
 	for ( ; row < end; row++) {
 		if ((*row)->replica_id == 0) {
@@ -909,6 +910,21 @@ wal_assign_lsn(struct vclock *vclock, struct xrow_header **row,
 			vclock_follow_xrow(vclock, *row);
 		}
 	}
+	while (begin < end && begin[0]->replica_id != instance_id)
+		++begin;
+	/* Setup txn_id and tnx_replica_id for locally generated rows. */
+	row = begin;
+	while (row < end) {
+		if (row[0]->replica_id != instance_id) {
+			diag_set(ClientError, ER_UNSUPPORTED,
+				 "Interleaved transactions");
+			return -1;
+		}
+		row[0]->txn_id = begin[0]->lsn;
+		row[0]->txn_commit = row == end - 1 ? 1 : 0;
+		++row;
+	}
+	return 0;
 }
 
 static void
@@ -979,7 +995,9 @@ wal_write_to_disk(struct cmsg *msg)
 	struct journal_entry *entry;
 	struct stailq_entry *last_committed = NULL;
 	stailq_foreach_entry(entry, &wal_msg->commit, fifo) {
-		wal_assign_lsn(&vclock, entry->rows, entry->rows + entry->n_rows);
+		if (wal_assign_lsn(&vclock, entry->rows,
+				   entry->rows + entry->n_rows) < 0)
+			goto done;
 		entry->res = vclock_sum(&vclock);
 		rc = xlog_write_entry(l, entry);
 		if (rc < 0)
@@ -1173,7 +1191,9 @@ wal_write_in_wal_mode_none(struct journal *journal,
 			   struct journal_entry *entry)
 {
 	struct wal_writer *writer = (struct wal_writer *) journal;
-	wal_assign_lsn(&writer->vclock, entry->rows, entry->rows + entry->n_rows);
+	if (wal_assign_lsn(&writer->vclock, entry->rows,
+			   entry->rows + entry->n_rows) != 0)
+		return -1;
 	vclock_copy(&replicaset.vclock, &writer->vclock);
 	return vclock_sum(&writer->vclock);
 }
diff --git a/src/box/xrow.c b/src/box/xrow.c
index fec8873d0..29fa75de4 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -102,6 +102,8 @@ error:
 
 	if (mp_typeof(**pos) != MP_MAP)
 		goto error;
+	bool txn_is_set = false;
+	uint32_t txn_flags = 0;
 
 	uint32_t size = mp_decode_map(pos);
 	for (uint32_t i = 0; i < size; i++) {
@@ -133,12 +135,32 @@ error:
 		case IPROTO_SCHEMA_VERSION:
 			header->schema_version = mp_decode_uint(pos);
 			break;
+		case IPROTO_TXN_ID:
+			txn_is_set = true;
+			header->txn_id = mp_decode_uint(pos);
+			break;
+		case IPROTO_TXN_FLAGS:
+			txn_flags = mp_decode_uint(pos);
+			header->txn_commit = txn_flags & TXN_FLAG_COMMIT;
+			if ((txn_flags & ~TXN_FLAG_COMMIT) != 0)
+				/* Unknow flags. */
+				goto error;
+			break;
 		default:
 			/* unknown header */
 			mp_next(pos);
 		}
 	}
 	assert(*pos <= end);
+	if (!txn_is_set) {
+		/*
+		 * Transaction id is not set so it is a single statement
+		 * transaction.
+		 */
+		header->txn_commit = true;
+	}
+	header->txn_id = header->lsn + header->txn_id;
+
 	/* Nop requests aren't supposed to have a body. */
 	if (*pos < end && header->type != IPROTO_NOP) {
 		const char *body = *pos;
@@ -223,6 +245,20 @@ xrow_header_encode(const struct xrow_header *header, uint64_t sync,
 		d = mp_encode_double(d, header->tm);
 		map_size++;
 	}
+	if (header->txn_id != 0) {
+		if (header->txn_id != header->lsn || header->txn_commit == 0) {
+			/* Encode txn id for multi row transaction members. */
+			d = mp_encode_uint(d, IPROTO_TXN_ID);
+			d = mp_encode_uint(d, header->lsn - header->txn_id);
+			map_size++;
+		}
+		if (header->txn_commit && header->txn_id != header->lsn) {
+			/* Setup last row for multi row transaction. */
+			d = mp_encode_uint(d, IPROTO_TXN_FLAGS);
+			d = mp_encode_uint(d, TXN_FLAG_COMMIT);
+			map_size++;
+		}
+	}
 	assert(d <= data + XROW_HEADER_LEN_MAX);
 	mp_encode_map(data, map_size);
 	out->iov_len = d - (char *) out->iov_base;
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 719add4f0..bc4c4a2d7 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -47,7 +47,7 @@ enum {
 	XROW_HEADER_IOVMAX = 1,
 	XROW_BODY_IOVMAX = 2,
 	XROW_IOVMAX = XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX,
-	XROW_HEADER_LEN_MAX = 40,
+	XROW_HEADER_LEN_MAX = 52,
 	XROW_BODY_LEN_MAX = 128,
 	IPROTO_HEADER_LEN = 28,
 	/** 7 = sizeof(iproto_body_bin). */
@@ -63,6 +63,8 @@ struct xrow_header {
 	uint64_t sync;
 	int64_t lsn; /* LSN must be signed for correct comparison */
 	double tm;
+	int64_t txn_id;
+	bool txn_commit;
 
 	int bodycnt;
 	uint32_t schema_version;
diff --git a/test/unit/xrow.cc b/test/unit/xrow.cc
index 022d1f998..bc99285de 100644
--- a/test/unit/xrow.cc
+++ b/test/unit/xrow.cc
@@ -215,6 +215,8 @@ test_xrow_header_encode_decode()
 	header.lsn = 400;
 	header.tm = 123.456;
 	header.bodycnt = 0;
+	header.txn_id = header.lsn;
+	header.txn_commit = true;
 	uint64_t sync = 100500;
 	struct iovec vec[1];
 	is(1, xrow_header_encode(&header, sync, vec, 200), "encode");
-- 
2.20.1





More information about the Tarantool-patches mailing list