From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mail-lj1-f195.google.com (mail-lj1-f195.google.com [209.85.208.195]) (using TLSv1.2 with cipher ECDHE-RSA-AES128-GCM-SHA256 (128/128 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 21B1E445320 for ; Wed, 22 Jul 2020 18:34:50 +0300 (MSK) Received: by mail-lj1-f195.google.com with SMTP id z24so2946463ljn.8 for ; Wed, 22 Jul 2020 08:34:50 -0700 (PDT) From: Cyrill Gorcunov Date: Wed, 22 Jul 2020 18:33:58 +0300 Message-Id: <20200722153359.134718-5-gorcunov@gmail.com> In-Reply-To: <20200722153359.134718-1-gorcunov@gmail.com> References: <20200722153359.134718-1-gorcunov@gmail.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH 4/5] qsync: implement direct write of confirm/rollback into a journal List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: tml Cc: Vladislav Shpilevoy When we need to write CONFIRM or ROLLBACK message (which is just a binary record in msgpack format) into a journal we use txn code to allocate a new transaction, encode there a message and pass it to walk the long txn path before it hit the journal. This is not only resource wasting but also somehow strange from arhitectural point of view. Instead lets encode a record on the stack and write it directly to the journal. Closes #5129 Signed-off-by: Cyrill Gorcunov --- src/box/iproto_constants.h | 24 ++++++++++ src/box/txn_limbo.c | 93 +++++++++++++++++++++----------------- src/box/xrow.c | 46 ++++--------------- src/box/xrow.h | 31 ++++--------- 4 files changed, 94 insertions(+), 100 deletions(-) diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h index 6b850f101..8f0c06981 100644 --- a/src/box/iproto_constants.h +++ b/src/box/iproto_constants.h @@ -328,6 +328,30 @@ iproto_type_is_synchro_request(uint32_t type) return type == IPROTO_CONFIRM || type == IPROTO_ROLLBACK; } +/** CONFIRM/ROLLBACK entries encoded in MsgPack. */ +struct PACKED request_synchro_body { + uint8_t m_body; + uint8_t k_replica_id; + uint8_t m_replica_id; + uint32_t v_replica_id; + uint8_t k_lsn; + uint8_t m_lsn; + uint64_t v_lsn; +}; + +static inline void +request_synchro_body_create(struct request_synchro_body *body, + uint32_t replica_id, int64_t lsn) +{ + body->m_body = 0x80 | 2; + body->k_replica_id = IPROTO_REPLICA_ID; + body->m_replica_id = 0xce; + body->v_replica_id = mp_bswap_u32(replica_id); + body->k_lsn = IPROTO_LSN; + body->m_lsn = 0xcf; + body->v_lsn = mp_bswap_u64(lsn); +} + /** This is an error. */ static inline bool iproto_type_is_error(uint32_t type) diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c index a3936c569..de043f53d 100644 --- a/src/box/txn_limbo.c +++ b/src/box/txn_limbo.c @@ -32,6 +32,9 @@ #include "txn_limbo.h" #include "replication.h" +#include "iproto_constants.h" +#include "journal.h" + struct txn_limbo txn_limbo; static inline void @@ -238,62 +241,70 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry) return 0; } +static void +txn_limbo_write_cb(struct journal_entry *entry) +{ + /* + * Since we don't know from which context + * we will be called (real wal engine or + * some other non-context switching) we + * might not need to wake up. + */ + if (entry->complete_data != fiber()) + fiber_wakeup(entry->complete_data); +} + +/** + * Write CONFIRM or ROLLBACK message to a journal directly + * without involving transaction engine because using txn + * engine is far from being cheap while we only need to + * write a small message. + */ static int -txn_limbo_write_confirm_rollback(struct txn_limbo *limbo, int64_t lsn, - bool is_confirm) +txn_limbo_write(uint32_t replica_id, int64_t lsn, int type) { + assert(replica_id != REPLICA_ID_NIL); + assert(type == IPROTO_CONFIRM || type == IPROTO_ROLLBACK); assert(lsn > 0); - struct xrow_header row; - struct request request = { - .header = &row, - }; + char buf[sizeof(struct journal_entry) + + sizeof(struct xrow_header *) + + sizeof(struct xrow_header)]; - struct txn *txn = txn_begin(); - if (txn == NULL) - return -1; + struct journal_entry *entry = (void *)buf; + struct xrow_header *row = (void *)&entry->rows[1]; + entry->rows[0] = row; - int res = 0; - if (is_confirm) { - res = xrow_encode_confirm(&row, &txn->region, - limbo->instance_id, lsn); - } else { - /* - * This LSN is the first to be rolled back, so - * the last "safe" lsn is lsn - 1. - */ - res = xrow_encode_rollback(&row, &txn->region, - limbo->instance_id, lsn); + struct request_synchro_body body; + xrow_encode_confirm_rollback(row, &body, replica_id, + lsn, type); + + journal_entry_create(entry, 1, xrow_approx_len(row), + txn_limbo_write_cb, fiber()); + + if (journal_write(entry) != 0) { + diag_set(ClientError, ER_WAL_IO); + diag_log(); + return -1; } - if (res == -1) - goto rollback; - /* - * This is not really a transaction. It just uses txn API - * to put the data into WAL. And obviously it should not - * go to the limbo and block on the very same sync - * transaction which it tries to confirm now. - */ - txn_set_flag(txn, TXN_FORCE_ASYNC); - if (txn_begin_stmt(txn, NULL) != 0) - goto rollback; - if (txn_commit_stmt(txn, &request) != 0) - goto rollback; + if (entry->res < 0) { + diag_set(ClientError, ER_WAL_IO); + diag_log(); + return -1; + } - return txn_commit(txn); -rollback: - txn_rollback(txn); - return -1; + return 0; } /** * Write a confirmation entry to WAL. After it's written all the * transactions waiting for confirmation may be finished. */ -static int +static inline int txn_limbo_write_confirm(struct txn_limbo *limbo, int64_t lsn) { - return txn_limbo_write_confirm_rollback(limbo, lsn, true); + return txn_limbo_write(limbo->instance_id, lsn, IPROTO_CONFIRM); } void @@ -339,10 +350,10 @@ txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn) * transactions following the current one and waiting for * confirmation must be rolled back. */ -static int +static inline int txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn) { - return txn_limbo_write_confirm_rollback(limbo, lsn, false); + return txn_limbo_write(limbo->instance_id, lsn, IPROTO_ROLLBACK); } void diff --git a/src/box/xrow.c b/src/box/xrow.c index 0c797a9d5..bba4ea9e2 100644 --- a/src/box/xrow.c +++ b/src/box/xrow.c @@ -893,51 +893,23 @@ xrow_encode_dml(const struct request *request, struct region *region, return iovcnt; } -static int -xrow_encode_confirm_rollback(struct xrow_header *row, struct region *region, - uint32_t replica_id, int64_t lsn, int type) +void +xrow_encode_confirm_rollback(struct xrow_header *row, + struct request_synchro_body *body, + uint32_t replica_id, int64_t lsn, + int type) { - size_t len = mp_sizeof_map(2) + mp_sizeof_uint(IPROTO_REPLICA_ID) + - mp_sizeof_uint(replica_id) + mp_sizeof_uint(IPROTO_LSN) + - mp_sizeof_uint(lsn); - char *buf = (char *)region_alloc(region, len); - if (buf == NULL) { - diag_set(OutOfMemory, len, "region_alloc", "buf"); - return -1; - } - char *pos = buf; + assert(type == IPROTO_CONFIRM || type == IPROTO_ROLLBACK); - pos = mp_encode_map(pos, 2); - pos = mp_encode_uint(pos, IPROTO_REPLICA_ID); - pos = mp_encode_uint(pos, replica_id); - pos = mp_encode_uint(pos, IPROTO_LSN); - pos = mp_encode_uint(pos, lsn); + request_synchro_body_create(body, replica_id, lsn); memset(row, 0, sizeof(*row)); - row->body[0].iov_base = buf; - row->body[0].iov_len = len; + row->body[0].iov_base = body; + row->body[0].iov_len = sizeof(*body); row->bodycnt = 1; row->type = type; - - return 0; -} - -int -xrow_encode_confirm(struct xrow_header *row, struct region *region, - uint32_t replica_id, int64_t lsn) -{ - return xrow_encode_confirm_rollback(row, region, replica_id, lsn, - IPROTO_CONFIRM); -} - -int -xrow_encode_rollback(struct xrow_header *row, struct region *region, - uint32_t replica_id, int64_t lsn) -{ - return xrow_encode_confirm_rollback(row, region, replica_id, lsn, - IPROTO_ROLLBACK); } static int diff --git a/src/box/xrow.h b/src/box/xrow.h index e21ede5a3..68fb4e8ef 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -54,6 +54,7 @@ enum { IPROTO_SELECT_HEADER_LEN = IPROTO_HEADER_LEN + 7, }; +struct request_synchro_body; struct region; struct xrow_header { @@ -216,18 +217,18 @@ xrow_encode_dml(const struct request *request, struct region *region, struct iovec *iov); /** - * Encode the CONFIRM to row body and set row type to - * IPROTO_CONFIRM. + * Encode the CONFIRM or ROLLBACK to row body. * @param row xrow header. - * @param region Region to use to encode the confirmation body. + * @param body encoded body. * @param replica_id master's instance id. * @param lsn last confirmed lsn. - * @retval -1 on error. - * @retval 0 success. + * @param type IPROTO_CONFIRM or IPROTO_ROLLBACK. */ -int -xrow_encode_confirm(struct xrow_header *row, struct region *region, - uint32_t replica_id, int64_t lsn); +void +xrow_encode_confirm_rollback(struct xrow_header *row, + struct request_synchro_body *body, + uint32_t replica_id, int64_t lsn, + int type); /** * Decode the CONFIRM request body. @@ -240,20 +241,6 @@ xrow_encode_confirm(struct xrow_header *row, struct region *region, int xrow_decode_confirm(struct xrow_header *row, uint32_t *replica_id, int64_t *lsn); -/** - * Encode the ROLLBACK row body and set row type to - * IPROTO_ROLLBACK. - * @param row xrow header. - * @param region Region to use to encode the rollback body. - * @param replica_id master's instance id. - * @param lsn lsn to rollback from, including it. - * @retval -1 on error. - * @retval 0 success. - */ -int -xrow_encode_rollback(struct xrow_header *row, struct region *region, - uint32_t replica_id, int64_t lsn); - /** * Decode the ROLLBACK row body. * @param row xrow header. -- 2.26.2