From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 030532436A for ; Tue, 22 Jan 2019 05:29:35 -0500 (EST) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id 9t5cdE_OZYqf for ; Tue, 22 Jan 2019 05:29:34 -0500 (EST) Received: from smtp32.i.mail.ru (smtp32.i.mail.ru [94.100.177.92]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 868ED24318 for ; Tue, 22 Jan 2019 05:29:34 -0500 (EST) From: Georgy Kirichenko Subject: [tarantool-patches] [PATCH v2 2/5] Update replicaset vclock from wal Date: Tue, 22 Jan 2019 13:31:10 +0300 Message-Id: In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: tarantool-patches@freelists.org Cc: Georgy Kirichenko Journal maintains replicaset vclock for recovery, local and replicated operations. Introduce replicaset.applier.vclock to prevent applier races. Prerequisite #980 --- src/box/applier.cc | 68 +++++++++++++++++++++--------------- src/box/box.cc | 3 +- src/box/replication.cc | 1 + src/box/replication.h | 3 ++ src/box/vclock.c | 14 ++++++++ src/box/vclock.h | 3 ++ src/box/wal.c | 38 ++++---------------- test/xlog-py/dup_key.result | 12 +++++-- test/xlog-py/dup_key.test.py | 18 +++++++--- 9 files changed, 93 insertions(+), 67 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index 21d2e6bcb..87873e970 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -139,7 +139,7 @@ applier_writer_f(va_list ap) continue; try { struct xrow_header xrow; - xrow_encode_vclock(&xrow, &replicaset.vclock); + xrow_encode_vclock(&xrow, &replicaset.applier.vclock); coio_write_xrow(&io, &xrow); } catch (SocketError *e) { /* @@ -300,7 +300,7 @@ applier_join(struct applier *applier) * Used to initialize the replica's initial * vclock in bootstrap_from_master() */ - xrow_decode_vclock_xc(&row, &replicaset.vclock); + xrow_decode_vclock_xc(&row, &replicaset.applier.vclock); } applier_set_state(applier, APPLIER_INITIAL_JOIN); @@ -326,7 +326,8 @@ applier_join(struct applier *applier) * vclock yet, do it now. In 1.7+ * this vclock is not used. */ - xrow_decode_vclock_xc(&row, &replicaset.vclock); + xrow_decode_vclock_xc(&row, + &replicaset.applier.vclock); } break; /* end of stream */ } else if (iproto_type_is_error(row.type)) { @@ -336,6 +337,7 @@ applier_join(struct applier *applier) (uint32_t) row.type); } } + vclock_copy(&replicaset.vclock, &replicaset.applier.vclock); say_info("initial data received"); applier_set_state(applier, APPLIER_FINAL_JOIN); @@ -355,7 +357,7 @@ applier_join(struct applier *applier) coio_read_xrow(coio, ibuf, &row); applier->last_row_time = ev_monotonic_now(loop()); if (iproto_type_is_dml(row.type)) { - vclock_follow_xrow(&replicaset.vclock, &row); + vclock_follow_xrow(&replicaset.applier.vclock, &row); xstream_write_xc(applier->subscribe_stream, &row); if (++row_count % 100000 == 0) say_info("%.1fM rows received", row_count / 1e6); @@ -386,6 +388,9 @@ applier_subscribe(struct applier *applier) { assert(applier->subscribe_stream != NULL); + if (!vclock_is_set(&replicaset.applier.vclock)) + vclock_copy(&replicaset.applier.vclock, &replicaset.vclock); + /* Send SUBSCRIBE request */ struct ev_io *coio = &applier->io; struct ibuf *ibuf = &applier->ibuf; @@ -470,19 +475,6 @@ applier_subscribe(struct applier *applier) applier_set_state(applier, APPLIER_FOLLOW); } - /* - * Stay 'orphan' until appliers catch up with - * the remote vclock at the time of SUBSCRIBE - * and the lag is less than configured. - */ - if (applier->state == APPLIER_SYNC && - applier->lag <= replication_sync_lag && - vclock_compare(&remote_vclock_at_subscribe, - &replicaset.vclock) <= 0) { - /* Applier is synced, switch to "follow". */ - applier_set_state(applier, APPLIER_FOLLOW); - } - /* * Tarantool < 1.7.7 does not send periodic heartbeat * messages so we can't assume that if we haven't heard @@ -512,16 +504,18 @@ applier_subscribe(struct applier *applier) applier->lag = ev_now(loop()) - row.tm; applier->last_row_time = ev_monotonic_now(loop()); - - if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) { - /** - * Promote the replica set vclock before - * applying the row. If there is an - * exception (conflict) applying the row, - * the row is skipped when the replication - * is resumed. - */ - vclock_follow_xrow(&replicaset.vclock, &row); + if (vclock_get(&replicaset.applier.vclock, + row.replica_id) < row.lsn) { + if (row.replica_id == instance_id && + vclock_get(&replicaset.vclock, instance_id) < + row.lsn) { + /* Local row returned back. */ + goto done; + } + /* Preserve old lsn value. */ + int64_t old_lsn = vclock_get(&replicaset.applier.vclock, + row.replica_id); + vclock_follow_xrow(&replicaset.applier.vclock, &row); struct replica *replica = replica_by_id(row.replica_id); struct latch *latch = (replica ? &replica->order_latch : &replicaset.applier.order_latch); @@ -550,10 +544,28 @@ applier_subscribe(struct applier *applier) box_error_code(e) == ER_TUPLE_FOUND && replication_skip_conflict) diag_clear(diag_get()); - else + else { + /* Rollback lsn to have a chance for a retry. */ + vclock_set(&replicaset.applier.vclock, + row.replica_id, old_lsn); diag_raise(); + } } } +done: + /* + * Stay 'orphan' until appliers catch up with + * the remote vclock at the time of SUBSCRIBE + * and the lag is less than configured. + */ + if (applier->state == APPLIER_SYNC && + applier->lag <= replication_sync_lag && + vclock_compare(&remote_vclock_at_subscribe, + &replicaset.vclock) <= 0) { + /* Applier is synced, switch to "follow". */ + applier_set_state(applier, APPLIER_FOLLOW); + } + if (applier->state == APPLIER_SYNC || applier->state == APPLIER_FOLLOW) fiber_cond_signal(&applier->writer_cond); diff --git a/src/box/box.cc b/src/box/box.cc index 9f2fd6da1..0df0875dd 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -292,6 +292,7 @@ recovery_journal_write(struct journal *base, struct journal_entry * /* entry */) { struct recovery_journal *journal = (struct recovery_journal *) base; + vclock_copy(&replicaset.vclock, journal->vclock); return vclock_sum(journal->vclock); } @@ -1809,7 +1810,7 @@ bootstrap_from_master(struct replica *master) */ engine_begin_final_recovery_xc(); struct recovery_journal journal; - recovery_journal_create(&journal, &replicaset.vclock); + recovery_journal_create(&journal, &replicaset.applier.vclock); journal_set(&journal.base); applier_resume_to_state(applier, APPLIER_JOINED, TIMEOUT_INFINITY); diff --git a/src/box/replication.cc b/src/box/replication.cc index 2cb4ec0f8..51e08886c 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -90,6 +90,7 @@ replication_init(void) fiber_cond_create(&replicaset.applier.cond); replicaset.replica_by_id = (struct replica **)calloc(VCLOCK_MAX, sizeof(struct replica *)); latch_create(&replicaset.applier.order_latch); + vclock_create(&replicaset.applier.vclock); } void diff --git a/src/box/replication.h b/src/box/replication.h index 2ac620d86..b9aebed14 100644 --- a/src/box/replication.h +++ b/src/box/replication.h @@ -194,6 +194,9 @@ struct replicaset { struct vclock vclock; /** Applier state. */ struct { + /** + * Vclock sent to process from appliers. */ + struct vclock vclock; /** * Total number of replicas with attached * appliers. diff --git a/src/box/vclock.c b/src/box/vclock.c index b5eb2800b..c297d1ff9 100644 --- a/src/box/vclock.c +++ b/src/box/vclock.c @@ -36,6 +36,20 @@ #include "diag.h" +void +vclock_set(struct vclock *vclock, uint32_t replica_id, int64_t lsn) +{ + assert(lsn >= 0); + assert(replica_id < VCLOCK_MAX); + int64_t prev_lsn = vclock->lsn[replica_id]; + if (lsn > 0) + vclock->map |= 1 << replica_id; + else + vclock->map &= ~(1 << replica_id); + vclock->lsn[replica_id] = lsn; + vclock->signature += lsn - prev_lsn; +} + int64_t vclock_follow(struct vclock *vclock, uint32_t replica_id, int64_t lsn) { diff --git a/src/box/vclock.h b/src/box/vclock.h index 111e29160..d6cb14c2a 100644 --- a/src/box/vclock.h +++ b/src/box/vclock.h @@ -161,6 +161,9 @@ vclock_get(const struct vclock *vclock, uint32_t replica_id) return vclock->lsn[replica_id]; } +void +vclock_set(struct vclock *vclock, uint32_t replica_id, int64_t lsn); + static inline int64_t vclock_inc(struct vclock *vclock, uint32_t replica_id) { diff --git a/src/box/wal.c b/src/box/wal.c index a55b544aa..4c3537672 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -171,6 +171,8 @@ struct wal_msg { * be rolled back. */ struct stailq rollback; + /** vclock after the batch processed. */ + struct vclock vclock; }; /** @@ -209,6 +211,7 @@ wal_msg_create(struct wal_msg *batch) batch->approx_len = 0; stailq_create(&batch->commit); stailq_create(&batch->rollback); + vclock_create(&batch->vclock); } static struct wal_msg * @@ -280,6 +283,7 @@ tx_schedule_commit(struct cmsg *msg) * wal_msg memory disappears after the first * iteration of tx_schedule_queue loop. */ + vclock_copy(&replicaset.vclock, &batch->vclock); if (! stailq_empty(&batch->rollback)) { /* Closes the input valve. */ stailq_concat(&writer->rollback, &batch->rollback); @@ -1028,6 +1032,8 @@ done: error_log(error); diag_clear(diag_get()); } + /* Set resulting vclock. */ + vclock_copy(&wal_msg->vclock, &writer->vclock); /* * We need to start rollback from the first request * following the last committed request. If @@ -1159,31 +1165,6 @@ wal_write(struct journal *journal, struct journal_entry *entry) bool cancellable = fiber_set_cancellable(false); fiber_yield(); /* Request was inserted. */ fiber_set_cancellable(cancellable); - if (entry->res > 0) { - struct xrow_header **last = entry->rows + entry->n_rows - 1; - while (last >= entry->rows) { - /* - * Find last row from local instance id - * and promote vclock. - */ - if ((*last)->replica_id == instance_id) { - /* - * In master-master configuration, during sudden - * power-loss, if the data have not been written - * to WAL but have already been sent to others, - * they will send the data back. In this case - * vclock has already been promoted by applier. - */ - if (vclock_get(&replicaset.vclock, - instance_id) < (*last)->lsn) { - vclock_follow_xrow(&replicaset.vclock, - *last); - } - break; - } - --last; - } - } return entry->res; } @@ -1193,12 +1174,7 @@ wal_write_in_wal_mode_none(struct journal *journal, { struct wal_writer *writer = (struct wal_writer *) journal; wal_assign_lsn(&writer->vclock, entry->rows, entry->rows + entry->n_rows); - int64_t old_lsn = vclock_get(&replicaset.vclock, instance_id); - int64_t new_lsn = vclock_get(&writer->vclock, instance_id); - if (new_lsn > old_lsn) { - /* There were local writes, promote vclock. */ - vclock_follow(&replicaset.vclock, instance_id, new_lsn); - } + vclock_copy(&replicaset.vclock, &writer->vclock); return vclock_sum(&writer->vclock); } diff --git a/test/xlog-py/dup_key.result b/test/xlog-py/dup_key.result index f387e8e89..966fa1f4a 100644 --- a/test/xlog-py/dup_key.result +++ b/test/xlog-py/dup_key.result @@ -16,7 +16,16 @@ box.space.test:insert{2, 'second tuple'} --- - [2, 'second tuple'] ... -.xlog exists +.xlog#1 exists +box.space.test:insert{3, 'third tuple'} +--- +- [3, 'third tuple'] +... +box.space.test:insert{4, 'fourth tuple'} +--- +- [4, 'fourth tuple'] +... +.xlog#2 exists box.space.test:insert{1, 'third tuple'} --- - [1, 'third tuple'] @@ -25,7 +34,6 @@ box.space.test:insert{2, 'fourth tuple'} --- - [2, 'fourth tuple'] ... -.xlog does not exist check log line for 'Duplicate key' 'Duplicate key' exists in server log diff --git a/test/xlog-py/dup_key.test.py b/test/xlog-py/dup_key.test.py index 1c033da40..3dacde771 100644 --- a/test/xlog-py/dup_key.test.py +++ b/test/xlog-py/dup_key.test.py @@ -26,19 +26,27 @@ server.stop() # Save wal#1 if os.access(wal, os.F_OK): - print ".xlog exists" + print ".xlog#1 exists" os.rename(wal, wal_old) -# Write wal#2 +# Write wal#2 to bump lsn +server.start() +server.admin("box.space.test:insert{3, 'third tuple'}") +server.admin("box.space.test:insert{4, 'fourth tuple'}") +server.stop() + +if os.access(wal, os.F_OK): + print ".xlog#2 exists" + +# Write wal#3 - confliction with wal#1 server.start() server.admin("box.space.test:insert{1, 'third tuple'}") server.admin("box.space.test:insert{2, 'fourth tuple'}") server.stop() # Restore wal#1 -if not os.access(wal, os.F_OK): - print ".xlog does not exist" - os.rename(wal_old, wal) +os.unlink(wal) +os.rename(wal_old, wal) server.start() line = 'Duplicate key' -- 2.20.1