From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Konstantin Belyavskiy Subject: [PATCH] replication: automatic skip duplicating rows in replication Date: Fri, 13 Apr 2018 16:44:12 +0300 Message-Id: <20180413134412.20737-1-k.belyavskiy@tarantool.org> To: vdavydov@tarantool.org, georgy@tarantool.org Cc: tarantool-patches@freelists.org List-ID: ticket: https://github.com/tarantool/tarantool/issues/3270 branch: https://github.com/tarantool/tarantool/compare/gh-3270-add-skip-conflict-row-option In case of attempting to insert a duplicate key, an error ER_TUPLE_FOUND occured, which led to disconnect. Introduce new oftion: 'replication_skip_conflict', if set, then error of this type will be ignored. Closes #3270 --- src/box/applier.cc | 15 +++- src/box/lua/load_cfg.lua | 3 + test/app-tap/init_script.result | 45 ++++++------ test/box/admin.result | 2 + test/box/cfg.result | 4 ++ test/replication/skip_conflict_row.result | 105 ++++++++++++++++++++++++++++ test/replication/skip_conflict_row.test.lua | 37 ++++++++++ 7 files changed, 188 insertions(+), 23 deletions(-) create mode 100644 test/replication/skip_conflict_row.result create mode 100644 test/replication/skip_conflict_row.test.lua diff --git a/src/box/applier.cc b/src/box/applier.cc index 9aa951c34..6f6970b13 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -47,6 +47,7 @@ #include "xrow_io.h" #include "error.h" #include "session.h" +#include "cfg.h" STRS(applier_state, applier_STATE); @@ -505,7 +506,19 @@ applier_subscribe(struct applier *applier) */ vclock_follow(&replicaset.vclock, row.replica_id, row.lsn); - xstream_write_xc(applier->subscribe_stream, &row); + if (xstream_write(applier->subscribe_stream, &row) != 0) { + struct error *e = diag_last_error(diag_get()); + /** + * Silently skip ER_TUPLE_FOUND error if such + * option is set in config. + */ + if (e->type == &type_ClientError && + box_error_code(e) == ER_TUPLE_FOUND && + cfg_geti("replication_skip_conflict")) + diag_clear(diag_get()); + else + diag_raise(); + } } if (applier->state == APPLIER_SYNC || applier->state == APPLIER_FOLLOW) diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua index 89fd7745e..3a5a6d46a 100644 --- a/src/box/lua/load_cfg.lua +++ b/src/box/lua/load_cfg.lua @@ -59,6 +59,7 @@ local default_cfg = { replication_sync_lag = 10, replication_connect_timeout = 4, replication_connect_quorum = nil, -- connect all + replication_skip_conflict = false, feedback_enabled = true, feedback_host = "https://feedback.tarantool.io", feedback_interval = 3600, @@ -118,6 +119,7 @@ local template_cfg = { replication_sync_lag = 'number', replication_connect_timeout = 'number', replication_connect_quorum = 'number', + replication_skip_conflict = 'boolean', feedback_enabled = 'boolean', feedback_host = 'string', feedback_interval = 'number', @@ -192,6 +194,7 @@ local dynamic_cfg = { force_recovery = function() end, replication_timeout = private.cfg_set_replication_timeout, replication_connect_quorum = private.cfg_set_replication_connect_quorum, + replication_skip_conflict = function() end, } local dynamic_cfg_skip_at_load = { diff --git a/test/app-tap/init_script.result b/test/app-tap/init_script.result index ae01b488d..5625f1466 100644 --- a/test/app-tap/init_script.result +++ b/test/app-tap/init_script.result @@ -25,28 +25,29 @@ box.cfg 20 read_only:false 21 readahead:16320 22 replication_connect_timeout:4 -23 replication_sync_lag:10 -24 replication_timeout:1 -25 rows_per_wal:500000 -26 slab_alloc_factor:1.05 -27 too_long_threshold:0.5 -28 vinyl_bloom_fpr:0.05 -29 vinyl_cache:134217728 -30 vinyl_dir:. -31 vinyl_max_tuple_size:1048576 -32 vinyl_memory:134217728 -33 vinyl_page_size:8192 -34 vinyl_range_size:1073741824 -35 vinyl_read_threads:1 -36 vinyl_run_count_per_level:2 -37 vinyl_run_size_ratio:3.5 -38 vinyl_timeout:60 -39 vinyl_write_threads:2 -40 wal_dir:. -41 wal_dir_rescan_delay:2 -42 wal_max_size:268435456 -43 wal_mode:write -44 worker_pool_threads:4 +23 replication_skip_conflict:false +24 replication_sync_lag:10 +25 replication_timeout:1 +26 rows_per_wal:500000 +27 slab_alloc_factor:1.05 +28 too_long_threshold:0.5 +29 vinyl_bloom_fpr:0.05 +30 vinyl_cache:134217728 +31 vinyl_dir:. +32 vinyl_max_tuple_size:1048576 +33 vinyl_memory:134217728 +34 vinyl_page_size:8192 +35 vinyl_range_size:1073741824 +36 vinyl_read_threads:1 +37 vinyl_run_count_per_level:2 +38 vinyl_run_size_ratio:3.5 +39 vinyl_timeout:60 +40 vinyl_write_threads:2 +41 wal_dir:. +42 wal_dir_rescan_delay:2 +43 wal_max_size:268435456 +44 wal_mode:write +45 worker_pool_threads:4 -- -- Test insert from detached fiber -- diff --git a/test/box/admin.result b/test/box/admin.result index 296976629..2168c3adb 100644 --- a/test/box/admin.result +++ b/test/box/admin.result @@ -62,6 +62,8 @@ cfg_filter(box.cfg) - 16320 - - replication_connect_timeout - 4 + - - replication_skip_conflict + - false - - replication_sync_lag - 10 - - replication_timeout diff --git a/test/box/cfg.result b/test/box/cfg.result index 717fa31c9..28449d9cc 100644 --- a/test/box/cfg.result +++ b/test/box/cfg.result @@ -58,6 +58,8 @@ cfg_filter(box.cfg) - 16320 - - replication_connect_timeout - 4 + - - replication_skip_conflict + - false - - replication_sync_lag - 10 - - replication_timeout @@ -153,6 +155,8 @@ cfg_filter(box.cfg) - 16320 - - replication_connect_timeout - 4 + - - replication_skip_conflict + - false - - replication_sync_lag - 10 - - replication_timeout diff --git a/test/replication/skip_conflict_row.result b/test/replication/skip_conflict_row.result new file mode 100644 index 000000000..bf794db5a --- /dev/null +++ b/test/replication/skip_conflict_row.result @@ -0,0 +1,105 @@ +env = require('test_run') +--- +... +test_run = env.new() +--- +... +engine = test_run:get_cfg('engine') +--- +... +box.schema.user.grant('guest', 'read,write,execute', 'universe') +--- +... +box.schema.user.grant('guest', 'replication') +--- +... +space = box.schema.space.create('test', {engine = engine}); +--- +... +index = box.space.test:create_index('primary') +--- +... +test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'") +--- +- true +... +test_run:cmd("start server replica") +--- +- true +... +test_run:cmd("switch replica") +--- +- true +... +box.cfg{replication_skip_conflict = true} +--- +... +box.space.test:insert{1} +--- +- [1] +... +test_run:cmd("switch default") +--- +- true +... +space:insert{1, 1} +--- +- [1, 1] +... +space:insert{2} +--- +- [2] +... +box.info.status +--- +- running +... +vclock = test_run:get_vclock('default') +--- +... +_ = test_run:wait_vclock("replica", vclock) +--- +... +test_run:cmd("switch replica") +--- +- true +... +box.info.replication[1].upstream.message +--- +- null +... +box.info.replication[1].upstream.status +--- +- follow +... +box.space.test:select() +--- +- - [1] + - [2] +... +test_run:cmd("switch default") +--- +- true +... +box.info.status +--- +- running +... +-- cleanup +test_run:cmd("stop server replica") +--- +- true +... +test_run:cmd("cleanup server replica") +--- +- true +... +box.space.test:drop() +--- +... +box.schema.user.revoke('guest', 'replication') +--- +... +box.schema.user.revoke('guest', 'read,write,execute', 'universe') +--- +... diff --git a/test/replication/skip_conflict_row.test.lua b/test/replication/skip_conflict_row.test.lua new file mode 100644 index 000000000..695cce9db --- /dev/null +++ b/test/replication/skip_conflict_row.test.lua @@ -0,0 +1,37 @@ +env = require('test_run') +test_run = env.new() +engine = test_run:get_cfg('engine') + +box.schema.user.grant('guest', 'read,write,execute', 'universe') +box.schema.user.grant('guest', 'replication') + +space = box.schema.space.create('test', {engine = engine}); +index = box.space.test:create_index('primary') + +test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'") +test_run:cmd("start server replica") +test_run:cmd("switch replica") +box.cfg{replication_skip_conflict = true} +box.space.test:insert{1} + +test_run:cmd("switch default") +space:insert{1, 1} +space:insert{2} +box.info.status + +vclock = test_run:get_vclock('default') +_ = test_run:wait_vclock("replica", vclock) +test_run:cmd("switch replica") +box.info.replication[1].upstream.message +box.info.replication[1].upstream.status +box.space.test:select() + +test_run:cmd("switch default") +box.info.status + +-- cleanup +test_run:cmd("stop server replica") +test_run:cmd("cleanup server replica") +box.space.test:drop() +box.schema.user.revoke('guest', 'replication') +box.schema.user.revoke('guest', 'read,write,execute', 'universe') -- 2.14.3 (Apple Git-98)