Tarantool development patches archive
 help / color / mirror / Atom feed
* [Tarantool-patches] [PATCH 0/8] wait for lsn and confirm
@ 2020-06-09 12:20 Serge Petrenko
  2020-06-09 12:20 ` [Tarantool-patches] [PATCH 1/8] replication: introduce space.is_sync option Serge Petrenko
                   ` (11 more replies)
  0 siblings, 12 replies; 56+ messages in thread
From: Serge Petrenko @ 2020-06-09 12:20 UTC (permalink / raw)
  To: v.shpilevoy, sergos, gorcunov; +Cc: tarantool-patches

Issues:
https://github.com/tarantool/tarantool/issues/4844
https://github.com/tarantool/tarantool/issues/4845
https://github.com/tarantool/tarantool/issues/4847

Branch:
https://github.com/tarantool/tarantool/tree/sp/gh-4847-wal-confirm-msg

This patchset contains Vlad's patches introducing txn_limbo, is_sync space flag,
synchronous transactions waiting for replica lsn and my patches regarding
writing CONFIRM entry to WAL and waiting for it to appear on replicas.

Local recovery with respect to CONFIRM is not yet implemented, just like
ROLLBACK message and its processing.

Serge Petrenko (4):
  txn_limbo: follow-up fixes
  txn_limbo: fix instance id assignment
  xrow: introduce CONFIRM entry
  replication: write and read CONFIRM entries

Vladislav Shpilevoy (4):
  replication: introduce space.is_sync option
  replication: introduce replication_sync_quorum cfg
  txn: add TXN_WAIT_ACK flag
  replication: make sync transactions wait quorum

 src/box/CMakeLists.txt                        |   1 +
 src/box/applier.cc                            |  81 +++++-
 src/box/box.cc                                |  32 +++
 src/box/box.h                                 |   1 +
 src/box/errcode.h                             |   2 +
 src/box/iproto_constants.h                    |   3 +
 src/box/lua/cfg.cc                            |   9 +
 src/box/lua/load_cfg.lua                      |   5 +
 src/box/lua/schema.lua                        |   2 +
 src/box/lua/space.cc                          |   5 +
 src/box/relay.cc                              |  14 +-
 src/box/replication.cc                        |   1 +
 src/box/replication.h                         |   6 +
 src/box/space_def.c                           |   2 +
 src/box/space_def.h                           |   6 +
 src/box/txn.c                                 | 111 ++++++++-
 src/box/txn.h                                 |  30 +++
 src/box/txn_limbo.c                           | 232 ++++++++++++++++++
 src/box/txn_limbo.h                           | 174 +++++++++++++
 src/box/xrow.c                                |  74 ++++++
 src/box/xrow.h                                |  23 ++
 test/app-tap/init_script.result               |   1 +
 test/box/admin.result                         |   2 +
 test/box/cfg.result                           |   4 +
 test/box/error.result                         |   2 +
 .../sync_replication_sanity.result            |  39 +++
 .../sync_replication_sanity.test.lua          |  16 ++
 27 files changed, 869 insertions(+), 9 deletions(-)
 create mode 100644 src/box/txn_limbo.c
 create mode 100644 src/box/txn_limbo.h
 create mode 100644 test/replication/sync_replication_sanity.result
 create mode 100644 test/replication/sync_replication_sanity.test.lua

-- 
2.24.3 (Apple Git-128)

^ permalink raw reply	[flat|nested] 56+ messages in thread

* [Tarantool-patches] [PATCH 1/8] replication: introduce space.is_sync option
  2020-06-09 12:20 [Tarantool-patches] [PATCH 0/8] wait for lsn and confirm Serge Petrenko
@ 2020-06-09 12:20 ` Serge Petrenko
  2020-06-10 23:51   ` Vladislav Shpilevoy
  2020-06-09 12:20 ` [Tarantool-patches] [PATCH 2/8] replication: introduce replication_sync_quorum cfg Serge Petrenko
                   ` (10 subsequent siblings)
  11 siblings, 1 reply; 56+ messages in thread
From: Serge Petrenko @ 2020-06-09 12:20 UTC (permalink / raw)
  To: v.shpilevoy, sergos, gorcunov; +Cc: tarantool-patches

From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>

Synchronous space makes every transaction, affecting its data,
wait until it is replicated on a quorum of replicas before it is
committed.

Part of #4844
---
 src/box/lua/schema.lua                        |  2 +
 src/box/lua/space.cc                          |  5 +++
 src/box/space_def.c                           |  2 +
 src/box/space_def.h                           |  6 +++
 .../sync_replication_sanity.result            | 39 +++++++++++++++++++
 .../sync_replication_sanity.test.lua          | 16 ++++++++
 6 files changed, 70 insertions(+)
 create mode 100644 test/replication/sync_replication_sanity.result
 create mode 100644 test/replication/sync_replication_sanity.test.lua

diff --git a/src/box/lua/schema.lua b/src/box/lua/schema.lua
index e6844b45f..a91b4fbad 100644
--- a/src/box/lua/schema.lua
+++ b/src/box/lua/schema.lua
@@ -413,6 +413,7 @@ box.schema.space.create = function(name, options)
         format = 'table',
         is_local = 'boolean',
         temporary = 'boolean',
+        is_sync = 'boolean',
     }
     local options_defaults = {
         engine = 'memtx',
@@ -457,6 +458,7 @@ box.schema.space.create = function(name, options)
     local space_options = setmap({
         group_id = options.is_local and 1 or nil,
         temporary = options.temporary and true or nil,
+        is_sync = options.is_sync
     })
     _space:insert{id, uid, name, options.engine, options.field_count,
         space_options, format}
diff --git a/src/box/lua/space.cc b/src/box/lua/space.cc
index d0e44dd41..177c58830 100644
--- a/src/box/lua/space.cc
+++ b/src/box/lua/space.cc
@@ -253,6 +253,11 @@ lbox_fillspace(struct lua_State *L, struct space *space, int i)
 	lua_pushstring(L, space->def->engine_name);
 	lua_settable(L, i);
 
+	/* space.is_sync */
+	lua_pushstring(L, "is_sync");
+	lua_pushboolean(L, space->def->opts.is_sync);
+	lua_settable(L, i);
+
 	lua_pushstring(L, "enabled");
 	lua_pushboolean(L, space_index(space, 0) != 0);
 	lua_settable(L, i);
diff --git a/src/box/space_def.c b/src/box/space_def.c
index efb1c8ee9..83566bf02 100644
--- a/src/box/space_def.c
+++ b/src/box/space_def.c
@@ -41,6 +41,7 @@ const struct space_opts space_opts_default = {
 	/* .is_temporary = */ false,
 	/* .is_ephemeral = */ false,
 	/* .view = */ false,
+	/* .is_sync = */ false,
 	/* .sql        = */ NULL,
 };
 
@@ -48,6 +49,7 @@ const struct opt_def space_opts_reg[] = {
 	OPT_DEF("group_id", OPT_UINT32, struct space_opts, group_id),
 	OPT_DEF("temporary", OPT_BOOL, struct space_opts, is_temporary),
 	OPT_DEF("view", OPT_BOOL, struct space_opts, is_view),
+	OPT_DEF("is_sync", OPT_BOOL, struct space_opts, is_sync),
 	OPT_DEF("sql", OPT_STRPTR, struct space_opts, sql),
 	OPT_DEF_LEGACY("checks"),
 	OPT_END,
diff --git a/src/box/space_def.h b/src/box/space_def.h
index 788b601e6..198242d02 100644
--- a/src/box/space_def.h
+++ b/src/box/space_def.h
@@ -67,6 +67,12 @@ struct space_opts {
 	 * this flag can't be changed after space creation.
 	 */
 	bool is_view;
+	/**
+	 * Synchronous space makes all transactions, affecting its
+	 * data, synchronous. That means they are not applied
+	 * until replicated to a quorum of replicas.
+	 */
+	bool is_sync;
 	/** SQL statement that produced this space. */
 	char *sql;
 };
diff --git a/test/replication/sync_replication_sanity.result b/test/replication/sync_replication_sanity.result
new file mode 100644
index 000000000..813c7b27b
--- /dev/null
+++ b/test/replication/sync_replication_sanity.result
@@ -0,0 +1,39 @@
+-- test-run result file version 2
+--
+-- gh-4282: synchronous replication. It allows to make certain
+-- spaces commit only when their changes are replicated to a
+-- quorum of replicas.
+--
+s = box.schema.create_space('test', {is_sync = true})
+ | ---
+ | ...
+s.is_sync
+ | ---
+ | - true
+ | ...
+pk = s:create_index('pk')
+ | ---
+ | ...
+box.begin() s:insert({1}) s:insert({2}) box.commit()
+ | ---
+ | ...
+s:select{}
+ | ---
+ | - - [1]
+ |   - [2]
+ | ...
+s:drop()
+ | ---
+ | ...
+
+-- Default is async.
+s = box.schema.create_space('test')
+ | ---
+ | ...
+s.is_sync
+ | ---
+ | - false
+ | ...
+s:drop()
+ | ---
+ | ...
diff --git a/test/replication/sync_replication_sanity.test.lua b/test/replication/sync_replication_sanity.test.lua
new file mode 100644
index 000000000..34dcaee61
--- /dev/null
+++ b/test/replication/sync_replication_sanity.test.lua
@@ -0,0 +1,16 @@
+--
+-- gh-4282: synchronous replication. It allows to make certain
+-- spaces commit only when their changes are replicated to a
+-- quorum of replicas.
+--
+s = box.schema.create_space('test', {is_sync = true})
+s.is_sync
+pk = s:create_index('pk')
+box.begin() s:insert({1}) s:insert({2}) box.commit()
+s:select{}
+s:drop()
+
+-- Default is async.
+s = box.schema.create_space('test')
+s.is_sync
+s:drop()
-- 
2.24.3 (Apple Git-128)

^ permalink raw reply	[flat|nested] 56+ messages in thread

* [Tarantool-patches] [PATCH 2/8] replication: introduce replication_sync_quorum cfg
  2020-06-09 12:20 [Tarantool-patches] [PATCH 0/8] wait for lsn and confirm Serge Petrenko
  2020-06-09 12:20 ` [Tarantool-patches] [PATCH 1/8] replication: introduce space.is_sync option Serge Petrenko
@ 2020-06-09 12:20 ` Serge Petrenko
  2020-06-10 23:51   ` Vladislav Shpilevoy
  2020-06-15 23:05   ` Vladislav Shpilevoy
  2020-06-09 12:20 ` [Tarantool-patches] [PATCH 3/8] txn: add TXN_WAIT_ACK flag Serge Petrenko
                   ` (9 subsequent siblings)
  11 siblings, 2 replies; 56+ messages in thread
From: Serge Petrenko @ 2020-06-09 12:20 UTC (permalink / raw)
  To: v.shpilevoy, sergos, gorcunov; +Cc: tarantool-patches

From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>

Synchronous transactions are supposed to be replicated on a
specified number of replicas before committed on master. The
number of replicas can be specified using replication_sync_quorum
option. It is 1 by default, so sync transactions work like
asynchronous when not configured anyhow. 1 means successful WAL
write on master is enough for commit.

Part of #4844
---
 src/box/box.cc                  | 27 +++++++++++++++++++++++++++
 src/box/box.h                   |  1 +
 src/box/lua/cfg.cc              |  9 +++++++++
 src/box/lua/load_cfg.lua        |  5 +++++
 src/box/replication.cc          |  1 +
 src/box/replication.h           |  6 ++++++
 test/app-tap/init_script.result |  1 +
 test/box/admin.result           |  2 ++
 test/box/cfg.result             |  4 ++++
 9 files changed, 56 insertions(+)

diff --git a/src/box/box.cc b/src/box/box.cc
index 96557651b..9b67aeb1f 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -476,6 +476,19 @@ box_check_replication_sync_lag(void)
 	return lag;
 }
 
+static int
+box_check_replication_sync_quorum(void)
+{
+	int quorum = cfg_geti("replication_sync_quorum");
+	if (quorum <= 0 || quorum > VCLOCK_MAX) {
+		diag_set(ClientError, ER_CFG, "replication_sync_quorum",
+			 "the value must be greater than and less than "
+			 "maximal number of replicas");
+		return -1;
+	}
+	return quorum;
+}
+
 static double
 box_check_replication_sync_timeout(void)
 {
@@ -658,6 +671,8 @@ box_check_config()
 	box_check_replication_connect_timeout();
 	box_check_replication_connect_quorum();
 	box_check_replication_sync_lag();
+	if (box_check_replication_sync_quorum() < 0)
+		diag_raise();
 	box_check_replication_sync_timeout();
 	box_check_readahead(cfg_geti("readahead"));
 	box_check_checkpoint_count(cfg_geti("checkpoint_count"));
@@ -777,6 +792,16 @@ box_set_replication_sync_lag(void)
 	replication_sync_lag = box_check_replication_sync_lag();
 }
 
+int
+box_set_replication_sync_quorum(void)
+{
+	int value = box_check_replication_sync_quorum();
+	if (value < 0)
+		return -1;
+	replication_sync_quorum = value;
+	return 0;
+}
+
 void
 box_set_replication_sync_timeout(void)
 {
@@ -2417,6 +2442,8 @@ box_cfg_xc(void)
 	box_set_replication_connect_timeout();
 	box_set_replication_connect_quorum();
 	box_set_replication_sync_lag();
+	if (box_set_replication_sync_quorum() != 0)
+		diag_raise();
 	box_set_replication_sync_timeout();
 	box_set_replication_skip_conflict();
 	box_set_replication_anon();
diff --git a/src/box/box.h b/src/box/box.h
index 557542a83..e81ff9ea4 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -243,6 +243,7 @@ void box_set_replication_timeout(void);
 void box_set_replication_connect_timeout(void);
 void box_set_replication_connect_quorum(void);
 void box_set_replication_sync_lag(void);
+int box_set_replication_sync_quorum(void);
 void box_set_replication_sync_timeout(void);
 void box_set_replication_skip_conflict(void);
 void box_set_replication_anon(void);
diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc
index a5b15e527..67dc89242 100644
--- a/src/box/lua/cfg.cc
+++ b/src/box/lua/cfg.cc
@@ -313,6 +313,14 @@ lbox_cfg_set_replication_sync_lag(struct lua_State *L)
 	return 0;
 }
 
+static int
+lbox_cfg_set_replication_sync_quorum(struct lua_State *L)
+{
+	if (box_set_replication_sync_quorum() != 0)
+		luaT_error(L);
+	return 0;
+}
+
 static int
 lbox_cfg_set_replication_sync_timeout(struct lua_State *L)
 {
@@ -370,6 +378,7 @@ box_lua_cfg_init(struct lua_State *L)
 		{"cfg_set_replication_connect_quorum", lbox_cfg_set_replication_connect_quorum},
 		{"cfg_set_replication_connect_timeout", lbox_cfg_set_replication_connect_timeout},
 		{"cfg_set_replication_sync_lag", lbox_cfg_set_replication_sync_lag},
+		{"cfg_set_replication_sync_quorum", lbox_cfg_set_replication_sync_quorum},
 		{"cfg_set_replication_sync_timeout", lbox_cfg_set_replication_sync_timeout},
 		{"cfg_set_replication_skip_conflict", lbox_cfg_set_replication_skip_conflict},
 		{"cfg_set_replication_anon", lbox_cfg_set_replication_anon},
diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
index 7dc40a47f..182c1dfdd 100644
--- a/src/box/lua/load_cfg.lua
+++ b/src/box/lua/load_cfg.lua
@@ -88,6 +88,7 @@ local default_cfg = {
     worker_pool_threads = 4,
     replication_timeout = 1,
     replication_sync_lag = 10,
+    replication_sync_quorum = 1,
     replication_sync_timeout = 300,
     replication_connect_timeout = 30,
     replication_connect_quorum = nil, -- connect all
@@ -163,6 +164,7 @@ local template_cfg = {
     worker_pool_threads = 'number',
     replication_timeout = 'number',
     replication_sync_lag = 'number',
+    replication_sync_quorum = 'number',
     replication_sync_timeout = 'number',
     replication_connect_timeout = 'number',
     replication_connect_quorum = 'number',
@@ -274,6 +276,7 @@ local dynamic_cfg = {
     replication_connect_timeout = private.cfg_set_replication_connect_timeout,
     replication_connect_quorum = private.cfg_set_replication_connect_quorum,
     replication_sync_lag    = private.cfg_set_replication_sync_lag,
+    replication_sync_quorum = private.cfg_set_replication_sync_quorum,
     replication_sync_timeout = private.cfg_set_replication_sync_timeout,
     replication_skip_conflict = private.cfg_set_replication_skip_conflict,
     replication_anon        = private.cfg_set_replication_anon,
@@ -307,6 +310,7 @@ local dynamic_cfg_order = {
     -- rule - apply before replication itself.
     replication_timeout     = 150,
     replication_sync_lag    = 150,
+    replication_sync_quorum = 150,
     replication_sync_timeout    = 150,
     replication_connect_timeout = 150,
     replication_connect_quorum  = 150,
@@ -342,6 +346,7 @@ local dynamic_cfg_skip_at_load = {
     replication_connect_timeout = true,
     replication_connect_quorum = true,
     replication_sync_lag    = true,
+    replication_sync_quorum = true,
     replication_sync_timeout = true,
     replication_skip_conflict = true,
     replication_anon        = true,
diff --git a/src/box/replication.cc b/src/box/replication.cc
index 273a7cb66..f2c18fa21 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -51,6 +51,7 @@ double replication_timeout = 1.0; /* seconds */
 double replication_connect_timeout = 30.0; /* seconds */
 int replication_connect_quorum = REPLICATION_CONNECT_QUORUM_ALL;
 double replication_sync_lag = 10.0; /* seconds */
+int replication_sync_quorum = 1;
 double replication_sync_timeout = 300.0; /* seconds */
 bool replication_skip_conflict = false;
 bool replication_anon = false;
diff --git a/src/box/replication.h b/src/box/replication.h
index 93a25c8a7..3624019fb 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -125,6 +125,12 @@ extern int replication_connect_quorum;
  */
 extern double replication_sync_lag;
 
+/**
+ * Minimal number of replicas to confirm a synchronous transaction
+ * is applied on to be able to finish its commit.
+ */
+extern int replication_sync_quorum;
+
 /**
  * Max time to wait for appliers to synchronize before entering
  * the orphan mode.
diff --git a/test/app-tap/init_script.result b/test/app-tap/init_script.result
index 7c4454285..7d2a610fa 100644
--- a/test/app-tap/init_script.result
+++ b/test/app-tap/init_script.result
@@ -29,6 +29,7 @@ replication_anon:false
 replication_connect_timeout:30
 replication_skip_conflict:false
 replication_sync_lag:10
+replication_sync_quorum:1
 replication_sync_timeout:300
 replication_timeout:1
 slab_alloc_factor:1.05
diff --git a/test/box/admin.result b/test/box/admin.result
index d94da8c5d..c4f849f2d 100644
--- a/test/box/admin.result
+++ b/test/box/admin.result
@@ -79,6 +79,8 @@ cfg_filter(box.cfg)
     - false
   - - replication_sync_lag
     - 10
+  - - replication_sync_quorum
+    - 1
   - - replication_sync_timeout
     - 300
   - - replication_timeout
diff --git a/test/box/cfg.result b/test/box/cfg.result
index b41d54599..c511458a8 100644
--- a/test/box/cfg.result
+++ b/test/box/cfg.result
@@ -67,6 +67,8 @@ cfg_filter(box.cfg)
  |     - false
  |   - - replication_sync_lag
  |     - 10
+ |   - - replication_sync_quorum
+ |     - 1
  |   - - replication_sync_timeout
  |     - 300
  |   - - replication_timeout
@@ -170,6 +172,8 @@ cfg_filter(box.cfg)
  |     - false
  |   - - replication_sync_lag
  |     - 10
+ |   - - replication_sync_quorum
+ |     - 1
  |   - - replication_sync_timeout
  |     - 300
  |   - - replication_timeout
-- 
2.24.3 (Apple Git-128)

^ permalink raw reply	[flat|nested] 56+ messages in thread

* [Tarantool-patches] [PATCH 3/8] txn: add TXN_WAIT_ACK flag
  2020-06-09 12:20 [Tarantool-patches] [PATCH 0/8] wait for lsn and confirm Serge Petrenko
  2020-06-09 12:20 ` [Tarantool-patches] [PATCH 1/8] replication: introduce space.is_sync option Serge Petrenko
  2020-06-09 12:20 ` [Tarantool-patches] [PATCH 2/8] replication: introduce replication_sync_quorum cfg Serge Petrenko
@ 2020-06-09 12:20 ` Serge Petrenko
  2020-06-18 23:12   ` Leonid Vasiliev
  2020-06-09 12:20 ` [Tarantool-patches] [PATCH 4/8] replication: make sync transactions wait quorum Serge Petrenko
                   ` (8 subsequent siblings)
  11 siblings, 1 reply; 56+ messages in thread
From: Serge Petrenko @ 2020-06-09 12:20 UTC (permalink / raw)
  To: v.shpilevoy, sergos, gorcunov; +Cc: tarantool-patches

From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>

When a transaction touches a synchronous space, its commit
procedure changes. The transaction enters state of 'waiting for
acks' from replicas and from own master's WAL.

To denote the state there is a new transaction flag -
TXN_WAIT_ACK.

Part of #4844
---
 src/box/txn.c | 17 +++++++++++++++--
 src/box/txn.h |  7 +++++++
 2 files changed, 22 insertions(+), 2 deletions(-)

diff --git a/src/box/txn.c b/src/box/txn.c
index 123520166..60870f536 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -495,24 +495,37 @@ txn_journal_entry_new(struct txn *txn)
 
 	struct xrow_header **remote_row = req->rows;
 	struct xrow_header **local_row = req->rows + txn->n_applier_rows;
+	bool is_sync = false;
+	/*
+	 * Only local transactions, originated from the master,
+	 * can enter 'waiting for acks' state. It means, only
+	 * author of the transaction can collect acks. Replicas
+	 * consider it a normal async transaction so far.
+	 */
+	bool is_local = true;
 
 	stailq_foreach_entry(stmt, &txn->stmts, next) {
 		if (stmt->has_triggers) {
 			txn_init_triggers(txn);
 			rlist_splice(&txn->on_commit, &stmt->on_commit);
 		}
+		is_sync = is_sync || stmt->space->def->opts.is_sync;
 
 		/* A read (e.g. select) request */
 		if (stmt->row == NULL)
 			continue;
 
-		if (stmt->row->replica_id == 0)
+		if (stmt->row->replica_id == 0) {
 			*local_row++ = stmt->row;
-		else
+		} else {
 			*remote_row++ = stmt->row;
+			is_local = false;
+		}
 
 		req->approx_len += xrow_approx_len(stmt->row);
 	}
+	if (is_sync && is_local)
+		txn_set_flag(txn, TXN_WAIT_ACK);
 
 	assert(remote_row == req->rows + txn->n_applier_rows);
 	assert(local_row == remote_row + txn->n_new_rows);
diff --git a/src/box/txn.h b/src/box/txn.h
index 3f6d79d5c..232cc07a8 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -66,6 +66,13 @@ enum txn_flag {
 	TXN_CAN_YIELD,
 	/** on_commit and/or on_rollback list is not empty. */
 	TXN_HAS_TRIGGERS,
+	/**
+	 * Transaction, touched sync spaces, enters 'waiting for
+	 * acks' state before commit. In this state it waits until
+	 * it is replicated onto a quorum of replicas, and only
+	 * then finishes commit and returns success to a user.
+	 */
+	TXN_WAIT_ACK,
 };
 
 enum {
-- 
2.24.3 (Apple Git-128)

^ permalink raw reply	[flat|nested] 56+ messages in thread

* [Tarantool-patches] [PATCH 4/8] replication: make sync transactions wait quorum
  2020-06-09 12:20 [Tarantool-patches] [PATCH 0/8] wait for lsn and confirm Serge Petrenko
                   ` (2 preceding siblings ...)
  2020-06-09 12:20 ` [Tarantool-patches] [PATCH 3/8] txn: add TXN_WAIT_ACK flag Serge Petrenko
@ 2020-06-09 12:20 ` Serge Petrenko
  2020-06-10 23:51   ` Vladislav Shpilevoy
                     ` (3 more replies)
  2020-06-09 12:20 ` [Tarantool-patches] [PATCH 5/8] txn_limbo: follow-up fixes Serge Petrenko
                   ` (7 subsequent siblings)
  11 siblings, 4 replies; 56+ messages in thread
From: Serge Petrenko @ 2020-06-09 12:20 UTC (permalink / raw)
  To: v.shpilevoy, sergos, gorcunov; +Cc: tarantool-patches

From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>

Synchronous transaction (which changes anything in a synchronous
space) before commit waits until it is replicated onto a quorum
of replicas.

So far all the 'synchronousness' is basically the same as the well
known 'wait_lsn' technique. With the exception, that the
transaction really is not committed until replicated.

Problem of wait_lsn is still present though, in case master
restarts. Because there is no a 'confirm' record in WAL telling
which transactions are replicated and can be applied.

Closes #4844
Closes #4845
---
 src/box/CMakeLists.txt |   1 +
 src/box/box.cc         |   2 +
 src/box/errcode.h      |   1 +
 src/box/relay.cc       |   7 ++
 src/box/txn.c          |  40 +++++++++-
 src/box/txn_limbo.c    | 167 ++++++++++++++++++++++++++++++++++++++++
 src/box/txn_limbo.h    | 168 +++++++++++++++++++++++++++++++++++++++++
 test/box/error.result  |   1 +
 8 files changed, 385 insertions(+), 2 deletions(-)
 create mode 100644 src/box/txn_limbo.c
 create mode 100644 src/box/txn_limbo.h

diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt
index 230e7427d..c0833e50a 100644
--- a/src/box/CMakeLists.txt
+++ b/src/box/CMakeLists.txt
@@ -124,6 +124,7 @@ add_library(box STATIC
     session.cc
     port.c
     txn.c
+    txn_limbo.c
     box.cc
     gc.c
     checkpoint_schedule.c
diff --git a/src/box/box.cc b/src/box/box.cc
index 9b67aeb1f..64ac89975 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -59,6 +59,7 @@
 #include "index.h"
 #include "port.h"
 #include "txn.h"
+#include "txn_limbo.h"
 #include "user.h"
 #include "cfg.h"
 #include "coio.h"
@@ -2389,6 +2390,7 @@ box_init(void)
 	if (tuple_init(lua_hash) != 0)
 		diag_raise();
 
+	txn_limbo_init();
 	sequence_init();
 }
 
diff --git a/src/box/errcode.h b/src/box/errcode.h
index d1e4d02a9..019c582af 100644
--- a/src/box/errcode.h
+++ b/src/box/errcode.h
@@ -266,6 +266,7 @@ struct errcode_record {
 	/*211 */_(ER_WRONG_QUERY_ID,		"Prepared statement with id %u does not exist") \
 	/*212 */_(ER_SEQUENCE_NOT_STARTED,		"Sequence '%s' is not started") \
 	/*213 */_(ER_NO_SUCH_SESSION_SETTING,	"Session setting %s doesn't exist") \
+	/*214 */_(ER_UNCOMMITTED_FOREIGN_SYNC_TXNS, "Found uncommitted sync transactions from other instance with id %u") \
 
 /*
  * !IMPORTANT! Please follow instructions at start of the file
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 2ad02cb8a..333e91ea9 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -53,6 +53,7 @@
 #include "xrow_io.h"
 #include "xstream.h"
 #include "wal.h"
+#include "txn_limbo.h"
 
 /**
  * Cbus message to send status updates from relay to tx thread.
@@ -399,6 +400,12 @@ tx_status_update(struct cmsg *msg)
 {
 	struct relay_status_msg *status = (struct relay_status_msg *)msg;
 	vclock_copy(&status->relay->tx.vclock, &status->vclock);
+	/*
+	 * Let pending synchronous transactions know, which of
+	 * them were successfully sent to the replica.
+	 */
+	txn_limbo_ack(&txn_limbo, status->relay->replica->id,
+		     vclock_get(&status->vclock, instance_id));
 	static const struct cmsg_hop route[] = {
 		{relay_status_update, NULL}
 	};
diff --git a/src/box/txn.c b/src/box/txn.c
index 60870f536..1d6518e29 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -29,6 +29,7 @@
  * SUCH DAMAGE.
  */
 #include "txn.h"
+#include "txn_limbo.h"
 #include "engine.h"
 #include "tuple.h"
 #include "journal.h"
@@ -433,7 +434,7 @@ txn_complete(struct txn *txn)
 			engine_rollback(txn->engine, txn);
 		if (txn_has_flag(txn, TXN_HAS_TRIGGERS))
 			txn_run_rollback_triggers(txn, &txn->on_rollback);
-	} else {
+	} else if (!txn_has_flag(txn, TXN_WAIT_ACK)) {
 		/* Commit the transaction. */
 		if (txn->engine != NULL)
 			engine_commit(txn->engine, txn);
@@ -448,6 +449,19 @@ txn_complete(struct txn *txn)
 					     txn->signature - n_rows + 1,
 					     stop_tm - txn->start_tm);
 		}
+	} else {
+		/*
+		 * Complete is called on every WAL operation
+		 * authored by this transaction. And it not always
+		 * is one. And not always is enough for commit.
+		 * In case the transaction is waiting for acks, it
+		 * can't be committed right away. Give control
+		 * back to the fiber, owning the transaction so as
+		 * it could decide what to do next.
+		 */
+		if (txn->fiber != fiber())
+			fiber_wakeup(txn->fiber);
+		return;
 	}
 	/*
 	 * If there is no fiber waiting for the transaction then
@@ -634,6 +648,7 @@ int
 txn_commit(struct txn *txn)
 {
 	struct journal_entry *req;
+	struct txn_limbo_entry *limbo_entry;
 
 	txn->fiber = fiber();
 
@@ -655,8 +670,25 @@ txn_commit(struct txn *txn)
 		return -1;
 	}
 
+	bool is_sync = txn_has_flag(txn, TXN_WAIT_ACK);
+	if (is_sync) {
+		/*
+		 * Append now. Before even WAL write is done.
+		 * After WAL write nothing should fail, even OOM
+		 * wouldn't be acceptable.
+		 */
+		limbo_entry = txn_limbo_append(&txn_limbo, txn);
+		if (limbo_entry == NULL) {
+			txn_rollback(txn);
+			txn_free(txn);
+			return -1;
+		}
+	}
+
 	fiber_set_txn(fiber(), NULL);
 	if (journal_write(req) != 0) {
+		if (is_sync)
+			txn_limbo_abort(&txn_limbo, limbo_entry);
 		fiber_set_txn(fiber(), txn);
 		txn_rollback(txn);
 		txn_free(txn);
@@ -665,7 +697,11 @@ txn_commit(struct txn *txn)
 		diag_log();
 		return -1;
 	}
-
+	if (is_sync) {
+		txn_limbo_assign_lsn(&txn_limbo, limbo_entry,
+				     req->rows[req->n_rows - 1]->lsn);
+		txn_limbo_wait_complete(&txn_limbo, limbo_entry);
+	}
 	if (!txn_has_flag(txn, TXN_IS_DONE)) {
 		txn->signature = req->res;
 		txn_complete(txn);
diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
new file mode 100644
index 000000000..d28b2a28b
--- /dev/null
+++ b/src/box/txn_limbo.c
@@ -0,0 +1,167 @@
+/*
+ * Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include "txn.h"
+#include "txn_limbo.h"
+#include "replication.h"
+
+struct txn_limbo txn_limbo;
+
+static inline void
+txn_limbo_create(struct txn_limbo *limbo)
+{
+	rlist_create(&limbo->queue);
+	limbo->instance_id = REPLICA_ID_NIL;
+	vclock_create(&limbo->vclock);
+}
+
+struct txn_limbo_entry *
+txn_limbo_append(struct txn_limbo *limbo, struct txn *txn)
+{
+	assert(txn_has_flag(txn, TXN_WAIT_ACK));
+	if (limbo->instance_id != instance_id) {
+		if (limbo->instance_id == REPLICA_ID_NIL ||
+		    rlist_empty(&limbo->queue)) {
+			limbo->instance_id = instance_id;
+		} else {
+			diag_set(ClientError, ER_UNCOMMITTED_FOREIGN_SYNC_TXNS,
+				 limbo->instance_id);
+			return NULL;
+		}
+	}
+	struct txn_limbo_entry *e = (struct txn_limbo_entry *)
+		region_alloc(&txn->region, sizeof(*e));
+	if (e == NULL) {
+		diag_set(OutOfMemory, sizeof(*e), "region_alloc", "e");
+		return NULL;
+	}
+	e->txn = txn;
+	e->lsn = -1;
+	e->ack_count = 0;
+	e->is_commit = false;
+	e->is_rollback = false;
+	rlist_add_tail_entry(&limbo->queue, e, in_queue);
+	return e;
+}
+
+static inline void
+txn_limbo_remove(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
+{
+	assert(!rlist_empty(&entry->in_queue));
+	assert(rlist_first_entry(&limbo->queue, struct txn_limbo_entry,
+				 in_queue) == entry);
+	rlist_del_entry(entry, in_queue);
+}
+
+void
+txn_limbo_abort(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
+{
+	entry->is_rollback = true;
+	txn_limbo_remove(limbo, entry);
+}
+
+void
+txn_limbo_assign_lsn(struct txn_limbo *limbo, struct txn_limbo_entry *entry,
+		     int64_t lsn)
+{
+	assert(limbo->instance_id != REPLICA_ID_NIL);
+	entry->lsn = lsn;
+	++entry->ack_count;
+	vclock_follow(&limbo->vclock, limbo->instance_id, lsn);
+}
+
+static bool
+txn_limbo_check_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
+{
+	struct vclock_iterator iter;
+	vclock_iterator_init(&iter, &limbo->vclock);
+	int ack_count = 0;
+	int64_t lsn = entry->lsn;
+	vclock_foreach(&iter, vc)
+		ack_count += vc.lsn >= lsn;
+	assert(ack_count >= entry->ack_count);
+	entry->ack_count = ack_count;
+	entry->is_commit = ack_count > replication_sync_quorum;
+	return entry->is_commit;
+}
+
+void
+txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
+{
+	struct txn *txn = entry->txn;
+	assert(entry->lsn > 0);
+	assert(!txn_has_flag(txn, TXN_IS_DONE));
+	assert(txn_has_flag(txn, TXN_WAIT_ACK));
+	if (txn_limbo_check_complete(limbo, entry))
+		return;
+	bool cancellable = fiber_set_cancellable(false);
+	while (!txn_limbo_entry_is_complete(entry))
+		fiber_yield();
+	fiber_set_cancellable(cancellable);
+	// TODO: implement rollback.
+	// TODO: implement confirm.
+	assert(!entry->is_rollback);
+	txn_limbo_remove(limbo, entry);
+	txn_clear_flag(txn, TXN_WAIT_ACK);
+}
+
+void
+txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn)
+{
+	if (rlist_empty(&limbo->queue))
+		return;
+	assert(limbo->instance_id != REPLICA_ID_NIL);
+	int64_t prev_lsn = vclock_get(&limbo->vclock, replica_id);
+	vclock_follow(&limbo->vclock, replica_id, lsn);
+	struct txn_limbo_entry *e;
+	rlist_foreach_entry(e, &limbo->queue, in_queue) {
+		if (e->lsn <= prev_lsn)
+			continue;
+		if (e->lsn > lsn)
+			break;
+		if (++e->ack_count >= replication_sync_quorum) {
+			// TODO: better call complete() right
+			// here. Appliers use async transactions,
+			// and their txns don't have fibers to
+			// wake up. That becomes actual, when
+			// appliers will be supposed to wait for
+			// 'confirm' message.
+			e->is_commit = true;
+			fiber_wakeup(e->txn->fiber);
+		}
+		assert(e->ack_count <= VCLOCK_MAX);
+	}
+}
+
+void
+txn_limbo_init(void)
+{
+	txn_limbo_create(&txn_limbo);
+}
diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
new file mode 100644
index 000000000..112fa8902
--- /dev/null
+++ b/src/box/txn_limbo.h
@@ -0,0 +1,168 @@
+#pragma once
+/*
+ * Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include "small/rlist.h"
+#include "vclock.h"
+
+#include <stdint.h>
+
+#if defined(__cplusplus)
+extern "C" {
+#endif /* defined(__cplusplus) */
+
+struct txn;
+
+/**
+ * Transaction and its quorum metadata, to be stored in limbo.
+ */
+struct txn_limbo_entry {
+	/** Link for limbo's queue. */
+	struct rlist in_queue;
+	/** Transaction, waiting for a quorum. */
+	struct txn *txn;
+	/**
+	 * LSN of the transaction by the originator's vclock
+	 * component. May be -1 in case the transaction is not
+	 * written to WAL yet.
+	 */
+	int64_t lsn;
+	/**
+	 * Number of ACKs. Or in other words - how many replicas
+	 * confirmed receipt of the transaction.
+	 */
+	int ack_count;
+	/**
+	 * Result flags. Only one of them can be true. But both
+	 * can be false if the transaction is still waiting for
+	 * its resolution.
+	 */
+	bool is_commit;
+	bool is_rollback;
+};
+
+static inline bool
+txn_limbo_entry_is_complete(const struct txn_limbo_entry *e)
+{
+	return e->is_commit || e->is_rollback;
+}
+
+/**
+ * Limbo is a place where transactions are stored, which are
+ * finished, but not committed nor rolled back. These are
+ * synchronous transactions in progress of collecting ACKs from
+ * replicas.
+ * Limbo's main purposes
+ *   - maintain the transactions ordered by LSN of their emitter;
+ *   - be a link between transaction and replication modules, so
+ *     as they wouldn't depend on each other directly.
+ */
+struct txn_limbo {
+	/**
+	 * Queue of limbo entries. Ordered by LSN. Some of the
+	 * entries in the end may not have an LSN yet (their local
+	 * WAL write is still in progress), but their order won't
+	 * change anyway. Because WAL write completions will give
+	 * them LSNs in the same order.
+	 */
+	struct rlist queue;
+	/**
+	 * Instance ID of the owner of all the transactions in the
+	 * queue. Strictly speaking, nothing prevents to store not
+	 * own transactions here, originated from some other
+	 * instance. But still the queue may contain only
+	 * transactions of the same instance. Otherwise LSN order
+	 * won't make sense - different nodes have own independent
+	 * LSNs in their vclock components.
+	 */
+	uint32_t instance_id;
+	/**
+	 * All components of the vclock are versions of the limbo
+	 * owner's LSN, how it is visible on other nodes. For
+	 * example, assume instance ID of the limbo is 1. Then
+	 * vclock[1] here is local LSN of the instance 1.
+	 * vclock[2] is how replica with ID 2 sees LSN of
+	 * instance 1.
+	 * vclock[3] is how replica with ID 3 sees LSN of
+	 * instance 1, and so on.
+	 * In that way by looking at this vclock it is always can
+	 * be said up to which LSN there is a sync quorum for
+	 * transactions, created on the limbo's owner node.
+	 */
+	struct vclock vclock;
+};
+
+/**
+ * Global limbo entry. So far an instance can have only one limbo,
+ * where master's transactions are stored. Eventually there may
+ * appear more than one limbo for master-master support.
+ */
+extern struct txn_limbo txn_limbo;
+
+/**
+ * Allocate, create, and append a new transaction to the limbo.
+ * The limbo entry is allocated on the transaction's region.
+ */
+struct txn_limbo_entry *
+txn_limbo_append(struct txn_limbo *limbo, struct txn *txn);
+
+/** Remove the entry from the limbo, mark as rolled back. */
+void
+txn_limbo_abort(struct txn_limbo *limbo, struct txn_limbo_entry *entry);
+
+/**
+ * Assign local LSN to the limbo entry. That happens when the
+ * transaction is added to the limbo, writes to WAL, and gets an
+ * LSN.
+ */
+void
+txn_limbo_assign_lsn(struct txn_limbo *limbo, struct txn_limbo_entry *entry,
+		     int64_t lsn);
+
+/**
+ * Ack all transactions up to the given LSN on behalf of the
+ * replica with the specified ID.
+ */
+void
+txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn);
+
+/**
+ * Block the current fiber until the transaction in the limbo
+ * entry is either committed or rolled back.
+ */
+void
+txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry);
+
+void
+txn_limbo_init();
+
+#if defined(__cplusplus)
+}
+#endif /* defined(__cplusplus) */
diff --git a/test/box/error.result b/test/box/error.result
index 2196fa541..69c471085 100644
--- a/test/box/error.result
+++ b/test/box/error.result
@@ -432,6 +432,7 @@ t;
  |   211: box.error.WRONG_QUERY_ID
  |   212: box.error.SEQUENCE_NOT_STARTED
  |   213: box.error.NO_SUCH_SESSION_SETTING
+ |   214: box.error.UNCOMMITTED_FOREIGN_SYNC_TXNS
  | ...
 
 test_run:cmd("setopt delimiter ''");
-- 
2.24.3 (Apple Git-128)

^ permalink raw reply	[flat|nested] 56+ messages in thread

* [Tarantool-patches] [PATCH 5/8] txn_limbo: follow-up fixes
  2020-06-09 12:20 [Tarantool-patches] [PATCH 0/8] wait for lsn and confirm Serge Petrenko
                   ` (3 preceding siblings ...)
  2020-06-09 12:20 ` [Tarantool-patches] [PATCH 4/8] replication: make sync transactions wait quorum Serge Petrenko
@ 2020-06-09 12:20 ` Serge Petrenko
  2020-06-10 23:51   ` Vladislav Shpilevoy
  2020-06-09 12:20 ` [Tarantool-patches] [PATCH 6/8] txn_limbo: fix instance id assignment Serge Petrenko
                   ` (6 subsequent siblings)
  11 siblings, 1 reply; 56+ messages in thread
From: Serge Petrenko @ 2020-06-09 12:20 UTC (permalink / raw)
  To: v.shpilevoy, sergos, gorcunov; +Cc: tarantool-patches

Part-of #4847
Follow-up #4844
Follow-up #4845
---
 src/box/txn.c       | 5 +++--
 src/box/txn_limbo.c | 2 +-
 2 files changed, 4 insertions(+), 3 deletions(-)

diff --git a/src/box/txn.c b/src/box/txn.c
index 1d6518e29..f30e20944 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -459,7 +459,7 @@ txn_complete(struct txn *txn)
 		 * back to the fiber, owning the transaction so as
 		 * it could decide what to do next.
 		 */
-		if (txn->fiber != fiber())
+		if (txn->fiber != NULL && txn->fiber != fiber())
 			fiber_wakeup(txn->fiber);
 		return;
 	}
@@ -523,7 +523,8 @@ txn_journal_entry_new(struct txn *txn)
 			txn_init_triggers(txn);
 			rlist_splice(&txn->on_commit, &stmt->on_commit);
 		}
-		is_sync = is_sync || stmt->space->def->opts.is_sync;
+		is_sync = is_sync || (stmt->space != NULL &&
+				     stmt->space->def->opts.is_sync);
 
 		/* A read (e.g. select) request */
 		if (stmt->row == NULL)
diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
index d28b2a28b..47c1bd249 100644
--- a/src/box/txn_limbo.c
+++ b/src/box/txn_limbo.c
@@ -108,7 +108,7 @@ txn_limbo_check_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
 		ack_count += vc.lsn >= lsn;
 	assert(ack_count >= entry->ack_count);
 	entry->ack_count = ack_count;
-	entry->is_commit = ack_count > replication_sync_quorum;
+	entry->is_commit = ack_count >= replication_sync_quorum;
 	return entry->is_commit;
 }
 
-- 
2.24.3 (Apple Git-128)

^ permalink raw reply	[flat|nested] 56+ messages in thread

* [Tarantool-patches] [PATCH 6/8] txn_limbo: fix instance id assignment
  2020-06-09 12:20 [Tarantool-patches] [PATCH 0/8] wait for lsn and confirm Serge Petrenko
                   ` (4 preceding siblings ...)
  2020-06-09 12:20 ` [Tarantool-patches] [PATCH 5/8] txn_limbo: follow-up fixes Serge Petrenko
@ 2020-06-09 12:20 ` Serge Petrenko
  2020-06-10 23:51   ` Vladislav Shpilevoy
  2020-06-09 12:20 ` [Tarantool-patches] [PATCH 7/8] xrow: introduce CONFIRM entry Serge Petrenko
                   ` (5 subsequent siblings)
  11 siblings, 1 reply; 56+ messages in thread
From: Serge Petrenko @ 2020-06-09 12:20 UTC (permalink / raw)
  To: v.shpilevoy, sergos, gorcunov; +Cc: tarantool-patches

When a txn_limbo is running on a replica, it holds changes from a remote
master. Reflect it by assigning libmo instance id accordingly.

Follow-up #4844
Part-of #4847
---
 src/box/txn.c       | 8 +++++++-
 src/box/txn_limbo.c | 8 +++++---
 src/box/txn_limbo.h | 2 +-
 3 files changed, 13 insertions(+), 5 deletions(-)

diff --git a/src/box/txn.c b/src/box/txn.c
index f30e20944..a65100b31 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -673,12 +673,18 @@ txn_commit(struct txn *txn)
 
 	bool is_sync = txn_has_flag(txn, TXN_WAIT_ACK);
 	if (is_sync) {
+		/*
+		 * Remote rows, if any, come before local rows, so
+		 * check for originating instance id here.
+		 */
+		uint32_t origin_id = req->rows[0]->replica_id;
+
 		/*
 		 * Append now. Before even WAL write is done.
 		 * After WAL write nothing should fail, even OOM
 		 * wouldn't be acceptable.
 		 */
-		limbo_entry = txn_limbo_append(&txn_limbo, txn);
+		limbo_entry = txn_limbo_append(&txn_limbo, origin_id, txn);
 		if (limbo_entry == NULL) {
 			txn_rollback(txn);
 			txn_free(txn);
diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
index 47c1bd249..efb97a591 100644
--- a/src/box/txn_limbo.c
+++ b/src/box/txn_limbo.c
@@ -43,13 +43,15 @@ txn_limbo_create(struct txn_limbo *limbo)
 }
 
 struct txn_limbo_entry *
-txn_limbo_append(struct txn_limbo *limbo, struct txn *txn)
+txn_limbo_append(struct txn_limbo *limbo, uint32_t id, struct txn *txn)
 {
 	assert(txn_has_flag(txn, TXN_WAIT_ACK));
-	if (limbo->instance_id != instance_id) {
+	if (id == 0)
+		id = instance_id;
+	if (limbo->instance_id != id) {
 		if (limbo->instance_id == REPLICA_ID_NIL ||
 		    rlist_empty(&limbo->queue)) {
-			limbo->instance_id = instance_id;
+			limbo->instance_id = id;
 		} else {
 			diag_set(ClientError, ER_UNCOMMITTED_FOREIGN_SYNC_TXNS,
 				 limbo->instance_id);
diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
index 112fa8902..1ad1c567a 100644
--- a/src/box/txn_limbo.h
+++ b/src/box/txn_limbo.h
@@ -131,7 +131,7 @@ extern struct txn_limbo txn_limbo;
  * The limbo entry is allocated on the transaction's region.
  */
 struct txn_limbo_entry *
-txn_limbo_append(struct txn_limbo *limbo, struct txn *txn);
+txn_limbo_append(struct txn_limbo *limbo, uint32_t id, struct txn *txn);
 
 /** Remove the entry from the limbo, mark as rolled back. */
 void
-- 
2.24.3 (Apple Git-128)

^ permalink raw reply	[flat|nested] 56+ messages in thread

* [Tarantool-patches] [PATCH 7/8] xrow: introduce CONFIRM entry
  2020-06-09 12:20 [Tarantool-patches] [PATCH 0/8] wait for lsn and confirm Serge Petrenko
                   ` (5 preceding siblings ...)
  2020-06-09 12:20 ` [Tarantool-patches] [PATCH 6/8] txn_limbo: fix instance id assignment Serge Petrenko
@ 2020-06-09 12:20 ` Serge Petrenko
  2020-06-19 15:18   ` Leonid Vasiliev
  2020-06-23  8:33   ` Serge Petrenko
  2020-06-09 12:20 ` [Tarantool-patches] [PATCH 8/8] replication: write and read CONFIRM entries Serge Petrenko
                   ` (4 subsequent siblings)
  11 siblings, 2 replies; 56+ messages in thread
From: Serge Petrenko @ 2020-06-09 12:20 UTC (permalink / raw)
  To: v.shpilevoy, sergos, gorcunov; +Cc: tarantool-patches

Add methods to encode/decode CONFIRM entry.
A CONFIRM entry will be written to WAL by synchronous replication master
as soon as it finds that the transaction was applied on a quorum of
replicas.
CONFIRM rows share the same header with other rows in WAL,but their body
differs: it's just a map containing replica_id and lsn of the last
confirmed transaction.

Part-of #4847
---
 src/box/iproto_constants.h |  3 ++
 src/box/xrow.c             | 74 ++++++++++++++++++++++++++++++++++++++
 src/box/xrow.h             | 23 ++++++++++++
 3 files changed, 100 insertions(+)

diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index f8eee0f3f..1466b456f 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -219,6 +219,9 @@ enum iproto_type {
 	/** The maximum typecode used for box.stat() */
 	IPROTO_TYPE_STAT_MAX,
 
+	/** A confirmation message for synchronous transactions. */
+	IPROTO_CONFIRM = 40,
+
 	/** PING request */
 	IPROTO_PING = 64,
 	/** Replication JOIN command */
diff --git a/src/box/xrow.c b/src/box/xrow.c
index bb64864b2..f197e0d85 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -878,6 +878,80 @@ xrow_encode_dml(const struct request *request, struct region *region,
 	return iovcnt;
 }
 
+int
+xrow_encode_confirm(struct xrow_header *row, uint32_t replica_id, int64_t lsn)
+{
+	size_t len = mp_sizeof_map(2) + mp_sizeof_uint(IPROTO_REPLICA_ID) +
+		     mp_sizeof_uint(replica_id) + mp_sizeof_uint(IPROTO_LSN) +
+		     mp_sizeof_uint(lsn);
+	char *buf = (char *)region_alloc(&fiber()->gc, len);
+	if (buf == NULL) {
+		diag_set(OutOfMemory, len, "region_alloc", "buf");
+		return -1;
+	}
+	char *pos = buf;
+
+	pos = mp_encode_map(pos, 2);
+	pos = mp_encode_uint(pos, IPROTO_REPLICA_ID);
+	pos = mp_encode_uint(pos, replica_id);
+	pos = mp_encode_uint(pos, IPROTO_LSN);
+	pos = mp_encode_uint(pos, lsn);
+
+	row->body[0].iov_base = buf;
+	row->body[0].iov_len = len;
+
+	row->type = IPROTO_CONFIRM;
+
+	return 1;
+}
+
+int
+xrow_decode_confirm(struct xrow_header *row, uint32_t *replica_id, int64_t *lsn)
+{
+	if (row->bodycnt == 0) {
+		diag_set(ClientError, ER_INVALID_MSGPACK, "request body");
+		return -1;
+	}
+
+	assert(row->bodycnt == 1);
+
+	const char * const data = (const char *)row->body[0].iov_base;
+	const char * const end = data + row->body[0].iov_len;
+	const char *d = data;
+	if (mp_check(&d, end) != 0 || mp_typeof(*data) != MP_MAP) {
+		xrow_on_decode_err(data, end, ER_INVALID_MSGPACK,
+				   "request body");
+		return -1;
+	}
+
+	d = data;
+	uint32_t map_size = mp_decode_map(&d);
+	for (uint32_t i = 0; i < map_size; i++) {
+		if (mp_typeof(*d) != MP_UINT) {
+			mp_next(&d);
+			mp_next(&d);
+			continue;
+		}
+		uint8_t key = mp_decode_uint(&d);
+		if (key >= IPROTO_KEY_MAX || iproto_key_type[key] !=
+					     mp_typeof(*d)) {
+				xrow_on_decode_err(data, end, ER_INVALID_MSGPACK,
+						   "request body");
+		}
+		switch (key) {
+		case IPROTO_REPLICA_ID:
+			*replica_id = mp_decode_uint(&d);
+			break;
+		case IPROTO_LSN:
+			*lsn = mp_decode_uint(&d);
+			break;
+		default:
+			mp_next(&d);
+		}
+	}
+	return 0;
+}
+
 int
 xrow_to_iovec(const struct xrow_header *row, struct iovec *out)
 {
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 2a0a9c852..75af71b77 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -207,6 +207,29 @@ int
 xrow_encode_dml(const struct request *request, struct region *region,
 		struct iovec *iov);
 
+/**
+ * Encode the CONFIRM to row body and set row type to
+ * IPROTO_CONFIRM.
+ * @param row xrow header.
+ * @param replica_id master's instance id.
+ * @param lsn last confirmed lsn.
+ * @retval -1 on error.
+ * @retval > 0 xrow bodycnt.
+ */
+int
+xrow_encode_confirm(struct xrow_header *row, uint32_t replica_id, int64_t lsn);
+
+/**
+ * Decode the CONFIRM request body.
+ * @param row xrow header.
+ * @param[out] replica_id master's instance id.
+ * @param[out] lsn last confirmed lsn.
+ * @retwal -1 on error.
+ * @retwal 0 success.
+ */
+int
+xrow_decode_confirm(struct xrow_header *row, uint32_t *replica_id, int64_t *lsn);
+
 /**
  * CALL/EVAL request.
  */
-- 
2.24.3 (Apple Git-128)

^ permalink raw reply	[flat|nested] 56+ messages in thread

* [Tarantool-patches] [PATCH 8/8] replication: write and read CONFIRM entries
  2020-06-09 12:20 [Tarantool-patches] [PATCH 0/8] wait for lsn and confirm Serge Petrenko
                   ` (6 preceding siblings ...)
  2020-06-09 12:20 ` [Tarantool-patches] [PATCH 7/8] xrow: introduce CONFIRM entry Serge Petrenko
@ 2020-06-09 12:20 ` Serge Petrenko
  2020-06-10 23:51   ` Vladislav Shpilevoy
                     ` (5 more replies)
  2020-06-09 12:53 ` [Tarantool-patches] [PATCH 0/2] A few fixes for building Cyrill Gorcunov
                   ` (3 subsequent siblings)
  11 siblings, 6 replies; 56+ messages in thread
From: Serge Petrenko @ 2020-06-09 12:20 UTC (permalink / raw)
  To: v.shpilevoy, sergos, gorcunov; +Cc: tarantool-patches

Make txn_limbo write a CONFIRM entry as soon as a batch of entries
receive their acks. CONFIRM entry is written to WAL and later replicated
to all the replicas.

Now replicas put synchronous transactions into txn_limbo and wait for
corresponding confirmation entries to arrive and end up in their WAL
before committing the transactions.

Part-of #4847
---
 src/box/applier.cc    | 81 ++++++++++++++++++++++++++++++++++++++++++-
 src/box/box.cc        |  3 ++
 src/box/errcode.h     |  1 +
 src/box/relay.cc      | 13 ++++---
 src/box/txn.c         | 75 ++++++++++++++++++++++++++++++---------
 src/box/txn.h         | 23 ++++++++++++
 src/box/txn_limbo.c   | 79 ++++++++++++++++++++++++++++++++++++-----
 src/box/txn_limbo.h   |  6 ++++
 test/box/error.result |  1 +
 9 files changed, 252 insertions(+), 30 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index df48b4796..1dc977424 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -51,6 +51,7 @@
 #include "txn.h"
 #include "box.h"
 #include "scoped_guard.h"
+#include "txn_limbo.h"
 
 STRS(applier_state, applier_STATE);
 
@@ -214,6 +215,11 @@ apply_snapshot_row(struct xrow_header *row)
 	struct txn *txn = txn_begin();
 	if (txn == NULL)
 		return -1;
+	/*
+	 * Do not wait for confirmation when fetching a snapshot.
+	 * Master only sends confirmed rows during join.
+	 */
+	txn_force_async(txn);
 	if (txn_begin_stmt(txn, space) != 0)
 		goto rollback;
 	/* no access checks here - applier always works with admin privs */
@@ -249,10 +255,73 @@ process_nop(struct request *request)
 	return txn_commit_stmt(txn, request);
 }
 
+/*
+ * An on_commit trigger set on a txn containing a CONFIRM entry.
+ * Confirms some of the txs waiting in txn_limbo.
+ */
+static int
+applier_on_confirm(struct trigger *trig, void *data)
+{
+	(void) trig;
+	int64_t lsn = *(int64_t *)data;
+	txn_limbo_read_confirm(&txn_limbo, lsn);
+	return 0;
+}
+
+static int
+process_confirm(struct request *request)
+{
+	assert(request->header->type = IPROTO_CONFIRM);
+	uint32_t replica_id;
+	struct txn *txn = in_txn();
+	int64_t *lsn = (int64_t *) region_alloc(&txn->region, sizeof(int64_t));
+	if (lsn == NULL) {
+		diag_set(OutOfMemory, sizeof(int64_t), "region_alloc", "lsn");
+		return -1;
+	}
+	if (xrow_decode_confirm(request->header, &replica_id, lsn) != 0)
+		return -1;
+	/*
+	 * on_commit trigger failure is not allowed, so check for
+	 * instance id early.
+	 */
+	if (replica_id != txn_limbo.instance_id) {
+		diag_set(ClientError, ER_SYNC_MASTER_MISMATCH, replica_id,
+			 txn_limbo.instance_id);
+		return -1;
+	}
+
+	/*
+	 * Set an on_commit trigger which will perform the actual
+	 * confirmation processing.
+	 */
+	struct trigger *trig = (struct trigger *)region_alloc(&txn->region,
+							      sizeof(*trig));
+	if (trig == NULL) {
+		diag_set(OutOfMemory, sizeof(*trig), "region_alloc", "trig");
+		return -1;
+	}
+	trigger_create(trig, applier_on_confirm, lsn, NULL);
+
+	if (txn_begin_stmt(txn, NULL) != 0)
+		return -1;
+
+	if (txn_commit_stmt(txn, request) == 0) {
+		txn_on_commit(txn, trig);
+		return 0;
+	} else {
+		return -1;
+	}
+}
+
 static int
 apply_row(struct xrow_header *row)
 {
 	struct request request;
+	if (row->type == IPROTO_CONFIRM) {
+		request.header = row;
+		return process_confirm(&request);
+	}
 	if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0)
 		return -1;
 	if (request.type == IPROTO_NOP)
@@ -273,6 +342,11 @@ apply_final_join_row(struct xrow_header *row)
 	struct txn *txn = txn_begin();
 	if (txn == NULL)
 		return -1;
+	/*
+	 * Do not wait for confirmation while processing final
+	 * join rows. See apply_snapshot_row().
+	 */
+	txn_force_async(txn);
 	if (apply_row(row) != 0) {
 		txn_rollback(txn);
 		fiber_gc();
@@ -492,7 +566,12 @@ applier_wait_register(struct applier *applier, uint64_t row_count)
 		applier->last_row_time = ev_monotonic_now(loop());
 		if (iproto_type_is_dml(row.type)) {
 			vclock_follow_xrow(&replicaset.vclock, &row);
-			if (apply_final_join_row(&row) != 0)
+			/*
+			 * Confirms are ignored during join. All the
+			 * data master sends us is valid.
+			 */
+			if (row.type != IPROTO_CONFIRM &&
+			    apply_final_join_row(&row) != 0)
 				diag_raise();
 			if (++row_count % 100000 == 0)
 				say_info("%.1fM rows received", row_count / 1e6);
diff --git a/src/box/box.cc b/src/box/box.cc
index 64ac89975..792c3c394 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -342,6 +342,9 @@ static void
 apply_wal_row(struct xstream *stream, struct xrow_header *row)
 {
 	struct request request;
+	// TODO: process confirmation during recovery.
+	if (row->type == IPROTO_CONFIRM)
+		return;
 	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
 	if (request.type != IPROTO_NOP) {
 		struct space *space = space_cache_find_xc(request.space_id);
diff --git a/src/box/errcode.h b/src/box/errcode.h
index 019c582af..3ba6866e5 100644
--- a/src/box/errcode.h
+++ b/src/box/errcode.h
@@ -267,6 +267,7 @@ struct errcode_record {
 	/*212 */_(ER_SEQUENCE_NOT_STARTED,		"Sequence '%s' is not started") \
 	/*213 */_(ER_NO_SUCH_SESSION_SETTING,	"Session setting %s doesn't exist") \
 	/*214 */_(ER_UNCOMMITTED_FOREIGN_SYNC_TXNS, "Found uncommitted sync transactions from other instance with id %u") \
+	/*215 */_(ER_SYNC_MASTER_MISMATCH,	"CONFIRM message arrived for an unknown master id %d, expected %d") \
 
 /*
  * !IMPORTANT! Please follow instructions at start of the file
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 333e91ea9..4df3c2f26 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -402,10 +402,14 @@ tx_status_update(struct cmsg *msg)
 	vclock_copy(&status->relay->tx.vclock, &status->vclock);
 	/*
 	 * Let pending synchronous transactions know, which of
-	 * them were successfully sent to the replica.
+	 * them were successfully sent to the replica. Acks are
+	 * collected only on the master. Other instances wait for
+	 * master's CONFIRM message instead.
 	 */
-	txn_limbo_ack(&txn_limbo, status->relay->replica->id,
-		     vclock_get(&status->vclock, instance_id));
+	if (txn_limbo.instance_id == instance_id) {
+		txn_limbo_ack(&txn_limbo, status->relay->replica->id,
+			      vclock_get(&status->vclock, instance_id));
+	}
 	static const struct cmsg_hop route[] = {
 		{relay_status_update, NULL}
 	};
@@ -766,7 +770,8 @@ static void
 relay_send_row(struct xstream *stream, struct xrow_header *packet)
 {
 	struct relay *relay = container_of(stream, struct relay, stream);
-	assert(iproto_type_is_dml(packet->type));
+	assert(iproto_type_is_dml(packet->type) ||
+	       packet->type == IPROTO_CONFIRM);
 	if (packet->group_id == GROUP_LOCAL) {
 		/*
 		 * We do not relay replica-local rows to other
diff --git a/src/box/txn.c b/src/box/txn.c
index a65100b31..3b331fecc 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -36,6 +36,7 @@
 #include <fiber.h>
 #include "xrow.h"
 #include "errinj.h"
+#include "iproto_constants.h"
 
 double too_long_threshold;
 
@@ -81,7 +82,12 @@ txn_add_redo(struct txn *txn, struct txn_stmt *stmt, struct request *request)
 	 */
 	struct space *space = stmt->space;
 	row->group_id = space != NULL ? space_group_id(space) : 0;
-	row->bodycnt = xrow_encode_dml(request, &txn->region, row->body);
+	/*
+	 * IPROTO_CONFIRM entries are supplementary and aren't
+	 * valid dml requests. They're encoded manually.
+	 */
+	if (likely(row->type != IPROTO_CONFIRM))
+		row->bodycnt = xrow_encode_dml(request, &txn->region, row->body);
 	if (row->bodycnt < 0)
 		return -1;
 	stmt->row = row;
@@ -321,8 +327,10 @@ txn_commit_stmt(struct txn *txn, struct request *request)
 	 */
 	struct txn_stmt *stmt = txn_current_stmt(txn);
 
-	/* Create WAL record for the write requests in non-temporary spaces.
-	 * stmt->space can be NULL for IRPOTO_NOP.
+	/*
+	 * Create WAL record for the write requests in
+	 * non-temporary spaces. stmt->space can be NULL for
+	 * IRPOTO_NOP or IPROTO_CONFIRM.
 	 */
 	if (stmt->space == NULL || !space_is_temporary(stmt->space)) {
 		if (txn_add_redo(txn, stmt, request) != 0)
@@ -417,12 +425,12 @@ txn_run_rollback_triggers(struct txn *txn, struct rlist *triggers)
 /**
  * Complete transaction processing.
  */
-static void
+void
 txn_complete(struct txn *txn)
 {
 	/*
 	 * Note, engine can be NULL if transaction contains
-	 * IPROTO_NOP statements only.
+	 * IPROTO_NOP or IPROTO_CONFIRM statements.
 	 */
 	if (txn->signature < 0) {
 		/* Undo the transaction. */
@@ -510,13 +518,6 @@ txn_journal_entry_new(struct txn *txn)
 	struct xrow_header **remote_row = req->rows;
 	struct xrow_header **local_row = req->rows + txn->n_applier_rows;
 	bool is_sync = false;
-	/*
-	 * Only local transactions, originated from the master,
-	 * can enter 'waiting for acks' state. It means, only
-	 * author of the transaction can collect acks. Replicas
-	 * consider it a normal async transaction so far.
-	 */
-	bool is_local = true;
 
 	stailq_foreach_entry(stmt, &txn->stmts, next) {
 		if (stmt->has_triggers) {
@@ -530,17 +531,18 @@ txn_journal_entry_new(struct txn *txn)
 		if (stmt->row == NULL)
 			continue;
 
-		if (stmt->row->replica_id == 0) {
+		if (stmt->row->replica_id == 0)
 			*local_row++ = stmt->row;
-		} else {
+		else
 			*remote_row++ = stmt->row;
-			is_local = false;
-		}
 
 		req->approx_len += xrow_approx_len(stmt->row);
 	}
-	if (is_sync && is_local)
+
+	is_sync = is_sync && !txn_has_flag(txn, TXN_FORCE_ASYNC);
+	if (is_sync) {
 		txn_set_flag(txn, TXN_WAIT_ACK);
+	}
 
 	assert(remote_row == req->rows + txn->n_applier_rows);
 	assert(local_row == remote_row + txn->n_new_rows);
@@ -601,6 +603,19 @@ txn_commit_nop(struct txn *txn)
 	return false;
 }
 
+/*
+ * A trigger called on tx rollback due to a failed WAL write,
+ * when tx is waiting for confirmation.
+ */
+static int
+txn_limbo_on_rollback(struct trigger *trig, void *data)
+{
+	(void) trig;
+	struct txn_limbo_entry *entry = (struct txn_limbo_entry *) data;
+	txn_limbo_abort(&txn_limbo, entry);
+	return 0;
+}
+
 int
 txn_commit_async(struct txn *txn)
 {
@@ -632,16 +647,42 @@ txn_commit_async(struct txn *txn)
 		return -1;
 	}
 
+	bool is_sync = txn_has_flag(txn, TXN_WAIT_ACK);
+	struct txn_limbo_entry *limbo_entry;
+	if (is_sync) {
+		/* See txn_commit(). */
+		uint32_t origin_id = req->rows[0]->replica_id;
+		int64_t lsn = req->rows[txn->n_applier_rows - 1]->lsn;
+		limbo_entry = txn_limbo_append(&txn_limbo, origin_id, txn);
+		if (limbo_entry == NULL) {
+			txn_rollback(txn);
+			txn_free(txn);
+			return -1;
+		}
+		assert(lsn > 0);
+		txn_limbo_assign_lsn(&txn_limbo, limbo_entry, lsn);
+	}
+
 	fiber_set_txn(fiber(), NULL);
 	if (journal_write_async(req) != 0) {
 		fiber_set_txn(fiber(), txn);
 		txn_rollback(txn);
+		txn_limbo_abort(&txn_limbo, limbo_entry);
 
 		diag_set(ClientError, ER_WAL_IO);
 		diag_log();
 		return -1;
 	}
 
+	/*
+	 * Set a trigger to abort waiting for confirm on WAL write
+	 * failure.
+	 */
+	if (is_sync) {
+		struct trigger trig;
+		trigger_create(&trig, txn_limbo_on_rollback, limbo_entry, NULL);
+		txn_on_rollback(txn, &trig);
+	}
 	return 0;
 }
 
diff --git a/src/box/txn.h b/src/box/txn.h
index 232cc07a8..e7705bb48 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -73,6 +73,13 @@ enum txn_flag {
 	 * then finishes commit and returns success to a user.
 	 */
 	TXN_WAIT_ACK,
+	/**
+	 * A transaction mustn't wait for confirmation, even if it
+	 * touches synchronous spaces. Needed for join stage on
+	 * replica, when all the data coming from the master is
+	 * already confirmed by design.
+	 */
+	TXN_FORCE_ASYNC,
 };
 
 enum {
@@ -257,6 +264,16 @@ txn_clear_flag(struct txn *txn, enum txn_flag flag)
 	txn->flags &= ~(1 << flag);
 }
 
+/**
+ * Force async mode for transaction. It won't wait for acks
+ * or confirmation.
+ */
+static inline void
+txn_force_async(struct txn *txn)
+{
+	txn_set_flag(txn, TXN_FORCE_ASYNC);
+}
+
 /* Pointer to the current transaction (if any) */
 static inline struct txn *
 in_txn(void)
@@ -278,6 +295,12 @@ fiber_set_txn(struct fiber *fiber, struct txn *txn)
 struct txn *
 txn_begin(void);
 
+/**
+ * Complete transaction processing.
+ */
+void
+txn_complete(struct txn *txn);
+
 /**
  * Commit a transaction.
  * @pre txn == in_txn()
diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
index efb97a591..daec98317 100644
--- a/src/box/txn_limbo.c
+++ b/src/box/txn_limbo.c
@@ -128,12 +128,65 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
 		fiber_yield();
 	fiber_set_cancellable(cancellable);
 	// TODO: implement rollback.
-	// TODO: implement confirm.
 	assert(!entry->is_rollback);
+	assert(entry->is_commit);
 	txn_limbo_remove(limbo, entry);
 	txn_clear_flag(txn, TXN_WAIT_ACK);
 }
 
+/**
+ * Write a confirmation entry to WAL. After it's written all the
+ * transactions waiting for confirmation may be finished.
+ */
+static int
+txn_limbo_write_confirm(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
+{
+	/* Prepare a confirm entry. */
+	struct xrow_header row = {0};
+	struct request request = {0};
+	request.header = &row;
+
+	row.bodycnt = xrow_encode_confirm(&row, limbo->instance_id, entry->lsn);
+	if (row.bodycnt < 0)
+		return -1;
+
+	struct txn *txn = txn_begin();
+	if (txn == NULL)
+		return -1;
+
+	if (txn_begin_stmt(txn, NULL) != 0)
+		goto rollback;
+	if (txn_commit_stmt(txn, &request) != 0)
+		goto rollback;
+
+	return txn_commit(txn);
+rollback:
+	txn_rollback(txn);
+	return -1;
+}
+
+void
+txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn)
+{
+	assert(limbo->instance_id != REPLICA_ID_NIL &&
+	       limbo->instance_id != instance_id);
+	struct txn_limbo_entry *e, *tmp;
+	rlist_foreach_entry_safe(e, &limbo->queue, in_queue, tmp) {
+		if (e->lsn > lsn)
+			break;
+		assert(e->txn->fiber == NULL);
+		e->is_commit = true;
+		txn_limbo_remove(limbo, e);
+		txn_clear_flag(e->txn, TXN_WAIT_ACK);
+		/*
+		 * txn_complete_async must've been called already,
+		 * since CONFIRM always follows the tx in question.
+		 * So, finish this tx processing right away.
+		 */
+		txn_complete(e->txn);
+	}
+}
+
 void
 txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn)
 {
@@ -143,23 +196,33 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn)
 	int64_t prev_lsn = vclock_get(&limbo->vclock, replica_id);
 	vclock_follow(&limbo->vclock, replica_id, lsn);
 	struct txn_limbo_entry *e;
+	struct txn_limbo_entry *last_quorum = NULL;
 	rlist_foreach_entry(e, &limbo->queue, in_queue) {
 		if (e->lsn <= prev_lsn)
 			continue;
 		if (e->lsn > lsn)
 			break;
 		if (++e->ack_count >= replication_sync_quorum) {
-			// TODO: better call complete() right
-			// here. Appliers use async transactions,
-			// and their txns don't have fibers to
-			// wake up. That becomes actual, when
-			// appliers will be supposed to wait for
-			// 'confirm' message.
 			e->is_commit = true;
-			fiber_wakeup(e->txn->fiber);
+			last_quorum = e;
 		}
 		assert(e->ack_count <= VCLOCK_MAX);
 	}
+	if (last_quorum != NULL) {
+		if (txn_limbo_write_confirm(limbo, last_quorum) != 0) {
+			// TODO: rollback.
+			return;
+		}
+		/*
+		 * Wakeup all the entries in direct order as soon
+		 * as confirmation message is written to WAL.
+		 */
+		rlist_foreach_entry(e, &limbo->queue, in_queue) {
+			fiber_wakeup(e->txn->fiber);
+			if (e == last_quorum)
+				break;
+		}
+	}
 }
 
 void
diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
index 1ad1c567a..de415cd97 100644
--- a/src/box/txn_limbo.h
+++ b/src/box/txn_limbo.h
@@ -160,6 +160,12 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn);
 void
 txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry);
 
+/**
+ * Confirm all the entries up to the given master's LSN.
+ */
+void
+txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn);
+
 void
 txn_limbo_init();
 
diff --git a/test/box/error.result b/test/box/error.result
index 69c471085..34ded3930 100644
--- a/test/box/error.result
+++ b/test/box/error.result
@@ -433,6 +433,7 @@ t;
  |   212: box.error.SEQUENCE_NOT_STARTED
  |   213: box.error.NO_SUCH_SESSION_SETTING
  |   214: box.error.UNCOMMITTED_FOREIGN_SYNC_TXNS
+ |   215: box.error.SYNC_MASTER_MISMATCH
  | ...
 
 test_run:cmd("setopt delimiter ''");
-- 
2.24.3 (Apple Git-128)

^ permalink raw reply	[flat|nested] 56+ messages in thread

* [Tarantool-patches] [PATCH 0/2] A few fixes for building
  2020-06-09 12:20 [Tarantool-patches] [PATCH 0/8] wait for lsn and confirm Serge Petrenko
                   ` (7 preceding siblings ...)
  2020-06-09 12:20 ` [Tarantool-patches] [PATCH 8/8] replication: write and read CONFIRM entries Serge Petrenko
@ 2020-06-09 12:53 ` Cyrill Gorcunov
  2020-06-09 12:53 ` [Tarantool-patches] [PATCH 1/2] box/applier: fix typo Cyrill Gorcunov
                   ` (2 subsequent siblings)
  11 siblings, 0 replies; 56+ messages in thread
From: Cyrill Gorcunov @ 2020-06-09 12:53 UTC (permalink / raw)
  To: tml; +Cc: v.shpilevoy

Fix typo and eliminate unneeded diag_raise()

Sergey just from a glance. Up to you if worth to peekup.

Cyrill Gorcunov (2):
  box/applier: fix typo
  box: use tnt_raise for quorum check

 src/box/applier.cc | 2 +-
 src/box/box.cc     | 9 ++++-----
 2 files changed, 5 insertions(+), 6 deletions(-)

-- 
2.26.2

^ permalink raw reply	[flat|nested] 56+ messages in thread

* [Tarantool-patches] [PATCH 1/2] box/applier: fix typo
  2020-06-09 12:20 [Tarantool-patches] [PATCH 0/8] wait for lsn and confirm Serge Petrenko
                   ` (8 preceding siblings ...)
  2020-06-09 12:53 ` [Tarantool-patches] [PATCH 0/2] A few fixes for building Cyrill Gorcunov
@ 2020-06-09 12:53 ` Cyrill Gorcunov
  2020-06-10  9:18   ` Sergey Ostanevich
  2020-06-09 12:53 ` [Tarantool-patches] [PATCH 2/2] box: use tnt_raise for quorum check Cyrill Gorcunov
  2020-06-22 21:51 ` [Tarantool-patches] [PATCH 0/8] wait for lsn and confirm Vladislav Shpilevoy
  11 siblings, 1 reply; 56+ messages in thread
From: Cyrill Gorcunov @ 2020-06-09 12:53 UTC (permalink / raw)
  To: tml; +Cc: v.shpilevoy

fixup 3210e1e6f867cfd1c1f65e05f28a32deae63c172

Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
 src/box/applier.cc | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 1dc977424..cf7215d43 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -271,7 +271,7 @@ applier_on_confirm(struct trigger *trig, void *data)
 static int
 process_confirm(struct request *request)
 {
-	assert(request->header->type = IPROTO_CONFIRM);
+	assert(request->header->type == IPROTO_CONFIRM);
 	uint32_t replica_id;
 	struct txn *txn = in_txn();
 	int64_t *lsn = (int64_t *) region_alloc(&txn->region, sizeof(int64_t));
-- 
2.26.2

^ permalink raw reply	[flat|nested] 56+ messages in thread

* [Tarantool-patches] [PATCH 2/2] box: use tnt_raise for quorum check
  2020-06-09 12:20 [Tarantool-patches] [PATCH 0/8] wait for lsn and confirm Serge Petrenko
                   ` (9 preceding siblings ...)
  2020-06-09 12:53 ` [Tarantool-patches] [PATCH 1/2] box/applier: fix typo Cyrill Gorcunov
@ 2020-06-09 12:53 ` Cyrill Gorcunov
  2020-06-10  9:17   ` Sergey Ostanevich
  2020-06-10 10:45   ` Serge Petrenko
  2020-06-22 21:51 ` [Tarantool-patches] [PATCH 0/8] wait for lsn and confirm Vladislav Shpilevoy
  11 siblings, 2 replies; 56+ messages in thread
From: Cyrill Gorcunov @ 2020-06-09 12:53 UTC (permalink / raw)
  To: tml; +Cc: v.shpilevoy

All other check routines uses tnt_raise
so no need to introduce a different approach.

Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
 src/box/box.cc | 9 ++++-----
 1 file changed, 4 insertions(+), 5 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index 792c3c394..179feccf6 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -485,9 +485,9 @@ box_check_replication_sync_quorum(void)
 {
 	int quorum = cfg_geti("replication_sync_quorum");
 	if (quorum <= 0 || quorum > VCLOCK_MAX) {
-		diag_set(ClientError, ER_CFG, "replication_sync_quorum",
-			 "the value must be greater than and less than "
-			 "maximal number of replicas");
+		tnt_raise(ClientError, ER_CFG, "replication_sync_quorum",
+			  "the value must be greater than and less than "
+			  "maximal number of replicas");
 		return -1;
 	}
 	return quorum;
@@ -675,8 +675,7 @@ box_check_config()
 	box_check_replication_connect_timeout();
 	box_check_replication_connect_quorum();
 	box_check_replication_sync_lag();
-	if (box_check_replication_sync_quorum() < 0)
-		diag_raise();
+	box_check_replication_sync_quorum();
 	box_check_replication_sync_timeout();
 	box_check_readahead(cfg_geti("readahead"));
 	box_check_checkpoint_count(cfg_geti("checkpoint_count"));
-- 
2.26.2

^ permalink raw reply	[flat|nested] 56+ messages in thread

* Re: [Tarantool-patches] [PATCH 2/2] box: use tnt_raise for quorum check
  2020-06-09 12:53 ` [Tarantool-patches] [PATCH 2/2] box: use tnt_raise for quorum check Cyrill Gorcunov
@ 2020-06-10  9:17   ` Sergey Ostanevich
  2020-06-10 10:45   ` Serge Petrenko
  1 sibling, 0 replies; 56+ messages in thread
From: Sergey Ostanevich @ 2020-06-10  9:17 UTC (permalink / raw)
  To: Cyrill Gorcunov; +Cc: tml, v.shpilevoy

Hi! 

Although it's not your part of change - I have a question on the message
itself. It should be both greater than and less than same number?

I suppose it misses a number after 'greater than' and this number 0?

Sergos.


On 09 июн 15:53, Cyrill Gorcunov wrote:
> All other check routines uses tnt_raise
> so no need to introduce a different approach.
> 
> Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
> ---
>  src/box/box.cc | 9 ++++-----
>  1 file changed, 4 insertions(+), 5 deletions(-)
> 
> diff --git a/src/box/box.cc b/src/box/box.cc
> index 792c3c394..179feccf6 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -485,9 +485,9 @@ box_check_replication_sync_quorum(void)
>  {
>  	int quorum = cfg_geti("replication_sync_quorum");
>  	if (quorum <= 0 || quorum > VCLOCK_MAX) {
> -		diag_set(ClientError, ER_CFG, "replication_sync_quorum",
> -			 "the value must be greater than and less than "
> -			 "maximal number of replicas");
> +		tnt_raise(ClientError, ER_CFG, "replication_sync_quorum",
> +			  "the value must be greater than and less than "
> +			  "maximal number of replicas");
>  		return -1;
>  	}
>  	return quorum;
> @@ -675,8 +675,7 @@ box_check_config()
>  	box_check_replication_connect_timeout();
>  	box_check_replication_connect_quorum();
>  	box_check_replication_sync_lag();
> -	if (box_check_replication_sync_quorum() < 0)
> -		diag_raise();
> +	box_check_replication_sync_quorum();
>  	box_check_replication_sync_timeout();
>  	box_check_readahead(cfg_geti("readahead"));
>  	box_check_checkpoint_count(cfg_geti("checkpoint_count"));
> -- 
> 2.26.2
> 

^ permalink raw reply	[flat|nested] 56+ messages in thread

* Re: [Tarantool-patches] [PATCH 1/2] box/applier: fix typo
  2020-06-09 12:53 ` [Tarantool-patches] [PATCH 1/2] box/applier: fix typo Cyrill Gorcunov
@ 2020-06-10  9:18   ` Sergey Ostanevich
  0 siblings, 0 replies; 56+ messages in thread
From: Sergey Ostanevich @ 2020-06-10  9:18 UTC (permalink / raw)
  To: Cyrill Gorcunov; +Cc: tml, v.shpilevoy

Hi!

Thanks for the catch! 

LGTM
Sergos.


On 09 июн 15:53, Cyrill Gorcunov wrote:
> fixup 3210e1e6f867cfd1c1f65e05f28a32deae63c172
> 
> Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
> ---
>  src/box/applier.cc | 2 +-
>  1 file changed, 1 insertion(+), 1 deletion(-)
> 
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 1dc977424..cf7215d43 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -271,7 +271,7 @@ applier_on_confirm(struct trigger *trig, void *data)
>  static int
>  process_confirm(struct request *request)
>  {
> -	assert(request->header->type = IPROTO_CONFIRM);
> +	assert(request->header->type == IPROTO_CONFIRM);
>  	uint32_t replica_id;
>  	struct txn *txn = in_txn();
>  	int64_t *lsn = (int64_t *) region_alloc(&txn->region, sizeof(int64_t));
> -- 
> 2.26.2
> 

^ permalink raw reply	[flat|nested] 56+ messages in thread

* Re: [Tarantool-patches] [PATCH 2/2] box: use tnt_raise for quorum check
  2020-06-09 12:53 ` [Tarantool-patches] [PATCH 2/2] box: use tnt_raise for quorum check Cyrill Gorcunov
  2020-06-10  9:17   ` Sergey Ostanevich
@ 2020-06-10 10:45   ` Serge Petrenko
  1 sibling, 0 replies; 56+ messages in thread
From: Serge Petrenko @ 2020-06-10 10:45 UTC (permalink / raw)
  To: Cyrill Gorcunov, tml; +Cc: v.shpilevoy


09.06.2020 15:53, Cyrill Gorcunov пишет:
> All other check routines uses tnt_raise
> so no need to introduce a different approach.
>
> Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
> ---
>   src/box/box.cc | 9 ++++-----
>   1 file changed, 4 insertions(+), 5 deletions(-)
>
> diff --git a/src/box/box.cc b/src/box/box.cc
> index 792c3c394..179feccf6 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -485,9 +485,9 @@ box_check_replication_sync_quorum(void)
>   {
>   	int quorum = cfg_geti("replication_sync_quorum");
>   	if (quorum <= 0 || quorum > VCLOCK_MAX) {
> -		diag_set(ClientError, ER_CFG, "replication_sync_quorum",
> -			 "the value must be greater than and less than "
> -			 "maximal number of replicas");
> +		tnt_raise(ClientError, ER_CFG, "replication_sync_quorum",
> +			  "the value must be greater than and less than "
> +			  "maximal number of replicas");
>   		return -1;
>   	}
>   	return quorum;
> @@ -675,8 +675,7 @@ box_check_config()
>   	box_check_replication_connect_timeout();
>   	box_check_replication_connect_quorum();
>   	box_check_replication_sync_lag();
> -	if (box_check_replication_sync_quorum() < 0)
> -		diag_raise();
> +	box_check_replication_sync_quorum();
>   	box_check_replication_sync_timeout();
>   	box_check_readahead(cfg_geti("readahead"));
>   	box_check_checkpoint_count(cfg_geti("checkpoint_count"));


Hi! Thanks for the catches. I applied your fixes and fixed 2 more tiny 
errors & pushed all

on the new branch: sp/gh-4847-wal-confirm-msg-v3

Here's the diff (It's copypasted so don't be afraid of spaces instead of 
tabs)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 1dc977424..cf7215d43 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -271,7 +271,7 @@ applier_on_confirm(struct trigger *trig, void *data)
  static int
  process_confirm(struct request *request)
  {
-       assert(request->header->type = IPROTO_CONFIRM);
+       assert(request->header->type == IPROTO_CONFIRM);
         uint32_t replica_id;
         struct txn *txn = in_txn();
         int64_t *lsn = (int64_t *) region_alloc(&txn->region, 
sizeof(int64_t));
diff --git a/src/box/box.cc b/src/box/box.cc
index 792c3c394..12ae258a8 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -485,9 +485,9 @@ box_check_replication_sync_quorum(void)
  {
         int quorum = cfg_geti("replication_sync_quorum");
         if (quorum <= 0 || quorum > VCLOCK_MAX) {
-               diag_set(ClientError, ER_CFG, "replication_sync_quorum",
-                        "the value must be greater than and less than "
-                        "maximal number of replicas");
+               tnt_raise(ClientError, ER_CFG, "replication_sync_quorum",
+                         "the value must be greater than zero and less "
+                         "than maximal number of replicas");
                 return -1;
         }
         return quorum;
@@ -675,8 +675,7 @@ box_check_config()
         box_check_replication_connect_timeout();
         box_check_replication_connect_quorum();
         box_check_replication_sync_lag();
-       if (box_check_replication_sync_quorum() < 0)
-               diag_raise();
+       box_check_replication_sync_quorum();
         box_check_replication_sync_timeout();
         box_check_readahead(cfg_geti("readahead"));
box_check_checkpoint_count(cfg_geti("checkpoint_count"));
diff --git a/src/box/txn.c b/src/box/txn.c
index 3b331fecc..07704e304 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -667,7 +667,8 @@ txn_commit_async(struct txn *txn)
         if (journal_write_async(req) != 0) {
                 fiber_set_txn(fiber(), txn);
                 txn_rollback(txn);
-               txn_limbo_abort(&txn_limbo, limbo_entry);
+               if (is_sync)
+                       txn_limbo_abort(&txn_limbo, limbo_entry);

                 diag_set(ClientError, ER_WAL_IO);
                 diag_log();
diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
index daec98317..896078db4 100644
--- a/src/box/txn_limbo.c
+++ b/src/box/txn_limbo.c
@@ -79,6 +79,7 @@ txn_limbo_remove(struct txn_limbo *limbo, struct 
txn_limbo_entry *entry)
         assert(!rlist_empty(&entry->in_queue));
         assert(rlist_first_entry(&limbo->queue, struct txn_limbo_entry,
                                  in_queue) == entry);
+       (void) limbo;
         rlist_del_entry(entry, in_queue);
  }


-- 
Serge Petrenko

^ permalink raw reply	[flat|nested] 56+ messages in thread

* Re: [Tarantool-patches] [PATCH 1/8] replication: introduce space.is_sync option
  2020-06-09 12:20 ` [Tarantool-patches] [PATCH 1/8] replication: introduce space.is_sync option Serge Petrenko
@ 2020-06-10 23:51   ` Vladislav Shpilevoy
  2020-06-18 22:27     ` Leonid Vasiliev
  0 siblings, 1 reply; 56+ messages in thread
From: Vladislav Shpilevoy @ 2020-06-10 23:51 UTC (permalink / raw)
  To: Serge Petrenko, sergos, gorcunov; +Cc: tarantool-patches

Thanks for sending the patch!

I added 'is_sync' flag to netbox schema. So as
netbox.space.<name>.is_sync is the same as
box.space.<name>.is_sync.

Also I added a check that is_local and is_sync
can't be specified together for one space. It
makes no sense.

I force pushed it on the branch gh-4842-sync-replication.
As well as all the other things I did in this email
thread.

^ permalink raw reply	[flat|nested] 56+ messages in thread

* Re: [Tarantool-patches] [PATCH 2/8] replication: introduce replication_sync_quorum cfg
  2020-06-09 12:20 ` [Tarantool-patches] [PATCH 2/8] replication: introduce replication_sync_quorum cfg Serge Petrenko
@ 2020-06-10 23:51   ` Vladislav Shpilevoy
  2020-06-15 23:05   ` Vladislav Shpilevoy
  1 sibling, 0 replies; 56+ messages in thread
From: Vladislav Shpilevoy @ 2020-06-10 23:51 UTC (permalink / raw)
  To: Serge Petrenko, sergos, gorcunov; +Cc: tarantool-patches

Thanks for the review fixes!

I applied the new error message, and updated a comment for the
option declaration.

> commit 1e5fcbe66c23b5371f0422c425c7f19ffecae83b
> Author: Serge Petrenko <sergepetrenko@tarantool.org>
> Date:   Wed Jun 10 13:42:16 2020 +0300
> 
>     follow-up fixes
> 
> diff --git a/src/box/box.cc b/src/box/box.cc
> index 9b67aeb1f..27be7a7b1 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -481,9 +481,9 @@ box_check_replication_sync_quorum(void)
>  {
>  	int quorum = cfg_geti("replication_sync_quorum");
>  	if (quorum <= 0 || quorum > VCLOCK_MAX) {
> -		diag_set(ClientError, ER_CFG, "replication_sync_quorum",
> -			 "the value must be greater than and less than "
> -			 "maximal number of replicas");
> +		tnt_raise(ClientError, ER_CFG, "replication_sync_quorum",
> +			  "the value must be greater than zero and less "
> +			  "than maximal number of replicas");

However I kept diag_set instead of tnt_raise. Because I hope this
file will become more C gradually. This is why you can already see
some box_check_*() functions returning -1 instead of throwing an
error.

>  		return -1;
>  	}
>  	return quorum;

^ permalink raw reply	[flat|nested] 56+ messages in thread

* Re: [Tarantool-patches] [PATCH 4/8] replication: make sync transactions wait quorum
  2020-06-09 12:20 ` [Tarantool-patches] [PATCH 4/8] replication: make sync transactions wait quorum Serge Petrenko
@ 2020-06-10 23:51   ` Vladislav Shpilevoy
  2020-06-11 14:57   ` Vladislav Shpilevoy
                     ` (2 subsequent siblings)
  3 siblings, 0 replies; 56+ messages in thread
From: Vladislav Shpilevoy @ 2020-06-10 23:51 UTC (permalink / raw)
  To: Serge Petrenko, sergos, gorcunov; +Cc: tarantool-patches

Thanks for sending the patch!

> diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
> new file mode 100644
> index 000000000..d28b2a28b
> --- /dev/null
> +++ b/src/box/txn_limbo.c
> @@ -0,0 +1,167 @@
> +#include "txn.h"
> +#include "txn_limbo.h"
> +#include "replication.h"
> +
> +struct txn_limbo txn_limbo;
> +
> +static inline void
> +txn_limbo_create(struct txn_limbo *limbo)
> +{
> +	rlist_create(&limbo->queue);
> +	limbo->instance_id = REPLICA_ID_NIL;
> +	vclock_create(&limbo->vclock);
> +}
> +
> +struct txn_limbo_entry *
> +txn_limbo_append(struct txn_limbo *limbo, struct txn *txn)
> +{
> +	assert(txn_has_flag(txn, TXN_WAIT_ACK));
> +	if (limbo->instance_id != instance_id) {
> +		if (limbo->instance_id == REPLICA_ID_NIL ||
> +		    rlist_empty(&limbo->queue)) {
> +			limbo->instance_id = instance_id;
> +		} else {
> +			diag_set(ClientError, ER_UNCOMMITTED_FOREIGN_SYNC_TXNS,
> +				 limbo->instance_id);
> +			return NULL;
> +		}
> +	}
> +	struct txn_limbo_entry *e = (struct txn_limbo_entry *)
> +		region_alloc(&txn->region, sizeof(*e));

I changed that to region_alloc_object(), according to recently
submitted ASAN changes about alignment.

^ permalink raw reply	[flat|nested] 56+ messages in thread

* Re: [Tarantool-patches] [PATCH 5/8] txn_limbo: follow-up fixes
  2020-06-09 12:20 ` [Tarantool-patches] [PATCH 5/8] txn_limbo: follow-up fixes Serge Petrenko
@ 2020-06-10 23:51   ` Vladislav Shpilevoy
  2020-06-11  8:46     ` Serge Petrenko
  0 siblings, 1 reply; 56+ messages in thread
From: Vladislav Shpilevoy @ 2020-06-10 23:51 UTC (permalink / raw)
  To: Serge Petrenko, sergos, gorcunov; +Cc: tarantool-patches

Thanks for the fixes!

On 09/06/2020 14:20, Serge Petrenko wrote:
> Part-of #4847
> Follow-up #4844
> Follow-up #4845
> ---
>  src/box/txn.c       | 5 +++--
>  src/box/txn_limbo.c | 2 +-
>  2 files changed, 4 insertions(+), 3 deletions(-)
> 
> diff --git a/src/box/txn.c b/src/box/txn.c
> index 1d6518e29..f30e20944 100644
> --- a/src/box/txn.c
> +++ b/src/box/txn.c
> @@ -459,7 +459,7 @@ txn_complete(struct txn *txn)
>  		 * back to the fiber, owning the transaction so as
>  		 * it could decide what to do next.
>  		 */
> -		if (txn->fiber != fiber())
> +		if (txn->fiber != NULL && txn->fiber != fiber())
>  			fiber_wakeup(txn->fiber);

Does it crash if you wakeup self? I thought it was fine, since
the self fiber already contains FIBER_IS_READY (doesn't it?).

^ permalink raw reply	[flat|nested] 56+ messages in thread

* Re: [Tarantool-patches] [PATCH 6/8] txn_limbo: fix instance id assignment
  2020-06-09 12:20 ` [Tarantool-patches] [PATCH 6/8] txn_limbo: fix instance id assignment Serge Petrenko
@ 2020-06-10 23:51   ` Vladislav Shpilevoy
  0 siblings, 0 replies; 56+ messages in thread
From: Vladislav Shpilevoy @ 2020-06-10 23:51 UTC (permalink / raw)
  To: Serge Petrenko, sergos, gorcunov; +Cc: tarantool-patches

Thanks for the fixes!

Merged into txn limbo patch.

^ permalink raw reply	[flat|nested] 56+ messages in thread

* Re: [Tarantool-patches] [PATCH 8/8] replication: write and read CONFIRM entries
  2020-06-09 12:20 ` [Tarantool-patches] [PATCH 8/8] replication: write and read CONFIRM entries Serge Petrenko
@ 2020-06-10 23:51   ` Vladislav Shpilevoy
  2020-06-11  8:56     ` Serge Petrenko
  2020-06-11 14:57   ` Vladislav Shpilevoy
                     ` (4 subsequent siblings)
  5 siblings, 1 reply; 56+ messages in thread
From: Vladislav Shpilevoy @ 2020-06-10 23:51 UTC (permalink / raw)
  To: Serge Petrenko, sergos, gorcunov; +Cc: tarantool-patches

Thanks for the patch!

> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index df48b4796..1dc977424 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -249,10 +255,73 @@ process_nop(struct request *request)
>  	return txn_commit_stmt(txn, request);
>  }
>  
> +/*
> + * An on_commit trigger set on a txn containing a CONFIRM entry.
> + * Confirms some of the txs waiting in txn_limbo.
> + */
> +static int
> +applier_on_confirm(struct trigger *trig, void *data)
> +{
> +	(void) trig;
> +	int64_t lsn = *(int64_t *)data;
> +	txn_limbo_read_confirm(&txn_limbo, lsn);
> +	return 0;
> +}
> +
> +static int
> +process_confirm(struct request *request)
> +{
> +	assert(request->header->type = IPROTO_CONFIRM);
> +	uint32_t replica_id;
> +	struct txn *txn = in_txn();
> +	int64_t *lsn = (int64_t *) region_alloc(&txn->region, sizeof(int64_t));
> +	if (lsn == NULL) {

I changed that to region_alloc_object(). To keep alignment correct. Generally,
we should keep in mind, that raw region_alloc() now is close to being forbidden.
It can be used only for byte buffers like strings, MessagePack.

> +		diag_set(OutOfMemory, sizeof(int64_t), "region_alloc", "lsn");
> +		return -1;
> +	}
> +	if (xrow_decode_confirm(request->header, &replica_id, lsn) != 0)
> +		return -1;
> +	/*
> +	 * on_commit trigger failure is not allowed, so check for
> +	 * instance id early.
> +	 */
> +	if (replica_id != txn_limbo.instance_id) {
> +		diag_set(ClientError, ER_SYNC_MASTER_MISMATCH, replica_id,
> +			 txn_limbo.instance_id);
> +		return -1;
> +	}
> +
> +	/*
> +	 * Set an on_commit trigger which will perform the actual
> +	 * confirmation processing.
> +	 */
> +	struct trigger *trig = (struct trigger *)region_alloc(&txn->region,
> +							      sizeof(*trig));

Changed to region_alloc_object().

> +	if (trig == NULL) {
> +		diag_set(OutOfMemory, sizeof(*trig), "region_alloc", "trig");
> +		return -1;
> +	}
> +	trigger_create(trig, applier_on_confirm, lsn, NULL);
> +
> +	if (txn_begin_stmt(txn, NULL) != 0)
> +		return -1;
> +
> +	if (txn_commit_stmt(txn, request) == 0) {
> +		txn_on_commit(txn, trig);
> +		return 0;
> +	} else {
> +		return -1;
> +	}
> @@ -492,7 +566,12 @@ applier_wait_register(struct applier *applier, uint64_t row_count)
>  		applier->last_row_time = ev_monotonic_now(loop());
>  		if (iproto_type_is_dml(row.type)) {
>  			vclock_follow_xrow(&replicaset.vclock, &row);
> -			if (apply_final_join_row(&row) != 0)
> +			/*
> +			 * Confirms are ignored during join. All the
> +			 * data master sends us is valid.
> +			 */
> +			if (row.type != IPROTO_CONFIRM &&

I moved the check into apply_final_join_row(). To be consistent with apply_row() and
apply_wal_row().

> +			    apply_final_join_row(&row) != 0)
>  				diag_raise();
>  			if (++row_count % 100000 == 0)
>  				say_info("%.1fM rows received", row_count / 1e6);
> diff --git a/src/box/relay.cc b/src/box/relay.cc
> index 333e91ea9..4df3c2f26 100644
> --- a/src/box/relay.cc
> +++ b/src/box/relay.cc
> @@ -402,10 +402,14 @@ tx_status_update(struct cmsg *msg)
>  	vclock_copy(&status->relay->tx.vclock, &status->vclock);
>  	/*
>  	 * Let pending synchronous transactions know, which of
> -	 * them were successfully sent to the replica.
> +	 * them were successfully sent to the replica. Acks are
> +	 * collected only on the master. Other instances wait for
> +	 * master's CONFIRM message instead.
>  	 */
> -	txn_limbo_ack(&txn_limbo, status->relay->replica->id,
> -		     vclock_get(&status->vclock, instance_id));
> +	if (txn_limbo.instance_id == instance_id) {
> +		txn_limbo_ack(&txn_limbo, status->relay->replica->id,
> +			      vclock_get(&status->vclock, instance_id));
> +	}

Nice, I moved that to the patch introducing the limbo.

>  	static const struct cmsg_hop route[] = {
>  		{relay_status_update, NULL}
>  	};
> diff --git a/src/box/txn.c b/src/box/txn.c
> index a65100b31..3b331fecc 100644
> --- a/src/box/txn.c
> +++ b/src/box/txn.c> @@ -510,13 +518,6 @@ txn_journal_entry_new(struct txn *txn)
>  	struct xrow_header **remote_row = req->rows;
>  	struct xrow_header **local_row = req->rows + txn->n_applier_rows;
>  	bool is_sync = false;
> -	/*
> -	 * Only local transactions, originated from the master,
> -	 * can enter 'waiting for acks' state. It means, only
> -	 * author of the transaction can collect acks. Replicas
> -	 * consider it a normal async transaction so far.
> -	 */
> -	bool is_local = true;

I squashed is_local removal into the first commits. So like it didn't
exist at all.

Why did you remove it, btw? I applied the removal, because realized,
that if all the spaces are local, then neither of them can be sync.
So I banned is_sync + is_local options in the first commit. Did you
remove it for the same reason?

>  
>  	stailq_foreach_entry(stmt, &txn->stmts, next) {
>  		if (stmt->has_triggers) {
> diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
> index efb97a591..daec98317 100644
> --- a/src/box/txn_limbo.c
> +++ b/src/box/txn_limbo.c
> @@ -128,12 +128,65 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
>  
> +/**
> + * Write a confirmation entry to WAL. After it's written all the
> + * transactions waiting for confirmation may be finished.
> + */
> +static int
> +txn_limbo_write_confirm(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
> +{
> +	/* Prepare a confirm entry. */
> +	struct xrow_header row = {0};
> +	struct request request = {0};
> +	request.header = &row;
> +
> +	row.bodycnt = xrow_encode_confirm(&row, limbo->instance_id, entry->lsn);
> +	if (row.bodycnt < 0)
> +		return -1;
> +
> +	struct txn *txn = txn_begin();
> +	if (txn == NULL)
> +		return -1;
> +
> +	if (txn_begin_stmt(txn, NULL) != 0)
> +		goto rollback;
> +	if (txn_commit_stmt(txn, &request) != 0)
> +		goto rollback;
> +
> +	return txn_commit(txn);

We definitely shouldn't use transactions for non DML data. We need
separate API for that, not to spoil the hotpath, and to keep the
DML commit code 'simple'. Not now though. We just need to keep these
kind of follow ups in mind/on track, and file them as a follow-up
issue after the sync replication is done.

> +rollback:
> +	txn_rollback(txn);
> +	return -1;
> +}

^ permalink raw reply	[flat|nested] 56+ messages in thread

* Re: [Tarantool-patches] [PATCH 5/8] txn_limbo: follow-up fixes
  2020-06-10 23:51   ` Vladislav Shpilevoy
@ 2020-06-11  8:46     ` Serge Petrenko
  2020-06-11 13:01       ` Vladislav Shpilevoy
  0 siblings, 1 reply; 56+ messages in thread
From: Serge Petrenko @ 2020-06-11  8:46 UTC (permalink / raw)
  To: Vladislav Shpilevoy, sergos, gorcunov; +Cc: tarantool-patches


11.06.2020 02:51, Vladislav Shpilevoy пишет:
> Thanks for the fixes!

Thanks for  the answer!


>
> On 09/06/2020 14:20, Serge Petrenko wrote:
>> Part-of #4847
>> Follow-up #4844
>> Follow-up #4845
>> ---
>>   src/box/txn.c       | 5 +++--
>>   src/box/txn_limbo.c | 2 +-
>>   2 files changed, 4 insertions(+), 3 deletions(-)
>>
>> diff --git a/src/box/txn.c b/src/box/txn.c
>> index 1d6518e29..f30e20944 100644
>> --- a/src/box/txn.c
>> +++ b/src/box/txn.c
>> @@ -459,7 +459,7 @@ txn_complete(struct txn *txn)
>>   		 * back to the fiber, owning the transaction so as
>>   		 * it could decide what to do next.
>>   		 */
>> -		if (txn->fiber != fiber())
>> +		if (txn->fiber != NULL && txn->fiber != fiber())
>>   			fiber_wakeup(txn->fiber);
> Does it crash if you wakeup self? I thought it was fine, since
> the self fiber already contains FIBER_IS_READY (doesn't it?).


No. But you're waking up txn->fiber,  which  may be NULL.
Hence I added the check.

-- 
Serge Petrenko

^ permalink raw reply	[flat|nested] 56+ messages in thread

* Re: [Tarantool-patches] [PATCH 8/8] replication: write and read CONFIRM entries
  2020-06-10 23:51   ` Vladislav Shpilevoy
@ 2020-06-11  8:56     ` Serge Petrenko
  2020-06-11 13:04       ` Vladislav Shpilevoy
  0 siblings, 1 reply; 56+ messages in thread
From: Serge Petrenko @ 2020-06-11  8:56 UTC (permalink / raw)
  To: Vladislav Shpilevoy, sergos, gorcunov; +Cc: tarantool-patches


11.06.2020 02:51, Vladislav Shpilevoy пишет:
> Thanks for the patch!

Hi! Thanks for the fixes!

Looks good.

See my comments inline.

>> diff --git a/src/box/applier.cc b/src/box/applier.cc
>> index df48b4796..1dc977424 100644
>> --- a/src/box/applier.cc
>> +++ b/src/box/applier.cc
>> @@ -249,10 +255,73 @@ process_nop(struct request *request)
>>   	return txn_commit_stmt(txn, request);
>>   }
>>   
>> +/*
>> + * An on_commit trigger set on a txn containing a CONFIRM entry.
>> + * Confirms some of the txs waiting in txn_limbo.
>> + */
>> +static int
>> +applier_on_confirm(struct trigger *trig, void *data)
>> +{
>> +	(void) trig;
>> +	int64_t lsn = *(int64_t *)data;
>> +	txn_limbo_read_confirm(&txn_limbo, lsn);
>> +	return 0;
>> +}
>> +
>> +static int
>> +process_confirm(struct request *request)
>> +{
>> +	assert(request->header->type = IPROTO_CONFIRM);
>> +	uint32_t replica_id;
>> +	struct txn *txn = in_txn();
>> +	int64_t *lsn = (int64_t *) region_alloc(&txn->region, sizeof(int64_t));
>> +	if (lsn == NULL) {
> I changed that to region_alloc_object(). To keep alignment correct. Generally,
> we should keep in mind, that raw region_alloc() now is close to being forbidden.
> It can be used only for byte buffers like strings, MessagePack.
Ok, I see.
>
>> +		diag_set(OutOfMemory, sizeof(int64_t), "region_alloc", "lsn");
>> +		return -1;
>> +	}
>> +	if (xrow_decode_confirm(request->header, &replica_id, lsn) != 0)
>> +		return -1;
>> +	/*
>> +	 * on_commit trigger failure is not allowed, so check for
>> +	 * instance id early.
>> +	 */
>> +	if (replica_id != txn_limbo.instance_id) {
>> +		diag_set(ClientError, ER_SYNC_MASTER_MISMATCH, replica_id,
>> +			 txn_limbo.instance_id);
>> +		return -1;
>> +	}
>> +
>> +	/*
>> +	 * Set an on_commit trigger which will perform the actual
>> +	 * confirmation processing.
>> +	 */
>> +	struct trigger *trig = (struct trigger *)region_alloc(&txn->region,
>> +							      sizeof(*trig));
> Changed to region_alloc_object().
>
>> +	if (trig == NULL) {
>> +		diag_set(OutOfMemory, sizeof(*trig), "region_alloc", "trig");
>> +		return -1;
>> +	}
>> +	trigger_create(trig, applier_on_confirm, lsn, NULL);
>> +
>> +	if (txn_begin_stmt(txn, NULL) != 0)
>> +		return -1;
>> +
>> +	if (txn_commit_stmt(txn, request) == 0) {
>> +		txn_on_commit(txn, trig);
>> +		return 0;
>> +	} else {
>> +		return -1;
>> +	}
>> @@ -492,7 +566,12 @@ applier_wait_register(struct applier *applier, uint64_t row_count)
>>   		applier->last_row_time = ev_monotonic_now(loop());
>>   		if (iproto_type_is_dml(row.type)) {
>>   			vclock_follow_xrow(&replicaset.vclock, &row);
>> -			if (apply_final_join_row(&row) != 0)
>> +			/*
>> +			 * Confirms are ignored during join. All the
>> +			 * data master sends us is valid.
>> +			 */
>> +			if (row.type != IPROTO_CONFIRM &&
> I moved the check into apply_final_join_row(). To be consistent with apply_row() and
> apply_wal_row().
>
>> +			    apply_final_join_row(&row) != 0)
>>   				diag_raise();
>>   			if (++row_count % 100000 == 0)
>>   				say_info("%.1fM rows received", row_count / 1e6);
>> diff --git a/src/box/relay.cc b/src/box/relay.cc
>> index 333e91ea9..4df3c2f26 100644
>> --- a/src/box/relay.cc
>> +++ b/src/box/relay.cc
>> @@ -402,10 +402,14 @@ tx_status_update(struct cmsg *msg)
>>   	vclock_copy(&status->relay->tx.vclock, &status->vclock);
>>   	/*
>>   	 * Let pending synchronous transactions know, which of
>> -	 * them were successfully sent to the replica.
>> +	 * them were successfully sent to the replica. Acks are
>> +	 * collected only on the master. Other instances wait for
>> +	 * master's CONFIRM message instead.
>>   	 */
>> -	txn_limbo_ack(&txn_limbo, status->relay->replica->id,
>> -		     vclock_get(&status->vclock, instance_id));
>> +	if (txn_limbo.instance_id == instance_id) {
>> +		txn_limbo_ack(&txn_limbo, status->relay->replica->id,
>> +			      vclock_get(&status->vclock, instance_id));
>> +	}
> Nice, I moved that to the patch introducing the limbo.
>
>>   	static const struct cmsg_hop route[] = {
>>   		{relay_status_update, NULL}
>>   	};
>> diff --git a/src/box/txn.c b/src/box/txn.c
>> index a65100b31..3b331fecc 100644
>> --- a/src/box/txn.c
>> +++ b/src/box/txn.c> @@ -510,13 +518,6 @@ txn_journal_entry_new(struct txn *txn)
>>   	struct xrow_header **remote_row = req->rows;
>>   	struct xrow_header **local_row = req->rows + txn->n_applier_rows;
>>   	bool is_sync = false;
>> -	/*
>> -	 * Only local transactions, originated from the master,
>> -	 * can enter 'waiting for acks' state. It means, only
>> -	 * author of the transaction can collect acks. Replicas
>> -	 * consider it a normal async transaction so far.
>> -	 */
>> -	bool is_local = true;
> I squashed is_local removal into the first commits. So like it didn't
> exist at all.
>
> Why did you remove it, btw? I applied the removal, because realized,
> that if all the spaces are local, then neither of them can be sync.
> So I banned is_sync + is_local options in the first commit. Did you
> remove it for the same reason?

If I remember correctly,  your is_local check was not about local 
spaces, but
rather about whether the transaction was originating from the local 
instance.

I separated txn_limbo behaviour judging by where the limbo_entries come 
from.
If they come from txn_commit_async(), this means this is  a  replica, 
and txn_limbo
waits  for  confirm messages. If they come from txn_commit(), this is 
master, so
txn_limbo gathers acks. It'd be better to judge by your is_local flag, 
I  believe.

We probably should put a fixme here also.

>
>>   
>>   	stailq_foreach_entry(stmt, &txn->stmts, next) {
>>   		if (stmt->has_triggers) {
>> diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
>> index efb97a591..daec98317 100644
>> --- a/src/box/txn_limbo.c
>> +++ b/src/box/txn_limbo.c
>> @@ -128,12 +128,65 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
>>   
>> +/**
>> + * Write a confirmation entry to WAL. After it's written all the
>> + * transactions waiting for confirmation may be finished.
>> + */
>> +static int
>> +txn_limbo_write_confirm(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
>> +{
>> +	/* Prepare a confirm entry. */
>> +	struct xrow_header row = {0};
>> +	struct request request = {0};
>> +	request.header = &row;
>> +
>> +	row.bodycnt = xrow_encode_confirm(&row, limbo->instance_id, entry->lsn);
>> +	if (row.bodycnt < 0)
>> +		return -1;
>> +
>> +	struct txn *txn = txn_begin();
>> +	if (txn == NULL)
>> +		return -1;
>> +
>> +	if (txn_begin_stmt(txn, NULL) != 0)
>> +		goto rollback;
>> +	if (txn_commit_stmt(txn, &request) != 0)
>> +		goto rollback;
>> +
>> +	return txn_commit(txn);
> We definitely shouldn't use transactions for non DML data. We need
> separate API for that, not to spoil the hotpath, and to keep the
> DML commit code 'simple'. Not now though. We just need to keep these
> kind of follow ups in mind/on track, and file them as a follow-up
> issue after the sync replication is done.

I agree. We'll have to rework journal callbacks then, cos now journal calls
txn_complete_async() on each successful write.

>
>> +rollback:
>> +	txn_rollback(txn);
>> +	return -1;
>> +}

-- 
Serge Petrenko

^ permalink raw reply	[flat|nested] 56+ messages in thread

* Re: [Tarantool-patches] [PATCH 5/8] txn_limbo: follow-up fixes
  2020-06-11  8:46     ` Serge Petrenko
@ 2020-06-11 13:01       ` Vladislav Shpilevoy
  0 siblings, 0 replies; 56+ messages in thread
From: Vladislav Shpilevoy @ 2020-06-11 13:01 UTC (permalink / raw)
  To: Serge Petrenko, sergos, gorcunov; +Cc: tarantool-patches

>> On 09/06/2020 14:20, Serge Petrenko wrote:
>>> Part-of #4847
>>> Follow-up #4844
>>> Follow-up #4845
>>> ---
>>>   src/box/txn.c       | 5 +++--
>>>   src/box/txn_limbo.c | 2 +-
>>>   2 files changed, 4 insertions(+), 3 deletions(-)
>>>
>>> diff --git a/src/box/txn.c b/src/box/txn.c
>>> index 1d6518e29..f30e20944 100644
>>> --- a/src/box/txn.c
>>> +++ b/src/box/txn.c
>>> @@ -459,7 +459,7 @@ txn_complete(struct txn *txn)
>>>            * back to the fiber, owning the transaction so as
>>>            * it could decide what to do next.
>>>            */
>>> -        if (txn->fiber != fiber())
>>> +        if (txn->fiber != NULL && txn->fiber != fiber())
>>>               fiber_wakeup(txn->fiber);
>> Does it crash if you wakeup self? I thought it was fine, since
>> the self fiber already contains FIBER_IS_READY (doesn't it?).
> 
> 
> No. But you're waking up txn->fiber,  which  may be NULL.
> Hence I added the check.

Yeah, my bad. I somewhy thought you added the self check, and the NULL
check was there before.

^ permalink raw reply	[flat|nested] 56+ messages in thread

* Re: [Tarantool-patches] [PATCH 8/8] replication: write and read CONFIRM entries
  2020-06-11  8:56     ` Serge Petrenko
@ 2020-06-11 13:04       ` Vladislav Shpilevoy
  0 siblings, 0 replies; 56+ messages in thread
From: Vladislav Shpilevoy @ 2020-06-11 13:04 UTC (permalink / raw)
  To: Serge Petrenko, sergos, gorcunov; +Cc: tarantool-patches

>>> diff --git a/src/box/txn.c b/src/box/txn.c
>>> index a65100b31..3b331fecc 100644
>>> --- a/src/box/txn.c
>>> +++ b/src/box/txn.c> @@ -510,13 +518,6 @@ txn_journal_entry_new(struct txn *txn)
>>>       struct xrow_header **remote_row = req->rows;
>>>       struct xrow_header **local_row = req->rows + txn->n_applier_rows;
>>>       bool is_sync = false;
>>> -    /*
>>> -     * Only local transactions, originated from the master,
>>> -     * can enter 'waiting for acks' state. It means, only
>>> -     * author of the transaction can collect acks. Replicas
>>> -     * consider it a normal async transaction so far.
>>> -     */
>>> -    bool is_local = true;
>> I squashed is_local removal into the first commits. So like it didn't
>> exist at all.
>>
>> Why did you remove it, btw? I applied the removal, because realized,
>> that if all the spaces are local, then neither of them can be sync.
>> So I banned is_sync + is_local options in the first commit. Did you
>> remove it for the same reason?
> 
> If I remember correctly,  your is_local check was not about local spaces, but
> rather about whether the transaction was originating from the local instance.

Yeah, also my bad. The name of my own variable confused me.

> I separated txn_limbo behaviour judging by where the limbo_entries come from.
> If they come from txn_commit_async(), this means this is  a  replica, and txn_limbo
> waits  for  confirm messages. If they come from txn_commit(), this is master, so
> txn_limbo gathers acks. It'd be better to judge by your is_local flag, I  believe.

Yeah, better rely on rows content than on async vs normal commit way. Both
in txn_commit_async() and txn_commit().

> We probably should put a fixme here also.

^ permalink raw reply	[flat|nested] 56+ messages in thread

* Re: [Tarantool-patches] [PATCH 4/8] replication: make sync transactions wait quorum
  2020-06-09 12:20 ` [Tarantool-patches] [PATCH 4/8] replication: make sync transactions wait quorum Serge Petrenko
  2020-06-10 23:51   ` Vladislav Shpilevoy
@ 2020-06-11 14:57   ` Vladislav Shpilevoy
  2020-06-15 23:05     ` Vladislav Shpilevoy
  2020-06-19 12:39   ` Leonid Vasiliev
  2020-06-25 21:48   ` Vladislav Shpilevoy
  3 siblings, 1 reply; 56+ messages in thread
From: Vladislav Shpilevoy @ 2020-06-11 14:57 UTC (permalink / raw)
  To: Serge Petrenko, sergos, gorcunov; +Cc: tarantool-patches

I added a new commmit with a small fix on top of
this one:

====================
    limbo: remove entry from limbo before txn completion
    
    [TO BE SQUASHED INTO THE PREVIOUS COMMIT]
    
    - txn_limbo_wait_complete() didn't remove the limbo entry from
      the limbo when the transaction appears to be completed right
      away. As a result, the limbo entry stayed in the limbo, and
      after the transaction termination turned into garbage, because
      was allocated on its region;
    
    - txn_limbo_ack() had the same problem - completed transaction
      should be removed from limbo.

diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
index 3b9aa6d40..f38639676 100644
--- a/src/box/txn_limbo.c
+++ b/src/box/txn_limbo.c
@@ -123,8 +123,10 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
 	assert(entry->lsn > 0);
 	assert(!txn_has_flag(txn, TXN_IS_DONE));
 	assert(txn_has_flag(txn, TXN_WAIT_ACK));
-	if (txn_limbo_check_complete(limbo, entry))
+	if (txn_limbo_check_complete(limbo, entry)) {
+		txn_limbo_remove(limbo, entry);
 		return;
+	}
 	bool cancellable = fiber_set_cancellable(false);
 	while (!txn_limbo_entry_is_complete(entry))
 		fiber_yield();
@@ -144,8 +146,8 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn)
 	assert(limbo->instance_id != REPLICA_ID_NIL);
 	int64_t prev_lsn = vclock_get(&limbo->vclock, replica_id);
 	vclock_follow(&limbo->vclock, replica_id, lsn);
-	struct txn_limbo_entry *e;
-	rlist_foreach_entry(e, &limbo->queue, in_queue) {
+	struct txn_limbo_entry *e, *tmp;
+	rlist_foreach_entry_safe(e, &limbo->queue, in_queue, tmp) {
 		if (e->lsn <= prev_lsn)
 			continue;
 		if (e->lsn > lsn)
@@ -158,6 +160,7 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn)
 			// appliers will be supposed to wait for
 			// 'confirm' message.
 			e->is_commit = true;
+			rlist_del_entry(e, in_queue);
 			fiber_wakeup(e->txn->fiber);
 		}
 		assert(e->ack_count <= VCLOCK_MAX);

^ permalink raw reply	[flat|nested] 56+ messages in thread

* Re: [Tarantool-patches] [PATCH 8/8] replication: write and read CONFIRM entries
  2020-06-09 12:20 ` [Tarantool-patches] [PATCH 8/8] replication: write and read CONFIRM entries Serge Petrenko
  2020-06-10 23:51   ` Vladislav Shpilevoy
@ 2020-06-11 14:57   ` Vladislav Shpilevoy
  2020-06-15 23:05     ` Vladislav Shpilevoy
  2020-06-19 17:50   ` Serge Petrenko
                     ` (3 subsequent siblings)
  5 siblings, 1 reply; 56+ messages in thread
From: Vladislav Shpilevoy @ 2020-06-11 14:57 UTC (permalink / raw)
  To: Serge Petrenko, sergos, gorcunov; +Cc: tarantool-patches

I added a new commmit with a small fix on top of
this one:

====================
    limbo: remove entry from limbo before txn completion
    
    [TO BE SQUASHED INTO THE PREVIOUS COMMIT]
    
    This is a port of the previous limbo entry removal commit. Fixes
    the same problem.

diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
index f5d14421e..92fc5540e 100644
--- a/src/box/txn_limbo.c
+++ b/src/box/txn_limbo.c
@@ -198,7 +198,7 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn)
 	assert(limbo->instance_id != REPLICA_ID_NIL);
 	int64_t prev_lsn = vclock_get(&limbo->vclock, replica_id);
 	vclock_follow(&limbo->vclock, replica_id, lsn);
-	struct txn_limbo_entry *e;
+	struct txn_limbo_entry *e, *tmp;
 	struct txn_limbo_entry *last_quorum = NULL;
 	rlist_foreach_entry(e, &limbo->queue, in_queue) {
 		if (e->lsn <= prev_lsn)
@@ -220,7 +220,8 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn)
 		 * Wakeup all the entries in direct order as soon
 		 * as confirmation message is written to WAL.
 		 */
-		rlist_foreach_entry(e, &limbo->queue, in_queue) {
+		rlist_foreach_entry_safe(e, &limbo->queue, in_queue, tmp) {
+			rlist_del_entry(e, in_queue);
 			fiber_wakeup(e->txn->fiber);
 			if (e == last_quorum)
 				break;

^ permalink raw reply	[flat|nested] 56+ messages in thread

* Re: [Tarantool-patches] [PATCH 2/8] replication: introduce replication_sync_quorum cfg
  2020-06-09 12:20 ` [Tarantool-patches] [PATCH 2/8] replication: introduce replication_sync_quorum cfg Serge Petrenko
  2020-06-10 23:51   ` Vladislav Shpilevoy
@ 2020-06-15 23:05   ` Vladislav Shpilevoy
  2020-06-18 22:54     ` Leonid Vasiliev
  2020-06-19 17:45     ` Serge Petrenko
  1 sibling, 2 replies; 56+ messages in thread
From: Vladislav Shpilevoy @ 2020-06-15 23:05 UTC (permalink / raw)
  To: Serge Petrenko, sergos, gorcunov; +Cc: tarantool-patches

I appended a new commit on top of this one on the
branch. In the quorum commit I renamed the
option to

    replication_synchro_quorum

====================
commit fc19662ec528c5217c7b611ae16d417497d9fe35
Author: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
Date:   Tue Jun 16 00:47:24 2020 +0200

    replication: introduce replication_synchro_timeout cfg
    
    [TO BE SQUASHED INTO THE PREVIOUS COMMIT]
    
    Part of #4844
    Part of #5073

diff --git a/src/box/box.cc b/src/box/box.cc
index c7a5f2e3c..9db55e05a 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -489,6 +489,18 @@ box_check_replication_synchro_quorum(void)
 	return quorum;
 }
 
+static double
+box_check_replication_synchro_timeout(void)
+{
+	double timeout = cfg_getd("replication_synchro_timeout");
+	if (timeout <= 0) {
+		diag_set(ClientError, ER_CFG, "replication_synchro_timeout",
+			 "the value must be greater than zero");
+		return -1;
+	}
+	return timeout;
+}
+
 static double
 box_check_replication_sync_timeout(void)
 {
@@ -673,6 +685,8 @@ box_check_config()
 	box_check_replication_sync_lag();
 	if (box_check_replication_synchro_quorum() < 0)
 		diag_raise();
+	if (box_check_replication_synchro_timeout() < 0)
+		diag_raise();
 	box_check_replication_sync_timeout();
 	box_check_readahead(cfg_geti("readahead"));
 	box_check_checkpoint_count(cfg_geti("checkpoint_count"));
@@ -802,6 +816,16 @@ box_set_replication_synchro_quorum(void)
 	return 0;
 }
 
+int
+box_set_replication_synchro_timeout(void)
+{
+	double value = box_check_replication_synchro_timeout();
+	if (value < 0)
+		return -1;
+	replication_synchro_timeout = value;
+	return 0;
+}
+
 void
 box_set_replication_sync_timeout(void)
 {
@@ -2444,6 +2468,8 @@ box_cfg_xc(void)
 	box_set_replication_sync_lag();
 	if (box_set_replication_synchro_quorum() != 0)
 		diag_raise();
+	if (box_set_replication_synchro_timeout() != 0)
+		diag_raise();
 	box_set_replication_sync_timeout();
 	box_set_replication_skip_conflict();
 	box_set_replication_anon();
diff --git a/src/box/box.h b/src/box/box.h
index 24802d0f1..f9789154e 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -244,6 +244,7 @@ void box_set_replication_connect_timeout(void);
 void box_set_replication_connect_quorum(void);
 void box_set_replication_sync_lag(void);
 int box_set_replication_synchro_quorum(void);
+int box_set_replication_synchro_timeout(void);
 void box_set_replication_sync_timeout(void);
 void box_set_replication_skip_conflict(void);
 void box_set_replication_anon(void);
diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc
index 01e8958cd..d481155cd 100644
--- a/src/box/lua/cfg.cc
+++ b/src/box/lua/cfg.cc
@@ -321,6 +321,14 @@ lbox_cfg_set_replication_synchro_quorum(struct lua_State *L)
 	return 0;
 }
 
+static int
+lbox_cfg_set_replication_synchro_timeout(struct lua_State *L)
+{
+	if (box_set_replication_synchro_timeout() != 0)
+		luaT_error(L);
+	return 0;
+}
+
 static int
 lbox_cfg_set_replication_sync_timeout(struct lua_State *L)
 {
@@ -379,6 +387,7 @@ box_lua_cfg_init(struct lua_State *L)
 		{"cfg_set_replication_connect_timeout", lbox_cfg_set_replication_connect_timeout},
 		{"cfg_set_replication_sync_lag", lbox_cfg_set_replication_sync_lag},
 		{"cfg_set_replication_synchro_quorum", lbox_cfg_set_replication_synchro_quorum},
+		{"cfg_set_replication_synchro_timeout", lbox_cfg_set_replication_synchro_timeout},
 		{"cfg_set_replication_sync_timeout", lbox_cfg_set_replication_sync_timeout},
 		{"cfg_set_replication_skip_conflict", lbox_cfg_set_replication_skip_conflict},
 		{"cfg_set_replication_anon", lbox_cfg_set_replication_anon},
diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
index 991e919e4..1155248a5 100644
--- a/src/box/lua/load_cfg.lua
+++ b/src/box/lua/load_cfg.lua
@@ -90,6 +90,7 @@ local default_cfg = {
     replication_sync_lag = 10,
     replication_sync_timeout = 300,
     replication_synchro_quorum = 1,
+    replication_synchro_timeout = 5,
     replication_connect_timeout = 30,
     replication_connect_quorum = nil, -- connect all
     replication_skip_conflict = false,
@@ -166,6 +167,7 @@ local template_cfg = {
     replication_sync_lag = 'number',
     replication_sync_timeout = 'number',
     replication_synchro_quorum = 'number',
+    replication_synchro_timeout = 'number',
     replication_connect_timeout = 'number',
     replication_connect_quorum = 'number',
     replication_skip_conflict = 'boolean',
@@ -278,6 +280,7 @@ local dynamic_cfg = {
     replication_sync_lag    = private.cfg_set_replication_sync_lag,
     replication_sync_timeout = private.cfg_set_replication_sync_timeout,
     replication_synchro_quorum = private.cfg_set_replication_synchro_quorum,
+    replication_synchro_timeout = private.cfg_set_replication_synchro_timeout,
     replication_skip_conflict = private.cfg_set_replication_skip_conflict,
     replication_anon        = private.cfg_set_replication_anon,
     instance_uuid           = check_instance_uuid,
@@ -312,6 +315,7 @@ local dynamic_cfg_order = {
     replication_sync_lag    = 150,
     replication_sync_timeout    = 150,
     replication_synchro_quorum  = 150,
+    replication_synchro_timeout = 150,
     replication_connect_timeout = 150,
     replication_connect_quorum  = 150,
     replication             = 200,
@@ -348,6 +352,7 @@ local dynamic_cfg_skip_at_load = {
     replication_sync_lag    = true,
     replication_sync_timeout = true,
     replication_synchro_quorum = true,
+    replication_synchro_timeout = true,
     replication_skip_conflict = true,
     replication_anon        = true,
     wal_dir_rescan_delay    = true,
diff --git a/src/box/replication.cc b/src/box/replication.cc
index 5b52f3864..01e9e876a 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -52,6 +52,7 @@ double replication_connect_timeout = 30.0; /* seconds */
 int replication_connect_quorum = REPLICATION_CONNECT_QUORUM_ALL;
 double replication_sync_lag = 10.0; /* seconds */
 int replication_synchro_quorum = 1;
+double replication_synchro_timeout = 5.0; /* seconds */
 double replication_sync_timeout = 300.0; /* seconds */
 bool replication_skip_conflict = false;
 bool replication_anon = false;
diff --git a/src/box/replication.h b/src/box/replication.h
index 05e3eb943..a081870f9 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -131,6 +131,12 @@ extern double replication_sync_lag;
  */
 extern int replication_synchro_quorum;
 
+/**
+ * Time in seconds which the master node is able to wait for ACKs
+ * for a synchronous transaction until it is rolled back.
+ */
+extern double replication_synchro_timeout;
+
 /**
  * Max time to wait for appliers to synchronize before entering
  * the orphan mode.
diff --git a/test/app-tap/init_script.result b/test/app-tap/init_script.result
index 2987b60b9..857f0c95f 100644
--- a/test/app-tap/init_script.result
+++ b/test/app-tap/init_script.result
@@ -31,6 +31,7 @@ replication_skip_conflict:false
 replication_sync_lag:10
 replication_sync_timeout:300
 replication_synchro_quorum:1
+replication_synchro_timeout:5
 replication_timeout:1
 slab_alloc_factor:1.05
 sql_cache_size:5242880
diff --git a/test/box/admin.result b/test/box/admin.result
index 35ecc7617..ab3e80a97 100644
--- a/test/box/admin.result
+++ b/test/box/admin.result
@@ -83,6 +83,8 @@ cfg_filter(box.cfg)
     - 300
   - - replication_synchro_quorum
     - 1
+  - - replication_synchro_timeout
+    - 5
   - - replication_timeout
     - 1
   - - slab_alloc_factor
diff --git a/test/box/cfg.result b/test/box/cfg.result
index cdc0773f2..bdd210b09 100644
--- a/test/box/cfg.result
+++ b/test/box/cfg.result
@@ -71,6 +71,8 @@ cfg_filter(box.cfg)
  |     - 300
  |   - - replication_synchro_quorum
  |     - 1
+ |   - - replication_synchro_timeout
+ |     - 5
  |   - - replication_timeout
  |     - 1
  |   - - slab_alloc_factor
@@ -176,6 +178,8 @@ cfg_filter(box.cfg)
  |     - 300
  |   - - replication_synchro_quorum
  |     - 1
+ |   - - replication_synchro_timeout
+ |     - 5
  |   - - replication_timeout
  |     - 1
  |   - - slab_alloc_factor

^ permalink raw reply	[flat|nested] 56+ messages in thread

* Re: [Tarantool-patches] [PATCH 8/8] replication: write and read CONFIRM entries
  2020-06-11 14:57   ` Vladislav Shpilevoy
@ 2020-06-15 23:05     ` Vladislav Shpilevoy
  2020-06-18 11:32       ` Leonid Vasiliev
  0 siblings, 1 reply; 56+ messages in thread
From: Vladislav Shpilevoy @ 2020-06-15 23:05 UTC (permalink / raw)
  To: Serge Petrenko, sergos, gorcunov; +Cc: tarantool-patches

The commit is squashed into the previous one.

^ permalink raw reply	[flat|nested] 56+ messages in thread

* Re: [Tarantool-patches] [PATCH 4/8] replication: make sync transactions wait quorum
  2020-06-11 14:57   ` Vladislav Shpilevoy
@ 2020-06-15 23:05     ` Vladislav Shpilevoy
  0 siblings, 0 replies; 56+ messages in thread
From: Vladislav Shpilevoy @ 2020-06-15 23:05 UTC (permalink / raw)
  To: Serge Petrenko, sergos, gorcunov; +Cc: tarantool-patches

The commit is squashed into the previous one.

^ permalink raw reply	[flat|nested] 56+ messages in thread

* Re: [Tarantool-patches] [PATCH 8/8] replication: write and read CONFIRM entries
  2020-06-15 23:05     ` Vladislav Shpilevoy
@ 2020-06-18 11:32       ` Leonid Vasiliev
  2020-06-18 21:49         ` Vladislav Shpilevoy
  2020-06-19 17:48         ` Serge Petrenko
  0 siblings, 2 replies; 56+ messages in thread
From: Leonid Vasiliev @ 2020-06-18 11:32 UTC (permalink / raw)
  To: Vladislav Shpilevoy, Serge Petrenko, sergos, gorcunov; +Cc: tarantool-patches

Add commit:

replication: fix for "replication: write and read CONFIRM entries"

As I understand it, an entry is deleted in txn_limbo_wait_complete(),
so don't delete it from the queue in txn_limbo_ack().

[TO BE SQUASHED INTO THE PREVIOUS COMMIT]

diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
index 03cbed7..a07ed24 100644
--- a/src/box/txn_limbo.c
+++ b/src/box/txn_limbo.c
@@ -198,7 +198,7 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t 
replica_id, int64_t lsn)
         assert(limbo->instance_id != REPLICA_ID_NIL);
         int64_t prev_lsn = vclock_get(&limbo->vclock, replica_id);
         vclock_follow(&limbo->vclock, replica_id, lsn);
-       struct txn_limbo_entry *e, *tmp;
+       struct txn_limbo_entry *e;
         struct txn_limbo_entry *last_quorum = NULL;
         rlist_foreach_entry(e, &limbo->queue, in_queue) {
                 if (e->lsn <= prev_lsn)
@@ -220,8 +220,7 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t 
replica_id, int64_t lsn)
                  * Wakeup all the entries in direct order as soon
                  * as confirmation message is written to WAL.
                  */
-               rlist_foreach_entry_safe(e, &limbo->queue, in_queue, tmp) {
-                       rlist_del_entry(e, in_queue);
+               rlist_foreach_entry(e, &limbo->queue, in_queue) {
                         fiber_wakeup(e->txn->fiber);
                         if (e == last_quorum)
                                 break;

^ permalink raw reply	[flat|nested] 56+ messages in thread

* Re: [Tarantool-patches] [PATCH 8/8] replication: write and read CONFIRM entries
  2020-06-18 11:32       ` Leonid Vasiliev
@ 2020-06-18 21:49         ` Vladislav Shpilevoy
  2020-06-19 17:48         ` Serge Petrenko
  1 sibling, 0 replies; 56+ messages in thread
From: Vladislav Shpilevoy @ 2020-06-18 21:49 UTC (permalink / raw)
  To: Leonid Vasiliev, Serge Petrenko, sergos, gorcunov; +Cc: tarantool-patches

LGTM. I suggest to squash it when someone will touch the branch
next time.

^ permalink raw reply	[flat|nested] 56+ messages in thread

* Re: [Tarantool-patches] [PATCH 1/8] replication: introduce space.is_sync option
  2020-06-10 23:51   ` Vladislav Shpilevoy
@ 2020-06-18 22:27     ` Leonid Vasiliev
  2020-06-21 16:24       ` Vladislav Shpilevoy
  0 siblings, 1 reply; 56+ messages in thread
From: Leonid Vasiliev @ 2020-06-18 22:27 UTC (permalink / raw)
  To: Vladislav Shpilevoy, Serge Petrenko, sergos, gorcunov; +Cc: tarantool-patches

Hi! Thank you for the patch.
Maybe, it is needed to add is_sync to the box_space_mt() function in
schema.lua
LGTM.

^ permalink raw reply	[flat|nested] 56+ messages in thread

* Re: [Tarantool-patches] [PATCH 2/8] replication: introduce replication_sync_quorum cfg
  2020-06-15 23:05   ` Vladislav Shpilevoy
@ 2020-06-18 22:54     ` Leonid Vasiliev
  2020-06-19 17:45     ` Serge Petrenko
  1 sibling, 0 replies; 56+ messages in thread
From: Leonid Vasiliev @ 2020-06-18 22:54 UTC (permalink / raw)
  To: Vladislav Shpilevoy, Serge Petrenko, sergos, gorcunov; +Cc: tarantool-patches

Hi! Thank you for the patch.
LGTM.

^ permalink raw reply	[flat|nested] 56+ messages in thread

* Re: [Tarantool-patches] [PATCH 3/8] txn: add TXN_WAIT_ACK flag
  2020-06-09 12:20 ` [Tarantool-patches] [PATCH 3/8] txn: add TXN_WAIT_ACK flag Serge Petrenko
@ 2020-06-18 23:12   ` Leonid Vasiliev
  2020-06-21 16:25     ` Vladislav Shpilevoy
  0 siblings, 1 reply; 56+ messages in thread
From: Leonid Vasiliev @ 2020-06-18 23:12 UTC (permalink / raw)
  To: Serge Petrenko, v.shpilevoy, sergos, gorcunov; +Cc: tarantool-patches

Hi! Thank you for the patch.


>   	stailq_foreach_entry(stmt, &txn->stmts, next) {
>   		if (stmt->has_triggers) {
>   			txn_init_triggers(txn);
>   			rlist_splice(&txn->on_commit, &stmt->on_commit);
>   		}
> +		is_sync = is_sync || stmt->space->def->opts.is_sync;

Seems like
"... || (stmt->space != NULL && stmt->space->def->opts.is_sync);"
can be moved here from "replication: make sync transactions wait quorum".

LGTM.

^ permalink raw reply	[flat|nested] 56+ messages in thread

* Re: [Tarantool-patches] [PATCH 4/8] replication: make sync transactions wait quorum
  2020-06-09 12:20 ` [Tarantool-patches] [PATCH 4/8] replication: make sync transactions wait quorum Serge Petrenko
  2020-06-10 23:51   ` Vladislav Shpilevoy
  2020-06-11 14:57   ` Vladislav Shpilevoy
@ 2020-06-19 12:39   ` Leonid Vasiliev
  2020-06-25 21:48   ` Vladislav Shpilevoy
  3 siblings, 0 replies; 56+ messages in thread
From: Leonid Vasiliev @ 2020-06-19 12:39 UTC (permalink / raw)
  To: Serge Petrenko, v.shpilevoy, sergos, gorcunov; +Cc: tarantool-patches

Hi! Thank you for the patch.
LGTM (version with fixes).

^ permalink raw reply	[flat|nested] 56+ messages in thread

* Re: [Tarantool-patches] [PATCH 7/8] xrow: introduce CONFIRM entry
  2020-06-09 12:20 ` [Tarantool-patches] [PATCH 7/8] xrow: introduce CONFIRM entry Serge Petrenko
@ 2020-06-19 15:18   ` Leonid Vasiliev
  2020-06-22 10:14     ` Serge Petrenko
  2020-06-23  8:33   ` Serge Petrenko
  1 sibling, 1 reply; 56+ messages in thread
From: Leonid Vasiliev @ 2020-06-19 15:18 UTC (permalink / raw)
  To: Serge Petrenko, v.shpilevoy, sergos, gorcunov; +Cc: tarantool-patches

LGTM.
All the following comments can be skipped silently.

On 09.06.2020 15:20, Serge Petrenko wrote:
> Add methods to encode/decode CONFIRM entry.
> A CONFIRM entry will be written to WAL by synchronous replication master
> as soon as it finds that the transaction was applied on a quorum of
> replicas.
> CONFIRM rows share the same header with other rows in WAL,but their body
> differs: it's just a map containing replica_id and lsn of the last
> confirmed transaction.
> 
> Part-of #4847
> ---
>   src/box/iproto_constants.h |  3 ++
>   src/box/xrow.c             | 74 ++++++++++++++++++++++++++++++++++++++
>   src/box/xrow.h             | 23 ++++++++++++
>   3 files changed, 100 insertions(+)
> 
> diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
> index f8eee0f3f..1466b456f 100644
> --- a/src/box/iproto_constants.h
> +++ b/src/box/iproto_constants.h
> @@ -219,6 +219,9 @@ enum iproto_type {
>   	/** The maximum typecode used for box.stat() */
>   	IPROTO_TYPE_STAT_MAX,
>   
> +	/** A confirmation message for synchronous transactions. */
> +	IPROTO_CONFIRM = 40,
> +

Seems like IPROTO_CONFIRM must be added to the documentation. If it's
true, please add @TarantoolBot.

>   	/** PING request */
>   	IPROTO_PING = 64,
>   	/** Replication JOIN command */
> diff --git a/src/box/xrow.c b/src/box/xrow.c
> index bb64864b2..f197e0d85 100644
> --- a/src/box/xrow.c
> +++ b/src/box/xrow.c
> @@ -878,6 +878,80 @@ xrow_encode_dml(const struct request *request, struct region *region,
>   	return iovcnt;
>   }
>   
> +int
> +xrow_encode_confirm(struct xrow_header *row, uint32_t replica_id, int64_t lsn)
> +{
> +	size_t len = mp_sizeof_map(2) + mp_sizeof_uint(IPROTO_REPLICA_ID) +
> +		     mp_sizeof_uint(replica_id) + mp_sizeof_uint(IPROTO_LSN) +
> +		     mp_sizeof_uint(lsn);
> +	char *buf = (char *)region_alloc(&fiber()->gc, len);
> +	if (buf == NULL) {
> +		diag_set(OutOfMemory, len, "region_alloc", "buf");
> +		return -1;
> +	}
> +	char *pos = buf;
> +
> +	pos = mp_encode_map(pos, 2);
> +	pos = mp_encode_uint(pos, IPROTO_REPLICA_ID);
> +	pos = mp_encode_uint(pos, replica_id);
> +	pos = mp_encode_uint(pos, IPROTO_LSN);
> +	pos = mp_encode_uint(pos, lsn);
> +
> +	row->body[0].iov_base = buf;
> +	row->body[0].iov_len = len;
> +
> +	row->type = IPROTO_CONFIRM;
> +
> +	return 1;
> +}

At the last version
memset(row, 0, sizeof(*row));
was added. But, usually it is initialized at the beginning of the
function, because otherwise it is possible to do return without
initializing of the row. It does not look like a problem, the decision
is yours.

> +
> +int
> +xrow_decode_confirm(struct xrow_header *row, uint32_t *replica_id, int64_t *lsn)
> +{
> +	if (row->bodycnt == 0) {
> +		diag_set(ClientError, ER_INVALID_MSGPACK, "request body");
> +		return -1;
> +	}
> +
> +	assert(row->bodycnt == 1);
> +
> +	const char * const data = (const char *)row->body[0].iov_base;
> +	const char * const end = data + row->body[0].iov_len;
> +	const char *d = data;
> +	if (mp_check(&d, end) != 0 || mp_typeof(*data) != MP_MAP) {
> +		xrow_on_decode_err(data, end, ER_INVALID_MSGPACK,
> +				   "request body");
> +		return -1;
> +	}
> +
> +	d = data;
> +	uint32_t map_size = mp_decode_map(&d);
> +	for (uint32_t i = 0; i < map_size; i++) {
> +		if (mp_typeof(*d) != MP_UINT) {
> +			mp_next(&d);
> +			mp_next(&d);
> +			continue;
> +		}
> +		uint8_t key = mp_decode_uint(&d);
> +		if (key >= IPROTO_KEY_MAX || iproto_key_type[key] !=
> +					     mp_typeof(*d)) {
> +				xrow_on_decode_err(data, end, ER_INVALID_MSGPACK,
> +						   "request body");
> +		}
> +		switch (key) {
> +		case IPROTO_REPLICA_ID:
> +			*replica_id = mp_decode_uint(&d);
> +			break;
> +		case IPROTO_LSN:
> +			*lsn = mp_decode_uint(&d);
> +			break;
> +		default:
> +			mp_next(&d);
> +		}
> +	}
> +	return 0;
> +}
> +
>   int
>   xrow_to_iovec(const struct xrow_header *row, struct iovec *out)
>   {
> diff --git a/src/box/xrow.h b/src/box/xrow.h
> index 2a0a9c852..75af71b77 100644
> --- a/src/box/xrow.h
> +++ b/src/box/xrow.h
> @@ -207,6 +207,29 @@ int
>   xrow_encode_dml(const struct request *request, struct region *region,
>   		struct iovec *iov);
>   
> +/**
> + * Encode the CONFIRM to row body and set row type to
> + * IPROTO_CONFIRM.
> + * @param row xrow header.
> + * @param replica_id master's instance id.
> + * @param lsn last confirmed lsn.
> + * @retval -1 on error.
> + * @retval > 0 xrow bodycnt.
> + */
> +int
> +xrow_encode_confirm(struct xrow_header *row, uint32_t replica_id, int64_t lsn);
> +
> +/**
> + * Decode the CONFIRM request body.
> + * @param row xrow header.
> + * @param[out] replica_id master's instance id.
> + * @param[out] lsn last confirmed lsn.
> + * @retwal -1 on error.
> + * @retwal 0 success.

The typo is fixed by you in "xrow: fix comment typo".

> + */
> +int
> +xrow_decode_confirm(struct xrow_header *row, uint32_t *replica_id, int64_t *lsn);
> +
>   /**
>    * CALL/EVAL request.
>    */
> 

^ permalink raw reply	[flat|nested] 56+ messages in thread

* Re: [Tarantool-patches] [PATCH 2/8] replication: introduce replication_sync_quorum cfg
  2020-06-15 23:05   ` Vladislav Shpilevoy
  2020-06-18 22:54     ` Leonid Vasiliev
@ 2020-06-19 17:45     ` Serge Petrenko
  2020-06-21 16:25       ` Vladislav Shpilevoy
  1 sibling, 1 reply; 56+ messages in thread
From: Serge Petrenko @ 2020-06-19 17:45 UTC (permalink / raw)
  To: Vladislav Shpilevoy, sergos, gorcunov, Leonid Vasiliev; +Cc: tarantool-patches


16.06.2020 02:05, Vladislav Shpilevoy пишет:
> I appended a new commit on top of this one on the
> branch. In the quorum commit I renamed the
> option to
>
>      replication_synchro_quorum
>
> ====================
> commit fc19662ec528c5217c7b611ae16d417497d9fe35
> Author: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
> Date:   Tue Jun 16 00:47:24 2020 +0200
>
>      replication: introduce replication_synchro_timeout cfg
>      
>      [TO BE SQUASHED INTO THE PREVIOUS COMMIT]
>      
>      Part of #4844
>      Part of #5073
>
> diff --git a/src/box/box.cc b/src/box/box.cc
> index c7a5f2e3c..9db55e05a 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -489,6 +489,18 @@ box_check_replication_synchro_quorum(void)
>   	return quorum;
>   }
>   
> +static double
> +box_check_replication_synchro_timeout(void)
> +{
> +	double timeout = cfg_getd("replication_synchro_timeout");
> +	if (timeout <= 0) {
> +		diag_set(ClientError, ER_CFG, "replication_synchro_timeout",
> +			 "the value must be greater than zero");
> +		return -1;
> +	}
> +	return timeout;
> +}
> +
>   static double
>   box_check_replication_sync_timeout(void)
>   {
> @@ -673,6 +685,8 @@ box_check_config()
>   	box_check_replication_sync_lag();
>   	if (box_check_replication_synchro_quorum() < 0)
>   		diag_raise();
> +	if (box_check_replication_synchro_timeout() < 0)
> +		diag_raise();
>   	box_check_replication_sync_timeout();
>   	box_check_readahead(cfg_geti("readahead"));
>   	box_check_checkpoint_count(cfg_geti("checkpoint_count"));
> @@ -802,6 +816,16 @@ box_set_replication_synchro_quorum(void)
>   	return 0;
>   }
>   
> +int
> +box_set_replication_synchro_timeout(void)
> +{
> +	double value = box_check_replication_synchro_timeout();
> +	if (value < 0)
> +		return -1;
> +	replication_synchro_timeout = value;
> +	return 0;
> +}
> +
>   void
>   box_set_replication_sync_timeout(void)
>   {
> @@ -2444,6 +2468,8 @@ box_cfg_xc(void)
>   	box_set_replication_sync_lag();
>   	if (box_set_replication_synchro_quorum() != 0)
>   		diag_raise();
> +	if (box_set_replication_synchro_timeout() != 0)
> +		diag_raise();
>   	box_set_replication_sync_timeout();
>   	box_set_replication_skip_conflict();
>   	box_set_replication_anon();
> diff --git a/src/box/box.h b/src/box/box.h
> index 24802d0f1..f9789154e 100644
> --- a/src/box/box.h
> +++ b/src/box/box.h
> @@ -244,6 +244,7 @@ void box_set_replication_connect_timeout(void);
>   void box_set_replication_connect_quorum(void);
>   void box_set_replication_sync_lag(void);
>   int box_set_replication_synchro_quorum(void);
> +int box_set_replication_synchro_timeout(void);
>   void box_set_replication_sync_timeout(void);
>   void box_set_replication_skip_conflict(void);
>   void box_set_replication_anon(void);
> diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc
> index 01e8958cd..d481155cd 100644
> --- a/src/box/lua/cfg.cc
> +++ b/src/box/lua/cfg.cc
> @@ -321,6 +321,14 @@ lbox_cfg_set_replication_synchro_quorum(struct lua_State *L)
>   	return 0;
>   }
>   
> +static int
> +lbox_cfg_set_replication_synchro_timeout(struct lua_State *L)
> +{
> +	if (box_set_replication_synchro_timeout() != 0)
> +		luaT_error(L);
> +	return 0;
> +}
> +
>   static int
>   lbox_cfg_set_replication_sync_timeout(struct lua_State *L)
>   {
> @@ -379,6 +387,7 @@ box_lua_cfg_init(struct lua_State *L)
>   		{"cfg_set_replication_connect_timeout", lbox_cfg_set_replication_connect_timeout},
>   		{"cfg_set_replication_sync_lag", lbox_cfg_set_replication_sync_lag},
>   		{"cfg_set_replication_synchro_quorum", lbox_cfg_set_replication_synchro_quorum},
> +		{"cfg_set_replication_synchro_timeout", lbox_cfg_set_replication_synchro_timeout},
>   		{"cfg_set_replication_sync_timeout", lbox_cfg_set_replication_sync_timeout},
>   		{"cfg_set_replication_skip_conflict", lbox_cfg_set_replication_skip_conflict},
>   		{"cfg_set_replication_anon", lbox_cfg_set_replication_anon},
> diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
> index 991e919e4..1155248a5 100644
> --- a/src/box/lua/load_cfg.lua
> +++ b/src/box/lua/load_cfg.lua
> @@ -90,6 +90,7 @@ local default_cfg = {
>       replication_sync_lag = 10,
>       replication_sync_timeout = 300,
>       replication_synchro_quorum = 1,
> +    replication_synchro_timeout = 5,
>       replication_connect_timeout = 30,
>       replication_connect_quorum = nil, -- connect all
>       replication_skip_conflict = false,
> @@ -166,6 +167,7 @@ local template_cfg = {
>       replication_sync_lag = 'number',
>       replication_sync_timeout = 'number',
>       replication_synchro_quorum = 'number',
> +    replication_synchro_timeout = 'number',
>       replication_connect_timeout = 'number',
>       replication_connect_quorum = 'number',
>       replication_skip_conflict = 'boolean',
> @@ -278,6 +280,7 @@ local dynamic_cfg = {
>       replication_sync_lag    = private.cfg_set_replication_sync_lag,
>       replication_sync_timeout = private.cfg_set_replication_sync_timeout,
>       replication_synchro_quorum = private.cfg_set_replication_synchro_quorum,
> +    replication_synchro_timeout = private.cfg_set_replication_synchro_timeout,
>       replication_skip_conflict = private.cfg_set_replication_skip_conflict,
>       replication_anon        = private.cfg_set_replication_anon,
>       instance_uuid           = check_instance_uuid,
> @@ -312,6 +315,7 @@ local dynamic_cfg_order = {
>       replication_sync_lag    = 150,
>       replication_sync_timeout    = 150,
>       replication_synchro_quorum  = 150,
> +    replication_synchro_timeout = 150,
>       replication_connect_timeout = 150,
>       replication_connect_quorum  = 150,
>       replication             = 200,
> @@ -348,6 +352,7 @@ local dynamic_cfg_skip_at_load = {
>       replication_sync_lag    = true,
>       replication_sync_timeout = true,
>       replication_synchro_quorum = true,
> +    replication_synchro_timeout = true,
>       replication_skip_conflict = true,
>       replication_anon        = true,
>       wal_dir_rescan_delay    = true,
> diff --git a/src/box/replication.cc b/src/box/replication.cc
> index 5b52f3864..01e9e876a 100644
> --- a/src/box/replication.cc
> +++ b/src/box/replication.cc
> @@ -52,6 +52,7 @@ double replication_connect_timeout = 30.0; /* seconds */
>   int replication_connect_quorum = REPLICATION_CONNECT_QUORUM_ALL;
>   double replication_sync_lag = 10.0; /* seconds */
>   int replication_synchro_quorum = 1;
> +double replication_synchro_timeout = 5.0; /* seconds */
>   double replication_sync_timeout = 300.0; /* seconds */
>   bool replication_skip_conflict = false;
>   bool replication_anon = false;
> diff --git a/src/box/replication.h b/src/box/replication.h
> index 05e3eb943..a081870f9 100644
> --- a/src/box/replication.h
> +++ b/src/box/replication.h
> @@ -131,6 +131,12 @@ extern double replication_sync_lag;
>    */
>   extern int replication_synchro_quorum;
>   
> +/**
> + * Time in seconds which the master node is able to wait for ACKs
> + * for a synchronous transaction until it is rolled back.
> + */
> +extern double replication_synchro_timeout;
> +
>   /**
>    * Max time to wait for appliers to synchronize before entering
>    * the orphan mode.
> diff --git a/test/app-tap/init_script.result b/test/app-tap/init_script.result
> index 2987b60b9..857f0c95f 100644
> --- a/test/app-tap/init_script.result
> +++ b/test/app-tap/init_script.result
> @@ -31,6 +31,7 @@ replication_skip_conflict:false
>   replication_sync_lag:10
>   replication_sync_timeout:300
>   replication_synchro_quorum:1
> +replication_synchro_timeout:5
>   replication_timeout:1
>   slab_alloc_factor:1.05
>   sql_cache_size:5242880
> diff --git a/test/box/admin.result b/test/box/admin.result
> index 35ecc7617..ab3e80a97 100644
> --- a/test/box/admin.result
> +++ b/test/box/admin.result
> @@ -83,6 +83,8 @@ cfg_filter(box.cfg)
>       - 300
>     - - replication_synchro_quorum
>       - 1
> +  - - replication_synchro_timeout
> +    - 5
>     - - replication_timeout
>       - 1
>     - - slab_alloc_factor
> diff --git a/test/box/cfg.result b/test/box/cfg.result
> index cdc0773f2..bdd210b09 100644
> --- a/test/box/cfg.result
> +++ b/test/box/cfg.result
> @@ -71,6 +71,8 @@ cfg_filter(box.cfg)
>    |     - 300
>    |   - - replication_synchro_quorum
>    |     - 1
> + |   - - replication_synchro_timeout
> + |     - 5
>    |   - - replication_timeout
>    |     - 1
>    |   - - slab_alloc_factor
> @@ -176,6 +178,8 @@ cfg_filter(box.cfg)
>    |     - 300
>    |   - - replication_synchro_quorum
>    |     - 1
> + |   - - replication_synchro_timeout
> + |     - 5
>    |   - - replication_timeout
>    |     - 1
>    |   - - slab_alloc_factor

Thanks! Looks good. I squashed the patch into the previous commit.

Here's the new commit message I came up with:


     replication: introduce replication_synchro_quorum and 
replication_synchro_timeout cfg options

     Synchronous transactions are supposed to be replicated on a
     specified number of replicas before committed on master. The
     number of replicas can be specified using
     replication_synchro_quorum option. It is 1 by default, so sync
     transactions work like asynchronous when not configured anyhow.
     1 means successful WAL write on master is enough for commit.

     When replication_synchro_quorum is greater than 1, an instance has to
     wait for the specified number of replicas to  reply with success. If
     enough replies aren't collected during replication_synchro_timeout,
     the instance rolls back the tx in question.

     Part of #4844
     Part of #5073


-- 
Serge Petrenko

^ permalink raw reply	[flat|nested] 56+ messages in thread

* Re: [Tarantool-patches] [PATCH 8/8] replication: write and read CONFIRM entries
  2020-06-18 11:32       ` Leonid Vasiliev
  2020-06-18 21:49         ` Vladislav Shpilevoy
@ 2020-06-19 17:48         ` Serge Petrenko
  1 sibling, 0 replies; 56+ messages in thread
From: Serge Petrenko @ 2020-06-19 17:48 UTC (permalink / raw)
  To: Leonid Vasiliev, Vladislav Shpilevoy, sergos, gorcunov; +Cc: tarantool-patches


18.06.2020 14:32, Leonid Vasiliev пишет:
> Add commit:
>
> replication: fix for "replication: write and read CONFIRM entries"
>
> As I understand it, an entry is deleted in txn_limbo_wait_complete(),
> so don't delete it from the queue in txn_limbo_ack().
>
> [TO BE SQUASHED INTO THE PREVIOUS COMMIT]
>
> diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
> index 03cbed7..a07ed24 100644
> --- a/src/box/txn_limbo.c
> +++ b/src/box/txn_limbo.c
> @@ -198,7 +198,7 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t 
> replica_id, int64_t lsn)
>         assert(limbo->instance_id != REPLICA_ID_NIL);
>         int64_t prev_lsn = vclock_get(&limbo->vclock, replica_id);
>         vclock_follow(&limbo->vclock, replica_id, lsn);
> -       struct txn_limbo_entry *e, *tmp;
> +       struct txn_limbo_entry *e;
>         struct txn_limbo_entry *last_quorum = NULL;
>         rlist_foreach_entry(e, &limbo->queue, in_queue) {
>                 if (e->lsn <= prev_lsn)
> @@ -220,8 +220,7 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t 
> replica_id, int64_t lsn)
>                  * Wakeup all the entries in direct order as soon
>                  * as confirmation message is written to WAL.
>                  */
> -               rlist_foreach_entry_safe(e, &limbo->queue, in_queue, 
> tmp) {
> -                       rlist_del_entry(e, in_queue);
> +               rlist_foreach_entry(e, &limbo->queue, in_queue) {
> fiber_wakeup(e->txn->fiber);
>                         if (e == last_quorum)
>                                 break;
>
LGTM. Squashed.

-- 
Serge Petrenko

^ permalink raw reply	[flat|nested] 56+ messages in thread

* Re: [Tarantool-patches] [PATCH 8/8] replication: write and read CONFIRM entries
  2020-06-09 12:20 ` [Tarantool-patches] [PATCH 8/8] replication: write and read CONFIRM entries Serge Petrenko
  2020-06-10 23:51   ` Vladislav Shpilevoy
  2020-06-11 14:57   ` Vladislav Shpilevoy
@ 2020-06-19 17:50   ` Serge Petrenko
  2020-06-23  8:35     ` Serge Petrenko
  2020-06-20 15:06   ` Leonid Vasiliev
                     ` (2 subsequent siblings)
  5 siblings, 1 reply; 56+ messages in thread
From: Serge Petrenko @ 2020-06-19 17:50 UTC (permalink / raw)
  To: v.shpilevoy, sergos, gorcunov, Leonid Vasiliev; +Cc: tarantool-patches

09.06.2020 15:20, Serge Petrenko пишет:
> Make txn_limbo write a CONFIRM entry as soon as a batch of entries
> receive their acks. CONFIRM entry is written to WAL and later replicated
> to all the replicas.
>
> Now replicas put synchronous transactions into txn_limbo and wait for
> corresponding confirmation entries to arrive and end up in their WAL
> before committing the transactions.
>
Added a new commit:

commit 88884031e6ea8ca836228432b400ca1557afafa6
Author: Serge Petrenko <sergepetrenko@tarantool.org>
Date:   Fri Jun 19 18:21:11 2020 +0300

     txn: rework synchronous tx on_rollback trigger

     Instead of allocating the trigger on heap, which would lead to use 
after
     free, allocate the trigger together with the txn.
     Also, clear the trigger when asynchronous journal write succeeds.

     [TO BE SQUASHED INTO THE PREVIOUS COMMIT]

diff --git a/src/box/txn.c b/src/box/txn.c
index 4f787db79..16856da0d 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -238,6 +238,7 @@ txn_begin(void)
      /* fiber_on_yield is initialized by engine on demand */
      trigger_create(&txn->fiber_on_stop, txn_on_stop, NULL, NULL);
      trigger_add(&fiber()->on_stop, &txn->fiber_on_stop);
+    trigger_create(&txn->on_write_failure, NULL, NULL, NULL);
      /*
       * By default all transactions may yield.
       * It's a responsibility of an engine to disable yields
@@ -458,6 +459,12 @@ txn_complete(struct txn *txn)
                           stop_tm - txn->start_tm);
          }
      } else {
+        /*
+         * Async write succeeded. Clear the trigger which
+         * would remove the corresponding txn_limbo entry
+         * in case of failure.
+         */
+        trigger_clear(&txn->on_write_failure);
          /*
           * Complete is called on every WAL operation
           * authored by this transaction. And it not always
@@ -681,10 +688,10 @@ txn_commit_async(struct txn *txn)
       * Set a trigger to abort waiting for confirm on WAL write
       * failure.
       */
+    trigger_create(&txn->on_write_failure, txn_limbo_on_rollback,
+               limbo_entry, NULL);
      if (is_sync) {
-        struct trigger trig;
-        trigger_create(&trig, txn_limbo_on_rollback, limbo_entry, NULL);
-        txn_on_rollback(txn, &trig);
+        txn_on_rollback(txn, &txn->on_write_failure);
      }
      return 0;
  }
diff --git a/src/box/txn.h b/src/box/txn.h
index e7705bb48..9efd6fd0d 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -228,6 +228,14 @@ struct txn {
       * in case a fiber stops (all engines).
       */
      struct trigger fiber_on_stop;
+    /**
+     * An on_rollback trigger for synchronous transactions
+     * removing the txn_limbo entry which would wait for
+     * confirmation otherwise.
+     * Is issued on asynchronous write failure and is cleared
+     * on write success.
+     */
+    struct trigger on_write_failure;
      /** Commit and rollback triggers. */
      struct rlist on_commit, on_rollback;
      /**

^ permalink raw reply	[flat|nested] 56+ messages in thread

* Re: [Tarantool-patches] [PATCH 8/8] replication: write and read CONFIRM entries
  2020-06-09 12:20 ` [Tarantool-patches] [PATCH 8/8] replication: write and read CONFIRM entries Serge Petrenko
                     ` (2 preceding siblings ...)
  2020-06-19 17:50   ` Serge Petrenko
@ 2020-06-20 15:06   ` Leonid Vasiliev
  2020-06-22 10:34     ` Serge Petrenko
  2020-06-23  8:34   ` Serge Petrenko
  2020-06-25 22:04   ` Vladislav Shpilevoy
  5 siblings, 1 reply; 56+ messages in thread
From: Leonid Vasiliev @ 2020-06-20 15:06 UTC (permalink / raw)
  To: Serge Petrenko, v.shpilevoy, sergos, gorcunov; +Cc: tarantool-patches

Hi! Thank you for the patch.
LGTM.
All the following comments can be skipped silently.

On 09.06.2020 15:20, Serge Petrenko wrote:
> Make txn_limbo write a CONFIRM entry as soon as a batch of entries
> receive their acks. CONFIRM entry is written to WAL and later replicated
> to all the replicas.
> 
> Now replicas put synchronous transactions into txn_limbo and wait for
> corresponding confirmation entries to arrive and end up in their WAL
> before committing the transactions.
> 
> Part-of #4847
> ---
>   src/box/applier.cc    | 81 ++++++++++++++++++++++++++++++++++++++++++-
>   src/box/box.cc        |  3 ++
>   src/box/errcode.h     |  1 +
>   src/box/relay.cc      | 13 ++++---
>   src/box/txn.c         | 75 ++++++++++++++++++++++++++++++---------
>   src/box/txn.h         | 23 ++++++++++++
>   src/box/txn_limbo.c   | 79 ++++++++++++++++++++++++++++++++++++-----
>   src/box/txn_limbo.h   |  6 ++++
>   test/box/error.result |  1 +
>   9 files changed, 252 insertions(+), 30 deletions(-)
> 
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index df48b4796..1dc977424 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -51,6 +51,7 @@
>   #include "txn.h"
>   #include "box.h"
>   #include "scoped_guard.h"
> +#include "txn_limbo.h"
>   
>   STRS(applier_state, applier_STATE);
>   
> @@ -214,6 +215,11 @@ apply_snapshot_row(struct xrow_header *row)
>   	struct txn *txn = txn_begin();
>   	if (txn == NULL)
>   		return -1;
> +	/*
> +	 * Do not wait for confirmation when fetching a snapshot.
> +	 * Master only sends confirmed rows during join.
> +	 */
> +	txn_force_async(txn);
>   	if (txn_begin_stmt(txn, space) != 0)
>   		goto rollback;
>   	/* no access checks here - applier always works with admin privs */
> @@ -249,10 +255,73 @@ process_nop(struct request *request)
>   	return txn_commit_stmt(txn, request);
>   }
>   
> +/*
> + * An on_commit trigger set on a txn containing a CONFIRM entry.
> + * Confirms some of the txs waiting in txn_limbo.

In txn.h "txns" notation is used (up to you).

> + */
> +static int
> +applier_on_confirm(struct trigger *trig, void *data)
> +{
> +	(void) trig;
> +	int64_t lsn = *(int64_t *)data;
> +	txn_limbo_read_confirm(&txn_limbo, lsn);
> +	return 0;
> +}
> +
> +static int
> +process_confirm(struct request *request)
> +{
> +	assert(request->header->type = IPROTO_CONFIRM);
> +	uint32_t replica_id;
> +	struct txn *txn = in_txn();
> +	int64_t *lsn = (int64_t *) region_alloc(&txn->region, sizeof(int64_t));
> +	if (lsn == NULL) {
> +		diag_set(OutOfMemory, sizeof(int64_t), "region_alloc", "lsn");
> +		return -1;
> +	}
> +	if (xrow_decode_confirm(request->header, &replica_id, lsn) != 0)
> +		return -1;
> +	/*
> +	 * on_commit trigger failure is not allowed, so check for
> +	 * instance id early.
> +	 */
> +	if (replica_id != txn_limbo.instance_id) {
> +		diag_set(ClientError, ER_SYNC_MASTER_MISMATCH, replica_id,
> +			 txn_limbo.instance_id);
> +		return -1;
> +	}
> +
> +	/*
> +	 * Set an on_commit trigger which will perform the actual
> +	 * confirmation processing.
> +	 */
> +	struct trigger *trig = (struct trigger *)region_alloc(&txn->region,
> +							      sizeof(*trig));
> +	if (trig == NULL) {
> +		diag_set(OutOfMemory, sizeof(*trig), "region_alloc", "trig");
> +		return -1;
> +	}
> +	trigger_create(trig, applier_on_confirm, lsn, NULL);
> +
> +	if (txn_begin_stmt(txn, NULL) != 0)
> +		return -1;
> +
> +	if (txn_commit_stmt(txn, request) == 0) {
> +		txn_on_commit(txn, trig);
> +		return 0;
> +	} else {
> +		return -1;
> +	}

Is I understood corectly that this is a trick like in the process_nop()
to promote vclock and ...? Maybe, add a coment?

> +}
> +
>   static int
>   apply_row(struct xrow_header *row)
>   {
>   	struct request request;
> +	if (row->type == IPROTO_CONFIRM) {
> +		request.header = row;
> +		return process_confirm(&request);
> +	}
>   	if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0)
>   		return -1;
>   	if (request.type == IPROTO_NOP)
> @@ -273,6 +342,11 @@ apply_final_join_row(struct xrow_header *row)
>   	struct txn *txn = txn_begin();
>   	if (txn == NULL)
>   		return -1;
> +	/*
> +	 * Do not wait for confirmation while processing final
> +	 * join rows. See apply_snapshot_row().
> +	 */
> +	txn_force_async(txn);
>   	if (apply_row(row) != 0) {
>   		txn_rollback(txn);
>   		fiber_gc();
> @@ -492,7 +566,12 @@ applier_wait_register(struct applier *applier, uint64_t row_count)
>   		applier->last_row_time = ev_monotonic_now(loop());
>   		if (iproto_type_is_dml(row.type)) {
>   			vclock_follow_xrow(&replicaset.vclock, &row);
> -			if (apply_final_join_row(&row) != 0)
> +			/*
> +			 * Confirms are ignored during join. All the
> +			 * data master sends us is valid.
> +			 */
> +			if (row.type != IPROTO_CONFIRM &&
> +			    apply_final_join_row(&row) != 0)
>   				diag_raise();
>   			if (++row_count % 100000 == 0)
>   				say_info("%.1fM rows received", row_count / 1e6);
> diff --git a/src/box/box.cc b/src/box/box.cc
> index 64ac89975..792c3c394 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -342,6 +342,9 @@ static void
>   apply_wal_row(struct xstream *stream, struct xrow_header *row)
>   {
>   	struct request request;
> +	// TODO: process confirmation during recovery.
> +	if (row->type == IPROTO_CONFIRM)
> +		return;
>   	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
>   	if (request.type != IPROTO_NOP) {
>   		struct space *space = space_cache_find_xc(request.space_id);
> diff --git a/src/box/errcode.h b/src/box/errcode.h
> index 019c582af..3ba6866e5 100644
> --- a/src/box/errcode.h
> +++ b/src/box/errcode.h
> @@ -267,6 +267,7 @@ struct errcode_record {
>   	/*212 */_(ER_SEQUENCE_NOT_STARTED,		"Sequence '%s' is not started") \
>   	/*213 */_(ER_NO_SUCH_SESSION_SETTING,	"Session setting %s doesn't exist") \
>   	/*214 */_(ER_UNCOMMITTED_FOREIGN_SYNC_TXNS, "Found uncommitted sync transactions from other instance with id %u") \
> +	/*215 */_(ER_SYNC_MASTER_MISMATCH,	"CONFIRM message arrived for an unknown master id %d, expected %d") \
>   
>   /*
>    * !IMPORTANT! Please follow instructions at start of the file
> diff --git a/src/box/relay.cc b/src/box/relay.cc
> index 333e91ea9..4df3c2f26 100644
> --- a/src/box/relay.cc
> +++ b/src/box/relay.cc
> @@ -402,10 +402,14 @@ tx_status_update(struct cmsg *msg)
>   	vclock_copy(&status->relay->tx.vclock, &status->vclock);
>   	/*
>   	 * Let pending synchronous transactions know, which of
> -	 * them were successfully sent to the replica.
> +	 * them were successfully sent to the replica. Acks are
> +	 * collected only on the master. Other instances wait for
> +	 * master's CONFIRM message instead.
>   	 */
> -	txn_limbo_ack(&txn_limbo, status->relay->replica->id,
> -		     vclock_get(&status->vclock, instance_id));
> +	if (txn_limbo.instance_id == instance_id) {
> +		txn_limbo_ack(&txn_limbo, status->relay->replica->id,
> +			      vclock_get(&status->vclock, instance_id));
> +	}
>   	static const struct cmsg_hop route[] = {
>   		{relay_status_update, NULL}
>   	};
> @@ -766,7 +770,8 @@ static void
>   relay_send_row(struct xstream *stream, struct xrow_header *packet)
>   {
>   	struct relay *relay = container_of(stream, struct relay, stream);
> -	assert(iproto_type_is_dml(packet->type));
> +	assert(iproto_type_is_dml(packet->type) ||
> +	       packet->type == IPROTO_CONFIRM);
>   	if (packet->group_id == GROUP_LOCAL) {
>   		/*
>   		 * We do not relay replica-local rows to other
> diff --git a/src/box/txn.c b/src/box/txn.c
> index a65100b31..3b331fecc 100644
> --- a/src/box/txn.c
> +++ b/src/box/txn.c
> @@ -36,6 +36,7 @@
>   #include <fiber.h>
>   #include "xrow.h"
>   #include "errinj.h"
> +#include "iproto_constants.h"
>   
>   double too_long_threshold;
>   
> @@ -81,7 +82,12 @@ txn_add_redo(struct txn *txn, struct txn_stmt *stmt, struct request *request)
>   	 */
>   	struct space *space = stmt->space;
>   	row->group_id = space != NULL ? space_group_id(space) : 0;
> -	row->bodycnt = xrow_encode_dml(request, &txn->region, row->body);
> +	/*
> +	 * IPROTO_CONFIRM entries are supplementary and aren't
> +	 * valid dml requests. They're encoded manually.
> +	 */
> +	if (likely(row->type != IPROTO_CONFIRM))
> +		row->bodycnt = xrow_encode_dml(request, &txn->region, row->body);
>   	if (row->bodycnt < 0)
>   		return -1;
>   	stmt->row = row;
> @@ -321,8 +327,10 @@ txn_commit_stmt(struct txn *txn, struct request *request)
>   	 */
>   	struct txn_stmt *stmt = txn_current_stmt(txn);
>   
> -	/* Create WAL record for the write requests in non-temporary spaces.
> -	 * stmt->space can be NULL for IRPOTO_NOP.
> +	/*
> +	 * Create WAL record for the write requests in
> +	 * non-temporary spaces. stmt->space can be NULL for
> +	 * IRPOTO_NOP or IPROTO_CONFIRM.
>   	 */
>   	if (stmt->space == NULL || !space_is_temporary(stmt->space)) {
>   		if (txn_add_redo(txn, stmt, request) != 0)
> @@ -417,12 +425,12 @@ txn_run_rollback_triggers(struct txn *txn, struct rlist *triggers)
>   /**
>    * Complete transaction processing.
>    */
> -static void
> +void
>   txn_complete(struct txn *txn)
>   {
>   	/*
>   	 * Note, engine can be NULL if transaction contains
> -	 * IPROTO_NOP statements only.
> +	 * IPROTO_NOP or IPROTO_CONFIRM statements.
>   	 */
>   	if (txn->signature < 0) {
>   		/* Undo the transaction. */
> @@ -510,13 +518,6 @@ txn_journal_entry_new(struct txn *txn)
>   	struct xrow_header **remote_row = req->rows;
>   	struct xrow_header **local_row = req->rows + txn->n_applier_rows;
>   	bool is_sync = false;
> -	/*
> -	 * Only local transactions, originated from the master,
> -	 * can enter 'waiting for acks' state. It means, only
> -	 * author of the transaction can collect acks. Replicas
> -	 * consider it a normal async transaction so far.
> -	 */
> -	bool is_local = true;
>   
>   	stailq_foreach_entry(stmt, &txn->stmts, next) {
>   		if (stmt->has_triggers) {
> @@ -530,17 +531,18 @@ txn_journal_entry_new(struct txn *txn)
>   		if (stmt->row == NULL)
>   			continue;
>   
> -		if (stmt->row->replica_id == 0) {
> +		if (stmt->row->replica_id == 0)
>   			*local_row++ = stmt->row;
> -		} else {
> +		else
>   			*remote_row++ = stmt->row;
> -			is_local = false;
> -		}
>   
>   		req->approx_len += xrow_approx_len(stmt->row);
>   	}
> -	if (is_sync && is_local)
> +
> +	is_sync = is_sync && !txn_has_flag(txn, TXN_FORCE_ASYNC);
> +	if (is_sync) {
>   		txn_set_flag(txn, TXN_WAIT_ACK);
> +	}
>   
>   	assert(remote_row == req->rows + txn->n_applier_rows);
>   	assert(local_row == remote_row + txn->n_new_rows);
> @@ -601,6 +603,19 @@ txn_commit_nop(struct txn *txn)
>   	return false;
>   }
>   
> +/*
> + * A trigger called on tx rollback due to a failed WAL write,
> + * when tx is waiting for confirmation.
> + */
> +static int
> +txn_limbo_on_rollback(struct trigger *trig, void *data)
> +{
> +	(void) trig;
> +	struct txn_limbo_entry *entry = (struct txn_limbo_entry *) data;
> +	txn_limbo_abort(&txn_limbo, entry);
> +	return 0;
> +}
> +
>   int
>   txn_commit_async(struct txn *txn)
>   {
> @@ -632,16 +647,42 @@ txn_commit_async(struct txn *txn)
>   		return -1;
>   	}
>   
> +	bool is_sync = txn_has_flag(txn, TXN_WAIT_ACK);
> +	struct txn_limbo_entry *limbo_entry;
> +	if (is_sync) {
> +		/* See txn_commit(). */
> +		uint32_t origin_id = req->rows[0]->replica_id;
> +		int64_t lsn = req->rows[txn->n_applier_rows - 1]->lsn;
> +		limbo_entry = txn_limbo_append(&txn_limbo, origin_id, txn);
> +		if (limbo_entry == NULL) {
> +			txn_rollback(txn);
> +			txn_free(txn);
> +			return -1;
> +		}
> +		assert(lsn > 0);
> +		txn_limbo_assign_lsn(&txn_limbo, limbo_entry, lsn);
> +	}
> +
>   	fiber_set_txn(fiber(), NULL);
>   	if (journal_write_async(req) != 0) {
>   		fiber_set_txn(fiber(), txn);
>   		txn_rollback(txn);
> +		txn_limbo_abort(&txn_limbo, limbo_entry);
>   
>   		diag_set(ClientError, ER_WAL_IO);
>   		diag_log();
>   		return -1;
>   	}
>   
> +	/*
> +	 * Set a trigger to abort waiting for confirm on WAL write
> +	 * failure.
> +	 */
> +	if (is_sync) {
> +		struct trigger trig;
> +		trigger_create(&trig, txn_limbo_on_rollback, limbo_entry, NULL);
> +		txn_on_rollback(txn, &trig);
> +	}
>   	return 0;
>   }
>   
> diff --git a/src/box/txn.h b/src/box/txn.h
> index 232cc07a8..e7705bb48 100644
> --- a/src/box/txn.h
> +++ b/src/box/txn.h
> @@ -73,6 +73,13 @@ enum txn_flag {
>   	 * then finishes commit and returns success to a user.
>   	 */
>   	TXN_WAIT_ACK,
> +	/**
> +	 * A transaction mustn't wait for confirmation, even if it
> +	 * touches synchronous spaces. Needed for join stage on
> +	 * replica, when all the data coming from the master is
> +	 * already confirmed by design.
> +	 */
> +	TXN_FORCE_ASYNC,
>   };
>   
>   enum {
> @@ -257,6 +264,16 @@ txn_clear_flag(struct txn *txn, enum txn_flag flag)
>   	txn->flags &= ~(1 << flag);
>   }
>   
> +/**
> + * Force async mode for transaction. It won't wait for acks
> + * or confirmation.
> + */
> +static inline void
> +txn_force_async(struct txn *txn)
> +{
> +	txn_set_flag(txn, TXN_FORCE_ASYNC);
> +}
> +
>   /* Pointer to the current transaction (if any) */
>   static inline struct txn *
>   in_txn(void)
> @@ -278,6 +295,12 @@ fiber_set_txn(struct fiber *fiber, struct txn *txn)
>   struct txn *
>   txn_begin(void);
>   
> +/**
> + * Complete transaction processing.
> + */
> +void
> +txn_complete(struct txn *txn);
> +
>   /**
>    * Commit a transaction.
>    * @pre txn == in_txn()
> diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
> index efb97a591..daec98317 100644
> --- a/src/box/txn_limbo.c
> +++ b/src/box/txn_limbo.c
> @@ -128,12 +128,65 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
>   		fiber_yield();
>   	fiber_set_cancellable(cancellable);
>   	// TODO: implement rollback.
> -	// TODO: implement confirm.
>   	assert(!entry->is_rollback);
> +	assert(entry->is_commit);
>   	txn_limbo_remove(limbo, entry);
>   	txn_clear_flag(txn, TXN_WAIT_ACK);
>   }
>   
> +/**
> + * Write a confirmation entry to WAL. After it's written all the
> + * transactions waiting for confirmation may be finished.
> + */
> +static int
> +txn_limbo_write_confirm(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
> +{
> +	/* Prepare a confirm entry. */
> +	struct xrow_header row = {0};
> +	struct request request = {0};
> +	request.header = &row;
> +
> +	row.bodycnt = xrow_encode_confirm(&row, limbo->instance_id, entry->lsn);
> +	if (row.bodycnt < 0)
> +		return -1;
> +
> +	struct txn *txn = txn_begin();
> +	if (txn == NULL)
> +		return -1;
> +
> +	if (txn_begin_stmt(txn, NULL) != 0)
> +		goto rollback;
> +	if (txn_commit_stmt(txn, &request) != 0)
> +		goto rollback;
> +
> +	return txn_commit(txn);
> +rollback:
> +	txn_rollback(txn);
> +	return -1;
> +}
> +
> +void
> +txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn)
> +{
> +	assert(limbo->instance_id != REPLICA_ID_NIL &&
> +	       limbo->instance_id != instance_id);
> +	struct txn_limbo_entry *e, *tmp;
> +	rlist_foreach_entry_safe(e, &limbo->queue, in_queue, tmp) {
> +		if (e->lsn > lsn)
> +			break;
> +		assert(e->txn->fiber == NULL);
> +		e->is_commit = true;
> +		txn_limbo_remove(limbo, e);
> +		txn_clear_flag(e->txn, TXN_WAIT_ACK);
> +		/*
> +		 * txn_complete_async must've been called already,
> +		 * since CONFIRM always follows the tx in question.
> +		 * So, finish this tx processing right away.
> +		 */
> +		txn_complete(e->txn);
> +	}
> +}
> +
>   void
>   txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn)
>   {
> @@ -143,23 +196,33 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn)
>   	int64_t prev_lsn = vclock_get(&limbo->vclock, replica_id);
>   	vclock_follow(&limbo->vclock, replica_id, lsn);
>   	struct txn_limbo_entry *e;
> +	struct txn_limbo_entry *last_quorum = NULL;
>   	rlist_foreach_entry(e, &limbo->queue, in_queue) {
>   		if (e->lsn <= prev_lsn)
>   			continue;
>   		if (e->lsn > lsn)
>   			break;
>   		if (++e->ack_count >= replication_sync_quorum) {
> -			// TODO: better call complete() right
> -			// here. Appliers use async transactions,
> -			// and their txns don't have fibers to
> -			// wake up. That becomes actual, when
> -			// appliers will be supposed to wait for
> -			// 'confirm' message.
>   			e->is_commit = true;
> -			fiber_wakeup(e->txn->fiber);
> +			last_quorum = e;
>   		}
>   		assert(e->ack_count <= VCLOCK_MAX);
>   	}
> +	if (last_quorum != NULL) {
> +		if (txn_limbo_write_confirm(limbo, last_quorum) != 0) {
> +			// TODO: rollback.
> +			return;
> +		}
> +		/*
> +		 * Wakeup all the entries in direct order as soon
> +		 * as confirmation message is written to WAL.
> +		 */
> +		rlist_foreach_entry(e, &limbo->queue, in_queue) {
> +			fiber_wakeup(e->txn->fiber);
> +			if (e == last_quorum)
> +				break;
> +		}
> +	}
>   }
>   
>   void
> diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
> index 1ad1c567a..de415cd97 100644
> --- a/src/box/txn_limbo.h
> +++ b/src/box/txn_limbo.h
> @@ -160,6 +160,12 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn);
>   void
>   txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry);
>   
> +/**
> + * Confirm all the entries up to the given master's LSN.
> + */
> +void
> +txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn);
> +
>   void
>   txn_limbo_init();
>   
> diff --git a/test/box/error.result b/test/box/error.result
> index 69c471085..34ded3930 100644
> --- a/test/box/error.result
> +++ b/test/box/error.result
> @@ -433,6 +433,7 @@ t;
>    |   212: box.error.SEQUENCE_NOT_STARTED
>    |   213: box.error.NO_SUCH_SESSION_SETTING
>    |   214: box.error.UNCOMMITTED_FOREIGN_SYNC_TXNS
> + |   215: box.error.SYNC_MASTER_MISMATCH
>    | ...
>   
>   test_run:cmd("setopt delimiter ''");
> 

^ permalink raw reply	[flat|nested] 56+ messages in thread

* Re: [Tarantool-patches] [PATCH 1/8] replication: introduce space.is_sync option
  2020-06-18 22:27     ` Leonid Vasiliev
@ 2020-06-21 16:24       ` Vladislav Shpilevoy
  0 siblings, 0 replies; 56+ messages in thread
From: Vladislav Shpilevoy @ 2020-06-21 16:24 UTC (permalink / raw)
  To: Leonid Vasiliev, Serge Petrenko, sergos, gorcunov; +Cc: tarantool-patches

Hi! Thanks for the review!

> Maybe, it is needed to add is_sync to the box_space_mt() function in
> schema.lua
> LGTM.

Added in a separate commit:

====================
commit 8ea5476e1c34a7592567644b9d58af838e97317c
Author: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
Date:   Sun Jun 21 18:07:40 2020 +0200

    replication: add is_sync to space serialization
    
    [TO BE SQUASHED INTO THE PREVIOUS COMMIT]

diff --git a/src/box/lua/schema.lua b/src/box/lua/schema.lua
index a91b4fbad..2edf25fae 100644
--- a/src/box/lua/schema.lua
+++ b/src/box/lua/schema.lua
@@ -2706,6 +2706,7 @@ local function box_space_mt(tab)
                 engine = v.engine,
                 is_local = v.is_local,
                 temporary = v.temporary,
+                is_sync = v.is_sync,
             }
         end
     end

^ permalink raw reply	[flat|nested] 56+ messages in thread

* Re: [Tarantool-patches] [PATCH 3/8] txn: add TXN_WAIT_ACK flag
  2020-06-18 23:12   ` Leonid Vasiliev
@ 2020-06-21 16:25     ` Vladislav Shpilevoy
  2020-06-22  9:44       ` Serge Petrenko
  0 siblings, 1 reply; 56+ messages in thread
From: Vladislav Shpilevoy @ 2020-06-21 16:25 UTC (permalink / raw)
  To: Leonid Vasiliev, Serge Petrenko, sergos, gorcunov; +Cc: tarantool-patches

Thanks for the review!

>>       stailq_foreach_entry(stmt, &txn->stmts, next) {
>>           if (stmt->has_triggers) {
>>               txn_init_triggers(txn);
>>               rlist_splice(&txn->on_commit, &stmt->on_commit);
>>           }
>> +        is_sync = is_sync || stmt->space->def->opts.is_sync;
> 
> Seems like
> "... || (stmt->space != NULL && stmt->space->def->opts.is_sync);"
> can be moved here from "replication: make sync transactions wait quorum".
> 
> LGTM.

I don't think it is a good idea. At this commit stmt->space can't be
NULL. This becomes possible only after Sergey P. starts using txns
to write confirm and rollback entries, which don't relate to any
space.

However I changed this place - I moved is_sync calculation after
stmt->row == NULL check. So as a read from a sync space wouldn't
turn the transaction into a synchronous one:

====================
commit e29fd482e0954913b87776d0a83ba04ef14683d8
Author: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
Date:   Sun Jun 21 18:12:53 2020 +0200

    txn: don't do sync flag calculation for read statements
    
    [TO BE SQUASHED INTO THE PREVIOUS COMMIT]

diff --git a/src/box/txn.c b/src/box/txn.c
index b50e7c1f8..22591a365 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -502,12 +502,13 @@ txn_journal_entry_new(struct txn *txn)
 			txn_init_triggers(txn);
 			rlist_splice(&txn->on_commit, &stmt->on_commit);
 		}
-		is_sync = is_sync || stmt->space->def->opts.is_sync;
 
 		/* A read (e.g. select) request */
 		if (stmt->row == NULL)
 			continue;
 
+		is_sync = is_sync || stmt->space->def->opts.is_sync;
+
 		if (stmt->row->replica_id == 0)
 			*local_row++ = stmt->row;
 		else

^ permalink raw reply	[flat|nested] 56+ messages in thread

* Re: [Tarantool-patches] [PATCH 2/8] replication: introduce replication_sync_quorum cfg
  2020-06-19 17:45     ` Serge Petrenko
@ 2020-06-21 16:25       ` Vladislav Shpilevoy
  0 siblings, 0 replies; 56+ messages in thread
From: Vladislav Shpilevoy @ 2020-06-21 16:25 UTC (permalink / raw)
  To: Serge Petrenko, sergos, gorcunov, Leonid Vasiliev; +Cc: tarantool-patches

> Thanks! Looks good. I squashed the patch into the previous commit.
> 
> Here's the new commit message I came up with:
> 
> 
>     replication: introduce replication_synchro_quorum and replication_synchro_timeout cfg options

Commit titles should be not longer than 50 symbols if possible. So
I changed it to:

    replication: introduce replication_synchro_* cfg options

Still a bit longer though.

^ permalink raw reply	[flat|nested] 56+ messages in thread

* Re: [Tarantool-patches] [PATCH 3/8] txn: add TXN_WAIT_ACK flag
  2020-06-21 16:25     ` Vladislav Shpilevoy
@ 2020-06-22  9:44       ` Serge Petrenko
  2020-06-23 22:13         ` Vladislav Shpilevoy
  0 siblings, 1 reply; 56+ messages in thread
From: Serge Petrenko @ 2020-06-22  9:44 UTC (permalink / raw)
  To: Vladislav Shpilevoy, Leonid Vasiliev, sergos, gorcunov; +Cc: tarantool-patches


21.06.2020 19:25, Vladislav Shpilevoy пишет:
> Thanks for the review!
>
>>>        stailq_foreach_entry(stmt, &txn->stmts, next) {
>>>            if (stmt->has_triggers) {
>>>                txn_init_triggers(txn);
>>>                rlist_splice(&txn->on_commit, &stmt->on_commit);
>>>            }
>>> +        is_sync = is_sync || stmt->space->def->opts.is_sync;
>> Seems like
>> "... || (stmt->space != NULL && stmt->space->def->opts.is_sync);"
>> can be moved here from "replication: make sync transactions wait quorum".
>>
>> LGTM.
> I don't think it is a good idea. At this commit stmt->space can't be
> NULL. This becomes possible only after Sergey P. starts using txns
> to write confirm and rollback entries, which don't relate to any
> space.

Actually, stmt->space is NULL for NOPs.

>
> However I changed this place - I moved is_sync calculation after
> stmt->row == NULL check. So as a read from a sync space wouldn't
> turn the transaction into a synchronous one:
>
> ====================
> commit e29fd482e0954913b87776d0a83ba04ef14683d8
> Author: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
> Date:   Sun Jun 21 18:12:53 2020 +0200
>
>      txn: don't do sync flag calculation for read statements
>      
>      [TO BE SQUASHED INTO THE PREVIOUS COMMIT]
>
> diff --git a/src/box/txn.c b/src/box/txn.c
> index b50e7c1f8..22591a365 100644
> --- a/src/box/txn.c
> +++ b/src/box/txn.c
> @@ -502,12 +502,13 @@ txn_journal_entry_new(struct txn *txn)
>   			txn_init_triggers(txn);
>   			rlist_splice(&txn->on_commit, &stmt->on_commit);
>   		}
> -		is_sync = is_sync || stmt->space->def->opts.is_sync;
>   
>   		/* A read (e.g. select) request */
>   		if (stmt->row == NULL)
>   			continue;
>   
> +		is_sync = is_sync || stmt->space->def->opts.is_sync;
> +
>   		if (stmt->row->replica_id == 0)
>   			*local_row++ = stmt->row;
>   		else
>
-- 
Serge Petrenko

^ permalink raw reply	[flat|nested] 56+ messages in thread

* Re: [Tarantool-patches] [PATCH 7/8] xrow: introduce CONFIRM entry
  2020-06-19 15:18   ` Leonid Vasiliev
@ 2020-06-22 10:14     ` Serge Petrenko
  0 siblings, 0 replies; 56+ messages in thread
From: Serge Petrenko @ 2020-06-22 10:14 UTC (permalink / raw)
  To: Leonid Vasiliev, v.shpilevoy, sergos, gorcunov; +Cc: tarantool-patches


19.06.2020 18:18, Leonid Vasiliev пишет:
> LGTM.
> All the following comments can be skipped silently.
Hi! Thanks for the review!
>
> On 09.06.2020 15:20, Serge Petrenko wrote:
>> Add methods to encode/decode CONFIRM entry.
>> A CONFIRM entry will be written to WAL by synchronous replication master
>> as soon as it finds that the transaction was applied on a quorum of
>> replicas.
>> CONFIRM rows share the same header with other rows in WAL,but their body
>> differs: it's just a map containing replica_id and lsn of the last
>> confirmed transaction.
>>
>> Part-of #4847
>> ---
>>   src/box/iproto_constants.h |  3 ++
>>   src/box/xrow.c             | 74 ++++++++++++++++++++++++++++++++++++++
>>   src/box/xrow.h             | 23 ++++++++++++
>>   3 files changed, 100 insertions(+)
>>
>> diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
>> index f8eee0f3f..1466b456f 100644
>> --- a/src/box/iproto_constants.h
>> +++ b/src/box/iproto_constants.h
>> @@ -219,6 +219,9 @@ enum iproto_type {
>>       /** The maximum typecode used for box.stat() */
>>       IPROTO_TYPE_STAT_MAX,
>>   +    /** A confirmation message for synchronous transactions. */
>> +    IPROTO_CONFIRM = 40,
>> +
>
> Seems like IPROTO_CONFIRM must be added to the documentation. If it's
> true, please add @TarantoolBot.
I agree, added doc requests for CONFIRM and ROLLBACK.
>
>>       /** PING request */
>>       IPROTO_PING = 64,
>>       /** Replication JOIN command */
>> diff --git a/src/box/xrow.c b/src/box/xrow.c
>> index bb64864b2..f197e0d85 100644
>> --- a/src/box/xrow.c
>> +++ b/src/box/xrow.c
>> @@ -878,6 +878,80 @@ xrow_encode_dml(const struct request *request, 
>> struct region *region,
>>       return iovcnt;
>>   }
>>   +int
>> +xrow_encode_confirm(struct xrow_header *row, uint32_t replica_id, 
>> int64_t lsn)
>> +{
>> +    size_t len = mp_sizeof_map(2) + mp_sizeof_uint(IPROTO_REPLICA_ID) +
>> +             mp_sizeof_uint(replica_id) + mp_sizeof_uint(IPROTO_LSN) +
>> +             mp_sizeof_uint(lsn);
>> +    char *buf = (char *)region_alloc(&fiber()->gc, len);
>> +    if (buf == NULL) {
>> +        diag_set(OutOfMemory, len, "region_alloc", "buf");
>> +        return -1;
>> +    }
>> +    char *pos = buf;
>> +
>> +    pos = mp_encode_map(pos, 2);
>> +    pos = mp_encode_uint(pos, IPROTO_REPLICA_ID);
>> +    pos = mp_encode_uint(pos, replica_id);
>> +    pos = mp_encode_uint(pos, IPROTO_LSN);
>> +    pos = mp_encode_uint(pos, lsn);
>> +
>> +    row->body[0].iov_base = buf;
>> +    row->body[0].iov_len = len;
>> +
>> +    row->type = IPROTO_CONFIRM;
>> +
>> +    return 1;
>> +}
>
> At the last version
> memset(row, 0, sizeof(*row));
> was added. But, usually it is initialized at the beginning of the
> function, because otherwise it is possible to do return without
> initializing of the row. It does not look like a problem, the decision
> is yours.

I left it as is, looks strange if I memset the row before an error can 
happen IMO.

>
>> +
>> +int
>> +xrow_decode_confirm(struct xrow_header *row, uint32_t *replica_id, 
>> int64_t *lsn)
>> +{
>> +    if (row->bodycnt == 0) {
>> +        diag_set(ClientError, ER_INVALID_MSGPACK, "request body");
>> +        return -1;
>> +    }
>> +
>> +    assert(row->bodycnt == 1);
>> +
>> +    const char * const data = (const char *)row->body[0].iov_base;
>> +    const char * const end = data + row->body[0].iov_len;
>> +    const char *d = data;
>> +    if (mp_check(&d, end) != 0 || mp_typeof(*data) != MP_MAP) {
>> +        xrow_on_decode_err(data, end, ER_INVALID_MSGPACK,
>> +                   "request body");
>> +        return -1;
>> +    }
>> +
>> +    d = data;
>> +    uint32_t map_size = mp_decode_map(&d);
>> +    for (uint32_t i = 0; i < map_size; i++) {
>> +        if (mp_typeof(*d) != MP_UINT) {
>> +            mp_next(&d);
>> +            mp_next(&d);
>> +            continue;
>> +        }
>> +        uint8_t key = mp_decode_uint(&d);
>> +        if (key >= IPROTO_KEY_MAX || iproto_key_type[key] !=
>> +                         mp_typeof(*d)) {
>> +                xrow_on_decode_err(data, end, ER_INVALID_MSGPACK,
>> +                           "request body");
>> +        }
>> +        switch (key) {
>> +        case IPROTO_REPLICA_ID:
>> +            *replica_id = mp_decode_uint(&d);
>> +            break;
>> +        case IPROTO_LSN:
>> +            *lsn = mp_decode_uint(&d);
>> +            break;
>> +        default:
>> +            mp_next(&d);
>> +        }
>> +    }
>> +    return 0;
>> +}
>> +
>>   int
>>   xrow_to_iovec(const struct xrow_header *row, struct iovec *out)
>>   {
>> diff --git a/src/box/xrow.h b/src/box/xrow.h
>> index 2a0a9c852..75af71b77 100644
>> --- a/src/box/xrow.h
>> +++ b/src/box/xrow.h
>> @@ -207,6 +207,29 @@ int
>>   xrow_encode_dml(const struct request *request, struct region *region,
>>           struct iovec *iov);
>>   +/**
>> + * Encode the CONFIRM to row body and set row type to
>> + * IPROTO_CONFIRM.
>> + * @param row xrow header.
>> + * @param replica_id master's instance id.
>> + * @param lsn last confirmed lsn.
>> + * @retval -1 on error.
>> + * @retval > 0 xrow bodycnt.
>> + */
>> +int
>> +xrow_encode_confirm(struct xrow_header *row, uint32_t replica_id, 
>> int64_t lsn);
>> +
>> +/**
>> + * Decode the CONFIRM request body.
>> + * @param row xrow header.
>> + * @param[out] replica_id master's instance id.
>> + * @param[out] lsn last confirmed lsn.
>> + * @retwal -1 on error.
>> + * @retwal 0 success.
>
> The typo is fixed by you in "xrow: fix comment typo".
>
Squashed it into this commit.

>> + */
>> +int
>> +xrow_decode_confirm(struct xrow_header *row, uint32_t *replica_id, 
>> int64_t *lsn);
>> +
>>   /**
>>    * CALL/EVAL request.
>>    */
>>
-- 
Serge Petrenko

^ permalink raw reply	[flat|nested] 56+ messages in thread

* Re: [Tarantool-patches] [PATCH 8/8] replication: write and read CONFIRM entries
  2020-06-20 15:06   ` Leonid Vasiliev
@ 2020-06-22 10:34     ` Serge Petrenko
  0 siblings, 0 replies; 56+ messages in thread
From: Serge Petrenko @ 2020-06-22 10:34 UTC (permalink / raw)
  To: Leonid Vasiliev, v.shpilevoy, sergos, gorcunov; +Cc: tarantool-patches


20.06.2020 18:06, Leonid Vasiliev пишет:
> Hi! Thank you for the patch.
> LGTM.
> All the following comments can be skipped silently.
>
> On 09.06.2020 15:20, Serge Petrenko wrote:
>> Make txn_limbo write a CONFIRM entry as soon as a batch of entries
>> receive their acks. CONFIRM entry is written to WAL and later replicated
>> to all the replicas.
>>
>> Now replicas put synchronous transactions into txn_limbo and wait for
>> corresponding confirmation entries to arrive and end up in their WAL
>> before committing the transactions.
>>
>> Part-of #4847
>> ---
>>   src/box/applier.cc    | 81 ++++++++++++++++++++++++++++++++++++++++++-
>>   src/box/box.cc        |  3 ++
>>   src/box/errcode.h     |  1 +
>>   src/box/relay.cc      | 13 ++++---
>>   src/box/txn.c         | 75 ++++++++++++++++++++++++++++++---------
>>   src/box/txn.h         | 23 ++++++++++++
>>   src/box/txn_limbo.c   | 79 ++++++++++++++++++++++++++++++++++++-----
>>   src/box/txn_limbo.h   |  6 ++++
>>   test/box/error.result |  1 +
>>   9 files changed, 252 insertions(+), 30 deletions(-)
>>
>> diff --git a/src/box/applier.cc b/src/box/applier.cc
>> index df48b4796..1dc977424 100644
>> --- a/src/box/applier.cc
>> +++ b/src/box/applier.cc
>> @@ -51,6 +51,7 @@
>>   #include "txn.h"
>>   #include "box.h"
>>   #include "scoped_guard.h"
>> +#include "txn_limbo.h"
>>     STRS(applier_state, applier_STATE);
>>   @@ -214,6 +215,11 @@ apply_snapshot_row(struct xrow_header *row)
>>       struct txn *txn = txn_begin();
>>       if (txn == NULL)
>>           return -1;
>> +    /*
>> +     * Do not wait for confirmation when fetching a snapshot.
>> +     * Master only sends confirmed rows during join.
>> +     */
>> +    txn_force_async(txn);
>>       if (txn_begin_stmt(txn, space) != 0)
>>           goto rollback;
>>       /* no access checks here - applier always works with admin 
>> privs */
>> @@ -249,10 +255,73 @@ process_nop(struct request *request)
>>       return txn_commit_stmt(txn, request);
>>   }
>>   +/*
>> + * An on_commit trigger set on a txn containing a CONFIRM entry.
>> + * Confirms some of the txs waiting in txn_limbo.
>
> In txn.h "txns" notation is used (up to you).
Thanks! Fixed.
>
>> + */
>> +static int
>> +applier_on_confirm(struct trigger *trig, void *data)
>> +{
>> +    (void) trig;
>> +    int64_t lsn = *(int64_t *)data;
>> +    txn_limbo_read_confirm(&txn_limbo, lsn);
>> +    return 0;
>> +}
>> +
>> +static int
>> +process_confirm(struct request *request)
>> +{
>> +    assert(request->header->type = IPROTO_CONFIRM);
>> +    uint32_t replica_id;
>> +    struct txn *txn = in_txn();
>> +    int64_t *lsn = (int64_t *) region_alloc(&txn->region, 
>> sizeof(int64_t));
>> +    if (lsn == NULL) {
>> +        diag_set(OutOfMemory, sizeof(int64_t), "region_alloc", "lsn");
>> +        return -1;
>> +    }
>> +    if (xrow_decode_confirm(request->header, &replica_id, lsn) != 0)
>> +        return -1;
>> +    /*
>> +     * on_commit trigger failure is not allowed, so check for
>> +     * instance id early.
>> +     */
>> +    if (replica_id != txn_limbo.instance_id) {
>> +        diag_set(ClientError, ER_SYNC_MASTER_MISMATCH, replica_id,
>> +             txn_limbo.instance_id);
>> +        return -1;
>> +    }
>> +
>> +    /*
>> +     * Set an on_commit trigger which will perform the actual
>> +     * confirmation processing.
>> +     */
>> +    struct trigger *trig = (struct trigger *)region_alloc(&txn->region,
>> +                                  sizeof(*trig));
>> +    if (trig == NULL) {
>> +        diag_set(OutOfMemory, sizeof(*trig), "region_alloc", "trig");
>> +        return -1;
>> +    }
>> +    trigger_create(trig, applier_on_confirm, lsn, NULL);
>> +
>> +    if (txn_begin_stmt(txn, NULL) != 0)
>> +        return -1;
>> +
>> +    if (txn_commit_stmt(txn, request) == 0) {
>> +        txn_on_commit(txn, trig);
>> +        return 0;
>> +    } else {
>> +        return -1;
>> +    }
>
> Is I understood corectly that this is a trick like in the process_nop()
> to promote vclock and ...? Maybe, add a coment?

Yes, it's somewhat similar to process_nop. The difference is that unlike

everything else that applier receives, CONFIRM and ROLLBACK aren't dml

requests, and cannot be processed in the same manner. They require some

txn_limbo operations tied to their processing.  Actually, I've just 
invented a

comment you ask for. Added:

@@ -268,6 +268,12 @@ applier_on_confirm(struct trigger *trig, void *data)
         return 0;
  }

+/*
+ * CONFIRM entries aren't dml requests and require special
+ * handling: they're written to WAL right away, without any space
+ * operations, and, once they're committed, txn_limbo performs the
+ * actions dictated by the received CONFIRM entry.
+ */
  static int
  process_confirm(struct request *request)
  {

>
>> +}
>> +
>>   static int
>>   apply_row(struct xrow_header *row)
>>   {
>>       struct request request;
>> +    if (row->type == IPROTO_CONFIRM) {
>> +        request.header = row;
>> +        return process_confirm(&request);
>> +    }
>>       if (xrow_decode_dml(row, &request, 
>> dml_request_key_map(row->type)) != 0)
>>           return -1;
>>       if (request.type == IPROTO_NOP)
>> @@ -273,6 +342,11 @@ apply_final_join_row(struct xrow_header *row)
>>       struct txn *txn = txn_begin();
>>       if (txn == NULL)
>>           return -1;
>> +    /*
>> +     * Do not wait for confirmation while processing final
>> +     * join rows. See apply_snapshot_row().
>> +     */
>> +    txn_force_async(txn);
>>       if (apply_row(row) != 0) {
>>           txn_rollback(txn);
>>           fiber_gc();
>> @@ -492,7 +566,12 @@ applier_wait_register(struct applier *applier, 
>> uint64_t row_count)
>>           applier->last_row_time = ev_monotonic_now(loop());
>>           if (iproto_type_is_dml(row.type)) {
>> vclock_follow_xrow(&replicaset.vclock, &row);
>> -            if (apply_final_join_row(&row) != 0)
>> +            /*
>> +             * Confirms are ignored during join. All the
>> +             * data master sends us is valid.
>> +             */
>> +            if (row.type != IPROTO_CONFIRM &&
>> +                apply_final_join_row(&row) != 0)
>>                   diag_raise();
>>               if (++row_count % 100000 == 0)
>>                   say_info("%.1fM rows received", row_count / 1e6);
>> diff --git a/src/box/box.cc b/src/box/box.cc
>> index 64ac89975..792c3c394 100644
>> --- a/src/box/box.cc
>> +++ b/src/box/box.cc
>> @@ -342,6 +342,9 @@ static void
>>   apply_wal_row(struct xstream *stream, struct xrow_header *row)
>>   {
>>       struct request request;
>> +    // TODO: process confirmation during recovery.
>> +    if (row->type == IPROTO_CONFIRM)
>> +        return;
>>       xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
>>       if (request.type != IPROTO_NOP) {
>>           struct space *space = space_cache_find_xc(request.space_id);
>> diff --git a/src/box/errcode.h b/src/box/errcode.h
>> index 019c582af..3ba6866e5 100644
>> --- a/src/box/errcode.h
>> +++ b/src/box/errcode.h
>> @@ -267,6 +267,7 @@ struct errcode_record {
>>       /*212 */_(ER_SEQUENCE_NOT_STARTED, "Sequence '%s' is not 
>> started") \
>>       /*213 */_(ER_NO_SUCH_SESSION_SETTING,    "Session setting %s 
>> doesn't exist") \
>>       /*214 */_(ER_UNCOMMITTED_FOREIGN_SYNC_TXNS, "Found uncommitted 
>> sync transactions from other instance with id %u") \
>> +    /*215 */_(ER_SYNC_MASTER_MISMATCH,    "CONFIRM message arrived 
>> for an unknown master id %d, expected %d") \
>>     /*
>>    * !IMPORTANT! Please follow instructions at start of the file
>> diff --git a/src/box/relay.cc b/src/box/relay.cc
>> index 333e91ea9..4df3c2f26 100644
>> --- a/src/box/relay.cc
>> +++ b/src/box/relay.cc
>> @@ -402,10 +402,14 @@ tx_status_update(struct cmsg *msg)
>>       vclock_copy(&status->relay->tx.vclock, &status->vclock);
>>       /*
>>        * Let pending synchronous transactions know, which of
>> -     * them were successfully sent to the replica.
>> +     * them were successfully sent to the replica. Acks are
>> +     * collected only on the master. Other instances wait for
>> +     * master's CONFIRM message instead.
>>        */
>> -    txn_limbo_ack(&txn_limbo, status->relay->replica->id,
>> -             vclock_get(&status->vclock, instance_id));
>> +    if (txn_limbo.instance_id == instance_id) {
>> +        txn_limbo_ack(&txn_limbo, status->relay->replica->id,
>> +                  vclock_get(&status->vclock, instance_id));
>> +    }
>>       static const struct cmsg_hop route[] = {
>>           {relay_status_update, NULL}
>>       };
>> @@ -766,7 +770,8 @@ static void
>>   relay_send_row(struct xstream *stream, struct xrow_header *packet)
>>   {
>>       struct relay *relay = container_of(stream, struct relay, stream);
>> -    assert(iproto_type_is_dml(packet->type));
>> +    assert(iproto_type_is_dml(packet->type) ||
>> +           packet->type == IPROTO_CONFIRM);
>>       if (packet->group_id == GROUP_LOCAL) {
>>           /*
>>            * We do not relay replica-local rows to other
>> diff --git a/src/box/txn.c b/src/box/txn.c
>> index a65100b31..3b331fecc 100644
>> --- a/src/box/txn.c
>> +++ b/src/box/txn.c
>> @@ -36,6 +36,7 @@
>>   #include <fiber.h>
>>   #include "xrow.h"
>>   #include "errinj.h"
>> +#include "iproto_constants.h"
>>     double too_long_threshold;
>>   @@ -81,7 +82,12 @@ txn_add_redo(struct txn *txn, struct txn_stmt 
>> *stmt, struct request *request)
>>        */
>>       struct space *space = stmt->space;
>>       row->group_id = space != NULL ? space_group_id(space) : 0;
>> -    row->bodycnt = xrow_encode_dml(request, &txn->region, row->body);
>> +    /*
>> +     * IPROTO_CONFIRM entries are supplementary and aren't
>> +     * valid dml requests. They're encoded manually.
>> +     */
>> +    if (likely(row->type != IPROTO_CONFIRM))
>> +        row->bodycnt = xrow_encode_dml(request, &txn->region, 
>> row->body);
>>       if (row->bodycnt < 0)
>>           return -1;
>>       stmt->row = row;
>> @@ -321,8 +327,10 @@ txn_commit_stmt(struct txn *txn, struct request 
>> *request)
>>        */
>>       struct txn_stmt *stmt = txn_current_stmt(txn);
>>   -    /* Create WAL record for the write requests in non-temporary 
>> spaces.
>> -     * stmt->space can be NULL for IRPOTO_NOP.
>> +    /*
>> +     * Create WAL record for the write requests in
>> +     * non-temporary spaces. stmt->space can be NULL for
>> +     * IRPOTO_NOP or IPROTO_CONFIRM.
>>        */
>>       if (stmt->space == NULL || !space_is_temporary(stmt->space)) {
>>           if (txn_add_redo(txn, stmt, request) != 0)
>> @@ -417,12 +425,12 @@ txn_run_rollback_triggers(struct txn *txn, 
>> struct rlist *triggers)
>>   /**
>>    * Complete transaction processing.
>>    */
>> -static void
>> +void
>>   txn_complete(struct txn *txn)
>>   {
>>       /*
>>        * Note, engine can be NULL if transaction contains
>> -     * IPROTO_NOP statements only.
>> +     * IPROTO_NOP or IPROTO_CONFIRM statements.
>>        */
>>       if (txn->signature < 0) {
>>           /* Undo the transaction. */
>> @@ -510,13 +518,6 @@ txn_journal_entry_new(struct txn *txn)
>>       struct xrow_header **remote_row = req->rows;
>>       struct xrow_header **local_row = req->rows + txn->n_applier_rows;
>>       bool is_sync = false;
>> -    /*
>> -     * Only local transactions, originated from the master,
>> -     * can enter 'waiting for acks' state. It means, only
>> -     * author of the transaction can collect acks. Replicas
>> -     * consider it a normal async transaction so far.
>> -     */
>> -    bool is_local = true;
>>         stailq_foreach_entry(stmt, &txn->stmts, next) {
>>           if (stmt->has_triggers) {
>> @@ -530,17 +531,18 @@ txn_journal_entry_new(struct txn *txn)
>>           if (stmt->row == NULL)
>>               continue;
>>   -        if (stmt->row->replica_id == 0) {
>> +        if (stmt->row->replica_id == 0)
>>               *local_row++ = stmt->row;
>> -        } else {
>> +        else
>>               *remote_row++ = stmt->row;
>> -            is_local = false;
>> -        }
>>             req->approx_len += xrow_approx_len(stmt->row);
>>       }
>> -    if (is_sync && is_local)
>> +
>> +    is_sync = is_sync && !txn_has_flag(txn, TXN_FORCE_ASYNC);
>> +    if (is_sync) {
>>           txn_set_flag(txn, TXN_WAIT_ACK);
>> +    }
>>         assert(remote_row == req->rows + txn->n_applier_rows);
>>       assert(local_row == remote_row + txn->n_new_rows);
>> @@ -601,6 +603,19 @@ txn_commit_nop(struct txn *txn)
>>       return false;
>>   }
>>   +/*
>> + * A trigger called on tx rollback due to a failed WAL write,
>> + * when tx is waiting for confirmation.
>> + */
>> +static int
>> +txn_limbo_on_rollback(struct trigger *trig, void *data)
>> +{
>> +    (void) trig;
>> +    struct txn_limbo_entry *entry = (struct txn_limbo_entry *) data;
>> +    txn_limbo_abort(&txn_limbo, entry);
>> +    return 0;
>> +}
>> +
>>   int
>>   txn_commit_async(struct txn *txn)
>>   {
>> @@ -632,16 +647,42 @@ txn_commit_async(struct txn *txn)
>>           return -1;
>>       }
>>   +    bool is_sync = txn_has_flag(txn, TXN_WAIT_ACK);
>> +    struct txn_limbo_entry *limbo_entry;
>> +    if (is_sync) {
>> +        /* See txn_commit(). */
>> +        uint32_t origin_id = req->rows[0]->replica_id;
>> +        int64_t lsn = req->rows[txn->n_applier_rows - 1]->lsn;
>> +        limbo_entry = txn_limbo_append(&txn_limbo, origin_id, txn);
>> +        if (limbo_entry == NULL) {
>> +            txn_rollback(txn);
>> +            txn_free(txn);
>> +            return -1;
>> +        }
>> +        assert(lsn > 0);
>> +        txn_limbo_assign_lsn(&txn_limbo, limbo_entry, lsn);
>> +    }
>> +
>>       fiber_set_txn(fiber(), NULL);
>>       if (journal_write_async(req) != 0) {
>>           fiber_set_txn(fiber(), txn);
>>           txn_rollback(txn);
>> +        txn_limbo_abort(&txn_limbo, limbo_entry);
>>             diag_set(ClientError, ER_WAL_IO);
>>           diag_log();
>>           return -1;
>>       }
>>   +    /*
>> +     * Set a trigger to abort waiting for confirm on WAL write
>> +     * failure.
>> +     */
>> +    if (is_sync) {
>> +        struct trigger trig;
>> +        trigger_create(&trig, txn_limbo_on_rollback, limbo_entry, 
>> NULL);
>> +        txn_on_rollback(txn, &trig);
>> +    }
>>       return 0;
>>   }
>>   diff --git a/src/box/txn.h b/src/box/txn.h
>> index 232cc07a8..e7705bb48 100644
>> --- a/src/box/txn.h
>> +++ b/src/box/txn.h
>> @@ -73,6 +73,13 @@ enum txn_flag {
>>        * then finishes commit and returns success to a user.
>>        */
>>       TXN_WAIT_ACK,
>> +    /**
>> +     * A transaction mustn't wait for confirmation, even if it
>> +     * touches synchronous spaces. Needed for join stage on
>> +     * replica, when all the data coming from the master is
>> +     * already confirmed by design.
>> +     */
>> +    TXN_FORCE_ASYNC,
>>   };
>>     enum {
>> @@ -257,6 +264,16 @@ txn_clear_flag(struct txn *txn, enum txn_flag flag)
>>       txn->flags &= ~(1 << flag);
>>   }
>>   +/**
>> + * Force async mode for transaction. It won't wait for acks
>> + * or confirmation.
>> + */
>> +static inline void
>> +txn_force_async(struct txn *txn)
>> +{
>> +    txn_set_flag(txn, TXN_FORCE_ASYNC);
>> +}
>> +
>>   /* Pointer to the current transaction (if any) */
>>   static inline struct txn *
>>   in_txn(void)
>> @@ -278,6 +295,12 @@ fiber_set_txn(struct fiber *fiber, struct txn *txn)
>>   struct txn *
>>   txn_begin(void);
>>   +/**
>> + * Complete transaction processing.
>> + */
>> +void
>> +txn_complete(struct txn *txn);
>> +
>>   /**
>>    * Commit a transaction.
>>    * @pre txn == in_txn()
>> diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
>> index efb97a591..daec98317 100644
>> --- a/src/box/txn_limbo.c
>> +++ b/src/box/txn_limbo.c
>> @@ -128,12 +128,65 @@ txn_limbo_wait_complete(struct txn_limbo 
>> *limbo, struct txn_limbo_entry *entry)
>>           fiber_yield();
>>       fiber_set_cancellable(cancellable);
>>       // TODO: implement rollback.
>> -    // TODO: implement confirm.
>>       assert(!entry->is_rollback);
>> +    assert(entry->is_commit);
>>       txn_limbo_remove(limbo, entry);
>>       txn_clear_flag(txn, TXN_WAIT_ACK);
>>   }
>>   +/**
>> + * Write a confirmation entry to WAL. After it's written all the
>> + * transactions waiting for confirmation may be finished.
>> + */
>> +static int
>> +txn_limbo_write_confirm(struct txn_limbo *limbo, struct 
>> txn_limbo_entry *entry)
>> +{
>> +    /* Prepare a confirm entry. */
>> +    struct xrow_header row = {0};
>> +    struct request request = {0};
>> +    request.header = &row;
>> +
>> +    row.bodycnt = xrow_encode_confirm(&row, limbo->instance_id, 
>> entry->lsn);
>> +    if (row.bodycnt < 0)
>> +        return -1;
>> +
>> +    struct txn *txn = txn_begin();
>> +    if (txn == NULL)
>> +        return -1;
>> +
>> +    if (txn_begin_stmt(txn, NULL) != 0)
>> +        goto rollback;
>> +    if (txn_commit_stmt(txn, &request) != 0)
>> +        goto rollback;
>> +
>> +    return txn_commit(txn);
>> +rollback:
>> +    txn_rollback(txn);
>> +    return -1;
>> +}
>> +
>> +void
>> +txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn)
>> +{
>> +    assert(limbo->instance_id != REPLICA_ID_NIL &&
>> +           limbo->instance_id != instance_id);
>> +    struct txn_limbo_entry *e, *tmp;
>> +    rlist_foreach_entry_safe(e, &limbo->queue, in_queue, tmp) {
>> +        if (e->lsn > lsn)
>> +            break;
>> +        assert(e->txn->fiber == NULL);
>> +        e->is_commit = true;
>> +        txn_limbo_remove(limbo, e);
>> +        txn_clear_flag(e->txn, TXN_WAIT_ACK);
>> +        /*
>> +         * txn_complete_async must've been called already,
>> +         * since CONFIRM always follows the tx in question.
>> +         * So, finish this tx processing right away.
>> +         */
>> +        txn_complete(e->txn);
>> +    }
>> +}
>> +
>>   void
>>   txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t 
>> lsn)
>>   {
>> @@ -143,23 +196,33 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t 
>> replica_id, int64_t lsn)
>>       int64_t prev_lsn = vclock_get(&limbo->vclock, replica_id);
>>       vclock_follow(&limbo->vclock, replica_id, lsn);
>>       struct txn_limbo_entry *e;
>> +    struct txn_limbo_entry *last_quorum = NULL;
>>       rlist_foreach_entry(e, &limbo->queue, in_queue) {
>>           if (e->lsn <= prev_lsn)
>>               continue;
>>           if (e->lsn > lsn)
>>               break;
>>           if (++e->ack_count >= replication_sync_quorum) {
>> -            // TODO: better call complete() right
>> -            // here. Appliers use async transactions,
>> -            // and their txns don't have fibers to
>> -            // wake up. That becomes actual, when
>> -            // appliers will be supposed to wait for
>> -            // 'confirm' message.
>>               e->is_commit = true;
>> -            fiber_wakeup(e->txn->fiber);
>> +            last_quorum = e;
>>           }
>>           assert(e->ack_count <= VCLOCK_MAX);
>>       }
>> +    if (last_quorum != NULL) {
>> +        if (txn_limbo_write_confirm(limbo, last_quorum) != 0) {
>> +            // TODO: rollback.
>> +            return;
>> +        }
>> +        /*
>> +         * Wakeup all the entries in direct order as soon
>> +         * as confirmation message is written to WAL.
>> +         */
>> +        rlist_foreach_entry(e, &limbo->queue, in_queue) {
>> +            fiber_wakeup(e->txn->fiber);
>> +            if (e == last_quorum)
>> +                break;
>> +        }
>> +    }
>>   }
>>     void
>> diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
>> index 1ad1c567a..de415cd97 100644
>> --- a/src/box/txn_limbo.h
>> +++ b/src/box/txn_limbo.h
>> @@ -160,6 +160,12 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t 
>> replica_id, int64_t lsn);
>>   void
>>   txn_limbo_wait_complete(struct txn_limbo *limbo, struct 
>> txn_limbo_entry *entry);
>>   +/**
>> + * Confirm all the entries up to the given master's LSN.
>> + */
>> +void
>> +txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn);
>> +
>>   void
>>   txn_limbo_init();
>>   diff --git a/test/box/error.result b/test/box/error.result
>> index 69c471085..34ded3930 100644
>> --- a/test/box/error.result
>> +++ b/test/box/error.result
>> @@ -433,6 +433,7 @@ t;
>>    |   212: box.error.SEQUENCE_NOT_STARTED
>>    |   213: box.error.NO_SUCH_SESSION_SETTING
>>    |   214: box.error.UNCOMMITTED_FOREIGN_SYNC_TXNS
>> + |   215: box.error.SYNC_MASTER_MISMATCH
>>    | ...
>>     test_run:cmd("setopt delimiter ''");
>>
-- 
Serge Petrenko

^ permalink raw reply	[flat|nested] 56+ messages in thread

* Re: [Tarantool-patches] [PATCH 0/8] wait for lsn and confirm
  2020-06-09 12:20 [Tarantool-patches] [PATCH 0/8] wait for lsn and confirm Serge Petrenko
                   ` (10 preceding siblings ...)
  2020-06-09 12:53 ` [Tarantool-patches] [PATCH 2/2] box: use tnt_raise for quorum check Cyrill Gorcunov
@ 2020-06-22 21:51 ` Vladislav Shpilevoy
  11 siblings, 0 replies; 56+ messages in thread
From: Vladislav Shpilevoy @ 2020-06-22 21:51 UTC (permalink / raw)
  To: Serge Petrenko, sergos, gorcunov; +Cc: tarantool-patches

I've got an assertion fail on this test:

Instance 1:

box.cfg{
    listen = 3313,
    replication = {'localhost:3313', 'localhost:3314'},
    replication_timeout = 20,
    replication_synchro_quorum = 2,
    replication_synchro_timeout = 100,
}
box.schema.user.grant('guest', 'super')
s = box.schema.create_space('test', {is_sync = true})
pk = s:create_index('pk')
s:replace{1}


Instance 2:

box.cfg{
    listen = 3314,
    replication = {'localhost:3313', 'localhost:3314'},
    replication_timeout = 20,
    replication_synchro_quorum = 2,
    replication_synchro_timeout = 100,
}
box.schema.user.grant('guest', 'super')

The first instance crashes like this:

(lldb) bt
* thread #1, queue = 'com.apple.main-thread', stop reason = signal SIGABRT
  * frame #0: 0x00007fff784192c2 libsystem_kernel.dylib`__pthread_kill + 10
    frame #1: 0x00007fff784d4bf1 libsystem_pthread.dylib`pthread_kill + 284
    frame #2: 0x00007fff783836a6 libsystem_c.dylib`abort + 127
    frame #3: 0x00007fff7834c20d libsystem_c.dylib`__assert_rtn + 324
    frame #4: 0x0000000100469242 tarantool`vclock_follow(vclock=0x00000001005f8010, replica_id=1, lsn=0) at vclock.c:46:2
    frame #5: 0x00000001000fac50 tarantool`txn_limbo_ack(limbo=0x00000001005f7ff8, replica_id=1, lsn=0) at txn_limbo.c:287:2
    frame #6: 0x000000010012c2ee tarantool`tx_status_update(msg=0x000000010e0022f8) at relay.cc:411:3
    frame #7: 0x000000010024da48 tarantool`cmsg_deliver(msg=0x000000010e0022f8) at cbus.c:375:2
    frame #8: 0x000000010024fa39 tarantool`fiber_pool_f(ap=0x00000001040015e8) at fiber_pool.c:64:3
    frame #9: 0x000000010000672a tarantool`fiber_cxx_invoke(f=(tarantool`fiber_pool_f at fiber_pool.c:38), ap=0x00000001040015e8)(__va_list_tag*), __va_list_tag*) at fiber.h:782:10
    frame #10: 0x0000000100244acb tarantool`fiber_loop(data=0x0000000000000000) at fiber.c:869:18
    frame #11: 0x00000001004c2e77 tarantool`coro_init at coro.c:110:3
(lldb) f 5
frame #5: 0x00000001000fac50 tarantool`txn_limbo_ack(limbo=0x00000001005f7ff8, replica_id=1, lsn=0) at txn_limbo.c:287:2
   284 			return;
   285 		assert(limbo->instance_id != REPLICA_ID_NIL);
   286 		int64_t prev_lsn = vclock_get(&limbo->vclock, replica_id);
-> 287 		vclock_follow(&limbo->vclock, replica_id, lsn);
   288 		struct txn_limbo_entry *e;
   289 		struct txn_limbo_entry *last_quorum = NULL;
   290 		rlist_foreach_entry(e, &limbo->queue, in_queue) {

Second instance has replica_id 1.

^ permalink raw reply	[flat|nested] 56+ messages in thread

* Re: [Tarantool-patches] [PATCH 7/8] xrow: introduce CONFIRM entry
  2020-06-09 12:20 ` [Tarantool-patches] [PATCH 7/8] xrow: introduce CONFIRM entry Serge Petrenko
  2020-06-19 15:18   ` Leonid Vasiliev
@ 2020-06-23  8:33   ` Serge Petrenko
  1 sibling, 0 replies; 56+ messages in thread
From: Serge Petrenko @ 2020-06-23  8:33 UTC (permalink / raw)
  To: v.shpilevoy, sergos, gorcunov; +Cc: tarantool-patches


09.06.2020 15:20, Serge Petrenko пишет:
> Add methods to encode/decode CONFIRM entry.
> A CONFIRM entry will be written to WAL by synchronous replication master
> as soon as it finds that the transaction was applied on a quorum of
> replicas.
> CONFIRM rows share the same header with other rows in WAL,but their body
> differs: it's just a map containing replica_id and lsn of the last
> confirmed transaction.
>
> Part-of #4847


New commit:

commit b6ff1a633655edf2c4285e699a757adfcd8926d0
Author: Serge Petrenko <sergepetrenko@tarantool.org>
Date:   Mon Jun 22 21:45:55 2020 +0300

     txn: introduce various reasons for txn rollback

     Transaction on_rollback triggers will need to distinguish
     txn_limbo-issued rollbacks from rollbacks that happened due to a failed
     WAL write or memory error.

     Prerequisite #4847
     Prerequisite #4848

diff --git a/src/box/txn.c b/src/box/txn.c
index 6cfa98212..9de72461b 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -222,7 +222,7 @@ txn_begin(void)
      txn->flags = 0;
      txn->in_sub_stmt = 0;
      txn->id = ++tsn;
-    txn->signature = -1;
+    txn->signature = TXN_SIGNATURE_ROLLBACK;
      txn->engine = NULL;
      txn->engine_tx = NULL;
      txn->fk_deferred_count = 0;
@@ -589,7 +589,7 @@ static bool
  txn_commit_nop(struct txn *txn)
  {
      if (txn->n_new_rows + txn->n_applier_rows == 0) {
-        txn->signature = 0;
+        txn->signature = TXN_SIGNATURE_NOP;
          txn_complete(txn);
          fiber_set_txn(fiber(), NULL);
          return true;
@@ -738,7 +738,7 @@ txn_rollback(struct txn *txn)
      trigger_clear(&txn->fiber_on_stop);
      if (!txn_has_flag(txn, TXN_CAN_YIELD))
          trigger_clear(&txn->fiber_on_yield);
-    txn->signature = -1;
+    txn->signature = TXN_SIGNATURE_ROLLBACK;
      txn_complete(txn);
      fiber_set_txn(fiber(), NULL);
  }
diff --git a/src/box/txn.h b/src/box/txn.h
index 232cc07a8..8ec4a248c 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -83,6 +83,29 @@ enum {
      TXN_SUB_STMT_MAX = 3
  };

+enum {
+    /** Signature set for empty transactions. */
+    TXN_SIGNATURE_NOP = 0,
+    /**
+     * The default signature value for failed transactions.
+     * Indicates either write failure or any other failure
+     * not caused by synchronous transaction processing.
+     */
+    TXN_SIGNATURE_ROLLBACK = -1,
+    /**
+     * A value set for failed synchronous transactions
+     * on master, when not enough acks were collected.
+     */
+    TXN_SIGNATURE_QUORUM_TIMEOUT = -2,
+    /**
+     * A value set for failed synchronous transactions
+     * on replica (or any instance during recovery), when a
+     * transaction is rolled back because ROLLBACK message was
+     * read.
+     */
+    TXN_SIGNATURE_SYNC_ROLLBACK = -3,
+};
+
  /**
   * A single statement of a multi-statement
   * transaction: undo and redo info.

-- 
Serge Petrenko

^ permalink raw reply	[flat|nested] 56+ messages in thread

* Re: [Tarantool-patches] [PATCH 8/8] replication: write and read CONFIRM entries
  2020-06-09 12:20 ` [Tarantool-patches] [PATCH 8/8] replication: write and read CONFIRM entries Serge Petrenko
                     ` (3 preceding siblings ...)
  2020-06-20 15:06   ` Leonid Vasiliev
@ 2020-06-23  8:34   ` Serge Petrenko
  2020-06-25 22:04   ` Vladislav Shpilevoy
  5 siblings, 0 replies; 56+ messages in thread
From: Serge Petrenko @ 2020-06-23  8:34 UTC (permalink / raw)
  To: v.shpilevoy, sergos, gorcunov; +Cc: tarantool-patches


09.06.2020 15:20, Serge Petrenko пишет:
> Make txn_limbo write a CONFIRM entry as soon as a batch of entries
> receive their acks. CONFIRM entry is written to WAL and later replicated
> to all the replicas.
>
> Now replicas put synchronous transactions into txn_limbo and wait for
> corresponding confirmation entries to arrive and end up in their WAL
> before committing the transactions.
>
> Part-of #4847

New commit:

commit c3f2ad52947a894a3028aac5f4b974577419af3f
Author: Serge Petrenko <sergepetrenko@tarantool.org>
Date:   Mon Jun 22 21:04:48 2020 +0300

     fix for 'replication: write and read CONFIRM entries'

     Rename on_confirm trigger to on_confirm_written to look similar to
     on_rollback_written trigger, introduced later.

     Make txn_limbo_on_rollback trigger respect the newly-introduced 
rollback
     reasons in txn->signature. (Now there's no need to clear the trigger
     after WAL write success).

     Fix use of wrong parameters inside both triggers.

     [TO BE SQUASHED INTO THE PREVIOUS COMMIT]

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 009962d3b..5703a0698 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -260,10 +260,10 @@ process_nop(struct request *request)
   * Confirms some of the txns waiting in txn_limbo.
   */
  static int
-applier_on_confirm(struct trigger *trig, void *data)
+applier_on_confirm_written(struct trigger *trig, void *event)
  {
-    (void) trig;
-    int64_t lsn = *(int64_t *)data;
+    (void) event;
+    int64_t lsn = *(int64_t *)trig->data;
      txn_limbo_read_confirm(&txn_limbo, lsn);
      return 0;
  }
@@ -308,7 +308,7 @@ process_confirm(struct request *request)
          diag_set(OutOfMemory, size, "region_alloc_object", "trig");
          return -1;
      }
-    trigger_create(trig, applier_on_confirm, lsn, NULL);
+    trigger_create(trig, applier_on_confirm_written, lsn, NULL);

      if (txn_begin_stmt(txn, NULL) != 0)
          return -1;
diff --git a/src/box/txn.c b/src/box/txn.c
index a50dfeaf1..bf17fd749 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -611,10 +611,14 @@ txn_commit_nop(struct txn *txn)
   * when tx is waiting for confirmation.
   */
  static int
-txn_limbo_on_rollback(struct trigger *trig, void *data)
+txn_limbo_on_rollback(struct trigger *trig, void *event)
  {
-    (void) trig;
-    struct txn_limbo_entry *entry = (struct txn_limbo_entry *) data;
+    (void) event;
+    struct txn *txn = (struct txn *) event;
+    /* Check whether limbo has performed the cleanup. */
+    if (txn->signature != TXN_SIGNATURE_ROLLBACK)
+        return 0;
+    struct txn_limbo_entry *entry = (struct txn_limbo_entry *) trig->data;
      txn_limbo_abort(&txn_limbo, entry);
      return 0;
  }

-- 
Serge Petrenko

^ permalink raw reply	[flat|nested] 56+ messages in thread

* Re: [Tarantool-patches] [PATCH 8/8] replication: write and read CONFIRM entries
  2020-06-19 17:50   ` Serge Petrenko
@ 2020-06-23  8:35     ` Serge Petrenko
  0 siblings, 0 replies; 56+ messages in thread
From: Serge Petrenko @ 2020-06-23  8:35 UTC (permalink / raw)
  To: v.shpilevoy, sergos, gorcunov, Leonid Vasiliev; +Cc: tarantool-patches


19.06.2020 20:50, Serge Petrenko пишет:
> 09.06.2020 15:20, Serge Petrenko пишет:
>> Make txn_limbo write a CONFIRM entry as soon as a batch of entries
>> receive their acks. CONFIRM entry is written to WAL and later replicated
>> to all the replicas.
>>
>> Now replicas put synchronous transactions into txn_limbo and wait for
>> corresponding confirmation entries to arrive and end up in their WAL
>> before committing the transactions.
>>
> Added a new commit:
>
> commit 88884031e6ea8ca836228432b400ca1557afafa6
> Author: Serge Petrenko <sergepetrenko@tarantool.org>
> Date:   Fri Jun 19 18:21:11 2020 +0300
>
>     txn: rework synchronous tx on_rollback trigger
>
>     Instead of allocating the trigger on heap, which would lead to use 
> after
>     free, allocate the trigger together with the txn.
>     Also, clear the trigger when asynchronous journal write succeeds.
>
>     [TO BE SQUASHED INTO THE PREVIOUS COMMIT]
>
> diff --git a/src/box/txn.c b/src/box/txn.c
> index 4f787db79..16856da0d 100644
> --- a/src/box/txn.c
> +++ b/src/box/txn.c
> @@ -238,6 +238,7 @@ txn_begin(void)
>      /* fiber_on_yield is initialized by engine on demand */
>      trigger_create(&txn->fiber_on_stop, txn_on_stop, NULL, NULL);
>      trigger_add(&fiber()->on_stop, &txn->fiber_on_stop);
> +    trigger_create(&txn->on_write_failure, NULL, NULL, NULL);
>      /*
>       * By default all transactions may yield.
>       * It's a responsibility of an engine to disable yields
> @@ -458,6 +459,12 @@ txn_complete(struct txn *txn)
>                           stop_tm - txn->start_tm);
>          }
>      } else {
> +        /*
> +         * Async write succeeded. Clear the trigger which
> +         * would remove the corresponding txn_limbo entry
> +         * in case of failure.
> +         */
> +        trigger_clear(&txn->on_write_failure);
>          /*
>           * Complete is called on every WAL operation
>           * authored by this transaction. And it not always
> @@ -681,10 +688,10 @@ txn_commit_async(struct txn *txn)
>       * Set a trigger to abort waiting for confirm on WAL write
>       * failure.
>       */
> +    trigger_create(&txn->on_write_failure, txn_limbo_on_rollback,
> +               limbo_entry, NULL);
>      if (is_sync) {
> -        struct trigger trig;
> -        trigger_create(&trig, txn_limbo_on_rollback, limbo_entry, NULL);
> -        txn_on_rollback(txn, &trig);
> +        txn_on_rollback(txn, &txn->on_write_failure);
>      }
>      return 0;
>  }
> diff --git a/src/box/txn.h b/src/box/txn.h
> index e7705bb48..9efd6fd0d 100644
> --- a/src/box/txn.h
> +++ b/src/box/txn.h
> @@ -228,6 +228,14 @@ struct txn {
>       * in case a fiber stops (all engines).
>       */
>      struct trigger fiber_on_stop;
> +    /**
> +     * An on_rollback trigger for synchronous transactions
> +     * removing the txn_limbo entry which would wait for
> +     * confirmation otherwise.
> +     * Is issued on asynchronous write failure and is cleared
> +     * on write success.
> +     */
> +    struct trigger on_write_failure;
>      /** Commit and rollback triggers. */
>      struct rlist on_commit, on_rollback;
>      /**
>

Changed this commit:

commit d3bd829759dd3cf3d49170da701d410af111cb60
Author: Serge Petrenko <sergepetrenko@tarantool.org>
Date:   Fri Jun 19 18:21:11 2020 +0300

     txn: rework synchronous tx on_rollback trigger

     Instead of allocating the trigger on heap, which would lead to use 
after
     free, allocate the trigger on the txn region.

     [TO BE SQUASHED INTO THE PREVIOUS COMMIT]

diff --git a/src/box/txn.c b/src/box/txn.c
index bf17fd749..d960c3888 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -670,6 +670,22 @@ txn_commit_async(struct txn *txn)
                 txn_limbo_assign_lsn(&txn_limbo, limbo_entry, lsn);
         }

+       /*
+        * We'll need this trigger for sync transactions later,
+        * but allocation failure is inappropriate after the entry
+        * is sent to journal, so allocate early.
+        */
+       struct trigger *trig;
+       if (is_sync) {
+               size_t size;
+               trig = region_alloc_object(&txn->region, typeof(*trig), 
&size);
+               if (trig == NULL) {
+                       diag_set(OutOfMemory, size, "region_alloc_object",
+                                "trig");
+                       return -1;
+               }
+       }
+
         fiber_set_txn(fiber(), NULL);
         if (journal_write_async(req) != 0) {
                 fiber_set_txn(fiber(), txn);
@@ -682,14 +698,14 @@ txn_commit_async(struct txn *txn)
                 return -1;
         }

-       /*
-        * Set a trigger to abort waiting for confirm on WAL write
-        * failure.
-        */
         if (is_sync) {
-               struct trigger trig;
-               trigger_create(&trig, txn_limbo_on_rollback, 
limbo_entry, NULL);
-               txn_on_rollback(txn, &trig);
+               /*
+                * Set a trigger to abort waiting for confirm on
+                * WAL write failure.
+                */
+               trigger_create(trig, txn_limbo_on_rollback,
+                              limbo_entry, NULL);
+               txn_on_rollback(txn, trig);
         }
         return 0;

-- 
Serge Petrenko

^ permalink raw reply	[flat|nested] 56+ messages in thread

* Re: [Tarantool-patches] [PATCH 3/8] txn: add TXN_WAIT_ACK flag
  2020-06-22  9:44       ` Serge Petrenko
@ 2020-06-23 22:13         ` Vladislav Shpilevoy
  0 siblings, 0 replies; 56+ messages in thread
From: Vladislav Shpilevoy @ 2020-06-23 22:13 UTC (permalink / raw)
  To: Serge Petrenko, Leonid Vasiliev, sergos, gorcunov; +Cc: tarantool-patches

>>>>        stailq_foreach_entry(stmt, &txn->stmts, next) {
>>>>            if (stmt->has_triggers) {
>>>>                txn_init_triggers(txn);
>>>>                rlist_splice(&txn->on_commit, &stmt->on_commit);
>>>>            }
>>>> +        is_sync = is_sync || stmt->space->def->opts.is_sync;
>>> Seems like
>>> "... || (stmt->space != NULL && stmt->space->def->opts.is_sync);"
>>> can be moved here from "replication: make sync transactions wait quorum".
>>>
>>> LGTM.
>> I don't think it is a good idea. At this commit stmt->space can't be
>> NULL. This becomes possible only after Sergey P. starts using txns
>> to write confirm and rollback entries, which don't relate to any
>> space.
> 
> Actually, stmt->space is NULL for NOPs.

Force pushed to this commit:

diff --git a/src/box/txn.c b/src/box/txn.c
index 22591a365..edc1f5180 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -507,7 +507,8 @@ txn_journal_entry_new(struct txn *txn)
 		if (stmt->row == NULL)
 			continue;
 
-		is_sync = is_sync || stmt->space->def->opts.is_sync;
+		is_sync = is_sync || (stmt->space != NULL &&
+				      stmt->space->def->opts.is_sync);
 
 		if (stmt->row->replica_id == 0)
 			*local_row++ = stmt->row;

^ permalink raw reply	[flat|nested] 56+ messages in thread

* Re: [Tarantool-patches] [PATCH 4/8] replication: make sync transactions wait quorum
  2020-06-09 12:20 ` [Tarantool-patches] [PATCH 4/8] replication: make sync transactions wait quorum Serge Petrenko
                     ` (2 preceding siblings ...)
  2020-06-19 12:39   ` Leonid Vasiliev
@ 2020-06-25 21:48   ` Vladislav Shpilevoy
  3 siblings, 0 replies; 56+ messages in thread
From: Vladislav Shpilevoy @ 2020-06-25 21:48 UTC (permalink / raw)
  To: Serge Petrenko, sergos, gorcunov; +Cc: tarantool-patches

I added a commit on top of this one:

====================
Author: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
Date:   Thu Jun 25 23:43:08 2020 +0200

    [tosquash] replication: txn_limbo_check_complete() should consider rollback too
    
    txn_limbo_check_complete() didn't consider rollback as a completion.
    Obviously it should. Even though fully functional rollback appears
    in later commits.

diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
index bab843c0a..9de91db93 100644
--- a/src/box/txn_limbo.c
+++ b/src/box/txn_limbo.c
@@ -104,6 +104,8 @@ txn_limbo_assign_lsn(struct txn_limbo *limbo, struct txn_limbo_entry *entry,
 static bool
 txn_limbo_check_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
 {
+	if (txn_limbo_entry_is_complete(entry))
+		return true;
 	struct vclock_iterator iter;
 	vclock_iterator_init(&iter, &limbo->vclock);
 	int ack_count = 0;

^ permalink raw reply	[flat|nested] 56+ messages in thread

* Re: [Tarantool-patches] [PATCH 8/8] replication: write and read CONFIRM entries
  2020-06-09 12:20 ` [Tarantool-patches] [PATCH 8/8] replication: write and read CONFIRM entries Serge Petrenko
                     ` (4 preceding siblings ...)
  2020-06-23  8:34   ` Serge Petrenko
@ 2020-06-25 22:04   ` Vladislav Shpilevoy
  2020-06-25 22:31     ` Vladislav Shpilevoy
  5 siblings, 1 reply; 56+ messages in thread
From: Vladislav Shpilevoy @ 2020-06-25 22:04 UTC (permalink / raw)
  To: Serge Petrenko, sergos, gorcunov; +Cc: tarantool-patches

> diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
> index efb97a591..daec98317 100644
> --- a/src/box/txn_limbo.c
> +++ b/src/box/txn_limbo.c
> @@ -128,12 +128,65 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
> +void
> +txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn)
> +{
> +	assert(limbo->instance_id != REPLICA_ID_NIL &&
> +	       limbo->instance_id != instance_id);
> +	struct txn_limbo_entry *e, *tmp;
> +	rlist_foreach_entry_safe(e, &limbo->queue, in_queue, tmp) {
> +		if (e->lsn > lsn)
> +			break;

What if e->lsn is -1, because its LSN wasn't delivered to TX thread
from WAL thread yet?

^ permalink raw reply	[flat|nested] 56+ messages in thread

* Re: [Tarantool-patches] [PATCH 8/8] replication: write and read CONFIRM entries
  2020-06-25 22:04   ` Vladislav Shpilevoy
@ 2020-06-25 22:31     ` Vladislav Shpilevoy
  2020-06-26 10:58       ` Serge Petrenko
  0 siblings, 1 reply; 56+ messages in thread
From: Vladislav Shpilevoy @ 2020-06-25 22:31 UTC (permalink / raw)
  To: Serge Petrenko, sergos, gorcunov; +Cc: tarantool-patches

On 26/06/2020 00:04, Vladislav Shpilevoy wrote:
>> diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
>> index efb97a591..daec98317 100644
>> --- a/src/box/txn_limbo.c
>> +++ b/src/box/txn_limbo.c
>> @@ -128,12 +128,65 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
>> +void
>> +txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn)
>> +{
>> +	assert(limbo->instance_id != REPLICA_ID_NIL &&
>> +	       limbo->instance_id != instance_id);
>> +	struct txn_limbo_entry *e, *tmp;
>> +	rlist_foreach_entry_safe(e, &limbo->queue, in_queue, tmp) {
>> +		if (e->lsn > lsn)
>> +			break;
> 
> What if e->lsn is -1, because its LSN wasn't delivered to TX thread
> from WAL thread yet?

Probably not a bug. At least now. Because txn_limbo_read_confirm() is called
on replica only, where LSN is known from master before it is written to local WAL.

^ permalink raw reply	[flat|nested] 56+ messages in thread

* Re: [Tarantool-patches] [PATCH 8/8] replication: write and read CONFIRM entries
  2020-06-25 22:31     ` Vladislav Shpilevoy
@ 2020-06-26 10:58       ` Serge Petrenko
  0 siblings, 0 replies; 56+ messages in thread
From: Serge Petrenko @ 2020-06-26 10:58 UTC (permalink / raw)
  To: Vladislav Shpilevoy, sergos, gorcunov; +Cc: tarantool-patches


26.06.2020 01:31, Vladislav Shpilevoy пишет:
> On 26/06/2020 00:04, Vladislav Shpilevoy wrote:
>>> diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
>>> index efb97a591..daec98317 100644
>>> --- a/src/box/txn_limbo.c
>>> +++ b/src/box/txn_limbo.c
>>> @@ -128,12 +128,65 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
>>> +void
>>> +txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn)
>>> +{
>>> +	assert(limbo->instance_id != REPLICA_ID_NIL &&
>>> +	       limbo->instance_id != instance_id);
>>> +	struct txn_limbo_entry *e, *tmp;
>>> +	rlist_foreach_entry_safe(e, &limbo->queue, in_queue, tmp) {
>>> +		if (e->lsn > lsn)
>>> +			break;
>> What if e->lsn is -1, because its LSN wasn't delivered to TX thread
>> from WAL thread yet?
> Probably not a bug. At least now. Because txn_limbo_read_confirm() is called
> on replica only, where LSN is known from master before it is written to local WAL.
Yes, it's ok for now.

-- 
Serge Petrenko

^ permalink raw reply	[flat|nested] 56+ messages in thread

end of thread, other threads:[~2020-06-26 10:58 UTC | newest]

Thread overview: 56+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-06-09 12:20 [Tarantool-patches] [PATCH 0/8] wait for lsn and confirm Serge Petrenko
2020-06-09 12:20 ` [Tarantool-patches] [PATCH 1/8] replication: introduce space.is_sync option Serge Petrenko
2020-06-10 23:51   ` Vladislav Shpilevoy
2020-06-18 22:27     ` Leonid Vasiliev
2020-06-21 16:24       ` Vladislav Shpilevoy
2020-06-09 12:20 ` [Tarantool-patches] [PATCH 2/8] replication: introduce replication_sync_quorum cfg Serge Petrenko
2020-06-10 23:51   ` Vladislav Shpilevoy
2020-06-15 23:05   ` Vladislav Shpilevoy
2020-06-18 22:54     ` Leonid Vasiliev
2020-06-19 17:45     ` Serge Petrenko
2020-06-21 16:25       ` Vladislav Shpilevoy
2020-06-09 12:20 ` [Tarantool-patches] [PATCH 3/8] txn: add TXN_WAIT_ACK flag Serge Petrenko
2020-06-18 23:12   ` Leonid Vasiliev
2020-06-21 16:25     ` Vladislav Shpilevoy
2020-06-22  9:44       ` Serge Petrenko
2020-06-23 22:13         ` Vladislav Shpilevoy
2020-06-09 12:20 ` [Tarantool-patches] [PATCH 4/8] replication: make sync transactions wait quorum Serge Petrenko
2020-06-10 23:51   ` Vladislav Shpilevoy
2020-06-11 14:57   ` Vladislav Shpilevoy
2020-06-15 23:05     ` Vladislav Shpilevoy
2020-06-19 12:39   ` Leonid Vasiliev
2020-06-25 21:48   ` Vladislav Shpilevoy
2020-06-09 12:20 ` [Tarantool-patches] [PATCH 5/8] txn_limbo: follow-up fixes Serge Petrenko
2020-06-10 23:51   ` Vladislav Shpilevoy
2020-06-11  8:46     ` Serge Petrenko
2020-06-11 13:01       ` Vladislav Shpilevoy
2020-06-09 12:20 ` [Tarantool-patches] [PATCH 6/8] txn_limbo: fix instance id assignment Serge Petrenko
2020-06-10 23:51   ` Vladislav Shpilevoy
2020-06-09 12:20 ` [Tarantool-patches] [PATCH 7/8] xrow: introduce CONFIRM entry Serge Petrenko
2020-06-19 15:18   ` Leonid Vasiliev
2020-06-22 10:14     ` Serge Petrenko
2020-06-23  8:33   ` Serge Petrenko
2020-06-09 12:20 ` [Tarantool-patches] [PATCH 8/8] replication: write and read CONFIRM entries Serge Petrenko
2020-06-10 23:51   ` Vladislav Shpilevoy
2020-06-11  8:56     ` Serge Petrenko
2020-06-11 13:04       ` Vladislav Shpilevoy
2020-06-11 14:57   ` Vladislav Shpilevoy
2020-06-15 23:05     ` Vladislav Shpilevoy
2020-06-18 11:32       ` Leonid Vasiliev
2020-06-18 21:49         ` Vladislav Shpilevoy
2020-06-19 17:48         ` Serge Petrenko
2020-06-19 17:50   ` Serge Petrenko
2020-06-23  8:35     ` Serge Petrenko
2020-06-20 15:06   ` Leonid Vasiliev
2020-06-22 10:34     ` Serge Petrenko
2020-06-23  8:34   ` Serge Petrenko
2020-06-25 22:04   ` Vladislav Shpilevoy
2020-06-25 22:31     ` Vladislav Shpilevoy
2020-06-26 10:58       ` Serge Petrenko
2020-06-09 12:53 ` [Tarantool-patches] [PATCH 0/2] A few fixes for building Cyrill Gorcunov
2020-06-09 12:53 ` [Tarantool-patches] [PATCH 1/2] box/applier: fix typo Cyrill Gorcunov
2020-06-10  9:18   ` Sergey Ostanevich
2020-06-09 12:53 ` [Tarantool-patches] [PATCH 2/2] box: use tnt_raise for quorum check Cyrill Gorcunov
2020-06-10  9:17   ` Sergey Ostanevich
2020-06-10 10:45   ` Serge Petrenko
2020-06-22 21:51 ` [Tarantool-patches] [PATCH 0/8] wait for lsn and confirm Vladislav Shpilevoy

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox