From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladimir Davydov Subject: [PATCH 5/6] wal: separate checkpoint and flush paths Date: Sun, 25 Nov 2018 16:48:12 +0300 Message-Id: <5ba6de6392cadfc93d0399de4a56c4401317b035.1543152574.git.vdavydov.dev@gmail.com> In-Reply-To: References: In-Reply-To: References: To: kostja@tarantool.org Cc: tarantool-patches@freelists.org List-ID: Currently, wal_checkpoint() is used for two purposes. First, to make a checkpoint (rotate = true). Second, to flush all pending WAL requests (rotate = false). Since checkpointing has to fail if cascading rollback is in progress so does flushing. This is confusing. Let's separate the two paths. --- src/box/box.cc | 5 ++-- src/box/vinyl.c | 5 +--- src/box/wal.c | 93 ++++++++++++++++++++++++++------------------------------- src/box/wal.h | 15 +++++++--- 4 files changed, 56 insertions(+), 62 deletions(-) diff --git a/src/box/box.cc b/src/box/box.cc index 8d6e966e..5ea2f014 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -2190,10 +2190,9 @@ box_checkpoint() goto end; struct vclock vclock; - if ((rc = wal_checkpoint(&vclock, true))) { - tnt_error(ClientError, ER_CHECKPOINT_ROLLBACK); + if ((rc = wal_checkpoint(&vclock))) goto end; - } + rc = engine_commit_checkpoint(&vclock); end: if (rc) diff --git a/src/box/vinyl.c b/src/box/vinyl.c index 1794489d..05df1329 100644 --- a/src/box/vinyl.c +++ b/src/box/vinyl.c @@ -1014,10 +1014,7 @@ vy_abort_writers_for_ddl(struct vy_env *env, struct vy_lsm *lsm) * Wait for prepared transactions to complete * (we can't abort them as they reached WAL). */ - struct vclock unused; - if (wal_checkpoint(&unused, false) != 0) - return -1; - + wal_flush(); return 0; } diff --git a/src/box/wal.c b/src/box/wal.c index 11aae5fc..3e6c1e7f 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -461,29 +461,35 @@ wal_thread_stop() wal_writer_destroy(&wal_writer_singleton); } -struct wal_checkpoint +void +wal_flush(void) { - struct cmsg base; - struct vclock *vclock; - struct fiber *fiber; - bool rotate; - int res; + cbus_flush(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe, NULL); +} + +struct wal_checkpoint_msg { + struct cbus_call_msg base; + struct vclock vclock; }; -void -wal_checkpoint_f(struct cmsg *data) +static int +wal_checkpoint_f(struct cbus_call_msg *data) { - struct wal_checkpoint *msg = (struct wal_checkpoint *) data; + struct wal_checkpoint_msg *msg = (struct wal_checkpoint_msg *)data; struct wal_writer *writer = &wal_writer_singleton; if (writer->in_rollback.route != NULL) { - /* We're rolling back a failed write. */ - msg->res = -1; - return; + /* + * We're rolling back a failed write and so + * can't make a checkpoint - see the comment + * in wal_checkpoint() for the explanation. + */ + diag_set(ClientError, ER_CHECKPOINT_ROLLBACK); + return -1; } /* * Avoid closing the current WAL if it has no rows (empty). */ - if (msg->rotate && xlog_is_open(&writer->current_wal) && + if (xlog_is_open(&writer->current_wal) && vclock_sum(&writer->current_wal.meta.vclock) != vclock_sum(&writer->vclock)) { @@ -492,53 +498,38 @@ wal_checkpoint_f(struct cmsg *data) * The next WAL will be created on the first write. */ } - vclock_copy(msg->vclock, &writer->vclock); -} - -void -wal_checkpoint_done_f(struct cmsg *data) -{ - struct wal_checkpoint *msg = (struct wal_checkpoint *) data; - fiber_wakeup(msg->fiber); + vclock_copy(&msg->vclock, &writer->vclock); + return 0; } int -wal_checkpoint(struct vclock *vclock, bool rotate) +wal_checkpoint(struct vclock *vclock) { struct wal_writer *writer = &wal_writer_singleton; - if (! stailq_empty(&writer->rollback)) { - /* - * The writer rollback queue is not empty, - * roll back this transaction immediately. - * This is to ensure we do not accidentally - * commit a transaction which has seen changes - * that will be rolled back. - */ - say_error("Aborting transaction %llu during " - "cascading rollback", - vclock_sum(&writer->vclock)); - return -1; - } if (writer->wal_mode == WAL_NONE) { vclock_copy(vclock, &writer->vclock); return 0; } - static struct cmsg_hop wal_checkpoint_route[] = { - {wal_checkpoint_f, &wal_thread.tx_prio_pipe}, - {wal_checkpoint_done_f, NULL}, - }; - vclock_create(vclock); - struct wal_checkpoint msg; - cmsg_init(&msg.base, wal_checkpoint_route); - msg.vclock = vclock; - msg.fiber = fiber(); - msg.rotate = rotate; - msg.res = 0; - cpipe_push(&wal_thread.wal_pipe, &msg.base); - fiber_set_cancellable(false); - fiber_yield(); - fiber_set_cancellable(true); - return msg.res; + if (!stailq_empty(&writer->rollback)) { + /* + * If cascading rollback is in progress, in-memory + * indexes can contain changes scheduled for rollback. + * If we made a checkpoint, we could write them to + * the snapshot. So we abort checkpointing in this + * case. + */ + diag_set(ClientError, ER_CHECKPOINT_ROLLBACK); + return -1; + } + struct wal_checkpoint_msg msg; + bool cancellable = fiber_set_cancellable(false); + int rc = cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe, + &msg.base, wal_checkpoint_f, NULL, TIMEOUT_INFINITY); + fiber_set_cancellable(cancellable); + if (rc != 0) + return -1; + vclock_copy(vclock, &msg.vclock); + return 0; } struct wal_gc_msg diff --git a/src/box/wal.h b/src/box/wal.h index e4094b1e..7ca27f1a 100644 --- a/src/box/wal.h +++ b/src/box/wal.h @@ -166,13 +166,20 @@ wal_mode(); /** * Wait till all pending changes to the WAL are flushed. - * Rotates the WAL. - * - * @param[out] vclock WAL vclock + */ +void +wal_flush(void); + +/** + * Prepare WAL for checkpointing. * + * This function flushes all pending changes and rotates the + * current WAL. The vclock of the last record written to the + * rotated WAL is returned in @vclock. This is the vclock that + * is supposed to be used to identify the new checkpoint. */ int -wal_checkpoint(struct vclock *vclock, bool rotate); +wal_checkpoint(struct vclock *vclock); /** * Remove WAL files that are not needed by consumers reading -- 2.11.0