From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id B7E452E26D for ; Sun, 9 Jun 2019 16:45:00 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id CZ-IYRFxW4jX for ; Sun, 9 Jun 2019 16:45:00 -0400 (EDT) Received: from smtp39.i.mail.ru (smtp39.i.mail.ru [94.100.177.99]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 5827D2E1C0 for ; Sun, 9 Jun 2019 16:45:00 -0400 (EDT) From: Georgy Kirichenko Subject: [tarantool-patches] [PATCH v3 13/14] applier: apply transaction in parallel Date: Sun, 9 Jun 2019 23:44:42 +0300 Message-Id: <58cc2b3d0c6ee62b3b45d204165f6e107976c35c.1560112747.git.georgy@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-Help: List-Unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-Subscribe: List-Owner: List-post: List-Archive: To: tarantool-patches@freelists.org Cc: Georgy Kirichenko Applier use asynchronous transaction to batch journal writes. A sequencer orders transaction execution and check write result while an applier reads network. Closes: #1254 --- src/box/applier.cc | 372 ++++++++++++++++++++++++++++++--------------- src/box/applier.h | 4 + src/box/box.cc | 1 + 3 files changed, 251 insertions(+), 126 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index 5a92f6109..9f0efda5a 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -53,6 +53,231 @@ STRS(applier_state, applier_STATE); +/** + * Process a no-op request. + * + * A no-op request does not affect any space, but it + * promotes vclock and is written to WAL. + */ +static int +process_nop(struct request *request) +{ + assert(request->type == IPROTO_NOP); + struct txn *txn = in_txn(); + if (txn_begin_stmt(txn, NULL) == NULL) + return -1; + return txn_commit_stmt(txn, request); +} + +static int +apply_row(struct xrow_header *row) +{ + struct request request; + if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0) + return -1; + if (request.type == IPROTO_NOP) + return process_nop(&request); + struct space *space = space_cache_find(request.space_id); + if (space == NULL) + return -1; + if (box_process_rw(&request, space, NULL) != 0) { + say_error("error applying row: %s", request_str(&request)); + return -1; + } + return 0; +} + +/** + * A helper struct to link xrow objects in a list. + */ +struct applier_tx_row { + /* Next transaction row. */ + struct stailq_entry next; + /* xrow_header struct for the current transaction row. */ + struct xrow_header row; +}; + +struct sequencer { + struct stailq txn_queue; + struct fiber_cond txn_queue_cond; + struct vclock net_vclock; + struct fiber_cond tx_vclock_cond; + struct diag diag; + struct rlist on_fail; +}; + +static struct sequencer sequencer; + +static int +sequencer_collect_f(va_list ap) +{ + (void) ap; + while (!fiber_is_cancelled()) { + while (stailq_empty(&sequencer.txn_queue)) { + if (!diag_is_empty(&sequencer.diag)) { + diag_clear(&sequencer.diag); + vclock_copy(&sequencer.net_vclock, &replicaset.vclock); + } + fiber_cond_wait(&sequencer.txn_queue_cond); + continue; + } + struct txn *txn = + stailq_shift_entry(&sequencer.txn_queue, struct txn, + in_txn_cache); + if (txn_wait(txn) == 0) { + continue; + } + if (diag_is_empty(&sequencer.diag)) { + diag_move(&fiber()->diag, &sequencer.diag); + trigger_run(&sequencer.on_fail, NULL); + } + } + return 0; +} + +void +applier_init() +{ + stailq_create(&sequencer.txn_queue); + fiber_cond_create(&sequencer.txn_queue_cond); + + rlist_create(&sequencer.on_fail); + + vclock_create(&sequencer.net_vclock); + fiber_cond_create(&sequencer.tx_vclock_cond); + diag_create(&sequencer.diag); + struct fiber *collector = fiber_new("collector", sequencer_collect_f); + if (collector == NULL) + panic("Failed to create a sequencer collector fiber"); + fiber_start(collector, NULL); +} + +static inline void +sequencer_on_fail(struct trigger *on_fail) +{ + trigger_add(&sequencer.on_fail, on_fail); +} + +static void +sequencer_rollback_cb(struct trigger *trigger, void *event) +{ + (void) trigger; + struct txn *txn = (struct txn *)event; + stailq_add_tail(&sequencer.txn_queue, &txn->in_txn_cache); + fiber_cond_signal(&sequencer.txn_queue_cond); + + diag_set(ClientError, ER_WAL_IO); + diag_move(&fiber()->diag, &sequencer.diag); + trigger_run(&sequencer.on_fail, &sequencer); +} + +static void +sequencer_commit_cb(struct trigger *trigger, void *event) +{ + (void) trigger; + (void) event; + struct txn *txn = (struct txn *)event; + stailq_add_tail(&sequencer.txn_queue, &txn->in_txn_cache); + fiber_cond_signal(&sequencer.txn_queue_cond); + fiber_cond_broadcast(&sequencer.tx_vclock_cond); +} + +static inline int +sequencer_submit(uint32_t replica_id, int64_t lsn, struct stailq *rows) +{ + struct replica *replica = replica_by_id(replica_id); + struct latch *latch = (replica ? &replica->order_latch : + &replicaset.applier.order_latch); + + latch_lock(latch); + if (vclock_get(&sequencer.net_vclock, replica_id) >= lsn) { + /* Nothing to do. */ + latch_unlock(latch); + return 0; + } + + struct trigger *on_rollback; + struct trigger *on_commit; + /** + * Explicitly begin the transaction so that we can + * control fiber->gc life cycle and, in case of apply + * conflict safely access failed xrow object and allocate + * IPROTO_NOP on gc. + */ + struct txn *txn = txn_begin(); + if (txn == NULL) + goto fail; + struct applier_tx_row *item; + stailq_foreach_entry(item, rows, next) { + struct xrow_header *row = &item->row; + int res = apply_row(row); + if (res != 0) { + struct error *e = diag_last_error(diag_get()); + /* + * In case of ER_TUPLE_FOUND error and enabled + * replication_skip_conflict configuration + * option, skip applying the foreign row and + * replace it with NOP in the local write ahead + * log. + */ + if (e->type == &type_ClientError && + box_error_code(e) == ER_TUPLE_FOUND && + replication_skip_conflict) { + diag_clear(diag_get()); + row->type = IPROTO_NOP; + row->bodycnt = 0; + res = apply_row(row); + } + } + if (res != 0) + goto rollback; + } + /* + * We are going to commit so it's a high time to check if + * the current transaction has non-local effects. + */ + if (txn_is_distributed(txn)) { + /* + * A transaction mixes remote and local rows. + * Local rows must be replicated back, which + * doesn't make sense since the master likely has + * new changes which local rows may overwrite. + * Raise an error. + */ + diag_set(ClientError, ER_UNSUPPORTED, + "Replication", "distributed transactions"); + goto rollback; + } + + /* We are ready to submit txn to wal. */ + on_rollback = (struct trigger *)txn_alloc(txn, sizeof(struct trigger)); + trigger_create(on_rollback, sequencer_rollback_cb, NULL, NULL); + txn_on_rollback(txn, on_rollback); + + on_commit = (struct trigger *)txn_alloc(txn, sizeof(struct trigger)); + trigger_create(on_commit, sequencer_commit_cb, NULL, NULL); + txn_on_commit(txn, on_commit); + + if (txn_write(txn) != 0) + goto fail; + + vclock_follow(&sequencer.net_vclock, replica_id, lsn); + latch_unlock(latch); + return 0; + +rollback: + txn_rollback(txn); + +fail: + latch_unlock(latch); + if (diag_is_empty(&sequencer.diag)) { + diag_add_error(&sequencer.diag, diag_last_error(&fiber()->diag)); + trigger_run(&sequencer.on_fail, NULL); + } + return -1; +} + + static inline void applier_set_state(struct applier *applier, enum applier_state state) { @@ -194,40 +419,6 @@ rollback: return -1; } -/** - * Process a no-op request. - * - * A no-op request does not affect any space, but it - * promotes vclock and is written to WAL. - */ -static int -process_nop(struct request *request) -{ - assert(request->type == IPROTO_NOP); - struct txn *txn = in_txn(); - if (txn_begin_stmt(txn, NULL) == NULL) - return -1; - return txn_commit_stmt(txn, request); -} - -static int -apply_row(struct xrow_header *row) -{ - struct request request; - if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0) - return -1; - if (request.type == IPROTO_NOP) - return process_nop(&request); - struct space *space = space_cache_find(request.space_id); - if (space == NULL) - return -1; - if (box_process_rw(&request, space, NULL) != 0) { - say_error("error applying row: %s", request_str(&request)); - return -1; - } - return 0; -} - /** * Connect to a remote host and authenticate the client. */ @@ -450,16 +641,6 @@ applier_join(struct applier *applier) applier_set_state(applier, APPLIER_READY); } -/** - * A helper struct to link xrow objects in a list. - */ -struct applier_tx_row { - /* Next transaction row. */ - struct stailq_entry next; - /* xrow_header struct for the current transaction row. */ - struct xrow_header row; -}; - static struct applier_tx_row * applier_read_tx_row(struct applier *applier) { @@ -565,70 +746,14 @@ applier_read_tx(struct applier *applier, struct stailq *rows) next)->row.is_commit); } -/** - * Apply all rows in the rows queue as a single transaction. - * - * Return 0 for success or -1 in case of an error. - */ -static int -applier_apply_tx(struct stailq *rows) +static void +applier_on_fail(struct trigger *trigger, void *event) { - /** - * Explicitly begin the transaction so that we can - * control fiber->gc life cycle and, in case of apply - * conflict safely access failed xrow object and allocate - * IPROTO_NOP on gc. - */ - struct txn *txn = txn_begin(); - struct applier_tx_row *item; - if (txn == NULL) - diag_raise(); - stailq_foreach_entry(item, rows, next) { - struct xrow_header *row = &item->row; - int res = apply_row(row); - if (res != 0) { - struct error *e = diag_last_error(diag_get()); - /* - * In case of ER_TUPLE_FOUND error and enabled - * replication_skip_conflict configuration - * option, skip applying the foreign row and - * replace it with NOP in the local write ahead - * log. - */ - if (e->type == &type_ClientError && - box_error_code(e) == ER_TUPLE_FOUND && - replication_skip_conflict) { - diag_clear(diag_get()); - row->type = IPROTO_NOP; - row->bodycnt = 0; - res = apply_row(row); - } - } - if (res != 0) - goto rollback; - } - /* - * We are going to commit so it's a high time to check if - * the current transaction has non-local effects. - */ - if (txn_is_distributed(txn)) { - /* - * A transaction mixes remote and local rows. - * Local rows must be replicated back, which - * doesn't make sense since the master likely has - * new changes which local rows may overwrite. - * Raise an error. - */ - diag_set(ClientError, ER_UNSUPPORTED, - "Replication", "distributed transactions"); - goto rollback; - } - return txn_commit(txn); - -rollback: - txn_rollback(txn); - fiber_gc(); - return -1; + (void) event; + struct applier *applier = (struct applier *)trigger->data; + if (!diag_is_empty(&sequencer.diag)) + diag_add_error(&applier->diag, diag_last_error(&sequencer.diag)); + fiber_cancel(applier->reader); } /** @@ -735,6 +860,10 @@ applier_subscribe(struct applier *applier) applier->lag = TIMEOUT_INFINITY; + struct trigger on_fail; + trigger_create(&on_fail, applier_on_fail, applier, NULL); + sequencer_on_fail(&on_fail); + /* * Process a stream of rows from the binary log. */ @@ -763,31 +892,16 @@ applier_subscribe(struct applier *applier) struct stailq rows; applier_read_tx(applier, &rows); + fiber_cond_signal(&applier->writer_cond); struct xrow_header *first_row = &stailq_first_entry(&rows, struct applier_tx_row, next)->row; applier->last_row_time = ev_monotonic_now(loop()); - struct replica *replica = replica_by_id(first_row->replica_id); - struct latch *latch = (replica ? &replica->order_latch : - &replicaset.applier.order_latch); - /* - * In a full mesh topology, the same set of changes - * may arrive via two concurrently running appliers. - * Hence we need a latch to strictly order all changes - * that belong to the same server id. - */ - latch_lock(latch); - if (vclock_get(&replicaset.vclock, first_row->replica_id) < - first_row->lsn && - applier_apply_tx(&rows) != 0) { - latch_unlock(latch); + if (sequencer_submit(first_row->replica_id, + first_row->lsn, &rows) != 0) { + trigger_clear(&on_fail); diag_raise(); } - latch_unlock(latch); - - if (applier->state == APPLIER_SYNC || - applier->state == APPLIER_FOLLOW) - fiber_cond_signal(&applier->writer_cond); if (ibuf_used(ibuf) == 0) ibuf_reset(ibuf); fiber_gc(); @@ -872,6 +986,11 @@ applier_f(va_list ap) return -1; } } catch (FiberIsCancelled *e) { + if (!diag_is_empty(&applier->diag)) { + diag_move(&applier->diag, &fiber()->diag); + applier_disconnect(applier, APPLIER_STOPPED); + break; + } applier_disconnect(applier, APPLIER_OFF); break; } catch (SocketError *e) { @@ -960,6 +1079,7 @@ applier_new(const char *uri) rlist_create(&applier->on_state); fiber_cond_create(&applier->resume_cond); fiber_cond_create(&applier->writer_cond); + diag_create(&applier->diag); return applier; } diff --git a/src/box/applier.h b/src/box/applier.h index 5bff90031..b0e56add6 100644 --- a/src/box/applier.h +++ b/src/box/applier.h @@ -114,8 +114,12 @@ struct applier { bool is_paused; /** Condition variable signaled to resume the applier. */ struct fiber_cond resume_cond; + struct diag diag; }; +void +applier_init(); + /** * Start a client to a remote master using a background fiber. * diff --git a/src/box/box.cc b/src/box/box.cc index 510f3fc99..49f8f24af 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -2158,6 +2158,7 @@ box_cfg_xc(void) port_init(); iproto_init(); sql_init(); + applier_init(); int64_t wal_max_rows = box_check_wal_max_rows(cfg_geti64("rows_per_wal")); int64_t wal_max_size = box_check_wal_max_size(cfg_geti64("wal_max_size")); -- 2.21.0