[PATCH] replication: automatic skip duplicating rows in replication

Konstantin Belyavskiy k.belyavskiy at tarantool.org
Fri Apr 13 16:44:12 MSK 2018


ticket: https://github.com/tarantool/tarantool/issues/3270
branch: https://github.com/tarantool/tarantool/compare/gh-3270-add-skip-conflict-row-option

In case of attempting to insert a duplicate key, an error ER_TUPLE_FOUND
occured, which led to disconnect.
Introduce new oftion: 'replication_skip_conflict', if set, then error of
this type will be ignored.

Closes #3270
---
 src/box/applier.cc                          |  15 +++-
 src/box/lua/load_cfg.lua                    |   3 +
 test/app-tap/init_script.result             |  45 ++++++------
 test/box/admin.result                       |   2 +
 test/box/cfg.result                         |   4 ++
 test/replication/skip_conflict_row.result   | 105 ++++++++++++++++++++++++++++
 test/replication/skip_conflict_row.test.lua |  37 ++++++++++
 7 files changed, 188 insertions(+), 23 deletions(-)
 create mode 100644 test/replication/skip_conflict_row.result
 create mode 100644 test/replication/skip_conflict_row.test.lua

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 9aa951c34..6f6970b13 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -47,6 +47,7 @@
 #include "xrow_io.h"
 #include "error.h"
 #include "session.h"
+#include "cfg.h"
 
 STRS(applier_state, applier_STATE);
 
@@ -505,7 +506,19 @@ applier_subscribe(struct applier *applier)
 			 */
 			vclock_follow(&replicaset.vclock, row.replica_id,
 				      row.lsn);
-			xstream_write_xc(applier->subscribe_stream, &row);
+			if (xstream_write(applier->subscribe_stream, &row) != 0) {
+				struct error *e = diag_last_error(diag_get());
+				/**
+				 * Silently skip ER_TUPLE_FOUND error if such
+				 * option is set in config.
+				 */
+				if (e->type == &type_ClientError &&
+				    box_error_code(e) == ER_TUPLE_FOUND &&
+				    cfg_geti("replication_skip_conflict"))
+					diag_clear(diag_get());
+				else
+					diag_raise();
+			}
 		}
 		if (applier->state == APPLIER_SYNC ||
 		    applier->state == APPLIER_FOLLOW)
diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
index 89fd7745e..3a5a6d46a 100644
--- a/src/box/lua/load_cfg.lua
+++ b/src/box/lua/load_cfg.lua
@@ -59,6 +59,7 @@ local default_cfg = {
     replication_sync_lag = 10,
     replication_connect_timeout = 4,
     replication_connect_quorum = nil, -- connect all
+    replication_skip_conflict = false,
     feedback_enabled      = true,
     feedback_host         = "https://feedback.tarantool.io",
     feedback_interval     = 3600,
@@ -118,6 +119,7 @@ local template_cfg = {
     replication_sync_lag = 'number',
     replication_connect_timeout = 'number',
     replication_connect_quorum = 'number',
+    replication_skip_conflict = 'boolean',
     feedback_enabled      = 'boolean',
     feedback_host         = 'string',
     feedback_interval     = 'number',
@@ -192,6 +194,7 @@ local dynamic_cfg = {
     force_recovery          = function() end,
     replication_timeout     = private.cfg_set_replication_timeout,
     replication_connect_quorum = private.cfg_set_replication_connect_quorum,
+    replication_skip_conflict = function() end,
 }
 
 local dynamic_cfg_skip_at_load = {
diff --git a/test/app-tap/init_script.result b/test/app-tap/init_script.result
index ae01b488d..5625f1466 100644
--- a/test/app-tap/init_script.result
+++ b/test/app-tap/init_script.result
@@ -25,28 +25,29 @@ box.cfg
 20	read_only:false
 21	readahead:16320
 22	replication_connect_timeout:4
-23	replication_sync_lag:10
-24	replication_timeout:1
-25	rows_per_wal:500000
-26	slab_alloc_factor:1.05
-27	too_long_threshold:0.5
-28	vinyl_bloom_fpr:0.05
-29	vinyl_cache:134217728
-30	vinyl_dir:.
-31	vinyl_max_tuple_size:1048576
-32	vinyl_memory:134217728
-33	vinyl_page_size:8192
-34	vinyl_range_size:1073741824
-35	vinyl_read_threads:1
-36	vinyl_run_count_per_level:2
-37	vinyl_run_size_ratio:3.5
-38	vinyl_timeout:60
-39	vinyl_write_threads:2
-40	wal_dir:.
-41	wal_dir_rescan_delay:2
-42	wal_max_size:268435456
-43	wal_mode:write
-44	worker_pool_threads:4
+23	replication_skip_conflict:false
+24	replication_sync_lag:10
+25	replication_timeout:1
+26	rows_per_wal:500000
+27	slab_alloc_factor:1.05
+28	too_long_threshold:0.5
+29	vinyl_bloom_fpr:0.05
+30	vinyl_cache:134217728
+31	vinyl_dir:.
+32	vinyl_max_tuple_size:1048576
+33	vinyl_memory:134217728
+34	vinyl_page_size:8192
+35	vinyl_range_size:1073741824
+36	vinyl_read_threads:1
+37	vinyl_run_count_per_level:2
+38	vinyl_run_size_ratio:3.5
+39	vinyl_timeout:60
+40	vinyl_write_threads:2
+41	wal_dir:.
+42	wal_dir_rescan_delay:2
+43	wal_max_size:268435456
+44	wal_mode:write
+45	worker_pool_threads:4
 --
 -- Test insert from detached fiber
 --
diff --git a/test/box/admin.result b/test/box/admin.result
index 296976629..2168c3adb 100644
--- a/test/box/admin.result
+++ b/test/box/admin.result
@@ -62,6 +62,8 @@ cfg_filter(box.cfg)
     - 16320
   - - replication_connect_timeout
     - 4
+  - - replication_skip_conflict
+    - false
   - - replication_sync_lag
     - 10
   - - replication_timeout
diff --git a/test/box/cfg.result b/test/box/cfg.result
index 717fa31c9..28449d9cc 100644
--- a/test/box/cfg.result
+++ b/test/box/cfg.result
@@ -58,6 +58,8 @@ cfg_filter(box.cfg)
     - 16320
   - - replication_connect_timeout
     - 4
+  - - replication_skip_conflict
+    - false
   - - replication_sync_lag
     - 10
   - - replication_timeout
@@ -153,6 +155,8 @@ cfg_filter(box.cfg)
     - 16320
   - - replication_connect_timeout
     - 4
+  - - replication_skip_conflict
+    - false
   - - replication_sync_lag
     - 10
   - - replication_timeout
diff --git a/test/replication/skip_conflict_row.result b/test/replication/skip_conflict_row.result
new file mode 100644
index 000000000..bf794db5a
--- /dev/null
+++ b/test/replication/skip_conflict_row.result
@@ -0,0 +1,105 @@
+env = require('test_run')
+---
+...
+test_run = env.new()
+---
+...
+engine = test_run:get_cfg('engine')
+---
+...
+box.schema.user.grant('guest', 'read,write,execute', 'universe')
+---
+...
+box.schema.user.grant('guest', 'replication')
+---
+...
+space = box.schema.space.create('test', {engine = engine});
+---
+...
+index = box.space.test:create_index('primary')
+---
+...
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+---
+- true
+...
+test_run:cmd("start server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+box.cfg{replication_skip_conflict = true}
+---
+...
+box.space.test:insert{1}
+---
+- [1]
+...
+test_run:cmd("switch default")
+---
+- true
+...
+space:insert{1, 1}
+---
+- [1, 1]
+...
+space:insert{2}
+---
+- [2]
+...
+box.info.status
+---
+- running
+...
+vclock = test_run:get_vclock('default')
+---
+...
+_ = test_run:wait_vclock("replica", vclock)
+---
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+box.info.replication[1].upstream.message
+---
+- null
+...
+box.info.replication[1].upstream.status
+---
+- follow
+...
+box.space.test:select()
+---
+- - [1]
+  - [2]
+...
+test_run:cmd("switch default")
+---
+- true
+...
+box.info.status
+---
+- running
+...
+-- cleanup
+test_run:cmd("stop server replica")
+---
+- true
+...
+test_run:cmd("cleanup server replica")
+---
+- true
+...
+box.space.test:drop()
+---
+...
+box.schema.user.revoke('guest', 'replication')
+---
+...
+box.schema.user.revoke('guest', 'read,write,execute', 'universe')
+---
+...
diff --git a/test/replication/skip_conflict_row.test.lua b/test/replication/skip_conflict_row.test.lua
new file mode 100644
index 000000000..695cce9db
--- /dev/null
+++ b/test/replication/skip_conflict_row.test.lua
@@ -0,0 +1,37 @@
+env = require('test_run')
+test_run = env.new()
+engine = test_run:get_cfg('engine')
+
+box.schema.user.grant('guest', 'read,write,execute', 'universe')
+box.schema.user.grant('guest', 'replication')
+
+space = box.schema.space.create('test', {engine = engine});
+index = box.space.test:create_index('primary')
+
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+test_run:cmd("start server replica")
+test_run:cmd("switch replica")
+box.cfg{replication_skip_conflict = true}
+box.space.test:insert{1}
+
+test_run:cmd("switch default")
+space:insert{1, 1}
+space:insert{2}
+box.info.status
+
+vclock = test_run:get_vclock('default')
+_ = test_run:wait_vclock("replica", vclock)
+test_run:cmd("switch replica")
+box.info.replication[1].upstream.message
+box.info.replication[1].upstream.status
+box.space.test:select()
+
+test_run:cmd("switch default")
+box.info.status
+
+-- cleanup
+test_run:cmd("stop server replica")
+test_run:cmd("cleanup server replica")
+box.space.test:drop()
+box.schema.user.revoke('guest', 'replication')
+box.schema.user.revoke('guest', 'read,write,execute', 'universe')
-- 
2.14.3 (Apple Git-98)




More information about the Tarantool-patches mailing list