From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 0851B272A6 for ; Thu, 7 Feb 2019 12:25:33 -0500 (EST) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id uwcKFv9ccRo3 for ; Thu, 7 Feb 2019 12:25:32 -0500 (EST) Received: from smtp16.mail.ru (smtp16.mail.ru [94.100.176.153]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 95622272AD for ; Thu, 7 Feb 2019 12:25:32 -0500 (EST) From: Georgy Kirichenko Subject: [tarantool-patches] [PATCH v2 2/2] Promote replicaset.vclock only after wal Date: Thu, 7 Feb 2019 20:27:31 +0300 Message-Id: <9a66ddf15aea56b09b6821b40f46b546c253d9da.1549556742.git.georgy@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: tarantool-patches@freelists.org Cc: Georgy Kirichenko Applier used to promote vclock prior to applying the row. This lead to a situation when master's row would be skipped forever in case there is an error trying to apply it. However, some errors are transient, and we might be able to successfully apply the same row later. While we're at it, make wal writer the only one responsible for advancing replicaset vclock. It was already doing it for rows coming from the local instance, besides, it makes the code cleaner since now we want to advance vclock direct from wal batch reply and lets us get rid of unnecessary checks whether applier or wal has already advanced the vclock. Closes #2283 Prerequisite #980 --- src/box/applier.cc | 46 ++++++--------- src/box/wal.c | 43 ++++---------- test/box/errinj.result | 56 ++++++++++++++---- test/replication/skip_conflict_row.result | 63 +++++++++++++++++++++ test/replication/skip_conflict_row.test.lua | 20 +++++++ 5 files changed, 159 insertions(+), 69 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index 21d2e6bcb..cae71ec1c 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -512,34 +512,20 @@ applier_subscribe(struct applier *applier) applier->lag = ev_now(loop()) - row.tm; applier->last_row_time = ev_monotonic_now(loop()); - - if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) { - /** - * Promote the replica set vclock before - * applying the row. If there is an - * exception (conflict) applying the row, - * the row is skipped when the replication - * is resumed. - */ - vclock_follow_xrow(&replicaset.vclock, &row); - struct replica *replica = replica_by_id(row.replica_id); - struct latch *latch = (replica ? &replica->order_latch : - &replicaset.applier.order_latch); - /* - * In a full mesh topology, the same set - * of changes may arrive via two - * concurrently running appliers. Thanks - * to vclock_follow() above, the first row - * in the set will be skipped - but the - * remaining may execute out of order, - * when the following xstream_write() - * yields on WAL. Hence we need a latch to - * strictly order all changes which belong - * to the same server id. - */ - latch_lock(latch); + struct replica *replica = replica_by_id(row.replica_id); + struct latch *latch = (replica ? &replica->order_latch : + &replicaset.applier.order_latch); + /* + * In a full mesh topology, the same set + * of changes may arrive via two + * concurrently running appliers. + * Hence we need a latch to strictly order all + * changes which belong to the same server id. + */ + latch_lock(latch); + if (vclock_get(&replicaset.vclock, + row.replica_id) < row.lsn) { int res = xstream_write(applier->subscribe_stream, &row); - latch_unlock(latch); if (res != 0) { struct error *e = diag_last_error(diag_get()); /** @@ -550,10 +536,14 @@ applier_subscribe(struct applier *applier) box_error_code(e) == ER_TUPLE_FOUND && replication_skip_conflict) diag_clear(diag_get()); - else + else { + latch_unlock(latch); diag_raise(); + } } } + latch_unlock(latch); + if (applier->state == APPLIER_SYNC || applier->state == APPLIER_FOLLOW) fiber_cond_signal(&applier->writer_cond); diff --git a/src/box/wal.c b/src/box/wal.c index 966f3bfb9..6d6dda390 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -171,6 +171,8 @@ struct wal_msg { * be rolled back. */ struct stailq rollback; + /** vclock after the batch processed. */ + struct vclock vclock; }; /** @@ -209,6 +211,7 @@ wal_msg_create(struct wal_msg *batch) batch->approx_len = 0; stailq_create(&batch->commit); stailq_create(&batch->rollback); + vclock_create(&batch->vclock); } static struct wal_msg * @@ -284,6 +287,8 @@ tx_schedule_commit(struct cmsg *msg) /* Closes the input valve. */ stailq_concat(&writer->rollback, &batch->rollback); } + /* Update the tx vclock to the latest written by wal. */ + vclock_copy(&replicaset.vclock, &batch->vclock); tx_schedule_queue(&batch->commit); } @@ -1033,6 +1038,12 @@ done: error_log(error); diag_clear(diag_get()); } + /* + * Remember the vclock of the last successfully written row so + * that we can update replicaset.vclock once this message gets + * back to tx. + */ + vclock_copy(&wal_msg->vclock, &writer->vclock); /* * We need to start rollback from the first request * following the last committed request. If @@ -1164,31 +1175,6 @@ wal_write(struct journal *journal, struct journal_entry *entry) bool cancellable = fiber_set_cancellable(false); fiber_yield(); /* Request was inserted. */ fiber_set_cancellable(cancellable); - if (entry->res > 0) { - struct xrow_header **last = entry->rows + entry->n_rows - 1; - while (last >= entry->rows) { - /* - * Find last row from local instance id - * and promote vclock. - */ - if ((*last)->replica_id == instance_id) { - /* - * In master-master configuration, during sudden - * power-loss, if the data have not been written - * to WAL but have already been sent to others, - * they will send the data back. In this case - * vclock has already been promoted by applier. - */ - if (vclock_get(&replicaset.vclock, - instance_id) < (*last)->lsn) { - vclock_follow_xrow(&replicaset.vclock, - *last); - } - break; - } - --last; - } - } return entry->res; } @@ -1198,12 +1184,7 @@ wal_write_in_wal_mode_none(struct journal *journal, { struct wal_writer *writer = (struct wal_writer *) journal; wal_assign_lsn(&writer->vclock, entry->rows, entry->rows + entry->n_rows); - int64_t old_lsn = vclock_get(&replicaset.vclock, instance_id); - int64_t new_lsn = vclock_get(&writer->vclock, instance_id); - if (new_lsn > old_lsn) { - /* There were local writes, promote vclock. */ - vclock_follow(&replicaset.vclock, instance_id, new_lsn); - } + vclock_copy(&replicaset.vclock, &writer->vclock); return vclock_sum(&writer->vclock); } diff --git a/test/box/errinj.result b/test/box/errinj.result index 1d9a16d8d..9a797916c 100644 --- a/test/box/errinj.result +++ b/test/box/errinj.result @@ -141,31 +141,70 @@ errinj.set("ERRINJ_TESTING", false) --- - ok ... -env = require('test_run') +-- Check how well we handle a failed log write +errinj.set("ERRINJ_WAL_IO", true) --- +- ok ... -test_run = env.new() +space:insert{1} --- +- error: Failed to write to disk ... -lsn1 = box.info.vclock[box.info.id] +space:get{1} --- ... --- Check how well we handle a failed log write -errinj.set("ERRINJ_WAL_IO", true) +errinj.set("ERRINJ_WAL_IO", false) --- - ok ... space:insert{1} --- +- [1] +... +-- Check vclock was promoted only one time +errinj.set("ERRINJ_WAL_IO", true) +--- +- ok +... +space:update(1, {{'=', 2, 2}}) +--- - error: Failed to write to disk ... space:get{1} --- +- [1] +... +space:get{2} +--- ... errinj.set("ERRINJ_WAL_IO", false) --- - ok ... +space:update(1, {{'=', 2, 2}}) +--- +- [1, 2] +... +-- Check vclock was promoted only two times +space:truncate() +--- +... +lsn1 = box.info.vclock[box.info.id] +--- +... +-- Check how well we handle a failed log write +errinj.set("ERRINJ_WAL_WRITE_PARTIAL", 0) +--- +- ok +... +space:insert{1} +--- +- error: Failed to write to disk +... +errinj.set("ERRINJ_WAL_WRITE_PARTIAL", -1) +--- +- ok +... space:insert{1} --- - [1] @@ -175,7 +214,7 @@ box.info.vclock[box.info.id] == lsn1 + 1 --- - true ... -errinj.set("ERRINJ_WAL_IO", true) +errinj.set("ERRINJ_WAL_WRITE_PARTIAL", 0) --- - ok ... @@ -187,10 +226,7 @@ space:get{1} --- - [1] ... -space:get{2} ---- -... -errinj.set("ERRINJ_WAL_IO", false) +errinj.set("ERRINJ_WAL_WRITE_PARTIAL", -1) --- - ok ... diff --git a/test/replication/skip_conflict_row.result b/test/replication/skip_conflict_row.result index 6ca13b472..0c45e15e2 100644 --- a/test/replication/skip_conflict_row.result +++ b/test/replication/skip_conflict_row.result @@ -82,6 +82,69 @@ box.info.status --- - running ... +-- test that if replication_skip_conflict is off vclock +-- is not advanced on errors. +test_run:cmd("restart server replica") +--- +- true +... +test_run:cmd("switch replica") +--- +- true +... +box.space.test:insert{3} +--- +- [3] +... +lsn1 = box.info.vclock[1] +--- +... +test_run:cmd("switch default") +--- +- true +... +box.space.test:insert{3, 3} +--- +- [3, 3] +... +box.space.test:insert{4} +--- +- [4] +... +test_run:cmd("switch replica") +--- +- true +... +-- lsn is not promoted +lsn1 == box.info.vclock[1] +--- +- true +... +box.info.replication[1].upstream.message +--- +- Duplicate key exists in unique index 'primary' in space 'test' +... +box.info.replication[1].upstream.status +--- +- stopped +... +test_run:cmd("switch default") +--- +- true +... +test_run:cmd("restart server replica") +--- +- true +... +-- applier is not in follow state +box.info.replication[1].upstream.message +--- +- Duplicate key exists in unique index 'primary' in space 'test' +... +test_run:cmd("switch default") +--- +- true +... -- cleanup test_run:cmd("stop server replica") --- diff --git a/test/replication/skip_conflict_row.test.lua b/test/replication/skip_conflict_row.test.lua index 4406ced95..7eed4073c 100644 --- a/test/replication/skip_conflict_row.test.lua +++ b/test/replication/skip_conflict_row.test.lua @@ -28,6 +28,26 @@ box.space.test:select() test_run:cmd("switch default") box.info.status +-- test that if replication_skip_conflict is off vclock +-- is not advanced on errors. +test_run:cmd("restart server replica") +test_run:cmd("switch replica") +box.space.test:insert{3} +lsn1 = box.info.vclock[1] +test_run:cmd("switch default") +box.space.test:insert{3, 3} +box.space.test:insert{4} +test_run:cmd("switch replica") +-- lsn is not promoted +lsn1 == box.info.vclock[1] +box.info.replication[1].upstream.message +box.info.replication[1].upstream.status +test_run:cmd("switch default") +test_run:cmd("restart server replica") +-- applier is not in follow state +box.info.replication[1].upstream.message +test_run:cmd("switch default") + -- cleanup test_run:cmd("stop server replica") test_run:cmd("cleanup server replica") -- 2.20.1