From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id 70E306ECDB; Mon, 6 Dec 2021 06:04:41 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 70E306ECDB DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1638759881; bh=omyJayG635wanlxzLSfg5yZYGxYEDgAVTlsmZrxA3bE=; h=To:Date:In-Reply-To:References:Subject:List-Id:List-Unsubscribe: List-Archive:List-Post:List-Help:List-Subscribe:From:Reply-To:Cc: From; b=KuXWFKApueUTam5vsYNGbqYeeYxJbXCxkMQj1qcFvx1d1MyMMqaB1wEueAwlGE46S MTeiuO7s0hnPgYlygabv5VWkdyPtREzoGb7h9bDxy3jJZG8Lm3eysMpMpa5xWcVxOp ka24u086x9FWcO3TR+fmTNngpt9aN9hMgn3N/IuU= Received: from smtp38.i.mail.ru (smtp38.i.mail.ru [94.100.177.98]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id A32B76ECDB for ; Mon, 6 Dec 2021 06:03:29 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org A32B76ECDB Received: by smtp38.i.mail.ru with esmtpa (envelope-from ) id 1mu4Ho-0007aS-FN; Mon, 06 Dec 2021 06:03:28 +0300 To: v.shpilevoy@tarantool.org, vdavydov@tarantool.org Date: Mon, 6 Dec 2021 06:03:21 +0300 Message-Id: X-Mailer: git-send-email 2.30.1 (Apple Git-130) In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-4EC0790: 10 X-7564579A: EEAE043A70213CC8 X-77F55803: 4F1203BC0FB41BD93822B471089FF64D2EEAA3714BB05B1726D59E38B3EB6006182A05F5380850402E4F06152191734C83747ECBF7B72C9CD10B5C4BB2FCF8775C8EC557436998D4 X-7FA49CB5: FF5795518A3D127A4AD6D5ED66289B5278DA827A17800CE731D82F3F177D3BCDEA1F7E6F0F101C67BD4B6F7A4D31EC0BCC500DACC3FED6E28638F802B75D45FF8AA50765F79006374B2D40F594293EAD8638F802B75D45FF36EB9D2243A4F8B5A6FCA7DBDB1FC311F39EFFDF887939037866D6147AF826D8BBD76D3539F1C1B201170A8F0C65A6B2117882F4460429724CE54428C33FAD305F5C1EE8F4F765FCF1175FABE1C0F9B6A471835C12D1D9774AD6D5ED66289B52BA9C0B312567BB23117882F44604297287769387670735209ECD01F8117BC8BEA471835C12D1D977C4224003CC836476EB9C4185024447017B076A6E789B0E975F5C1EE8F4F765FCCA90F4063C1091473AA81AA40904B5D9CF19DD082D7633A078D18283394535A93AA81AA40904B5D98AA50765F7900637A28A502E88DD91B5D81D268191BDAD3D698AB9A7B718F8C4D1B931868CE1C5781A620F70A64A45A98AA50765F79006372E808ACE2090B5E1725E5C173C3A84C3C5EA940A35A165FF2DBA43225CD8A89FC152C4E37BA382CE262FEC7FBD7D1F5BB5C8C57E37DE458BEDA766A37F9254B7 X-C1DE0DAB: C20DE7B7AB408E4181F030C43753B8183A4AFAF3EA6BDC44C234C8B12C006B7A07FEF4BC0B92C68BE47836F09BD44F75FA4D7B8573C738FAB1881A6453793CE9C32612AADDFBE061C61BE10805914D3804EBA3D8E7E5B87ABF8C51168CD8EBDBF87214F1A954108EDC48ACC2A39D04F89CDFB48F4795C241BDAD6C7F3747799A X-C8649E89: 4E36BF7865823D7055A7F0CF078B5EC49A30900B95165D346EDE9E12965E8CD59016AD12D2D0EBC8EDB9EAA17E442E91F2F9022B0CA0B7FF93FAEF1A1FED38181D7E09C32AA3244C03C02FAB184F980952489448118CE1B369B6CAE0477E908D927AC6DF5659F194 X-D57D3AED: 3ZO7eAau8CL7WIMRKs4sN3D3tLDjz0dLbV79QFUyzQ2Ujvy7cMT6pYYqY16iZVKkSc3dCLJ7zSJH7+u4VD18S7Vl4ZUrpaVfd2+vE6kuoey4m4VkSEu530nj6fImhcD4MUrOEAnl0W826KZ9Q+tr5ycPtXkTV4k65bRjmOUUP8cvGozZ33TWg5HZplvhhXbhDGzqmQDTd6OAevLeAnq3Ra9uf7zvY2zzsIhlcp/Y7m53TZgf2aB4JOg4gkr2biojbL9S8ysBdXiGu4RiEaSSzl2oD6okK/p+ X-Mailru-Sender: 583F1D7ACE8F49BD7B46BC6C7C9DD5A8AA235155620D99CD64122E2B660E212B228AAE7164CF04C9424AE0EB1F3D1D21E2978F233C3FAE6EE63DB1732555E4A8EE80603BA4A5B0BCB0DAF586E7D11B3E67EA787935ED9F1B X-Mras: Ok Subject: [Tarantool-patches] [PATCH 2/4] applier: reuse input buffer to store row bodies X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: Serge Petrenko via Tarantool-patches Reply-To: Serge Petrenko Cc: tarantool-patches@dev.tarantool.org Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" Applier, the main user of coio_read_xrow(), had to save row bodies elsewhere, because there was no guarantee that the input buffer wouldn't be reused before the body is processed. This changed in the previous commit (xrow: rework coio_read_xrow to keep parsed input), so it's time to take advantage of this change in applier. Stop saving row bodies to fiber gc region, store them right on ibuf instead. Introduce the machinery needed to track ibuf reallocation. Prerequisite #6329 --- src/box/applier.cc | 86 +++++++++++++++++++++++++++++++++------------- 1 file changed, 63 insertions(+), 23 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index 6036c19d9..a8505c93a 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -553,6 +553,22 @@ struct applier_tx_row { struct xrow_header row; }; +/** Defragment the input buffer: move its contents, if any, to its beginning. */ +static inline void +ibuf_defragment(struct ibuf *ibuf) +{ + size_t used = ibuf_used(ibuf); + if (used == 0) { + ibuf_reset(ibuf); + } else { + size_t cap = ibuf_capacity(ibuf); + /* + * Defragment the buffer by reserving all the available space. + */ + ibuf_reserve(ibuf, cap - used); + } +} + static uint64_t applier_wait_register(struct applier *applier, uint64_t row_count) { @@ -589,8 +605,12 @@ applier_wait_register(struct applier *applier, uint64_t row_count) } if (apply_final_join_tx(applier->instance_id, &rows) != 0) diag_raise(); + /* @sa applier_subscribe(). */ + applier->ibuf.rpos = applier->ibuf.xpos; + ibuf_defragment(&applier->ibuf); } + ibuf_reset(&applier->ibuf); return row_count; } @@ -676,7 +696,7 @@ applier_read_tx_row(struct applier *applier, double timeout) ERROR_INJECT_YIELD(ERRINJ_APPLIER_READ_TX_ROW_DELAY); - coio_read_xrow_timeout_xc(io, ibuf, row, timeout); + coio_read_xrow_ex_timeout_xc(io, ibuf, row, timeout); if (row->tm > 0) applier->lag = ev_now(loop()) - row->tm; @@ -722,20 +742,6 @@ set_next_tx_row(struct stailq *rows, struct applier_tx_row *tx_row, int64_t tsn) if (row->is_commit) { /* Signal the caller that we've reached the tx end. */ tsn = 0; - } else if (row->bodycnt == 1) { - /* - * Save row body to gc region. Not done for single-statement - * transactions and the last row of multi-statement transactions - * knowing that the input buffer will not be used while the - * transaction is applied. - */ - void *new_base = region_alloc(&fiber()->gc, row->body->iov_len); - if (new_base == NULL) - tnt_raise(OutOfMemory, row->body->iov_len, "region", - "xrow body"); - memcpy(new_base, row->body->iov_base, row->body->iov_len); - /* Adjust row body pointers. */ - row->body->iov_base = new_base; } stailq_add_tail(rows, &tx_row->next); @@ -744,10 +750,19 @@ set_next_tx_row(struct stailq *rows, struct applier_tx_row *tx_row, int64_t tsn) /** * Read one transaction from network using applier's input buffer. - * Transaction rows are placed onto fiber gc region. - * We could not use applier input buffer to store rows because - * rpos is adjusted as xrow is decoded and the corresponding - * network input space is reused for the next xrow. + * + * The input buffer is reused to store row bodies. The only two problems to + * deal with are sporadic input buffer reallocation and defragmentation. + * We have to adjust row body pointers each time any of the two occur. + * + * Defragmentation is done manually in between the transaction reads, so it + * **never** happens inside this function (ibuf->rpos always points at + * the very beginning of the ibuf). + * + * Speaking of buffer reallocation, it only happens during the "saturation" + * phase, until the input buffer reaches the size big enough to hold a single + * transaction. Moreover, each next reallocation is exponentially less likely + * to happen, because the buffer size is doubled every time. */ static uint64_t applier_read_tx(struct applier *applier, struct stailq *rows, double timeout) @@ -757,8 +772,26 @@ applier_read_tx(struct applier *applier, struct stailq *rows, double timeout) stailq_create(rows); do { + const char *old_rpos = applier->ibuf.rpos; struct applier_tx_row *tx_row = applier_read_tx_row(applier, timeout); + /* Detect ibuf reallocation or defragmentation. */ + ssize_t delta = applier->ibuf.rpos - old_rpos; + if (unlikely(delta != 0)) { + struct applier_tx_row *item; + stailq_foreach_entry(item, rows, next) { + struct xrow_header *row = &item->row; + if (row->bodycnt == 0) + continue; + /* + * The row body's offset relative to ibuf->rpos + * is constant, so they all were moved by the + * same delta as rpos was. + */ + row->body->iov_base = + (char *)row->body->iov_base + delta; + } + } tsn = set_next_tx_row(rows, tx_row, tsn); ++row_count; } while (tsn != 0); @@ -1123,8 +1156,8 @@ nopify:; row = &item->row; row->type = IPROTO_NOP; /* - * Row body is saved to fiber's region and will be freed - * on next fiber_gc() call. + * Row body will be discarded together with the remaining + * input. */ row->bodycnt = 0; } @@ -1434,8 +1467,15 @@ applier_subscribe(struct applier *applier) diag_raise(); } - if (ibuf_used(ibuf) == 0) - ibuf_reset(ibuf); + /* Discard processed input. */ + ibuf->rpos = ibuf->xpos; + /* + * Even though this is not necessary, defragment the buffer + * explicitly. Otherwise the defragmentation would be triggered + * by one of the row reads, resulting in moving a bigger memory + * chunk. + */ + ibuf_defragment(&applier->ibuf); } } -- 2.30.1 (Apple Git-130)