Tarantool development patches archive
 help / color / mirror / Atom feed
* [PATCH] replication: automatic skip duplicating rows in replication
@ 2018-04-12 11:01 Konstantin Belyavskiy
  2018-04-13  8:23 ` Vladimir Davydov
  0 siblings, 1 reply; 7+ messages in thread
From: Konstantin Belyavskiy @ 2018-04-12 11:01 UTC (permalink / raw)
  To: vdavydov, georgy; +Cc: tarantool-patches

ticket: https://github.com/tarantool/tarantool/issues/3270
branch: https://github.com/tarantool/tarantool/compare/gh-3270-add-skip-conflict-row-option

In case of attempting to insert a duplicate key, an error ER_TUPLE_FOUND
occured, which led to disconnect. 
Introduce new oftion: 'silent_skip_conflict_rows', if set, then error of
this type will be ignored.

Closes #3270
---
 src/box/applier.cc                          |  15 +++-
 src/box/lua/load_cfg.lua                    |   3 +
 test/app-tap/init_script.result             |  39 +++++----
 test/box/admin.result                       |   2 +
 test/box/cfg.result                         |   4 +
 test/replication/replica_skip_row.lua       |  10 +++
 test/replication/skip_conflict_row.result   | 129 ++++++++++++++++++++++++++++
 test/replication/skip_conflict_row.test.lua |  46 ++++++++++
 8 files changed, 228 insertions(+), 20 deletions(-)
 create mode 100644 test/replication/replica_skip_row.lua
 create mode 100644 test/replication/skip_conflict_row.result
 create mode 100644 test/replication/skip_conflict_row.test.lua

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 9aa951c34..5ebe67e27 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -47,6 +47,7 @@
 #include "xrow_io.h"
 #include "error.h"
 #include "session.h"
+#include "cfg.h"
 
 STRS(applier_state, applier_STATE);
 
@@ -505,7 +506,19 @@ applier_subscribe(struct applier *applier)
 			 */
 			vclock_follow(&replicaset.vclock, row.replica_id,
 				      row.lsn);
-			xstream_write_xc(applier->subscribe_stream, &row);
+			if (xstream_write(applier->subscribe_stream, &row) != 0) {
+				struct error *e = diag_last_error(diag_get());
+				/**
+				 * Silently skip ER_TUPLE_FOUND error if such
+				 * option is set in config.
+				 */
+				if (e->type == &type_ClientError &&
+				    box_error_code(e) == ER_TUPLE_FOUND &&
+				    cfg_geti("silent_skip_conflict_rows"))
+					diag_clear(diag_get());
+				else
+					diag_raise();
+			}
 		}
 		if (applier->state == APPLIER_SYNC ||
 		    applier->state == APPLIER_FOLLOW)
diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
index 89fd7745e..372f00f25 100644
--- a/src/box/lua/load_cfg.lua
+++ b/src/box/lua/load_cfg.lua
@@ -62,6 +62,7 @@ local default_cfg = {
     feedback_enabled      = true,
     feedback_host         = "https://feedback.tarantool.io",
     feedback_interval     = 3600,
+    silent_skip_conflict_rows = false,
 }
 
 -- types of available options
@@ -121,6 +122,7 @@ local template_cfg = {
     feedback_enabled      = 'boolean',
     feedback_host         = 'string',
     feedback_interval     = 'number',
+    silent_skip_conflict_rows = 'boolean',
 }
 
 local function normalize_uri(port)
@@ -192,6 +194,7 @@ local dynamic_cfg = {
     force_recovery          = function() end,
     replication_timeout     = private.cfg_set_replication_timeout,
     replication_connect_quorum = private.cfg_set_replication_connect_quorum,
+    silent_skip_conflict_rows = function() end,
 }
 
 local dynamic_cfg_skip_at_load = {
diff --git a/test/app-tap/init_script.result b/test/app-tap/init_script.result
index ae01b488d..79f17649c 100644
--- a/test/app-tap/init_script.result
+++ b/test/app-tap/init_script.result
@@ -28,25 +28,26 @@ box.cfg
 23	replication_sync_lag:10
 24	replication_timeout:1
 25	rows_per_wal:500000
-26	slab_alloc_factor:1.05
-27	too_long_threshold:0.5
-28	vinyl_bloom_fpr:0.05
-29	vinyl_cache:134217728
-30	vinyl_dir:.
-31	vinyl_max_tuple_size:1048576
-32	vinyl_memory:134217728
-33	vinyl_page_size:8192
-34	vinyl_range_size:1073741824
-35	vinyl_read_threads:1
-36	vinyl_run_count_per_level:2
-37	vinyl_run_size_ratio:3.5
-38	vinyl_timeout:60
-39	vinyl_write_threads:2
-40	wal_dir:.
-41	wal_dir_rescan_delay:2
-42	wal_max_size:268435456
-43	wal_mode:write
-44	worker_pool_threads:4
+26	silent_skip_conflict_rows:false
+27	slab_alloc_factor:1.05
+28	too_long_threshold:0.5
+29	vinyl_bloom_fpr:0.05
+30	vinyl_cache:134217728
+31	vinyl_dir:.
+32	vinyl_max_tuple_size:1048576
+33	vinyl_memory:134217728
+34	vinyl_page_size:8192
+35	vinyl_range_size:1073741824
+36	vinyl_read_threads:1
+37	vinyl_run_count_per_level:2
+38	vinyl_run_size_ratio:3.5
+39	vinyl_timeout:60
+40	vinyl_write_threads:2
+41	wal_dir:.
+42	wal_dir_rescan_delay:2
+43	wal_max_size:268435456
+44	wal_mode:write
+45	worker_pool_threads:4
 --
 -- Test insert from detached fiber
 --
diff --git a/test/box/admin.result b/test/box/admin.result
index 296976629..bee3c39fe 100644
--- a/test/box/admin.result
+++ b/test/box/admin.result
@@ -68,6 +68,8 @@ cfg_filter(box.cfg)
     - 1
   - - rows_per_wal
     - 500000
+  - - silent_skip_conflict_rows
+    - false
   - - slab_alloc_factor
     - 1.05
   - - too_long_threshold
diff --git a/test/box/cfg.result b/test/box/cfg.result
index 717fa31c9..b3bf9d23c 100644
--- a/test/box/cfg.result
+++ b/test/box/cfg.result
@@ -64,6 +64,8 @@ cfg_filter(box.cfg)
     - 1
   - - rows_per_wal
     - 500000
+  - - silent_skip_conflict_rows
+    - false
   - - slab_alloc_factor
     - 1.05
   - - too_long_threshold
@@ -159,6 +161,8 @@ cfg_filter(box.cfg)
     - 1
   - - rows_per_wal
     - 500000
+  - - silent_skip_conflict_rows
+    - false
   - - slab_alloc_factor
     - 1.05
   - - too_long_threshold
diff --git a/test/replication/replica_skip_row.lua b/test/replication/replica_skip_row.lua
new file mode 100644
index 000000000..4a4d99882
--- /dev/null
+++ b/test/replication/replica_skip_row.lua
@@ -0,0 +1,10 @@
+#!/usr/bin/env tarantool
+
+box.cfg({
+    listen              = os.getenv("LISTEN"),
+    replication         = os.getenv("MASTER"),
+    memtx_memory        = 107374182,
+    silent_skip_conflict_rows = true,
+})
+
+require('console').listen(os.getenv('ADMIN'))
diff --git a/test/replication/skip_conflict_row.result b/test/replication/skip_conflict_row.result
new file mode 100644
index 000000000..38ff5e58c
--- /dev/null
+++ b/test/replication/skip_conflict_row.result
@@ -0,0 +1,129 @@
+env = require('test_run')
+---
+...
+test_run = env.new()
+---
+...
+engine = test_run:get_cfg('engine')
+---
+...
+box.schema.user.grant('guest', 'read,write,execute', 'universe')
+---
+...
+box.schema.user.grant('guest', 'replication')
+---
+...
+space = box.schema.space.create('test', {engine = engine});
+---
+...
+index = box.space.test:create_index('primary')
+---
+...
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica_skip_row.lua'")
+---
+- true
+...
+test_run:cmd("start server replica")
+---
+- true
+...
+repl = box.cfg.replication
+---
+...
+box.cfg{replication = ""}
+---
+...
+box.info.status
+---
+- running
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+repl = box.cfg.replication
+---
+...
+box.cfg{replication = ""}
+---
+...
+box.space.test:insert{2}
+---
+- [2]
+...
+box.space.test:insert{1}
+---
+- [1]
+...
+test_run:cmd("switch default")
+---
+- true
+...
+space:insert{1}
+---
+- [1]
+...
+space:select{}
+---
+- - [1]
+...
+box.cfg{replication = repl}
+---
+...
+box.info.status
+---
+- running
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+box.cfg{replication = repl}
+---
+...
+require('fiber').sleep(0.01)
+---
+...
+box.info.replication[1].upstream.message
+---
+- null
+...
+box.info.replication[1].upstream.status
+---
+- follow
+...
+box.space.test:select{}
+---
+- - [1]
+  - [2]
+...
+test_run:cmd("switch default")
+---
+- true
+...
+space:select{}
+---
+- - [1]
+...
+box.info.status
+---
+- running
+...
+-- cleanup
+test_run:cmd("stop server replica")
+---
+- true
+...
+test_run:cmd("cleanup server replica")
+---
+- true
+...
+box.space.test:drop()
+---
+...
+box.schema.user.revoke('guest', 'replication')
+---
+...
+box.schema.user.revoke('guest', 'read,write,execute', 'universe')
+---
+...
diff --git a/test/replication/skip_conflict_row.test.lua b/test/replication/skip_conflict_row.test.lua
new file mode 100644
index 000000000..5cfbff339
--- /dev/null
+++ b/test/replication/skip_conflict_row.test.lua
@@ -0,0 +1,46 @@
+env = require('test_run')
+test_run = env.new()
+engine = test_run:get_cfg('engine')
+
+box.schema.user.grant('guest', 'read,write,execute', 'universe')
+box.schema.user.grant('guest', 'replication')
+
+space = box.schema.space.create('test', {engine = engine});
+index = box.space.test:create_index('primary')
+
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica_skip_row.lua'")
+test_run:cmd("start server replica")
+
+repl = box.cfg.replication
+box.cfg{replication = ""}
+box.info.status
+
+test_run:cmd("switch replica")
+repl = box.cfg.replication
+box.cfg{replication = ""}
+box.space.test:insert{2}
+box.space.test:insert{1}
+
+test_run:cmd("switch default")
+space:insert{1}
+space:select{}
+box.cfg{replication = repl}
+box.info.status
+
+test_run:cmd("switch replica")
+box.cfg{replication = repl}
+require('fiber').sleep(0.01)
+box.info.replication[1].upstream.message
+box.info.replication[1].upstream.status
+box.space.test:select{}
+
+test_run:cmd("switch default")
+space:select{}
+box.info.status
+
+-- cleanup
+test_run:cmd("stop server replica")
+test_run:cmd("cleanup server replica")
+box.space.test:drop()
+box.schema.user.revoke('guest', 'replication')
+box.schema.user.revoke('guest', 'read,write,execute', 'universe')
-- 
2.14.3 (Apple Git-98)

^ permalink raw reply	[flat|nested] 7+ messages in thread

* Re: [PATCH] replication: automatic skip duplicating rows in replication
  2018-04-12 11:01 [PATCH] replication: automatic skip duplicating rows in replication Konstantin Belyavskiy
@ 2018-04-13  8:23 ` Vladimir Davydov
  2018-04-13 11:46   ` [tarantool-patches] " Konstantin Belyavskiy
  0 siblings, 1 reply; 7+ messages in thread
From: Vladimir Davydov @ 2018-04-13  8:23 UTC (permalink / raw)
  To: Konstantin Belyavskiy; +Cc: georgy, tarantool-patches

On Thu, Apr 12, 2018 at 02:01:25PM +0300, Konstantin Belyavskiy wrote:
> ticket: https://github.com/tarantool/tarantool/issues/3270
> branch: https://github.com/tarantool/tarantool/compare/gh-3270-add-skip-conflict-row-option
> 
> In case of attempting to insert a duplicate key, an error ER_TUPLE_FOUND
> occured, which led to disconnect. 
> Introduce new oftion: 'silent_skip_conflict_rows', if set, then error of
> this type will be ignored.
> 
> Closes #3270
> ---
>  src/box/applier.cc                          |  15 +++-
>  src/box/lua/load_cfg.lua                    |   3 +
>  test/app-tap/init_script.result             |  39 +++++----
>  test/box/admin.result                       |   2 +
>  test/box/cfg.result                         |   4 +
>  test/replication/replica_skip_row.lua       |  10 +++
>  test/replication/skip_conflict_row.result   | 129 ++++++++++++++++++++++++++++
>  test/replication/skip_conflict_row.test.lua |  46 ++++++++++
>  8 files changed, 228 insertions(+), 20 deletions(-)
>  create mode 100644 test/replication/replica_skip_row.lua
>  create mode 100644 test/replication/skip_conflict_row.result
>  create mode 100644 test/replication/skip_conflict_row.test.lua
> 
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 9aa951c34..5ebe67e27 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -47,6 +47,7 @@
>  #include "xrow_io.h"
>  #include "error.h"
>  #include "session.h"
> +#include "cfg.h"
>  
>  STRS(applier_state, applier_STATE);
>  
> @@ -505,7 +506,19 @@ applier_subscribe(struct applier *applier)
>  			 */
>  			vclock_follow(&replicaset.vclock, row.replica_id,
>  				      row.lsn);
> -			xstream_write_xc(applier->subscribe_stream, &row);
> +			if (xstream_write(applier->subscribe_stream, &row) != 0) {
> +				struct error *e = diag_last_error(diag_get());
> +				/**
> +				 * Silently skip ER_TUPLE_FOUND error if such
> +				 * option is set in config.
> +				 */
> +				if (e->type == &type_ClientError &&
> +				    box_error_code(e) == ER_TUPLE_FOUND &&

> +				    cfg_geti("silent_skip_conflict_rows"))

cfg_geti() is a heavy operation. I think you should cache the value of
this configuration option in C.

> +					diag_clear(diag_get());
> +				else
> +					diag_raise();
> +			}

What about UPDATE over a non-existent key or vinyl transaction conflict?
Those can also happen due to replication conflicts. May be, we should
ignore all errors of type ClientError?

>  		}
>  		if (applier->state == APPLIER_SYNC ||
>  		    applier->state == APPLIER_FOLLOW)
> diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
> index 89fd7745e..372f00f25 100644
> --- a/src/box/lua/load_cfg.lua
> +++ b/src/box/lua/load_cfg.lua
> @@ -62,6 +62,7 @@ local default_cfg = {
>      feedback_enabled      = true,
>      feedback_host         = "https://feedback.tarantool.io",
>      feedback_interval     = 3600,
> +    silent_skip_conflict_rows = false,

All replication-related options should have replication_ prefix.
What about replication_ignore_conflicts?

> diff --git a/test/replication/skip_conflict_row.test.lua b/test/replication/skip_conflict_row.test.lua
> new file mode 100644
> index 000000000..5cfbff339
> --- /dev/null
> +++ b/test/replication/skip_conflict_row.test.lua
> @@ -0,0 +1,46 @@
> +env = require('test_run')
> +test_run = env.new()
> +engine = test_run:get_cfg('engine')
> +
> +box.schema.user.grant('guest', 'read,write,execute', 'universe')
> +box.schema.user.grant('guest', 'replication')
> +
> +space = box.schema.space.create('test', {engine = engine});
> +index = box.space.test:create_index('primary')
> +
> +test_run:cmd("create server replica with rpl_master=default, script='replication/replica_skip_row.lua'")

AFAIU you don't need to add a new script - you can reuse replica.lua and
set the new configuration option after bootstrap.

> +test_run:cmd("start server replica")
> +
> +repl = box.cfg.replication
> +box.cfg{replication = ""}
> +box.info.status

I don't understand why you turn off replication here.

> +
> +test_run:cmd("switch replica")
> +repl = box.cfg.replication
> +box.cfg{replication = ""}

... and here

> +box.space.test:insert{2}
> +box.space.test:insert{1}
> +
> +test_run:cmd("switch default")
> +space:insert{1}
> +space:select{}
> +box.cfg{replication = repl}
> +box.info.status
> +
> +test_run:cmd("switch replica")
> +box.cfg{replication = repl}
> +require('fiber').sleep(0.01)
> +box.info.replication[1].upstream.message
> +box.info.replication[1].upstream.status
> +box.space.test:select{}
> +
> +test_run:cmd("switch default")
> +space:select{}
> +box.info.status
> +
> +-- cleanup
> +test_run:cmd("stop server replica")
> +test_run:cmd("cleanup server replica")
> +box.space.test:drop()
> +box.schema.user.revoke('guest', 'replication')
> +box.schema.user.revoke('guest', 'read,write,execute', 'universe')

^ permalink raw reply	[flat|nested] 7+ messages in thread

* Re: [tarantool-patches] Re: [PATCH] replication: automatic skip duplicating rows in replication
  2018-04-13  8:23 ` Vladimir Davydov
@ 2018-04-13 11:46   ` Konstantin Belyavskiy
  2018-04-13 12:59     ` Vladimir Davydov
  0 siblings, 1 reply; 7+ messages in thread
From: Konstantin Belyavskiy @ 2018-04-13 11:46 UTC (permalink / raw)
  To: tarantool-patches, Vladimir Davydov, georgy

[-- Attachment #1: Type: text/plain, Size: 5618 bytes --]

Please take a look at newer version.

>Пятница, 13 апреля 2018, 11:24 +03:00 от Vladimir Davydov <vdavydov.dev@gmail.com>:
>
>On Thu, Apr 12, 2018 at 02:01:25PM +0300, Konstantin Belyavskiy wrote:
>> ticket:  https://github.com/tarantool/tarantool/issues/3270
>> branch:  https://github.com/tarantool/tarantool/compare/gh-3270-add-skip-conflict-row-option
>> 
>> In case of attempting to insert a duplicate key, an error ER_TUPLE_FOUND
>> occured, which led to disconnect. 
>> Introduce new oftion: 'silent_skip_conflict_rows', if set, then error of
>> this type will be ignored.
>> 
>> Closes #3270
>> ---
>>  src/box/applier.cc                          |  15 +++-
>>  src/box/lua/load_cfg.lua                    |   3 +
>>  test/app-tap/init_script.result             |  39 +++++----
>>  test/box/admin.result                       |   2 +
>>  test/box/cfg.result                         |   4 +
>>  test/replication/replica_skip_row.lua       |  10 +++
>>  test/replication/skip_conflict_row.result   | 129 ++++++++++++++++++++++++++++
>>  test/replication/skip_conflict_row.test.lua |  46 ++++++++++
>>  8 files changed, 228 insertions(+), 20 deletions(-)
>>  create mode 100644 test/replication/replica_skip_row.lua
>>  create mode 100644 test/replication/skip_conflict_row.result
>>  create mode 100644 test/replication/skip_conflict_row.test.lua
>> 
>> diff --git a/src/box/applier.cc b/src/box/applier.cc
>> index 9aa951c34..5ebe67e27 100644
>> --- a/src/box/applier.cc
>> +++ b/src/box/applier.cc
>> @@ -47,6 +47,7 @@
>>  #include "xrow_io.h"
>>  #include "error.h"
>>  #include "session.h"
>> +#include "cfg.h"
>> 
>>  STRS(applier_state, applier_STATE);
>> 
>> @@ -505,7 +506,19 @@ applier_subscribe(struct applier *applier)
>>  			 */
>>  			vclock_follow(&replicaset.vclock, row.replica_id,
>>  				      row.lsn);
>> -			xstream_write_xc(applier->subscribe_stream, &row);
>> +			if (xstream_write(applier->subscribe_stream, &row) != 0) {
>> +				struct error *e = diag_last_error(diag_get());
>> +				/**
>> +				 * Silently skip ER_TUPLE_FOUND error if such
>> +				 * option is set in config.
>> +				 */
>> +				if (e->type == &type_ClientError &&
>> +				    box_error_code(e) == ER_TUPLE_FOUND &&
>
>> +				    cfg_geti("silent_skip_conflict_rows"))
>
>cfg_geti() is a heavy operation. I think you should cache the value of
>this configuration option in C. 
I hope conflicts will occurs rather rare, over-wise something goes wrong..
So it's not a big overkill.
>
>> +					diag_clear(diag_get());
>> +				else
>> +					diag_raise();
>> +			}
>
>What about UPDATE over a non-existent key or vinyl transaction conflict?
>Those can also happen due to replication conflicts. May be, we should
>ignore all errors of type ClientError?
No, in this ticket I want to solve a certain problem, let's consider other cases in separate ticket.
>
>>  		}
>>  		if (applier->state == APPLIER_SYNC ||
>>  		    applier->state == APPLIER_FOLLOW)
>> diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
>> index 89fd7745e..372f00f25 100644
>> --- a/src/box/lua/load_cfg.lua
>> +++ b/src/box/lua/load_cfg.lua
>> @@ -62,6 +62,7 @@ local default_cfg = {
>>      feedback_enabled      = true,
>>      feedback_host         = " https://feedback.tarantool.io ",
>>      feedback_interval     = 3600,
>> +    silent_skip_conflict_rows = false,
>
>All replication-related options should have replication_ prefix.
>What about replication_ignore_conflicts? 
Replace with "replication_skip_conflict" (from ticket Name)
>> diff --git a/test/replication/skip_conflict_row.test.lua b/test/replication/skip_conflict_row.test.lua
>> new file mode 100644
>> index 000000000..5cfbff339
>> --- /dev/null
>> +++ b/test/replication/skip_conflict_row.test.lua
>> @@ -0,0 +1,46 @@
>> +env = require('test_run')
>> +test_run = env.new()
>> +engine = test_run:get_cfg('engine')
>> +
>> +box.schema.user.grant('guest', 'read,write,execute', 'universe')
>> +box.schema.user.grant('guest', 'replication')
>> +
>> +space = box.schema.space.create('test', {engine = engine});
>> +index = box.space.test:create_index('primary')
>> +
>> +test_run:cmd("create server replica with rpl_master=default, script='replication/replica_skip_row.lua'")
>
>AFAIU you don't need to add a new script - you can reuse replica.lua and
>set the new configuration option after bootstrap. 
Yes, you're right, updated and remove new replica_skip_row.lua
>
>> +test_run:cmd("start server replica")
>> +
>> +repl = box.cfg.replication
>> +box.cfg{replication = ""}
>> +box.info.status
>
>I don't understand why you turn off replication here. 
Fixed, you are right, it's not necessary, since it's not a mesh.
>
>> +
>> +test_run:cmd("switch replica")
>> +repl = box.cfg.replication
>> +box.cfg{replication = ""}
>
>... and here 
Removed
>
>> +box.space.test:insert{2}
>> +box.space.test:insert{1}
>> +
>> +test_run:cmd("switch default")
>> +space:insert{1}
>> +space:select{}
>> +box.cfg{replication = repl}
>> +box.info.status
>> +
>> +test_run:cmd("switch replica")
>> +box.cfg{replication = repl}
>> +require('fiber').sleep(0.01)
>> +box.info.replication[1].upstream.message
>> +box.info.replication[1].upstream.status
>> +box.space.test:select{}
>> +
>> +test_run:cmd("switch default")
>> +space:select{}
>> +box.info.status
>> +
>> +-- cleanup
>> +test_run:cmd("stop server replica")
>> +test_run:cmd("cleanup server replica")
>> +box.space.test:drop()
>> +box.schema.user.revoke('guest', 'replication')
>> +box.schema.user.revoke('guest', 'read,write,execute', 'universe')
>


Best regards,
Konstantin Belyavskiy
k.belyavskiy@tarantool.org

[-- Attachment #2: Type: text/html, Size: 8292 bytes --]

^ permalink raw reply	[flat|nested] 7+ messages in thread

* Re: [tarantool-patches] Re: [PATCH] replication: automatic skip duplicating rows in replication
  2018-04-13 11:46   ` [tarantool-patches] " Konstantin Belyavskiy
@ 2018-04-13 12:59     ` Vladimir Davydov
  0 siblings, 0 replies; 7+ messages in thread
From: Vladimir Davydov @ 2018-04-13 12:59 UTC (permalink / raw)
  To: Konstantin Belyavskiy; +Cc: tarantool-patches, georgy

On Fri, Apr 13, 2018 at 02:46:28PM +0300, Konstantin Belyavskiy wrote:
> Please take a look at newer version.

Once again, please enclose the patch in the email if you've fixed
something and want it to get reviewed.

> From e20b33f032b7f50289f6c7445352220e7ea81101 Mon Sep 17 00:00:00 2001
> From: Konstantin Belyavskiy <k.belyavskiy@tarantool.org>
> Date: Wed, 4 Apr 2018 20:07:49 +0300
> Subject: [PATCH] replication: automatic skip duplicating rows in replication
> 
> In case of attempting to insert a duplicate key, an error ER_TUPLE_FOUND
> occured, which led to disconnect.
> Introduce new oftion: 'silent_skip_conflict_rows', if set, then error of

The option is called replication_skip_conflict now...

> this type will be ignored.
> 
> Closes #3270
> 
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 9aa951c3..6f6970b1 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -47,6 +47,7 @@
>  #include "xrow_io.h"
>  #include "error.h"
>  #include "session.h"
> +#include "cfg.h"
>  
>  STRS(applier_state, applier_STATE);
>  
> @@ -505,7 +506,19 @@ applier_subscribe(struct applier *applier)
>  			 */
>  			vclock_follow(&replicaset.vclock, row.replica_id,
>  				      row.lsn);
> -			xstream_write_xc(applier->subscribe_stream, &row);
> +			if (xstream_write(applier->subscribe_stream, &row) != 0) {
> +				struct error *e = diag_last_error(diag_get());
> +				/**
> +				 * Silently skip ER_TUPLE_FOUND error if such
> +				 * option is set in config.
> +				 */
> +				if (e->type == &type_ClientError &&
> +				    box_error_code(e) == ER_TUPLE_FOUND &&
> +				    cfg_geti("replication_skip_conflict"))
> +					diag_clear(diag_get());
> +				else
> +					diag_raise();
> +			}
>  		}
>  		if (applier->state == APPLIER_SYNC ||
>  		    applier->state == APPLIER_FOLLOW)
> diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
> index 89fd7745..ff1757f8 100644
> --- a/src/box/lua/load_cfg.lua
> +++ b/src/box/lua/load_cfg.lua
> @@ -62,6 +62,7 @@ local default_cfg = {
>      feedback_enabled      = true,
>      feedback_host         = "https://feedback.tarantool.io",
>      feedback_interval     = 3600,
> +    replication_skip_conflict = false,

Nit: please put this new option where other replication options are
defined so that they are all neatly grouped together.

>  }
>  
>  -- types of available options
> @@ -121,6 +122,7 @@ local template_cfg = {
>      feedback_enabled      = 'boolean',
>      feedback_host         = 'string',
>      feedback_interval     = 'number',
> +    replication_skip_conflict = 'boolean',

Ditto

>  }
>  
>  local function normalize_uri(port)
> @@ -192,6 +194,7 @@ local dynamic_cfg = {
>      force_recovery          = function() end,
>      replication_timeout     = private.cfg_set_replication_timeout,
>      replication_connect_quorum = private.cfg_set_replication_connect_quorum,
> +    replication_skip_conflict = function() end,
>  }
>  
>  local dynamic_cfg_skip_at_load = {

> diff --git a/test/replication/skip_conflict_row.test.lua b/test/replication/skip_conflict_row.test.lua
> new file mode 100644
> index 00000000..403b4635
> --- /dev/null
> +++ b/test/replication/skip_conflict_row.test.lua
> @@ -0,0 +1,34 @@
> +env = require('test_run')
> +test_run = env.new()
> +engine = test_run:get_cfg('engine')
> +
> +box.schema.user.grant('guest', 'read,write,execute', 'universe')
> +box.schema.user.grant('guest', 'replication')
> +
> +space = box.schema.space.create('test', {engine = engine});
> +index = box.space.test:create_index('primary')
> +
> +test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
> +test_run:cmd("start server replica")
> +test_run:cmd("switch replica")
> +box.cfg{replication_skip_conflict = true}
> +box.space.test:insert{1}
> +
> +test_run:cmd("switch default")
> +space:insert{1}

I think you should insert a different tuple (say {1, 1}) and check that
it doesn't overwrite the old one. Also, I think you should insert more
tuples and see that the replication is still working after stumbling
upon a conflicting row.

> +box.info.status
> +
> +test_run:cmd("switch replica")

> +require('fiber').sleep(0.01)

Using sleep like this is racy. Use wait_vclock instead, please.

> +box.info.replication[1].upstream.message
> +box.info.replication[1].upstream.status
> +
> +test_run:cmd("switch default")
> +box.info.status
> +
> +-- cleanup
> +test_run:cmd("stop server replica")
> +test_run:cmd("cleanup server replica")
> +box.space.test:drop()
> +box.schema.user.revoke('guest', 'replication')
> +box.schema.user.revoke('guest', 'read,write,execute', 'universe')

^ permalink raw reply	[flat|nested] 7+ messages in thread

* Re: [PATCH] replication: automatic skip duplicating rows in replication
  2018-04-13 13:44 Konstantin Belyavskiy
  2018-04-13 13:46 ` Vladimir Davydov
@ 2018-04-18 12:30 ` Vladimir Davydov
  1 sibling, 0 replies; 7+ messages in thread
From: Vladimir Davydov @ 2018-04-18 12:30 UTC (permalink / raw)
  To: Konstantin Belyavskiy; +Cc: georgy, tarantool-patches

On Fri, Apr 13, 2018 at 04:44:12PM +0300, Konstantin Belyavskiy wrote:
> ticket: https://github.com/tarantool/tarantool/issues/3270
> branch: https://github.com/tarantool/tarantool/compare/gh-3270-add-skip-conflict-row-option
> 
> In case of attempting to insert a duplicate key, an error ER_TUPLE_FOUND
> occured, which led to disconnect.
> Introduce new oftion: 'replication_skip_conflict', if set, then error of
> this type will be ignored.
> 
> Closes #3270
> ---
>  src/box/applier.cc                          |  15 +++-
>  src/box/lua/load_cfg.lua                    |   3 +
>  test/app-tap/init_script.result             |  45 ++++++------
>  test/box/admin.result                       |   2 +
>  test/box/cfg.result                         |   4 ++
>  test/replication/skip_conflict_row.result   | 105 ++++++++++++++++++++++++++++
>  test/replication/skip_conflict_row.test.lua |  37 ++++++++++
>  7 files changed, 188 insertions(+), 23 deletions(-)
>  create mode 100644 test/replication/skip_conflict_row.result
>  create mode 100644 test/replication/skip_conflict_row.test.lua

Pushed to 1.10.

^ permalink raw reply	[flat|nested] 7+ messages in thread

* Re: [PATCH] replication: automatic skip duplicating rows in replication
  2018-04-13 13:44 Konstantin Belyavskiy
@ 2018-04-13 13:46 ` Vladimir Davydov
  2018-04-18 12:30 ` Vladimir Davydov
  1 sibling, 0 replies; 7+ messages in thread
From: Vladimir Davydov @ 2018-04-13 13:46 UTC (permalink / raw)
  To: Konstantin Belyavskiy; +Cc: georgy, tarantool-patches

On Fri, Apr 13, 2018 at 04:44:12PM +0300, Konstantin Belyavskiy wrote:
> ticket: https://github.com/tarantool/tarantool/issues/3270
> branch: https://github.com/tarantool/tarantool/compare/gh-3270-add-skip-conflict-row-option
> 
> In case of attempting to insert a duplicate key, an error ER_TUPLE_FOUND
> occured, which led to disconnect.
> Introduce new oftion: 'replication_skip_conflict', if set, then error of
> this type will be ignored.
> 
> Closes #3270
> ---
>  src/box/applier.cc                          |  15 +++-
>  src/box/lua/load_cfg.lua                    |   3 +
>  test/app-tap/init_script.result             |  45 ++++++------
>  test/box/admin.result                       |   2 +
>  test/box/cfg.result                         |   4 ++
>  test/replication/skip_conflict_row.result   | 105 ++++++++++++++++++++++++++++
>  test/replication/skip_conflict_row.test.lua |  37 ++++++++++
>  7 files changed, 188 insertions(+), 23 deletions(-)
>  create mode 100644 test/replication/skip_conflict_row.result
>  create mode 100644 test/replication/skip_conflict_row.test.lua

Looks OK to me.

^ permalink raw reply	[flat|nested] 7+ messages in thread

* [PATCH] replication: automatic skip duplicating rows in replication
@ 2018-04-13 13:44 Konstantin Belyavskiy
  2018-04-13 13:46 ` Vladimir Davydov
  2018-04-18 12:30 ` Vladimir Davydov
  0 siblings, 2 replies; 7+ messages in thread
From: Konstantin Belyavskiy @ 2018-04-13 13:44 UTC (permalink / raw)
  To: vdavydov, georgy; +Cc: tarantool-patches

ticket: https://github.com/tarantool/tarantool/issues/3270
branch: https://github.com/tarantool/tarantool/compare/gh-3270-add-skip-conflict-row-option

In case of attempting to insert a duplicate key, an error ER_TUPLE_FOUND
occured, which led to disconnect.
Introduce new oftion: 'replication_skip_conflict', if set, then error of
this type will be ignored.

Closes #3270
---
 src/box/applier.cc                          |  15 +++-
 src/box/lua/load_cfg.lua                    |   3 +
 test/app-tap/init_script.result             |  45 ++++++------
 test/box/admin.result                       |   2 +
 test/box/cfg.result                         |   4 ++
 test/replication/skip_conflict_row.result   | 105 ++++++++++++++++++++++++++++
 test/replication/skip_conflict_row.test.lua |  37 ++++++++++
 7 files changed, 188 insertions(+), 23 deletions(-)
 create mode 100644 test/replication/skip_conflict_row.result
 create mode 100644 test/replication/skip_conflict_row.test.lua

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 9aa951c34..6f6970b13 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -47,6 +47,7 @@
 #include "xrow_io.h"
 #include "error.h"
 #include "session.h"
+#include "cfg.h"
 
 STRS(applier_state, applier_STATE);
 
@@ -505,7 +506,19 @@ applier_subscribe(struct applier *applier)
 			 */
 			vclock_follow(&replicaset.vclock, row.replica_id,
 				      row.lsn);
-			xstream_write_xc(applier->subscribe_stream, &row);
+			if (xstream_write(applier->subscribe_stream, &row) != 0) {
+				struct error *e = diag_last_error(diag_get());
+				/**
+				 * Silently skip ER_TUPLE_FOUND error if such
+				 * option is set in config.
+				 */
+				if (e->type == &type_ClientError &&
+				    box_error_code(e) == ER_TUPLE_FOUND &&
+				    cfg_geti("replication_skip_conflict"))
+					diag_clear(diag_get());
+				else
+					diag_raise();
+			}
 		}
 		if (applier->state == APPLIER_SYNC ||
 		    applier->state == APPLIER_FOLLOW)
diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
index 89fd7745e..3a5a6d46a 100644
--- a/src/box/lua/load_cfg.lua
+++ b/src/box/lua/load_cfg.lua
@@ -59,6 +59,7 @@ local default_cfg = {
     replication_sync_lag = 10,
     replication_connect_timeout = 4,
     replication_connect_quorum = nil, -- connect all
+    replication_skip_conflict = false,
     feedback_enabled      = true,
     feedback_host         = "https://feedback.tarantool.io",
     feedback_interval     = 3600,
@@ -118,6 +119,7 @@ local template_cfg = {
     replication_sync_lag = 'number',
     replication_connect_timeout = 'number',
     replication_connect_quorum = 'number',
+    replication_skip_conflict = 'boolean',
     feedback_enabled      = 'boolean',
     feedback_host         = 'string',
     feedback_interval     = 'number',
@@ -192,6 +194,7 @@ local dynamic_cfg = {
     force_recovery          = function() end,
     replication_timeout     = private.cfg_set_replication_timeout,
     replication_connect_quorum = private.cfg_set_replication_connect_quorum,
+    replication_skip_conflict = function() end,
 }
 
 local dynamic_cfg_skip_at_load = {
diff --git a/test/app-tap/init_script.result b/test/app-tap/init_script.result
index ae01b488d..5625f1466 100644
--- a/test/app-tap/init_script.result
+++ b/test/app-tap/init_script.result
@@ -25,28 +25,29 @@ box.cfg
 20	read_only:false
 21	readahead:16320
 22	replication_connect_timeout:4
-23	replication_sync_lag:10
-24	replication_timeout:1
-25	rows_per_wal:500000
-26	slab_alloc_factor:1.05
-27	too_long_threshold:0.5
-28	vinyl_bloom_fpr:0.05
-29	vinyl_cache:134217728
-30	vinyl_dir:.
-31	vinyl_max_tuple_size:1048576
-32	vinyl_memory:134217728
-33	vinyl_page_size:8192
-34	vinyl_range_size:1073741824
-35	vinyl_read_threads:1
-36	vinyl_run_count_per_level:2
-37	vinyl_run_size_ratio:3.5
-38	vinyl_timeout:60
-39	vinyl_write_threads:2
-40	wal_dir:.
-41	wal_dir_rescan_delay:2
-42	wal_max_size:268435456
-43	wal_mode:write
-44	worker_pool_threads:4
+23	replication_skip_conflict:false
+24	replication_sync_lag:10
+25	replication_timeout:1
+26	rows_per_wal:500000
+27	slab_alloc_factor:1.05
+28	too_long_threshold:0.5
+29	vinyl_bloom_fpr:0.05
+30	vinyl_cache:134217728
+31	vinyl_dir:.
+32	vinyl_max_tuple_size:1048576
+33	vinyl_memory:134217728
+34	vinyl_page_size:8192
+35	vinyl_range_size:1073741824
+36	vinyl_read_threads:1
+37	vinyl_run_count_per_level:2
+38	vinyl_run_size_ratio:3.5
+39	vinyl_timeout:60
+40	vinyl_write_threads:2
+41	wal_dir:.
+42	wal_dir_rescan_delay:2
+43	wal_max_size:268435456
+44	wal_mode:write
+45	worker_pool_threads:4
 --
 -- Test insert from detached fiber
 --
diff --git a/test/box/admin.result b/test/box/admin.result
index 296976629..2168c3adb 100644
--- a/test/box/admin.result
+++ b/test/box/admin.result
@@ -62,6 +62,8 @@ cfg_filter(box.cfg)
     - 16320
   - - replication_connect_timeout
     - 4
+  - - replication_skip_conflict
+    - false
   - - replication_sync_lag
     - 10
   - - replication_timeout
diff --git a/test/box/cfg.result b/test/box/cfg.result
index 717fa31c9..28449d9cc 100644
--- a/test/box/cfg.result
+++ b/test/box/cfg.result
@@ -58,6 +58,8 @@ cfg_filter(box.cfg)
     - 16320
   - - replication_connect_timeout
     - 4
+  - - replication_skip_conflict
+    - false
   - - replication_sync_lag
     - 10
   - - replication_timeout
@@ -153,6 +155,8 @@ cfg_filter(box.cfg)
     - 16320
   - - replication_connect_timeout
     - 4
+  - - replication_skip_conflict
+    - false
   - - replication_sync_lag
     - 10
   - - replication_timeout
diff --git a/test/replication/skip_conflict_row.result b/test/replication/skip_conflict_row.result
new file mode 100644
index 000000000..bf794db5a
--- /dev/null
+++ b/test/replication/skip_conflict_row.result
@@ -0,0 +1,105 @@
+env = require('test_run')
+---
+...
+test_run = env.new()
+---
+...
+engine = test_run:get_cfg('engine')
+---
+...
+box.schema.user.grant('guest', 'read,write,execute', 'universe')
+---
+...
+box.schema.user.grant('guest', 'replication')
+---
+...
+space = box.schema.space.create('test', {engine = engine});
+---
+...
+index = box.space.test:create_index('primary')
+---
+...
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+---
+- true
+...
+test_run:cmd("start server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+box.cfg{replication_skip_conflict = true}
+---
+...
+box.space.test:insert{1}
+---
+- [1]
+...
+test_run:cmd("switch default")
+---
+- true
+...
+space:insert{1, 1}
+---
+- [1, 1]
+...
+space:insert{2}
+---
+- [2]
+...
+box.info.status
+---
+- running
+...
+vclock = test_run:get_vclock('default')
+---
+...
+_ = test_run:wait_vclock("replica", vclock)
+---
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+box.info.replication[1].upstream.message
+---
+- null
+...
+box.info.replication[1].upstream.status
+---
+- follow
+...
+box.space.test:select()
+---
+- - [1]
+  - [2]
+...
+test_run:cmd("switch default")
+---
+- true
+...
+box.info.status
+---
+- running
+...
+-- cleanup
+test_run:cmd("stop server replica")
+---
+- true
+...
+test_run:cmd("cleanup server replica")
+---
+- true
+...
+box.space.test:drop()
+---
+...
+box.schema.user.revoke('guest', 'replication')
+---
+...
+box.schema.user.revoke('guest', 'read,write,execute', 'universe')
+---
+...
diff --git a/test/replication/skip_conflict_row.test.lua b/test/replication/skip_conflict_row.test.lua
new file mode 100644
index 000000000..695cce9db
--- /dev/null
+++ b/test/replication/skip_conflict_row.test.lua
@@ -0,0 +1,37 @@
+env = require('test_run')
+test_run = env.new()
+engine = test_run:get_cfg('engine')
+
+box.schema.user.grant('guest', 'read,write,execute', 'universe')
+box.schema.user.grant('guest', 'replication')
+
+space = box.schema.space.create('test', {engine = engine});
+index = box.space.test:create_index('primary')
+
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+test_run:cmd("start server replica")
+test_run:cmd("switch replica")
+box.cfg{replication_skip_conflict = true}
+box.space.test:insert{1}
+
+test_run:cmd("switch default")
+space:insert{1, 1}
+space:insert{2}
+box.info.status
+
+vclock = test_run:get_vclock('default')
+_ = test_run:wait_vclock("replica", vclock)
+test_run:cmd("switch replica")
+box.info.replication[1].upstream.message
+box.info.replication[1].upstream.status
+box.space.test:select()
+
+test_run:cmd("switch default")
+box.info.status
+
+-- cleanup
+test_run:cmd("stop server replica")
+test_run:cmd("cleanup server replica")
+box.space.test:drop()
+box.schema.user.revoke('guest', 'replication')
+box.schema.user.revoke('guest', 'read,write,execute', 'universe')
-- 
2.14.3 (Apple Git-98)

^ permalink raw reply	[flat|nested] 7+ messages in thread

end of thread, other threads:[~2018-04-18 12:30 UTC | newest]

Thread overview: 7+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2018-04-12 11:01 [PATCH] replication: automatic skip duplicating rows in replication Konstantin Belyavskiy
2018-04-13  8:23 ` Vladimir Davydov
2018-04-13 11:46   ` [tarantool-patches] " Konstantin Belyavskiy
2018-04-13 12:59     ` Vladimir Davydov
2018-04-13 13:44 Konstantin Belyavskiy
2018-04-13 13:46 ` Vladimir Davydov
2018-04-18 12:30 ` Vladimir Davydov

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox