[tarantool-patches] Re: [PATCH] replication: automatic skip duplicating rows in replication
Vladimir Davydov
vdavydov.dev at gmail.com
Fri Apr 13 15:59:23 MSK 2018
On Fri, Apr 13, 2018 at 02:46:28PM +0300, Konstantin Belyavskiy wrote:
> Please take a look at newer version.
Once again, please enclose the patch in the email if you've fixed
something and want it to get reviewed.
> From e20b33f032b7f50289f6c7445352220e7ea81101 Mon Sep 17 00:00:00 2001
> From: Konstantin Belyavskiy <k.belyavskiy at tarantool.org>
> Date: Wed, 4 Apr 2018 20:07:49 +0300
> Subject: [PATCH] replication: automatic skip duplicating rows in replication
>
> In case of attempting to insert a duplicate key, an error ER_TUPLE_FOUND
> occured, which led to disconnect.
> Introduce new oftion: 'silent_skip_conflict_rows', if set, then error of
The option is called replication_skip_conflict now...
> this type will be ignored.
>
> Closes #3270
>
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 9aa951c3..6f6970b1 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -47,6 +47,7 @@
> #include "xrow_io.h"
> #include "error.h"
> #include "session.h"
> +#include "cfg.h"
>
> STRS(applier_state, applier_STATE);
>
> @@ -505,7 +506,19 @@ applier_subscribe(struct applier *applier)
> */
> vclock_follow(&replicaset.vclock, row.replica_id,
> row.lsn);
> - xstream_write_xc(applier->subscribe_stream, &row);
> + if (xstream_write(applier->subscribe_stream, &row) != 0) {
> + struct error *e = diag_last_error(diag_get());
> + /**
> + * Silently skip ER_TUPLE_FOUND error if such
> + * option is set in config.
> + */
> + if (e->type == &type_ClientError &&
> + box_error_code(e) == ER_TUPLE_FOUND &&
> + cfg_geti("replication_skip_conflict"))
> + diag_clear(diag_get());
> + else
> + diag_raise();
> + }
> }
> if (applier->state == APPLIER_SYNC ||
> applier->state == APPLIER_FOLLOW)
> diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
> index 89fd7745..ff1757f8 100644
> --- a/src/box/lua/load_cfg.lua
> +++ b/src/box/lua/load_cfg.lua
> @@ -62,6 +62,7 @@ local default_cfg = {
> feedback_enabled = true,
> feedback_host = "https://feedback.tarantool.io",
> feedback_interval = 3600,
> + replication_skip_conflict = false,
Nit: please put this new option where other replication options are
defined so that they are all neatly grouped together.
> }
>
> -- types of available options
> @@ -121,6 +122,7 @@ local template_cfg = {
> feedback_enabled = 'boolean',
> feedback_host = 'string',
> feedback_interval = 'number',
> + replication_skip_conflict = 'boolean',
Ditto
> }
>
> local function normalize_uri(port)
> @@ -192,6 +194,7 @@ local dynamic_cfg = {
> force_recovery = function() end,
> replication_timeout = private.cfg_set_replication_timeout,
> replication_connect_quorum = private.cfg_set_replication_connect_quorum,
> + replication_skip_conflict = function() end,
> }
>
> local dynamic_cfg_skip_at_load = {
> diff --git a/test/replication/skip_conflict_row.test.lua b/test/replication/skip_conflict_row.test.lua
> new file mode 100644
> index 00000000..403b4635
> --- /dev/null
> +++ b/test/replication/skip_conflict_row.test.lua
> @@ -0,0 +1,34 @@
> +env = require('test_run')
> +test_run = env.new()
> +engine = test_run:get_cfg('engine')
> +
> +box.schema.user.grant('guest', 'read,write,execute', 'universe')
> +box.schema.user.grant('guest', 'replication')
> +
> +space = box.schema.space.create('test', {engine = engine});
> +index = box.space.test:create_index('primary')
> +
> +test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
> +test_run:cmd("start server replica")
> +test_run:cmd("switch replica")
> +box.cfg{replication_skip_conflict = true}
> +box.space.test:insert{1}
> +
> +test_run:cmd("switch default")
> +space:insert{1}
I think you should insert a different tuple (say {1, 1}) and check that
it doesn't overwrite the old one. Also, I think you should insert more
tuples and see that the replication is still working after stumbling
upon a conflicting row.
> +box.info.status
> +
> +test_run:cmd("switch replica")
> +require('fiber').sleep(0.01)
Using sleep like this is racy. Use wait_vclock instead, please.
> +box.info.replication[1].upstream.message
> +box.info.replication[1].upstream.status
> +
> +test_run:cmd("switch default")
> +box.info.status
> +
> +-- cleanup
> +test_run:cmd("stop server replica")
> +test_run:cmd("cleanup server replica")
> +box.space.test:drop()
> +box.schema.user.revoke('guest', 'replication')
> +box.schema.user.revoke('guest', 'read,write,execute', 'universe')
More information about the Tarantool-patches
mailing list