From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladimir Davydov Subject: [PATCH 1/9] wal: separate checkpoint and flush paths Date: Wed, 28 Nov 2018 19:14:39 +0300 Message-Id: In-Reply-To: References: In-Reply-To: References: To: kostja@tarantool.org Cc: tarantool-patches@freelists.org List-ID: Currently, wal_checkpoint() is used for two purposes. First, to make a checkpoint (rotate = true). Second, to flush all pending WAL requests (rotate = false). Since checkpointing has to fail if cascading rollback is in progress so does flushing. This is confusing. Let's separate the two paths. While we are at it, let's also rewrite WAL checkpointing using cbus_call instead of cpipe_push as it's a more convenient way of exchanging simple two-hop messages between two threads. --- src/box/box.cc | 5 ++-- src/box/vinyl.c | 5 +--- src/box/wal.c | 91 +++++++++++++++++++++++++++------------------------------ src/box/wal.h | 15 +++++++--- 4 files changed, 57 insertions(+), 59 deletions(-) diff --git a/src/box/box.cc b/src/box/box.cc index 8d6e966e..5ea2f014 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -2190,10 +2190,9 @@ box_checkpoint() goto end; struct vclock vclock; - if ((rc = wal_checkpoint(&vclock, true))) { - tnt_error(ClientError, ER_CHECKPOINT_ROLLBACK); + if ((rc = wal_checkpoint(&vclock))) goto end; - } + rc = engine_commit_checkpoint(&vclock); end: if (rc) diff --git a/src/box/vinyl.c b/src/box/vinyl.c index 1794489d..05df1329 100644 --- a/src/box/vinyl.c +++ b/src/box/vinyl.c @@ -1014,10 +1014,7 @@ vy_abort_writers_for_ddl(struct vy_env *env, struct vy_lsm *lsm) * Wait for prepared transactions to complete * (we can't abort them as they reached WAL). */ - struct vclock unused; - if (wal_checkpoint(&unused, false) != 0) - return -1; - + wal_flush(); return 0; } diff --git a/src/box/wal.c b/src/box/wal.c index 11aae5fc..bc396ca5 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -461,29 +461,39 @@ wal_thread_stop() wal_writer_destroy(&wal_writer_singleton); } +void +wal_flush(void) +{ + struct wal_writer *writer = &wal_writer_singleton; + if (writer->wal_mode == WAL_NONE) + return; + cbus_flush(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe, NULL); +} + struct wal_checkpoint { - struct cmsg base; - struct vclock *vclock; - struct fiber *fiber; - bool rotate; - int res; + struct cbus_call_msg base; + struct vclock vclock; }; -void -wal_checkpoint_f(struct cmsg *data) +static int +wal_checkpoint_f(struct cbus_call_msg *data) { struct wal_checkpoint *msg = (struct wal_checkpoint *) data; struct wal_writer *writer = &wal_writer_singleton; if (writer->in_rollback.route != NULL) { - /* We're rolling back a failed write. */ - msg->res = -1; - return; + /* + * We're rolling back a failed write and so + * can't make a checkpoint - see the comment + * in wal_checkpoint() for the explanation. + */ + diag_set(ClientError, ER_CHECKPOINT_ROLLBACK); + return -1; } /* * Avoid closing the current WAL if it has no rows (empty). */ - if (msg->rotate && xlog_is_open(&writer->current_wal) && + if (xlog_is_open(&writer->current_wal) && vclock_sum(&writer->current_wal.meta.vclock) != vclock_sum(&writer->vclock)) { @@ -492,53 +502,38 @@ wal_checkpoint_f(struct cmsg *data) * The next WAL will be created on the first write. */ } - vclock_copy(msg->vclock, &writer->vclock); -} - -void -wal_checkpoint_done_f(struct cmsg *data) -{ - struct wal_checkpoint *msg = (struct wal_checkpoint *) data; - fiber_wakeup(msg->fiber); + vclock_copy(&msg->vclock, &writer->vclock); + return 0; } int -wal_checkpoint(struct vclock *vclock, bool rotate) +wal_checkpoint(struct vclock *vclock) { struct wal_writer *writer = &wal_writer_singleton; - if (! stailq_empty(&writer->rollback)) { - /* - * The writer rollback queue is not empty, - * roll back this transaction immediately. - * This is to ensure we do not accidentally - * commit a transaction which has seen changes - * that will be rolled back. - */ - say_error("Aborting transaction %llu during " - "cascading rollback", - vclock_sum(&writer->vclock)); - return -1; - } if (writer->wal_mode == WAL_NONE) { vclock_copy(vclock, &writer->vclock); return 0; } - static struct cmsg_hop wal_checkpoint_route[] = { - {wal_checkpoint_f, &wal_thread.tx_prio_pipe}, - {wal_checkpoint_done_f, NULL}, - }; - vclock_create(vclock); + if (!stailq_empty(&writer->rollback)) { + /* + * If cascading rollback is in progress, in-memory + * indexes can contain changes scheduled for rollback. + * If we made a checkpoint, we could write them to + * the snapshot. So we abort checkpointing in this + * case. + */ + diag_set(ClientError, ER_CHECKPOINT_ROLLBACK); + return -1; + } struct wal_checkpoint msg; - cmsg_init(&msg.base, wal_checkpoint_route); - msg.vclock = vclock; - msg.fiber = fiber(); - msg.rotate = rotate; - msg.res = 0; - cpipe_push(&wal_thread.wal_pipe, &msg.base); - fiber_set_cancellable(false); - fiber_yield(); - fiber_set_cancellable(true); - return msg.res; + bool cancellable = fiber_set_cancellable(false); + int rc = cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe, + &msg.base, wal_checkpoint_f, NULL, TIMEOUT_INFINITY); + fiber_set_cancellable(cancellable); + if (rc != 0) + return -1; + vclock_copy(vclock, &msg.vclock); + return 0; } struct wal_gc_msg diff --git a/src/box/wal.h b/src/box/wal.h index e4094b1e..7ca27f1a 100644 --- a/src/box/wal.h +++ b/src/box/wal.h @@ -166,13 +166,20 @@ wal_mode(); /** * Wait till all pending changes to the WAL are flushed. - * Rotates the WAL. - * - * @param[out] vclock WAL vclock + */ +void +wal_flush(void); + +/** + * Prepare WAL for checkpointing. * + * This function flushes all pending changes and rotates the + * current WAL. The vclock of the last record written to the + * rotated WAL is returned in @vclock. This is the vclock that + * is supposed to be used to identify the new checkpoint. */ int -wal_checkpoint(struct vclock *vclock, bool rotate); +wal_checkpoint(struct vclock *vclock); /** * Remove WAL files that are not needed by consumers reading -- 2.11.0