From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Konstantin Belyavskiy Subject: [PATCH] replication: automatic skip duplicating rows in replication Date: Thu, 12 Apr 2018 14:01:25 +0300 Message-Id: <20180412110125.84663-1-k.belyavskiy@tarantool.org> To: vdavydov@tarantool.org, georgy@tarantool.org Cc: tarantool-patches@freelists.org List-ID: ticket: https://github.com/tarantool/tarantool/issues/3270 branch: https://github.com/tarantool/tarantool/compare/gh-3270-add-skip-conflict-row-option In case of attempting to insert a duplicate key, an error ER_TUPLE_FOUND occured, which led to disconnect. Introduce new oftion: 'silent_skip_conflict_rows', if set, then error of this type will be ignored. Closes #3270 --- src/box/applier.cc | 15 +++- src/box/lua/load_cfg.lua | 3 + test/app-tap/init_script.result | 39 +++++---- test/box/admin.result | 2 + test/box/cfg.result | 4 + test/replication/replica_skip_row.lua | 10 +++ test/replication/skip_conflict_row.result | 129 ++++++++++++++++++++++++++++ test/replication/skip_conflict_row.test.lua | 46 ++++++++++ 8 files changed, 228 insertions(+), 20 deletions(-) create mode 100644 test/replication/replica_skip_row.lua create mode 100644 test/replication/skip_conflict_row.result create mode 100644 test/replication/skip_conflict_row.test.lua diff --git a/src/box/applier.cc b/src/box/applier.cc index 9aa951c34..5ebe67e27 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -47,6 +47,7 @@ #include "xrow_io.h" #include "error.h" #include "session.h" +#include "cfg.h" STRS(applier_state, applier_STATE); @@ -505,7 +506,19 @@ applier_subscribe(struct applier *applier) */ vclock_follow(&replicaset.vclock, row.replica_id, row.lsn); - xstream_write_xc(applier->subscribe_stream, &row); + if (xstream_write(applier->subscribe_stream, &row) != 0) { + struct error *e = diag_last_error(diag_get()); + /** + * Silently skip ER_TUPLE_FOUND error if such + * option is set in config. + */ + if (e->type == &type_ClientError && + box_error_code(e) == ER_TUPLE_FOUND && + cfg_geti("silent_skip_conflict_rows")) + diag_clear(diag_get()); + else + diag_raise(); + } } if (applier->state == APPLIER_SYNC || applier->state == APPLIER_FOLLOW) diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua index 89fd7745e..372f00f25 100644 --- a/src/box/lua/load_cfg.lua +++ b/src/box/lua/load_cfg.lua @@ -62,6 +62,7 @@ local default_cfg = { feedback_enabled = true, feedback_host = "https://feedback.tarantool.io", feedback_interval = 3600, + silent_skip_conflict_rows = false, } -- types of available options @@ -121,6 +122,7 @@ local template_cfg = { feedback_enabled = 'boolean', feedback_host = 'string', feedback_interval = 'number', + silent_skip_conflict_rows = 'boolean', } local function normalize_uri(port) @@ -192,6 +194,7 @@ local dynamic_cfg = { force_recovery = function() end, replication_timeout = private.cfg_set_replication_timeout, replication_connect_quorum = private.cfg_set_replication_connect_quorum, + silent_skip_conflict_rows = function() end, } local dynamic_cfg_skip_at_load = { diff --git a/test/app-tap/init_script.result b/test/app-tap/init_script.result index ae01b488d..79f17649c 100644 --- a/test/app-tap/init_script.result +++ b/test/app-tap/init_script.result @@ -28,25 +28,26 @@ box.cfg 23 replication_sync_lag:10 24 replication_timeout:1 25 rows_per_wal:500000 -26 slab_alloc_factor:1.05 -27 too_long_threshold:0.5 -28 vinyl_bloom_fpr:0.05 -29 vinyl_cache:134217728 -30 vinyl_dir:. -31 vinyl_max_tuple_size:1048576 -32 vinyl_memory:134217728 -33 vinyl_page_size:8192 -34 vinyl_range_size:1073741824 -35 vinyl_read_threads:1 -36 vinyl_run_count_per_level:2 -37 vinyl_run_size_ratio:3.5 -38 vinyl_timeout:60 -39 vinyl_write_threads:2 -40 wal_dir:. -41 wal_dir_rescan_delay:2 -42 wal_max_size:268435456 -43 wal_mode:write -44 worker_pool_threads:4 +26 silent_skip_conflict_rows:false +27 slab_alloc_factor:1.05 +28 too_long_threshold:0.5 +29 vinyl_bloom_fpr:0.05 +30 vinyl_cache:134217728 +31 vinyl_dir:. +32 vinyl_max_tuple_size:1048576 +33 vinyl_memory:134217728 +34 vinyl_page_size:8192 +35 vinyl_range_size:1073741824 +36 vinyl_read_threads:1 +37 vinyl_run_count_per_level:2 +38 vinyl_run_size_ratio:3.5 +39 vinyl_timeout:60 +40 vinyl_write_threads:2 +41 wal_dir:. +42 wal_dir_rescan_delay:2 +43 wal_max_size:268435456 +44 wal_mode:write +45 worker_pool_threads:4 -- -- Test insert from detached fiber -- diff --git a/test/box/admin.result b/test/box/admin.result index 296976629..bee3c39fe 100644 --- a/test/box/admin.result +++ b/test/box/admin.result @@ -68,6 +68,8 @@ cfg_filter(box.cfg) - 1 - - rows_per_wal - 500000 + - - silent_skip_conflict_rows + - false - - slab_alloc_factor - 1.05 - - too_long_threshold diff --git a/test/box/cfg.result b/test/box/cfg.result index 717fa31c9..b3bf9d23c 100644 --- a/test/box/cfg.result +++ b/test/box/cfg.result @@ -64,6 +64,8 @@ cfg_filter(box.cfg) - 1 - - rows_per_wal - 500000 + - - silent_skip_conflict_rows + - false - - slab_alloc_factor - 1.05 - - too_long_threshold @@ -159,6 +161,8 @@ cfg_filter(box.cfg) - 1 - - rows_per_wal - 500000 + - - silent_skip_conflict_rows + - false - - slab_alloc_factor - 1.05 - - too_long_threshold diff --git a/test/replication/replica_skip_row.lua b/test/replication/replica_skip_row.lua new file mode 100644 index 000000000..4a4d99882 --- /dev/null +++ b/test/replication/replica_skip_row.lua @@ -0,0 +1,10 @@ +#!/usr/bin/env tarantool + +box.cfg({ + listen = os.getenv("LISTEN"), + replication = os.getenv("MASTER"), + memtx_memory = 107374182, + silent_skip_conflict_rows = true, +}) + +require('console').listen(os.getenv('ADMIN')) diff --git a/test/replication/skip_conflict_row.result b/test/replication/skip_conflict_row.result new file mode 100644 index 000000000..38ff5e58c --- /dev/null +++ b/test/replication/skip_conflict_row.result @@ -0,0 +1,129 @@ +env = require('test_run') +--- +... +test_run = env.new() +--- +... +engine = test_run:get_cfg('engine') +--- +... +box.schema.user.grant('guest', 'read,write,execute', 'universe') +--- +... +box.schema.user.grant('guest', 'replication') +--- +... +space = box.schema.space.create('test', {engine = engine}); +--- +... +index = box.space.test:create_index('primary') +--- +... +test_run:cmd("create server replica with rpl_master=default, script='replication/replica_skip_row.lua'") +--- +- true +... +test_run:cmd("start server replica") +--- +- true +... +repl = box.cfg.replication +--- +... +box.cfg{replication = ""} +--- +... +box.info.status +--- +- running +... +test_run:cmd("switch replica") +--- +- true +... +repl = box.cfg.replication +--- +... +box.cfg{replication = ""} +--- +... +box.space.test:insert{2} +--- +- [2] +... +box.space.test:insert{1} +--- +- [1] +... +test_run:cmd("switch default") +--- +- true +... +space:insert{1} +--- +- [1] +... +space:select{} +--- +- - [1] +... +box.cfg{replication = repl} +--- +... +box.info.status +--- +- running +... +test_run:cmd("switch replica") +--- +- true +... +box.cfg{replication = repl} +--- +... +require('fiber').sleep(0.01) +--- +... +box.info.replication[1].upstream.message +--- +- null +... +box.info.replication[1].upstream.status +--- +- follow +... +box.space.test:select{} +--- +- - [1] + - [2] +... +test_run:cmd("switch default") +--- +- true +... +space:select{} +--- +- - [1] +... +box.info.status +--- +- running +... +-- cleanup +test_run:cmd("stop server replica") +--- +- true +... +test_run:cmd("cleanup server replica") +--- +- true +... +box.space.test:drop() +--- +... +box.schema.user.revoke('guest', 'replication') +--- +... +box.schema.user.revoke('guest', 'read,write,execute', 'universe') +--- +... diff --git a/test/replication/skip_conflict_row.test.lua b/test/replication/skip_conflict_row.test.lua new file mode 100644 index 000000000..5cfbff339 --- /dev/null +++ b/test/replication/skip_conflict_row.test.lua @@ -0,0 +1,46 @@ +env = require('test_run') +test_run = env.new() +engine = test_run:get_cfg('engine') + +box.schema.user.grant('guest', 'read,write,execute', 'universe') +box.schema.user.grant('guest', 'replication') + +space = box.schema.space.create('test', {engine = engine}); +index = box.space.test:create_index('primary') + +test_run:cmd("create server replica with rpl_master=default, script='replication/replica_skip_row.lua'") +test_run:cmd("start server replica") + +repl = box.cfg.replication +box.cfg{replication = ""} +box.info.status + +test_run:cmd("switch replica") +repl = box.cfg.replication +box.cfg{replication = ""} +box.space.test:insert{2} +box.space.test:insert{1} + +test_run:cmd("switch default") +space:insert{1} +space:select{} +box.cfg{replication = repl} +box.info.status + +test_run:cmd("switch replica") +box.cfg{replication = repl} +require('fiber').sleep(0.01) +box.info.replication[1].upstream.message +box.info.replication[1].upstream.status +box.space.test:select{} + +test_run:cmd("switch default") +space:select{} +box.info.status + +-- cleanup +test_run:cmd("stop server replica") +test_run:cmd("cleanup server replica") +box.space.test:drop() +box.schema.user.revoke('guest', 'replication') +box.schema.user.revoke('guest', 'read,write,execute', 'universe') -- 2.14.3 (Apple Git-98)