From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id 4C8916ECC0; Mon, 6 Dec 2021 06:05:39 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 4C8916ECC0 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1638759939; bh=0I7qZ8VdbNXgmooLVo1QfPw5FflR4rPTsHpdtvjFlYY=; h=To:Date:In-Reply-To:References:Subject:List-Id:List-Unsubscribe: List-Archive:List-Post:List-Help:List-Subscribe:From:Reply-To:Cc: From; b=MmJPGbLw7j0NMTR1ZERmYoW/0OPd7ugcFOSZvOxCHf+J8hj2zFYU25y6qQ5MpU9Jx ZnnMM0IUJRjXyyyC0WmrU6H0hXbWvraZQMseIsS4R5huVClmHWbQjG+9GrLCz0vffX q1Z6U3O0/WY/YvtA17w+icAmQegy0ej9Q3QZcBT8= Received: from smtp38.i.mail.ru (smtp38.i.mail.ru [94.100.177.98]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id AA41A6ECC0 for ; Mon, 6 Dec 2021 06:03:34 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org AA41A6ECC0 Received: by smtp38.i.mail.ru with esmtpa (envelope-from ) id 1mu4Ht-0007aS-Cc; Mon, 06 Dec 2021 06:03:33 +0300 To: v.shpilevoy@tarantool.org, vdavydov@tarantool.org Date: Mon, 6 Dec 2021 06:03:23 +0300 Message-Id: <890ddf1e50a264e27baf75fca411fb548364410c.1638757827.git.sergepetrenko@tarantool.org> X-Mailer: git-send-email 2.30.1 (Apple Git-130) In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-4EC0790: 10 X-7564579A: EEAE043A70213CC8 X-77F55803: 4F1203BC0FB41BD97D497884E4742A3C90A5601CD53B05361A269E607AC18C7C182A05F5380850404C228DA9ACA6FE2753D27CE61D9101A295C7E8B9D31EB30E8C701887FCFC27D884B951C02DD5C496 X-7FA49CB5: FF5795518A3D127A4AD6D5ED66289B5278DA827A17800CE75C385DEB91CEC222EA1F7E6F0F101C67BD4B6F7A4D31EC0BCC500DACC3FED6E28638F802B75D45FF8AA50765F79006375E280A1EC162AD7D8638F802B75D45FF36EB9D2243A4F8B5A6FCA7DBDB1FC311F39EFFDF887939037866D6147AF826D80459A7A89E161E81297935F2866320B7117882F4460429724CE54428C33FAD305F5C1EE8F4F765FCAA867293B0326636D2E47CDBA5A96583BD4B6F7A4D31EC0BC014FD901B82EE079FA2833FD35BB23D27C277FBC8AE2E8B974A882099E279BDA471835C12D1D977C4224003CC836476EB9C4185024447017B076A6E789B0E975F5C1EE8F4F765FC6D65C3A94496E9DF3AA81AA40904B5D9CF19DD082D7633A078D18283394535A93AA81AA40904B5D98AA50765F7900637A4B4F35B727BC9B1D81D268191BDAD3D698AB9A7B718F8C4D1B931868CE1C5781A620F70A64A45A98AA50765F79006372E808ACE2090B5E1725E5C173C3A84C3C5EA940A35A165FF2DBA43225CD8A89FB26E97DCB74E625235872C767BF85DA2F004C90652538430E4A6367B16DE6309 X-C1DE0DAB: C20DE7B7AB408E4181F030C43753B8183A4AFAF3EA6BDC44E1F4276B809941965478C2A5D31ECACF67D1BE7502388616B79CCB559E5A56659C2B6934AE262D3EE7EAB7254005DCED7532B743992DF240BDC6A1CF3F042BAD6DF99611D93F60EF505D71D783575ABE699F904B3F4130E343918A1A30D5E7FCCB5012B2E24CD356 X-C8649E89: 4E36BF7865823D7055A7F0CF078B5EC49A30900B95165D34E20F2278F74481A18B3D4E06ADB8F3CF478EA9ED76AA96E62283C8A6E25AAA0245497E12B4C28D7D1D7E09C32AA3244CC43DB64841BA1C12EFBD1A3482399C00A8CE788DE6831205927AC6DF5659F194 X-D57D3AED: 3ZO7eAau8CL7WIMRKs4sN3D3tLDjz0dLbV79QFUyzQ2Ujvy7cMT6pYYqY16iZVKkSc3dCLJ7zSJH7+u4VD18S7Vl4ZUrpaVfd2+vE6kuoey4m4VkSEu530nj6fImhcD4MUrOEAnl0W826KZ9Q+tr5ycPtXkTV4k65bRjmOUUP8cvGozZ33TWg5HZplvhhXbhDGzqmQDTd6OAevLeAnq3Ra9uf7zvY2zzsIhlcp/Y7m53TZgf2aB4JOg4gkr2biojbL9S8ysBdXiGu4RiEaSSzg44ivw2YZpU X-Mailru-Sender: 583F1D7ACE8F49BD7B46BC6C7C9DD5A8AA235155620D99CDDB5BF130EFCE4A614B036FAEFFB54D7B424AE0EB1F3D1D21E2978F233C3FAE6EE63DB1732555E4A8EE80603BA4A5B0BCB0DAF586E7D11B3E67EA787935ED9F1B X-Mras: Ok Subject: [Tarantool-patches] [PATCH 4/4] Introduce applier thread X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: Serge Petrenko via Tarantool-patches Reply-To: Serge Petrenko Cc: tarantool-patches@dev.tarantool.org Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" It's reported that in master-slave setup replicas often have higher CPU usage than their replication masters. Moreover, it's easy for a replica to start falling behind the master. The reason for that is the additional work load on replica's tx thread as compared to master. While master typically splits request processing into 2 threads: iproto, which decodes the requests, and tx, which applies them, replica performs both tasks in the tx thread. This is due to replication architecture: replica handles master connection by a single fiber in a tx thread. The fiber first decodes the incoming requests and then applies them. Make it possible to decode the incoming replication stream in a separate thread. This way tx thread doesn't waste processing time on row decoding. Each applier thread may serve several appliers, and the total number of applier threads is controlled by a new configuration option - `replication_num_threads`, with default value `1` (meaning, a single thread is spawned to handle all the appliers. Closes #6329 @TarantoolBot document Title: New configuration option - `replication_num_threads` It's now possible to specify how many threads will be spawned to decode incoming replication streams. Setting the value to anything except 0 (the old default) makes Tarantool spawn a thread per each server in `box.cfg.replication`. The thread handles replication stream decoding, lowering the TX thread CPU load and potentially making the replica better keep up with the master. There are never more replication threads than `replication_num_threads`, and when there are more peers in `box.cfg.replication` than threads, one thread may handle multiple data streams. --- .../unreleased/gh-6329-applier-in-thread.md | 5 + src/box/applier.cc | 840 ++++++++++++++++-- src/box/applier.h | 4 + src/box/box.cc | 2 +- src/box/lua/load_cfg.lua | 2 + src/box/replication.cc | 5 +- src/box/replication.h | 7 +- 7 files changed, 811 insertions(+), 54 deletions(-) create mode 100644 changelogs/unreleased/gh-6329-applier-in-thread.md diff --git a/changelogs/unreleased/gh-6329-applier-in-thread.md b/changelogs/unreleased/gh-6329-applier-in-thread.md new file mode 100644 index 000000000..542b5adf1 --- /dev/null +++ b/changelogs/unreleased/gh-6329-applier-in-thread.md @@ -0,0 +1,5 @@ +## feature/replication + +* Make it possible to decode incoming replication data in a separate thread. Add + the `replication_num_threads` configuration option, which controls how many + threads may be spawned to do the task (gh-6329). diff --git a/src/box/applier.cc b/src/box/applier.cc index 393f0a2fe..ec6c7a6e6 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -57,6 +57,8 @@ #include "txn_limbo.h" #include "journal.h" #include "raft.h" +#include "tt_static.h" +#include "cbus.h" STRS(applier_state, applier_STATE); @@ -161,7 +163,9 @@ static int applier_writer_f(va_list ap) { struct applier *applier = va_arg(ap, struct applier *); - + bool check_sync = va_arg(ap, int); + bool *has_acks_to_send = va_arg(ap, bool *); + struct vclock *vclock = va_arg(ap, struct vclock *); /* ID is permanent while applier is alive */ uint32_t replica_id = applier->instance_id; @@ -171,7 +175,7 @@ applier_writer_f(va_list ap) * messages so we don't need to send ACKs every * replication_timeout seconds any more. */ - if (!applier->has_acks_to_send) { + if (!*has_acks_to_send) { if (applier->version_id >= version_id(1, 7, 7)) fiber_cond_wait_timeout(&applier->writer_cond, TIMEOUT_INFINITY); @@ -185,16 +189,18 @@ applier_writer_f(va_list ap) * update an applier status because the applier state could * yield and doesn't fit into a commit trigger. */ - applier_check_sync(applier); + if (check_sync) { + applier_check_sync(applier); + /* Send ACKs only when in FOLLOW mode ,*/ + if (applier->state != APPLIER_SYNC && + applier->state != APPLIER_FOLLOW) + continue; + } - /* Send ACKs only when in FOLLOW mode ,*/ - if (applier->state != APPLIER_SYNC && - applier->state != APPLIER_FOLLOW) - continue; try { - applier->has_acks_to_send = false; + *has_acks_to_send = false; struct xrow_header xrow; - xrow_encode_vclock(&xrow, &replicaset.vclock); + xrow_encode_vclock(&xrow, vclock); /* * For relay lag statistics we report last * written transaction timestamp in tm field. @@ -295,24 +301,30 @@ process_nop(struct request *request) } static int -apply_row(struct xrow_header *row) +apply_request(struct request *request) { - struct request request; - assert(!iproto_type_is_synchro_request(row->type)); - if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0) - return -1; - if (request.type == IPROTO_NOP) - return process_nop(&request); - struct space *space = space_cache_find(request.space_id); + if (request->type == IPROTO_NOP) + return process_nop(request); + struct space *space = space_cache_find(request->space_id); if (space == NULL) return -1; - if (box_process_rw(&request, space, NULL) != 0) { - say_error("error applying row: %s", request_str(&request)); + if (box_process_rw(request, space, NULL) != 0) { + say_error("error applying row: %s", request_str(request)); return -1; } return 0; } +static int +apply_row(struct xrow_header *row) +{ + struct request request; + assert(!iproto_type_is_synchro_request(row->type)); + if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0) + return -1; + return apply_request(&request); +} + /** * Connect to a remote host and authenticate the client. */ @@ -543,6 +555,15 @@ applier_read_tx(struct applier *applier, struct stailq *rows, double timeout); static int apply_final_join_tx(uint32_t replica_id, struct stailq *rows); +union applier_request { + struct request dml; + struct synchro_request synchro; + struct { + struct raft_request req; + struct vclock vclock; + } raft; +}; + /** * A helper struct to link xrow objects in a list. */ @@ -551,6 +572,15 @@ struct applier_tx_row { struct stailq_entry next; /* xrow_header struct for the current transaction row. */ struct xrow_header row; + /* The decoded request, if any. */ + union applier_request decoded[]; +}; + +struct applier_tx { + /** A link in tx list. */ + struct stailq_entry next; + /** The transaction rows. */ + struct stailq rows; }; /** Defragment the input buffer: move its contents, if any, to its beginning. */ @@ -691,7 +721,6 @@ applier_read_tx_row(struct applier *applier, double timeout) if (tx_row == NULL) tnt_raise(OutOfMemory, size, "region_alloc_object", "tx_row"); - struct xrow_header *row = &tx_row->row; ERROR_INJECT_YIELD(ERRINJ_APPLIER_READ_TX_ROW_DELAY); @@ -761,8 +790,9 @@ set_next_tx_row(struct stailq *rows, struct applier_tx_row *tx_row, int64_t tsn) * * Speaking of buffer reallocation, it only happens during the "saturation" * phase, until the input buffer reaches the size big enough to hold a single - * transaction. Moreover, each next reallocation is exponentially less likely - * to happen, because the buffer size is doubled every time. + * transaction (or a batch of transactions). Moreover, each next reallocation + * is exponentially less likely to happen, because the buffer size is doubled + * every time. */ static uint64_t applier_read_tx(struct applier *applier, struct stailq *rows, double timeout) @@ -895,7 +925,7 @@ struct synchro_entry { * Async write journal completion. */ static void -apply_synchro_row_cb(struct journal_entry *entry) +apply_synchro_req_cb(struct journal_entry *entry) { assert(entry->complete_data != NULL); struct synchro_entry *synchro_entry = @@ -910,16 +940,9 @@ apply_synchro_row_cb(struct journal_entry *entry) fiber_wakeup(synchro_entry->owner); } -/** Process a synchro request. */ static int -apply_synchro_row(uint32_t replica_id, struct xrow_header *row) +apply_synchro_req(uint32_t replica_id, struct xrow_header *row, struct synchro_request *req) { - assert(iproto_type_is_synchro_request(row->type)); - - struct synchro_request req; - if (xrow_decode_synchro(row, &req) != 0) - goto err; - struct replica_cb_data rcb_data; struct synchro_entry entry; /* @@ -930,8 +953,8 @@ apply_synchro_row(uint32_t replica_id, struct xrow_header *row) rows = entry.base.rows; rows[0] = row; journal_entry_create(&entry.base, 1, xrow_approx_len(row), - apply_synchro_row_cb, &entry); - entry.req = &req; + apply_synchro_req_cb, &entry); + entry.req = req; entry.owner = fiber(); rcb_data.replica_id = replica_id; @@ -968,26 +991,48 @@ err: return -1; } +/** Process a synchro request. */ static int -applier_handle_raft(struct applier *applier, struct xrow_header *row) +apply_synchro_row(uint32_t replica_id, struct xrow_header *row) { - assert(iproto_type_is_raft_request(row->type)); + assert(iproto_type_is_synchro_request(row->type)); + + struct synchro_request req; + if (xrow_decode_synchro(row, &req) != 0) { + diag_log(); + return -1; + } + + return apply_synchro_req(replica_id, row, &req); +} + +static int +applier_handle_raft_request(struct applier *applier, struct raft_request *req) +{ + if (applier->instance_id == 0) { diag_set(ClientError, ER_PROTOCOL, "Can't apply a Raft request " "from an instance without an ID"); return -1; } + return box_raft_process(req, applier->instance_id); +} + +static int +applier_handle_raft(struct applier *applier, struct xrow_header *row) +{ + assert(iproto_type_is_raft_request(row->type)); struct raft_request req; struct vclock candidate_clock; if (xrow_decode_raft(row, &req, &candidate_clock) != 0) return -1; - return box_raft_process(&req, applier->instance_id); + return applier_handle_raft_request(applier, &req); } static int apply_plain_tx(uint32_t replica_id, struct stailq *rows, - bool skip_conflict, bool use_triggers) + bool skip_conflict, bool use_triggers, bool decoded) { /* * Explicitly begin the transaction so that we can @@ -1002,7 +1047,8 @@ apply_plain_tx(uint32_t replica_id, struct stailq *rows, stailq_foreach_entry(item, rows, next) { struct xrow_header *row = &item->row; - int res = apply_row(row); + int res = decoded ? apply_request(&item->decoded->dml) : + apply_row(row); if (res != 0 && skip_conflict) { struct error *e = diag_last_error(diag_get()); /* @@ -1101,7 +1147,7 @@ apply_final_join_tx(uint32_t replica_id, struct stailq *rows) assert(first_row == last_row); rc = apply_synchro_row(replica_id, first_row); } else { - rc = apply_plain_tx(replica_id, rows, false, false); + rc = apply_plain_tx(replica_id, rows, false, false, false); } fiber_gc(); return rc; @@ -1115,7 +1161,7 @@ apply_final_join_tx(uint32_t replica_id, struct stailq *rows) * The rows are replaced with NOPs to preserve the vclock consistency. */ static void -applier_synchro_filter_tx(struct stailq *rows) +applier_synchro_filter_tx(struct stailq *rows, bool decoded) { /* * XXX: in case raft is disabled, synchronous replication still works @@ -1160,6 +1206,8 @@ nopify:; * input. */ row->bodycnt = 0; + if (decoded) + item->decoded->dml.type = IPROTO_NOP; } } @@ -1169,7 +1217,7 @@ nopify:; * Return 0 for success or -1 in case of an error. */ static int -applier_apply_tx(struct applier *applier, struct stailq *rows) +applier_apply_tx(struct applier *applier, struct stailq *rows, bool decoded) { /* * Initially we've been filtering out data if it came from @@ -1187,8 +1235,10 @@ applier_apply_tx(struct applier *applier, struct stailq *rows) * Finally we dropped such "sender" filtration and use transaction * "initiator" filtration via xrow->replica_id only. */ - struct xrow_header *first_row = &stailq_first_entry(rows, - struct applier_tx_row, next)->row; + struct applier_tx_row *txr = stailq_first_entry(rows, + struct applier_tx_row, + next); + struct xrow_header *first_row = &txr->row; struct xrow_header *last_row; last_row = &stailq_last_entry(rows, struct applier_tx_row, next)->row; struct replica *replica = replica_by_id(first_row->replica_id); @@ -1225,7 +1275,7 @@ applier_apply_tx(struct applier *applier, struct stailq *rows) } } } - applier_synchro_filter_tx(rows); + applier_synchro_filter_tx(rows, decoded); if (unlikely(iproto_type_is_synchro_request(first_row->type))) { /* * Synchro messages are not transactions, in terms @@ -1233,10 +1283,15 @@ applier_apply_tx(struct applier *applier, struct stailq *rows) * each other. */ assert(first_row == last_row); - rc = apply_synchro_row(applier->instance_id, first_row); + if (decoded) { + rc = apply_synchro_req(applier->instance_id, &txr->row, + &txr->decoded->synchro); + } else { + rc = apply_synchro_row(applier->instance_id, first_row); + } } else { rc = apply_plain_tx(applier->instance_id, rows, - replication_skip_conflict, true); + replication_skip_conflict, true, decoded); } if (rc != 0) goto finish; @@ -1256,7 +1311,8 @@ finish: static inline void applier_signal_ack(struct applier *applier) { - fiber_cond_signal(&applier->writer_cond); + if (applier->writer != NULL) + fiber_cond_signal(&applier->writer_cond); applier->has_acks_to_send = true; } @@ -1290,6 +1346,682 @@ applier_on_rollback(struct trigger *trigger, void *event) return 0; } +/** The underlying thread behind a number of appliers. */ +struct applier_thread { + /** A link in allocated threads list. */ + struct rlist in_list; + struct cord cord; + /** The single thread endpoint. */ + struct cbus_endpoint endpoint; + /** A pre-allocated message emitted by the thread once it's exiting. */ + struct cmsg join_msg; + /** Count of appliers attached to this thread. */ + int n_appliers; + /** Whether the thread has no appliers and has to be joined. */ + bool is_exiting; +}; + +/** + * Notify tx that the applier thread has detached its last applier and has to + * be joined. + */ +static void +applier_thread_set_joinable(struct cmsg *msg) +{ + struct applier_thread *thread = container_of(msg, struct applier_thread, + join_msg); + thread->is_exiting = true; +} + +static const struct cmsg_hop join_msg_route[] = { + {applier_thread_set_joinable, NULL}, +}; + +struct applier_detach_msg { + struct cmsg base; + struct diag diag; +}; + +/** + * Propagate the exception causing the applier death to tx from applier + * thread. + */ +static void +applier_detach_thread(struct cmsg *base) +{ + struct applier_detach_msg *msg = (struct applier_detach_msg *)base; + assert(!diag_is_empty(&msg->diag)); + diag_move(&msg->diag, &fiber()->diag); + diag_raise(); +} + +static const struct cmsg_hop detach_msg_route[] = { + {applier_detach_thread, NULL}, +}; + +struct applier_msg { + struct cmsg base; + struct applier *applier; + /** + * The last known confirmed vclock to be used by applier ack writer + * fiber. + */ + struct vclock ack_vclock; + /** Whether the message has an updated ack_vclcok value. */ + bool has_ack; + /** A list of read up transactions to be processed in tx thread. */ + struct stailq txs; + /** This message's personal buffer, holding input rows. */ + struct ibuf ibuf; + /** This message's region used to allocate auxiliary structures. */ + struct region region; + int txn_cnt; +}; + +/* Applier thread related data. */ +struct applier_thread_data { + /** A pointer to the thread handling this applier's data stream. */ + struct applier_thread *thread; + /** Applier's personal endpoint in the tx thread. */ + struct cbus_endpoint endpoint; + /** A pipe from the applier thread to tx. */ + struct cpipe tx_pipe; + /** A pipe from tx to the applier thread. */ + struct cpipe thread_pipe; + /** + * A pair of rotating messages. While one of the messages is processed + * in the tx thread the other is used to store incoming rows. + */ + struct applier_msg msgs[2]; + /** Usual message route. */ + struct cmsg_hop route[2]; + /** An index of the message currently filling up in applier thread. */ + int msg_ptr; + /** + * A preallocated message used to notify tx that the applier should exit + * due to a network error. + */ + struct applier_detach_msg exit_msg; + /** The reader fiber, reading and parsing incoming rows. */ + struct fiber *reader; + /** The writer fiber, writing acks to the replication master. */ + struct fiber *writer; + /** A trigger invoked on reader fiber stop. */ + struct trigger reader_on_stop; + /** A trigger invoked on writer fiber stop. */ + struct trigger writer_on_stop; + /** The latest known ack vclock to send to the replication master. */ + struct vclock ack_vclock; + /** Whether an ack should be sent or not. */ + bool has_ack; +}; + +/** + * The tx part of applier-in-thread machinery. Apply all the parsed transactions + * and notify the applier thread of new ack vclock value. + */ +static void +applier_process_batch(struct cmsg *base) +{ + struct applier_msg *msg = (struct applier_msg *)base; + struct applier *applier = msg->applier; + struct applier_tx *tx; + stailq_foreach_entry(tx, &msg->txs, next) { + struct xrow_header *first_row = + &stailq_first_entry(&tx->rows, struct applier_tx_row, + next)->row; + raft_process_heartbeat(box_raft(), applier->instance_id); + if (first_row->lsn == 0) { + if (unlikely(iproto_type_is_raft_request( + first_row->type))) { + if (applier_handle_raft(applier, + first_row) != 0) + diag_raise(); + } + applier_signal_ack(applier); + } else if (applier_apply_tx(applier, &tx->rows, true) != 0) { + diag_raise(); + } + } + if (applier->has_acks_to_send) { + applier_check_sync(applier); + vclock_copy(&msg->ack_vclock, &replicaset.vclock); + msg->has_ack = true; + applier->has_acks_to_send = false; + } +} + +/** The callback invoked on the message return to applier thread. */ +static void +applier_thread_return_batch(struct cmsg *base) +{ + struct applier_msg *msg = (struct applier_msg *) base; + struct ibuf *ibuf = &msg->ibuf; + struct applier *applier = msg->applier; + struct applier_thread_data *tdata = applier->tdata; + msg->txn_cnt = 0; + ibuf->rpos = ibuf->xpos; /* forget processed input */ + ibuf_defragment(ibuf); + region_reset(&msg->region); + stailq_create(&msg->txs); + if (tdata->reader != NULL && !fiber_is_dead(tdata->reader)) + fiber_wakeup(tdata->reader); + if (msg->has_ack) { + tdata->has_ack = true; + vclock_copy(&tdata->ack_vclock, &msg->ack_vclock); + if (tdata->writer != NULL && !fiber_is_dead(tdata->writer)) { + fiber_cond_signal(&applier->writer_cond); + } + msg->has_ack = false; + } +} + +/** + * Given a pair of input buffers, move the unparsed data from the buffer + * departing to tx thread to the buffer remaining available in applier thread. + */ +static inline size_t +move_unparsed(struct ibuf *oldbuf, struct ibuf *newbuf) +{ + assert(ibuf_used(newbuf) == 0); /* cannot move on top of other data */ + size_t unparsed = ibuf_unparsed(oldbuf); + if (unparsed > 0) { + void *ptr = ibuf_alloc(newbuf, unparsed); + if (ptr == NULL) { + panic("Applier failed to allocate memory for incoming " + "transactions on ibuf"); + } + memcpy(ptr, oldbuf->xpos, unparsed); + oldbuf->wpos -= unparsed; + } + return unparsed; +} + +/** Get the message not containing any data ready for processing. */ +static struct applier_msg * +applier_thread_next_msg(struct applier_thread_data *tdata) +{ + int cur = tdata->msg_ptr; + struct applier_msg *msg = &tdata->msgs[cur]; + struct ibuf *ibuf = &msg->ibuf; + if (ibuf_unparsed(ibuf) == ibuf_used(ibuf)) { + /* + * The ibuf doesn't contain any parsed data. + * Use current mesage. + */ + return msg; + } + cur = (cur + 1) % 2; + msg = &tdata->msgs[cur]; + struct ibuf *other = ibuf; + ibuf = &msg->ibuf; + if (ibuf_used(ibuf) == 0) { + tdata->msg_ptr = cur; + move_unparsed(other, ibuf); + return msg; + } + return NULL; +} + +static void +applier_thread_push_batch(struct applier_thread_data *tdata, + struct applier_msg *msg, struct stailq *txs) +{ + assert(msg != NULL); + stailq_concat(&msg->txs, txs); + + cmsg_init(&msg->base, tdata->route); + cpipe_push(&tdata->tx_pipe, &msg->base); +} + +/** + * Read as much data as possible. Do not yield as long as at least one byte is + * available. + */ +static ssize_t +applier_read_ahead(struct applier *applier, struct ibuf *ibuf) +{ + return coio_breadn_timeout(&applier->io, ibuf, 1, + replication_disconnect_timeout()); +} + +static struct applier_tx * +applier_parse_tx(struct applier *applier, struct ibuf *ibuf, + struct region *region) +{ + const char *data = ibuf->xpos; + const char *end = ibuf->wpos; + uint64_t tsn = 0; + struct applier_tx *tx = region_alloc_object_xc(region, + struct applier_tx); + stailq_create(&tx->rows); + do { + if (data == end) + goto not_read; + if (mp_typeof(*data) != MP_UINT) { + tnt_raise(ClientError, ER_INVALID_MSGPACK, + "packet length"); + } + if (mp_check_uint(data, end) > 0) { + goto not_read; + } + uint64_t len = mp_decode_uint(&data); + const char *reqend = data + len; + if (reqend > end) { + goto not_read; + } + struct applier_tx_row *tx_row; + size_t size = sizeof(*tx_row) + sizeof(*tx_row->decoded); + tx_row = (typeof(tx_row))region_aligned_alloc_xc(region, size, + alignof(*tx_row)); + memset(tx_row, 0, size); + struct xrow_header *row = &tx_row->row; + xrow_header_decode_xc(row, &data, reqend, true); + if (row->tm > 0) { + applier->lag = ev_now(loop()) - row->tm; + } + applier->last_row_time = ev_monotonic_now(loop()); + tsn = set_next_tx_row(&tx->rows, tx_row, tsn); + } while (tsn != 0); + + /* All txn row headers are parsed. Time to parse row bodies. */ + struct applier_tx_row *item; + stailq_foreach_entry(item, &tx->rows, next) { + struct xrow_header *row = &item->row; + uint16_t type = row->type; + if (iproto_type_is_dml(type)) { + if (xrow_decode_dml(row, &item->decoded->dml, + dml_request_key_map(type)) != 0) { + diag_raise(); + } + } else if (iproto_type_is_synchro_request(type)) { + if (xrow_decode_synchro(row, &item->decoded->synchro) != 0) { + diag_raise(); + } + } else if (iproto_type_is_raft_request(type)) { + if (xrow_decode_raft(row, &item->decoded->raft.req, + &item->decoded->raft.vclock) != 0) { + diag_raise(); + } + } else if (type == IPROTO_OK) { + /* Nothing to do. */ + } else { + tnt_raise(ClientError, ER_UNKNOWN_REQUEST_TYPE, type); + } + } + size_t read; + read = data - ibuf->xpos; + ibuf->xpos += read; + return tx; +not_read: + /* Discard the preallocated messages. */ + region_reset(region); + return NULL; +} + +static int +applier_decode_txs(struct applier *applier, struct ibuf *ibuf, + struct region *region, struct stailq *txs) +{ + int tx_cnt = 0; + while (true) { + struct applier_tx *tx; + tx = applier_parse_tx(applier, ibuf, region); + if (tx == NULL) + break; + stailq_add_tail_entry(txs, tx, next); + ++tx_cnt; + } + return tx_cnt; +} + +/** Applier thread reader fiber function. */ +static int +applier_thread_reader_f(va_list ap) +{ + struct applier *applier = va_arg(ap, struct applier *); + while (!fiber_is_cancelled()) { + struct ibuf *ibuf; + struct region *region; + struct applier_msg *msg; + do { + msg = applier_thread_next_msg(applier->tdata); + if (msg == NULL) { + fiber_yield(); + if (fiber_is_cancelled()) + return 0; + } + } while (msg == NULL); + ibuf = &msg->ibuf; + region = &msg->region; + struct stailq txs; + int tx_cnt; + try { + applier_read_ahead(applier, ibuf); + stailq_create(&txs); + tx_cnt = applier_decode_txs(applier, ibuf, region, + &txs); + } catch (Exception *e) { + return -1; + } + if (tx_cnt > 0) + applier_thread_push_batch(applier->tdata, msg, &txs); + else if (tx_cnt < 0) + return -1; + } + return 0; +} + +/** The main applier thread fiber function. */ +static int +applier_thread_f(va_list ap) +{ + struct applier_thread *thread = va_arg(ap, typeof(thread)); + int rc = cbus_endpoint_create(&thread->endpoint, cord()->name, + fiber_schedule_cb, fiber()); + assert(rc == 0); + cmsg_init(&thread->join_msg, join_msg_route); + thread->n_appliers = 0; + + cbus_loop(&thread->endpoint); + + cbus_endpoint_destroy(&thread->endpoint, cbus_process); + assert(thread->n_appliers == 0); + return 0; +} + +/** Initialize and start the applier thread. */ +static int +applier_thread_create(struct applier_thread *thread) +{ + static int thread_id = 0; + const char *name = tt_sprintf("applier_%d", ++thread_id); + + thread->is_exiting = false; + if (cord_costart(&thread->cord, name, applier_thread_f, thread) != 0) { + return -1; + } + return thread_id; +} + +/** A memory pool used for allocating applier threads. */ +struct mempool applier_thread_pool; + +/** A count of currently live applier threads. */ +static int applier_thread_cnt = 0; + +/** Alive applier thread list. */ +static RLIST_HEAD(thread_list); + +/** + * A pointer to the thread which will accept appliers once thread count reaches + * the maximum configured value. + */ +static struct applier_thread *fill_thread = NULL; + +/** Allocate the applier thread structure. */ +static struct applier_thread * +applier_thread_alloc(void) +{ + if (!mempool_is_initialized(&applier_thread_pool)) { + mempool_create(&applier_thread_pool, &cord()->slabc, + sizeof(struct applier_thread)); + } + struct applier_thread *thread = + (struct applier_thread *)mempool_alloc(&applier_thread_pool); + if (thread == NULL) { + diag_set(OutOfMemory, sizeof(*thread), "mempool_alloc", + "applier thread"); + } + return thread; +} + +/** + * Get a working applier thread. Either create a new one or use one of the + * already initialized. + */ +static struct applier_thread * +applier_thread_new(void) +{ + assert(replication_num_applier_threads > 0); + struct applier_thread *thread; + if (applier_thread_cnt < replication_num_applier_threads) { +alloc: thread = applier_thread_alloc(); + if (thread != NULL) { + if (applier_thread_create(thread) < 0) { + mempool_free(&applier_thread_pool, thread); + return NULL; + } + rlist_add_tail_entry(&thread_list, thread, in_list); + applier_thread_cnt++; + } + } else { + assert(!rlist_empty(&thread_list)); + if (fill_thread == NULL) { + fill_thread = rlist_first_entry(&thread_list, + struct applier_thread, + in_list); + } + thread = fill_thread; + /* + * Fill the threads in a round-robin manner. Each new applier + * goes to the next available thread. + */ + if (fill_thread == rlist_last_entry(&thread_list, + struct applier_thread, + in_list)) { + fill_thread = rlist_first_entry(&thread_list, + struct applier_thread, + in_list); + } else { + fill_thread = rlist_next_entry(fill_thread, in_list); + } + if (thread->is_exiting) + goto alloc; + } + return thread; +} + +/** Destroy the applier thread. */ +static void +applier_thread_free(struct applier_thread *thread) +{ + if (cord_cojoin(&thread->cord) < 0) { + panic("Can't join the applier thread."); + } + rlist_del_entry(thread, in_list); + mempool_free(&applier_thread_pool, thread); + applier_thread_cnt--; + if (fill_thread == thread) + fill_thread = NULL; +} + +/** Initialize applier thread messages. */ +static void +applier_thread_msgs_init(struct applier *applier) +{ + struct applier_thread_data *tdata = applier->tdata; + for (int i = 0; i < 2; i++) { + struct applier_msg *msg = &tdata->msgs[i]; + memset(msg, 0, sizeof(*msg)); + msg->applier = applier; + stailq_create(&msg->txs); + ibuf_create(&msg->ibuf, &cord()->slabc, 1024); + region_create(&msg->region, &cord()->slabc); + } + tdata->msg_ptr = 0; + + cmsg_init(&tdata->exit_msg.base, detach_msg_route); + diag_create(&tdata->exit_msg.diag); + + /* Initialize the default message route. */ + tdata->route[0].f = applier_process_batch; + tdata->route[0].pipe = &tdata->thread_pipe; + tdata->route[1].f = applier_thread_return_batch; + tdata->route[1].pipe = NULL; +} + +/** + * A trigger fired on each of applier thread fibers death. Propagates the + * exception with which the fiber exited to the tx thread. + */ +static int +applier_thread_fiber_on_stop(struct trigger *trigger, void *event) +{ + struct fiber *fiber = (struct fiber *)event; + assert(fiber == fiber()); + struct applier_thread_data *tdata = + (struct applier_thread_data *)trigger->data; + assert(fiber == tdata->reader || fiber == tdata->writer); + if (fiber->f_ret != 0 && !(fiber->flags & FIBER_IS_CANCELLED)) { + /* Notify the tx thread that its applier is dead. */ + assert(!diag_is_empty(&fiber->diag)); + diag_move(&fiber->diag, &tdata->exit_msg.diag); + cpipe_push(&tdata->tx_pipe, &tdata->exit_msg.base); + } + trigger_clear(trigger); + return 0; +} + +/** Initialize fibers needed for applier in thread operation. */ +static inline void +applier_thread_fiber_init(struct applier *applier) +{ + struct applier_thread_data *tdata = applier->tdata; + tdata->reader = fiber_new_xc("reader", applier_thread_reader_f); + tdata->writer = fiber_new_xc("writer", applier_writer_f); + trigger_create(&tdata->reader_on_stop, applier_thread_fiber_on_stop, + tdata, NULL); + trigger_create(&tdata->writer_on_stop, applier_thread_fiber_on_stop, + tdata, NULL); + trigger_add(&tdata->reader->on_stop, &tdata->reader_on_stop); + trigger_add(&tdata->writer->on_stop, &tdata->writer_on_stop); + fiber_set_joinable(tdata->reader, true); + fiber_set_joinable(tdata->writer, true); + fiber_start(tdata->reader, applier); + fiber_start(tdata->writer, applier, false, &tdata->has_ack, + &tdata->ack_vclock); +} + +/** Notify the applier thread it has to serve yet another applier. */ +static void +applier_thread_attach_applier(void *data) +{ + struct applier *applier = (struct applier *)data; + struct applier_thread *thread = container_of(cord(), typeof(*thread), + cord); + + applier_thread_msgs_init(applier); + applier_thread_fiber_init(applier); + + ++thread->n_appliers; +} + +/** Notify the applier thread one of the appliers it served is dead. */ +static void +applier_thread_detach_applier(void *data) +{ + struct applier *applier = (struct applier *)data; + struct applier_thread_data *tdata = applier->tdata; + struct applier_thread *thread = container_of(cord(), typeof(*thread), + cord); + + fiber_cancel(tdata->reader); + fiber_cancel(tdata->writer); + /* + * We do not join the fibers, since we do not care about their return + * codes. The exceptions are propagated elewhere. + */ + if (--thread->n_appliers == 0) { + fiber_cancel(fiber()); + cpipe_push(&tdata->tx_pipe, &thread->join_msg); + } +} + +static void +applier_thread_data_free(struct applier *applier); + +struct mempool applier_thread_data_pool; + +/** + * Create and initialize the applier-in-thread data and notify the thread + * there's a new applier. + */ +static struct applier_thread_data * +applier_thread_data_new(struct applier *applier, struct applier_thread *thread) +{ + assert(thread != NULL); + if (!mempool_is_initialized(&applier_thread_data_pool)) { + mempool_create(&applier_thread_data_pool, &cord()->slabc, + sizeof(struct applier_thread_data)); + } + struct applier_thread_data *tdata = + (typeof(tdata))mempool_alloc(&applier_thread_data_pool); + if (tdata == NULL) { + diag_set(OutOfMemory, sizeof(*tdata), "mempool_alloc", + "applier thread data"); + return NULL; + } + + tdata->thread = thread; + const char *thread_name = thread->cord.name; + const char *name = tt_sprintf("applier_%p", applier); + int rc = cbus_endpoint_create(&tdata->endpoint, name, fiber_schedule_cb, + fiber()); + assert(rc == 0); + applier->tdata = tdata; + auto guard = make_scoped_guard([&]{ + applier_thread_data_free(applier); + }); + cbus_pair(thread_name, name, &tdata->thread_pipe, &tdata->tx_pipe, + applier_thread_attach_applier, applier, cbus_process); + guard.is_active = false; + return applier->tdata; +} + +/** + * Remove the applier from the thread and destroy the supporting data structure. + */ +static void +applier_thread_data_free(struct applier *applier) +{ + struct applier_thread_data *tdata = applier->tdata; + assert(tdata != NULL); + + cbus_unpair(&tdata->thread_pipe, &tdata->tx_pipe, + applier_thread_detach_applier, applier, cbus_process); + cbus_endpoint_destroy(&tdata->endpoint, cbus_process); + + if (tdata->thread->is_exiting) + applier_thread_free(tdata->thread); + mempool_free(&applier_thread_data_pool, tdata); + applier->tdata = NULL; +} + +/** + * Subscribe to the replication stream. Use a separate decoding thread. + */ +static void +applier_thread_subscribe(struct applier *applier) +{ + struct applier_thread *thread = applier_thread_new(); + + applier->tdata = applier_thread_data_new(applier, thread); + if (applier->tdata == NULL) { + diag_raise(); + } + + struct applier_thread_data *tdata = applier->tdata; + + auto guard = make_scoped_guard([&]{ + applier_thread_data_free(applier); + }); + + cbus_loop(&tdata->endpoint); + + tnt_raise(FiberIsCancelled); +} + /** * Subscribe to the replication stream. Decode the incoming rows right in * applier fiber. @@ -1336,7 +2068,7 @@ applier_subscribe_f(struct applier *applier) diag_raise(); } applier_signal_ack(applier); - } else if (applier_apply_tx(applier, &rows) != 0) { + } else if (applier_apply_tx(applier, &rows, false) != 0) { diag_raise(); } @@ -1351,6 +2083,7 @@ applier_subscribe_f(struct applier *applier) ibuf_defragment(&applier->ibuf); } } + /** * Execute and process SUBSCRIBE request (follow updates from a master). */ @@ -1448,7 +2181,8 @@ applier_subscribe(struct applier *applier) /* Re-enable warnings after successful execution of SUBSCRIBE */ applier->last_logged_errcode = 0; - if (applier->version_id >= version_id(1, 7, 4)) { + if (applier->version_id >= version_id(1, 7, 4) && + replication_num_applier_threads == 0) { /* Enable replication ACKs for newer servers */ assert(applier->writer == NULL); @@ -1458,7 +2192,8 @@ applier_subscribe(struct applier *applier) applier->writer = fiber_new_xc(name, applier_writer_f); fiber_set_joinable(applier->writer, true); - fiber_start(applier->writer, applier); + fiber_start(applier->writer, applier, true, + &applier->has_acks_to_send, &replicaset.vclock); } applier->lag = TIMEOUT_INFINITY; @@ -1486,7 +2221,10 @@ applier_subscribe(struct applier *applier) /* * Process a stream of rows from the binary log. */ - applier_subscribe_f(applier); + if (replication_num_applier_threads > 0) + applier_thread_subscribe(applier); + else + applier_subscribe_f(applier); } static inline void diff --git a/src/box/applier.h b/src/box/applier.h index 899dc053a..0658cbd25 100644 --- a/src/box/applier.h +++ b/src/box/applier.h @@ -71,6 +71,8 @@ enum { APPLIER_SOURCE_MAXLEN = 1024 }; /* enough to fit URI with passwords */ ENUM(applier_state, applier_STATE); extern const char *applier_state_strs[]; +struct applier_thread_data; + /** * State of a replication connection to the master */ @@ -132,6 +134,8 @@ struct applier { struct diag diag; /* Master's vclock at the time of SUBSCRIBE. */ struct vclock remote_vclock_at_subscribe; + /* The thread data used by this applier. */ + struct applier_thread_data *tdata; }; /** diff --git a/src/box/box.cc b/src/box/box.cc index 0413cbf44..eeef3029a 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -3629,7 +3629,7 @@ box_cfg_xc(void) gc_init(); engine_init(); schema_init(); - replication_init(); + replication_init(cfg_geti_default("replication_num_threads", 1)); port_init(); iproto_init(cfg_geti("iproto_threads")); sql_init(); diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua index f14c8de5d..946d3537d 100644 --- a/src/box/lua/load_cfg.lua +++ b/src/box/lua/load_cfg.lua @@ -103,6 +103,7 @@ local default_cfg = { replication_connect_quorum = nil, -- connect all replication_skip_conflict = false, replication_anon = false, + replication_num_threads = 1, feedback_enabled = true, feedback_crashinfo = true, feedback_host = "https://feedback.tarantool.io", @@ -210,6 +211,7 @@ local template_cfg = { replication_connect_quorum = 'number', replication_skip_conflict = 'boolean', replication_anon = 'boolean', + replication_num_threads = 'number', feedback_enabled = ifdef_feedback('boolean'), feedback_crashinfo = ifdef_feedback('boolean'), feedback_host = ifdef_feedback('string'), diff --git a/src/box/replication.cc b/src/box/replication.cc index 10b4ac915..028d9f054 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -56,6 +56,7 @@ double replication_synchro_timeout = 5.0; /* seconds */ double replication_sync_timeout = 300.0; /* seconds */ bool replication_skip_conflict = false; bool replication_anon = false; +int replication_num_applier_threads = 1; struct replicaset replicaset; @@ -84,7 +85,7 @@ replicaset_quorum(void) } void -replication_init(void) +replication_init(int num_threads) { memset(&replicaset, 0, sizeof(replicaset)); replica_hash_new(&replicaset.hash); @@ -101,6 +102,8 @@ replication_init(void) rlist_create(&replicaset.on_ack); diag_create(&replicaset.applier.diag); + + replication_num_applier_threads = num_threads; } void diff --git a/src/box/replication.h b/src/box/replication.h index 95563e811..881087ca6 100644 --- a/src/box/replication.h +++ b/src/box/replication.h @@ -155,6 +155,11 @@ extern bool replication_skip_conflict; */ extern bool replication_anon; +/** + * Whether this replica will receive replication stream in a separate thread. + */ +extern int replication_num_applier_threads; + /** * Wait for the given period of time before trying to reconnect * to a master. @@ -176,7 +181,7 @@ replication_disconnect_timeout(void) } void -replication_init(void); +replication_init(int num_threads); void replication_free(void); -- 2.30.1 (Apple Git-130)