From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp33.i.mail.ru (smtp33.i.mail.ru [94.100.177.93]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 412474696C7 for ; Wed, 12 Feb 2020 12:39:25 +0300 (MSK) From: Georgy Kirichenko Date: Wed, 12 Feb 2020 12:39:15 +0300 Message-Id: <1f79b6013791a30b4f70f8679e99be9530e5c0c8.1581500169.git.georgy@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH v4 06/11] wal: extract log write batch into a separate routine List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: tarantool-patches@dev.tarantool.org Introduce a routine which transfers journal entries from an input to an output queue writing them to a xlog file. On xlog output the routine breaks transferring loop and returns writing result code. After this the output queue containing entries which were written to xlog (despite the disk write status) whereas the input queue contains untouched entries. If an input queue is processed without actual xlog write then a xlog file is flushed manually. This refactoring helps to implement wal memory buffer. Part of #980, #3794 --- src/box/wal.c | 87 ++++++++++++++++++++++++++++++++------------------- 1 file changed, 54 insertions(+), 33 deletions(-) diff --git a/src/box/wal.c b/src/box/wal.c index 0ae66ff32..ce15cb459 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -958,6 +958,36 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base, } } +/* + * This function shifts entries from input queue and writes + * them to the current log file until the current log flushes + * or write error happened. All touched entries are moved to + * the output queue. The function returns count of written + * bytes or -1 in case of error. + */ +static ssize_t +wal_write_xlog_batch(struct wal_writer *writer, struct stailq *input, + struct stailq *output, struct vclock *vclock_diff) +{ + struct xlog *l = &writer->current_wal; + ssize_t rc; + do { + struct journal_entry *entry = + stailq_shift_entry(input, struct journal_entry, fifo); + stailq_add_tail(output, &entry->fifo); + + wal_assign_lsn(vclock_diff, &writer->vclock, + entry->rows, entry->rows + entry->n_rows); + entry->res = vclock_sum(vclock_diff) + + vclock_sum(&writer->vclock); + rc = xlog_write_entry(l, entry); + } while (rc == 0 && !stailq_empty(input)); + /* If log was not flushed then flush it explicitly. */ + if (rc == 0) + rc = xlog_flush(l); + return rc; +} + static void wal_write_to_disk(struct cmsg *msg) { @@ -1017,36 +1047,31 @@ wal_write_to_disk(struct cmsg *msg) * of request in xlog file is stored inside `struct journal_entry`. */ - struct xlog *l = &writer->current_wal; - - /* - * Iterate over requests (transactions) - */ - int rc; - struct journal_entry *entry; - struct stailq_entry *last_committed = NULL; - stailq_foreach_entry(entry, &wal_msg->commit, fifo) { - wal_assign_lsn(&vclock_diff, &writer->vclock, - entry->rows, entry->rows + entry->n_rows); - entry->res = vclock_sum(&vclock_diff) + - vclock_sum(&writer->vclock); - rc = xlog_write_entry(l, entry); - if (rc < 0) - goto done; - if (rc > 0) { + struct stailq input; + stailq_create(&input); + stailq_concat(&input, &wal_msg->commit); + struct stailq output; + stailq_create(&output); + while (!stailq_empty(&input)) { + ssize_t rc = wal_write_xlog_batch(writer, &input, &output, + &vclock_diff); + if (rc < 0) { + /* + * Put processed entries and tail of write + * queue to a rollback list. + */ + stailq_concat(&wal_msg->rollback, &output); + stailq_concat(&wal_msg->rollback, &input); + } else { + /* + * Schedule processed entries to commit + * and update the wal vclock. + */ + stailq_concat(&wal_msg->commit, &output); writer->checkpoint_wal_size += rc; - last_committed = &entry->fifo; vclock_merge(&writer->vclock, &vclock_diff); } - /* rc == 0: the write is buffered in xlog_tx */ } - rc = xlog_flush(l); - if (rc < 0) - goto done; - - writer->checkpoint_wal_size += rc; - last_committed = stailq_last(&wal_msg->commit); - vclock_merge(&writer->vclock, &vclock_diff); /* * Notify TX if the checkpoint threshold has been exceeded. @@ -1070,7 +1095,6 @@ wal_write_to_disk(struct cmsg *msg) } } -done: error = diag_last_error(diag_get()); if (error) { /* Until we can pass the error to tx, log it and clear. */ @@ -1090,15 +1114,12 @@ done: * nothing, and need to start rollback from the first * request. Otherwise we rollback from the first request. */ - struct stailq rollback; - stailq_cut_tail(&wal_msg->commit, last_committed, &rollback); - - if (!stailq_empty(&rollback)) { + if (!stailq_empty(&wal_msg->rollback)) { + struct journal_entry *entry; /* Update status of the successfully committed requests. */ - stailq_foreach_entry(entry, &rollback, fifo) + stailq_foreach_entry(entry, &wal_msg->rollback, fifo) entry->res = -1; /* Rollback unprocessed requests */ - stailq_concat(&wal_msg->rollback, &rollback); wal_writer_begin_rollback(writer); } fiber_gc(); -- 2.25.0