From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Date: Fri, 13 Apr 2018 15:59:23 +0300 From: Vladimir Davydov Subject: Re: [tarantool-patches] Re: [PATCH] replication: automatic skip duplicating rows in replication Message-ID: <20180413125923.q2j3mnv7cckwigbt@esperanza> References: <20180412110125.84663-1-k.belyavskiy@tarantool.org> <20180413082348.tlj3ejjis4hjrgo3@esperanza> <1523619988.284799840@f105.i.mail.ru> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: <1523619988.284799840@f105.i.mail.ru> To: Konstantin Belyavskiy Cc: tarantool-patches@freelists.org, georgy List-ID: On Fri, Apr 13, 2018 at 02:46:28PM +0300, Konstantin Belyavskiy wrote: > Please take a look at newer version. Once again, please enclose the patch in the email if you've fixed something and want it to get reviewed. > From e20b33f032b7f50289f6c7445352220e7ea81101 Mon Sep 17 00:00:00 2001 > From: Konstantin Belyavskiy > Date: Wed, 4 Apr 2018 20:07:49 +0300 > Subject: [PATCH] replication: automatic skip duplicating rows in replication > > In case of attempting to insert a duplicate key, an error ER_TUPLE_FOUND > occured, which led to disconnect. > Introduce new oftion: 'silent_skip_conflict_rows', if set, then error of The option is called replication_skip_conflict now... > this type will be ignored. > > Closes #3270 > > diff --git a/src/box/applier.cc b/src/box/applier.cc > index 9aa951c3..6f6970b1 100644 > --- a/src/box/applier.cc > +++ b/src/box/applier.cc > @@ -47,6 +47,7 @@ > #include "xrow_io.h" > #include "error.h" > #include "session.h" > +#include "cfg.h" > > STRS(applier_state, applier_STATE); > > @@ -505,7 +506,19 @@ applier_subscribe(struct applier *applier) > */ > vclock_follow(&replicaset.vclock, row.replica_id, > row.lsn); > - xstream_write_xc(applier->subscribe_stream, &row); > + if (xstream_write(applier->subscribe_stream, &row) != 0) { > + struct error *e = diag_last_error(diag_get()); > + /** > + * Silently skip ER_TUPLE_FOUND error if such > + * option is set in config. > + */ > + if (e->type == &type_ClientError && > + box_error_code(e) == ER_TUPLE_FOUND && > + cfg_geti("replication_skip_conflict")) > + diag_clear(diag_get()); > + else > + diag_raise(); > + } > } > if (applier->state == APPLIER_SYNC || > applier->state == APPLIER_FOLLOW) > diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua > index 89fd7745..ff1757f8 100644 > --- a/src/box/lua/load_cfg.lua > +++ b/src/box/lua/load_cfg.lua > @@ -62,6 +62,7 @@ local default_cfg = { > feedback_enabled = true, > feedback_host = "https://feedback.tarantool.io", > feedback_interval = 3600, > + replication_skip_conflict = false, Nit: please put this new option where other replication options are defined so that they are all neatly grouped together. > } > > -- types of available options > @@ -121,6 +122,7 @@ local template_cfg = { > feedback_enabled = 'boolean', > feedback_host = 'string', > feedback_interval = 'number', > + replication_skip_conflict = 'boolean', Ditto > } > > local function normalize_uri(port) > @@ -192,6 +194,7 @@ local dynamic_cfg = { > force_recovery = function() end, > replication_timeout = private.cfg_set_replication_timeout, > replication_connect_quorum = private.cfg_set_replication_connect_quorum, > + replication_skip_conflict = function() end, > } > > local dynamic_cfg_skip_at_load = { > diff --git a/test/replication/skip_conflict_row.test.lua b/test/replication/skip_conflict_row.test.lua > new file mode 100644 > index 00000000..403b4635 > --- /dev/null > +++ b/test/replication/skip_conflict_row.test.lua > @@ -0,0 +1,34 @@ > +env = require('test_run') > +test_run = env.new() > +engine = test_run:get_cfg('engine') > + > +box.schema.user.grant('guest', 'read,write,execute', 'universe') > +box.schema.user.grant('guest', 'replication') > + > +space = box.schema.space.create('test', {engine = engine}); > +index = box.space.test:create_index('primary') > + > +test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'") > +test_run:cmd("start server replica") > +test_run:cmd("switch replica") > +box.cfg{replication_skip_conflict = true} > +box.space.test:insert{1} > + > +test_run:cmd("switch default") > +space:insert{1} I think you should insert a different tuple (say {1, 1}) and check that it doesn't overwrite the old one. Also, I think you should insert more tuples and see that the replication is still working after stumbling upon a conflicting row. > +box.info.status > + > +test_run:cmd("switch replica") > +require('fiber').sleep(0.01) Using sleep like this is racy. Use wait_vclock instead, please. > +box.info.replication[1].upstream.message > +box.info.replication[1].upstream.status > + > +test_run:cmd("switch default") > +box.info.status > + > +-- cleanup > +test_run:cmd("stop server replica") > +test_run:cmd("cleanup server replica") > +box.space.test:drop() > +box.schema.user.revoke('guest', 'replication') > +box.schema.user.revoke('guest', 'read,write,execute', 'universe')