From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 924B121572 for ; Wed, 6 Feb 2019 03:28:01 -0500 (EST) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id 3bPtOChDSBYq for ; Wed, 6 Feb 2019 03:28:01 -0500 (EST) Received: from smtp18.mail.ru (smtp18.mail.ru [94.100.176.155]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 2A37821693 for ; Wed, 6 Feb 2019 03:28:01 -0500 (EST) From: Georgy Kirichenko Subject: [tarantool-patches] [PATCH 3/3] Promote replicaset.vclock only after wal Date: Wed, 6 Feb 2019 11:29:59 +0300 Message-Id: <94d6421460682117a3f06d7dc042f0581cdfa0e5.1549441084.git.georgy@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: tarantool-patches@freelists.org Cc: Georgy Kirichenko Get rid if appliers' vclock_follow and promote vclock only after wal write. Closes #2283 Prerequisite #980 --- src/box/applier.cc | 14 ++------ src/box/wal.c | 38 ++++----------------- test/replication/skip_conflict_row.test.lua | 19 +++++++++++ test/xlog-py/dup_key.result | 12 +++++-- test/xlog-py/dup_key.test.py | 23 ++++++++++--- 5 files changed, 57 insertions(+), 49 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index d87b247e2..cae71ec1c 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -518,22 +518,14 @@ applier_subscribe(struct applier *applier) /* * In a full mesh topology, the same set * of changes may arrive via two - * concurrently running appliers. Thanks - * to vclock_follow() above, the first row - * in the set will be skipped - but the - * remaining may execute out of order, - * when the following xstream_write() - * yields on WAL. Hence we need a latch to - * strictly order all changes which belong - * to the same server id. + * concurrently running appliers. + * Hence we need a latch to strictly order all + * changes which belong to the same server id. */ latch_lock(latch); if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) { - vclock_follow_xrow(&replicaset.vclock, &row); - latch_lock(latch); int res = xstream_write(applier->subscribe_stream, &row); - latch_unlock(latch); if (res != 0) { struct error *e = diag_last_error(diag_get()); /** diff --git a/src/box/wal.c b/src/box/wal.c index a55b544aa..4c3537672 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -171,6 +171,8 @@ struct wal_msg { * be rolled back. */ struct stailq rollback; + /** vclock after the batch processed. */ + struct vclock vclock; }; /** @@ -209,6 +211,7 @@ wal_msg_create(struct wal_msg *batch) batch->approx_len = 0; stailq_create(&batch->commit); stailq_create(&batch->rollback); + vclock_create(&batch->vclock); } static struct wal_msg * @@ -280,6 +283,7 @@ tx_schedule_commit(struct cmsg *msg) * wal_msg memory disappears after the first * iteration of tx_schedule_queue loop. */ + vclock_copy(&replicaset.vclock, &batch->vclock); if (! stailq_empty(&batch->rollback)) { /* Closes the input valve. */ stailq_concat(&writer->rollback, &batch->rollback); @@ -1028,6 +1032,8 @@ done: error_log(error); diag_clear(diag_get()); } + /* Set resulting vclock. */ + vclock_copy(&wal_msg->vclock, &writer->vclock); /* * We need to start rollback from the first request * following the last committed request. If @@ -1159,31 +1165,6 @@ wal_write(struct journal *journal, struct journal_entry *entry) bool cancellable = fiber_set_cancellable(false); fiber_yield(); /* Request was inserted. */ fiber_set_cancellable(cancellable); - if (entry->res > 0) { - struct xrow_header **last = entry->rows + entry->n_rows - 1; - while (last >= entry->rows) { - /* - * Find last row from local instance id - * and promote vclock. - */ - if ((*last)->replica_id == instance_id) { - /* - * In master-master configuration, during sudden - * power-loss, if the data have not been written - * to WAL but have already been sent to others, - * they will send the data back. In this case - * vclock has already been promoted by applier. - */ - if (vclock_get(&replicaset.vclock, - instance_id) < (*last)->lsn) { - vclock_follow_xrow(&replicaset.vclock, - *last); - } - break; - } - --last; - } - } return entry->res; } @@ -1193,12 +1174,7 @@ wal_write_in_wal_mode_none(struct journal *journal, { struct wal_writer *writer = (struct wal_writer *) journal; wal_assign_lsn(&writer->vclock, entry->rows, entry->rows + entry->n_rows); - int64_t old_lsn = vclock_get(&replicaset.vclock, instance_id); - int64_t new_lsn = vclock_get(&writer->vclock, instance_id); - if (new_lsn > old_lsn) { - /* There were local writes, promote vclock. */ - vclock_follow(&replicaset.vclock, instance_id, new_lsn); - } + vclock_copy(&replicaset.vclock, &writer->vclock); return vclock_sum(&writer->vclock); } diff --git a/test/replication/skip_conflict_row.test.lua b/test/replication/skip_conflict_row.test.lua index 4406ced95..c60999b9b 100644 --- a/test/replication/skip_conflict_row.test.lua +++ b/test/replication/skip_conflict_row.test.lua @@ -1,5 +1,6 @@ env = require('test_run') test_run = env.new() +test_run:cmd("restart server default with cleanup=1") engine = test_run:get_cfg('engine') box.schema.user.grant('guest', 'replication') @@ -28,6 +29,24 @@ box.space.test:select() test_run:cmd("switch default") box.info.status +-- test that if replication_skip_conflict is off vclock +-- is not advanced on errors. +test_run:cmd("restart server replica") +test_run:cmd("switch replica") +box.cfg{replication_skip_conflict=false} +box.space.test:insert{3} +box.info.vclock +test_run:cmd("switch default") +box.space.test:insert{3, 3} +box.space.test:insert{4} +box.info.vclock +test_run:cmd("switch replica") +box.info.vclock +box.info.replication[1].upstream.message +box.info.replication[1].upstream.status +box.space.test:select() +test_run:cmd("switch default") + -- cleanup test_run:cmd("stop server replica") test_run:cmd("cleanup server replica") diff --git a/test/xlog-py/dup_key.result b/test/xlog-py/dup_key.result index f387e8e89..966fa1f4a 100644 --- a/test/xlog-py/dup_key.result +++ b/test/xlog-py/dup_key.result @@ -16,7 +16,16 @@ box.space.test:insert{2, 'second tuple'} --- - [2, 'second tuple'] ... -.xlog exists +.xlog#1 exists +box.space.test:insert{3, 'third tuple'} +--- +- [3, 'third tuple'] +... +box.space.test:insert{4, 'fourth tuple'} +--- +- [4, 'fourth tuple'] +... +.xlog#2 exists box.space.test:insert{1, 'third tuple'} --- - [1, 'third tuple'] @@ -25,7 +34,6 @@ box.space.test:insert{2, 'fourth tuple'} --- - [2, 'fourth tuple'] ... -.xlog does not exist check log line for 'Duplicate key' 'Duplicate key' exists in server log diff --git a/test/xlog-py/dup_key.test.py b/test/xlog-py/dup_key.test.py index 1c033da40..e25b1d477 100644 --- a/test/xlog-py/dup_key.test.py +++ b/test/xlog-py/dup_key.test.py @@ -22,23 +22,36 @@ wal = os.path.join(vardir, filename) # Create wal#1 server.admin("box.space.test:insert{1, 'first tuple'}") server.admin("box.space.test:insert{2, 'second tuple'}") +lsn2 = int(yaml.load(server.admin("box.info.lsn", silent=True))[0]) server.stop() # Save wal#1 if os.access(wal, os.F_OK): - print ".xlog exists" + print ".xlog#1 exists" os.rename(wal, wal_old) +# drop empty log created on shutdown +filename2 = str(lsn2).zfill(20) + ".xlog" +wal2 = os.path.join(vardir, filename2) +os.unlink(wal2) -# Write wal#2 +# Write wal#2 to bump lsn +server.start() +server.admin("box.space.test:insert{3, 'third tuple'}") +server.admin("box.space.test:insert{4, 'fourth tuple'}") +server.stop() + +if os.access(wal, os.F_OK): + print ".xlog#2 exists" + +# Write wal#3 - confliction with wal#1 server.start() server.admin("box.space.test:insert{1, 'third tuple'}") server.admin("box.space.test:insert{2, 'fourth tuple'}") server.stop() # Restore wal#1 -if not os.access(wal, os.F_OK): - print ".xlog does not exist" - os.rename(wal_old, wal) +os.unlink(wal) +os.rename(wal_old, wal) server.start() line = 'Duplicate key' -- 2.20.1