[PATCH] replication: automatic skip duplicating rows in replication

Konstantin Belyavskiy k.belyavskiy at tarantool.org
Thu Apr 12 14:01:25 MSK 2018


ticket: https://github.com/tarantool/tarantool/issues/3270
branch: https://github.com/tarantool/tarantool/compare/gh-3270-add-skip-conflict-row-option

In case of attempting to insert a duplicate key, an error ER_TUPLE_FOUND
occured, which led to disconnect. 
Introduce new oftion: 'silent_skip_conflict_rows', if set, then error of
this type will be ignored.

Closes #3270
---
 src/box/applier.cc                          |  15 +++-
 src/box/lua/load_cfg.lua                    |   3 +
 test/app-tap/init_script.result             |  39 +++++----
 test/box/admin.result                       |   2 +
 test/box/cfg.result                         |   4 +
 test/replication/replica_skip_row.lua       |  10 +++
 test/replication/skip_conflict_row.result   | 129 ++++++++++++++++++++++++++++
 test/replication/skip_conflict_row.test.lua |  46 ++++++++++
 8 files changed, 228 insertions(+), 20 deletions(-)
 create mode 100644 test/replication/replica_skip_row.lua
 create mode 100644 test/replication/skip_conflict_row.result
 create mode 100644 test/replication/skip_conflict_row.test.lua

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 9aa951c34..5ebe67e27 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -47,6 +47,7 @@
 #include "xrow_io.h"
 #include "error.h"
 #include "session.h"
+#include "cfg.h"
 
 STRS(applier_state, applier_STATE);
 
@@ -505,7 +506,19 @@ applier_subscribe(struct applier *applier)
 			 */
 			vclock_follow(&replicaset.vclock, row.replica_id,
 				      row.lsn);
-			xstream_write_xc(applier->subscribe_stream, &row);
+			if (xstream_write(applier->subscribe_stream, &row) != 0) {
+				struct error *e = diag_last_error(diag_get());
+				/**
+				 * Silently skip ER_TUPLE_FOUND error if such
+				 * option is set in config.
+				 */
+				if (e->type == &type_ClientError &&
+				    box_error_code(e) == ER_TUPLE_FOUND &&
+				    cfg_geti("silent_skip_conflict_rows"))
+					diag_clear(diag_get());
+				else
+					diag_raise();
+			}
 		}
 		if (applier->state == APPLIER_SYNC ||
 		    applier->state == APPLIER_FOLLOW)
diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
index 89fd7745e..372f00f25 100644
--- a/src/box/lua/load_cfg.lua
+++ b/src/box/lua/load_cfg.lua
@@ -62,6 +62,7 @@ local default_cfg = {
     feedback_enabled      = true,
     feedback_host         = "https://feedback.tarantool.io",
     feedback_interval     = 3600,
+    silent_skip_conflict_rows = false,
 }
 
 -- types of available options
@@ -121,6 +122,7 @@ local template_cfg = {
     feedback_enabled      = 'boolean',
     feedback_host         = 'string',
     feedback_interval     = 'number',
+    silent_skip_conflict_rows = 'boolean',
 }
 
 local function normalize_uri(port)
@@ -192,6 +194,7 @@ local dynamic_cfg = {
     force_recovery          = function() end,
     replication_timeout     = private.cfg_set_replication_timeout,
     replication_connect_quorum = private.cfg_set_replication_connect_quorum,
+    silent_skip_conflict_rows = function() end,
 }
 
 local dynamic_cfg_skip_at_load = {
diff --git a/test/app-tap/init_script.result b/test/app-tap/init_script.result
index ae01b488d..79f17649c 100644
--- a/test/app-tap/init_script.result
+++ b/test/app-tap/init_script.result
@@ -28,25 +28,26 @@ box.cfg
 23	replication_sync_lag:10
 24	replication_timeout:1
 25	rows_per_wal:500000
-26	slab_alloc_factor:1.05
-27	too_long_threshold:0.5
-28	vinyl_bloom_fpr:0.05
-29	vinyl_cache:134217728
-30	vinyl_dir:.
-31	vinyl_max_tuple_size:1048576
-32	vinyl_memory:134217728
-33	vinyl_page_size:8192
-34	vinyl_range_size:1073741824
-35	vinyl_read_threads:1
-36	vinyl_run_count_per_level:2
-37	vinyl_run_size_ratio:3.5
-38	vinyl_timeout:60
-39	vinyl_write_threads:2
-40	wal_dir:.
-41	wal_dir_rescan_delay:2
-42	wal_max_size:268435456
-43	wal_mode:write
-44	worker_pool_threads:4
+26	silent_skip_conflict_rows:false
+27	slab_alloc_factor:1.05
+28	too_long_threshold:0.5
+29	vinyl_bloom_fpr:0.05
+30	vinyl_cache:134217728
+31	vinyl_dir:.
+32	vinyl_max_tuple_size:1048576
+33	vinyl_memory:134217728
+34	vinyl_page_size:8192
+35	vinyl_range_size:1073741824
+36	vinyl_read_threads:1
+37	vinyl_run_count_per_level:2
+38	vinyl_run_size_ratio:3.5
+39	vinyl_timeout:60
+40	vinyl_write_threads:2
+41	wal_dir:.
+42	wal_dir_rescan_delay:2
+43	wal_max_size:268435456
+44	wal_mode:write
+45	worker_pool_threads:4
 --
 -- Test insert from detached fiber
 --
diff --git a/test/box/admin.result b/test/box/admin.result
index 296976629..bee3c39fe 100644
--- a/test/box/admin.result
+++ b/test/box/admin.result
@@ -68,6 +68,8 @@ cfg_filter(box.cfg)
     - 1
   - - rows_per_wal
     - 500000
+  - - silent_skip_conflict_rows
+    - false
   - - slab_alloc_factor
     - 1.05
   - - too_long_threshold
diff --git a/test/box/cfg.result b/test/box/cfg.result
index 717fa31c9..b3bf9d23c 100644
--- a/test/box/cfg.result
+++ b/test/box/cfg.result
@@ -64,6 +64,8 @@ cfg_filter(box.cfg)
     - 1
   - - rows_per_wal
     - 500000
+  - - silent_skip_conflict_rows
+    - false
   - - slab_alloc_factor
     - 1.05
   - - too_long_threshold
@@ -159,6 +161,8 @@ cfg_filter(box.cfg)
     - 1
   - - rows_per_wal
     - 500000
+  - - silent_skip_conflict_rows
+    - false
   - - slab_alloc_factor
     - 1.05
   - - too_long_threshold
diff --git a/test/replication/replica_skip_row.lua b/test/replication/replica_skip_row.lua
new file mode 100644
index 000000000..4a4d99882
--- /dev/null
+++ b/test/replication/replica_skip_row.lua
@@ -0,0 +1,10 @@
+#!/usr/bin/env tarantool
+
+box.cfg({
+    listen              = os.getenv("LISTEN"),
+    replication         = os.getenv("MASTER"),
+    memtx_memory        = 107374182,
+    silent_skip_conflict_rows = true,
+})
+
+require('console').listen(os.getenv('ADMIN'))
diff --git a/test/replication/skip_conflict_row.result b/test/replication/skip_conflict_row.result
new file mode 100644
index 000000000..38ff5e58c
--- /dev/null
+++ b/test/replication/skip_conflict_row.result
@@ -0,0 +1,129 @@
+env = require('test_run')
+---
+...
+test_run = env.new()
+---
+...
+engine = test_run:get_cfg('engine')
+---
+...
+box.schema.user.grant('guest', 'read,write,execute', 'universe')
+---
+...
+box.schema.user.grant('guest', 'replication')
+---
+...
+space = box.schema.space.create('test', {engine = engine});
+---
+...
+index = box.space.test:create_index('primary')
+---
+...
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica_skip_row.lua'")
+---
+- true
+...
+test_run:cmd("start server replica")
+---
+- true
+...
+repl = box.cfg.replication
+---
+...
+box.cfg{replication = ""}
+---
+...
+box.info.status
+---
+- running
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+repl = box.cfg.replication
+---
+...
+box.cfg{replication = ""}
+---
+...
+box.space.test:insert{2}
+---
+- [2]
+...
+box.space.test:insert{1}
+---
+- [1]
+...
+test_run:cmd("switch default")
+---
+- true
+...
+space:insert{1}
+---
+- [1]
+...
+space:select{}
+---
+- - [1]
+...
+box.cfg{replication = repl}
+---
+...
+box.info.status
+---
+- running
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+box.cfg{replication = repl}
+---
+...
+require('fiber').sleep(0.01)
+---
+...
+box.info.replication[1].upstream.message
+---
+- null
+...
+box.info.replication[1].upstream.status
+---
+- follow
+...
+box.space.test:select{}
+---
+- - [1]
+  - [2]
+...
+test_run:cmd("switch default")
+---
+- true
+...
+space:select{}
+---
+- - [1]
+...
+box.info.status
+---
+- running
+...
+-- cleanup
+test_run:cmd("stop server replica")
+---
+- true
+...
+test_run:cmd("cleanup server replica")
+---
+- true
+...
+box.space.test:drop()
+---
+...
+box.schema.user.revoke('guest', 'replication')
+---
+...
+box.schema.user.revoke('guest', 'read,write,execute', 'universe')
+---
+...
diff --git a/test/replication/skip_conflict_row.test.lua b/test/replication/skip_conflict_row.test.lua
new file mode 100644
index 000000000..5cfbff339
--- /dev/null
+++ b/test/replication/skip_conflict_row.test.lua
@@ -0,0 +1,46 @@
+env = require('test_run')
+test_run = env.new()
+engine = test_run:get_cfg('engine')
+
+box.schema.user.grant('guest', 'read,write,execute', 'universe')
+box.schema.user.grant('guest', 'replication')
+
+space = box.schema.space.create('test', {engine = engine});
+index = box.space.test:create_index('primary')
+
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica_skip_row.lua'")
+test_run:cmd("start server replica")
+
+repl = box.cfg.replication
+box.cfg{replication = ""}
+box.info.status
+
+test_run:cmd("switch replica")
+repl = box.cfg.replication
+box.cfg{replication = ""}
+box.space.test:insert{2}
+box.space.test:insert{1}
+
+test_run:cmd("switch default")
+space:insert{1}
+space:select{}
+box.cfg{replication = repl}
+box.info.status
+
+test_run:cmd("switch replica")
+box.cfg{replication = repl}
+require('fiber').sleep(0.01)
+box.info.replication[1].upstream.message
+box.info.replication[1].upstream.status
+box.space.test:select{}
+
+test_run:cmd("switch default")
+space:select{}
+box.info.status
+
+-- cleanup
+test_run:cmd("stop server replica")
+test_run:cmd("cleanup server replica")
+box.space.test:drop()
+box.schema.user.revoke('guest', 'replication')
+box.schema.user.revoke('guest', 'read,write,execute', 'universe')
-- 
2.14.3 (Apple Git-98)




More information about the Tarantool-patches mailing list