From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mail-lf1-f68.google.com (mail-lf1-f68.google.com [209.85.167.68]) (using TLSv1.2 with cipher ECDHE-RSA-AES128-GCM-SHA256 (128/128 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 98BDB430411 for ; Sat, 15 Aug 2020 00:16:10 +0300 (MSK) Received: by mail-lf1-f68.google.com with SMTP id m15so5500221lfp.7 for ; Fri, 14 Aug 2020 14:16:10 -0700 (PDT) From: Cyrill Gorcunov Date: Sat, 15 Aug 2020 00:14:41 +0300 Message-Id: <20200814211442.667099-8-gorcunov@gmail.com> In-Reply-To: <20200814211442.667099-1-gorcunov@gmail.com> References: <20200814211442.667099-1-gorcunov@gmail.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH v7 7/8] applier: process synchro requests without txn engine List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: tml Cc: Vladislav Shpilevoy Transaction processing code is very heavy simply because trasactions are carrying various data and involves a number of other mechanisms to procceed. In turn, when we receive confirm or rollback packed from another node in a cluster we just need to inspect limbo queue and write this packed into a WAL journal. So calling a bunch of txn engine helpers is simply waste of cycles. Thus lets rather handle them in a special light way: - allocate synchro_entry structure which would carry the journal entry itself and encoded message - process limbo queue to mark confirmed/rollback'ed messages - finally write this synchro_entry into a journal Which is a way more simplier. Part-of #5129 Suggedsted-by: Vladislav Shpilevoy Signed-off-by: Cyrill Gorcunov --- src/box/applier.cc | 179 +++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 172 insertions(+), 7 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index a71516282..a1ce7a23f 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -51,8 +51,10 @@ #include "schema.h" #include "txn.h" #include "box.h" +#include "xrow.h" #include "scoped_guard.h" #include "txn_limbo.h" +#include "journal.h" STRS(applier_state, applier_STATE); @@ -841,6 +843,151 @@ applier_unlock(struct latch *latch) latch_unlock(latch); } +struct synchro_entry { + /** An applier initiated the syncho request. */ + struct applier *applier; + + /** Encoded form of a synchro record. */ + struct synchro_body_bin body_bin; + + /** xrow to write, used by the journal engine. */ + struct xrow_header row; + + /** + * The journal entry itself. Note since + * it has unsized array it must be the + * last entry in the structure. + */ + struct journal_entry journal_entry; +}; + +static void +synchro_entry_delete(struct synchro_entry *entry) +{ + free(entry); +} + +/** + * Async write journal completion. + */ +static void +apply_synchro_row_cb(struct journal_entry *entry) +{ + assert(entry->complete_data != NULL); + struct synchro_entry *synchro_entry = + (struct synchro_entry *)entry->complete_data; + struct applier *applier = synchro_entry->applier; + + /* + * We can reuse triggers, they are allocated when + * applier get subscribed and since packets handling + * is processed after the subscribtion phase the triggers + * will be alive. + */ + if (entry->res < 0) { + trigger_run(&replicaset.applier.on_rollback, applier); + /* + * Restore the last written vlock value. + */ + vclock_copy(&replicaset.applier.vclock, &replicaset.vclock); + diag_set(ClientError, ER_WAL_IO); + diag_log(); + } else { + trigger_run(&replicaset.applier.on_wal_write, applier); + } + + synchro_entry_delete(synchro_entry); +} + +/** + * Allocate a new synchro_entry to be passed to + * the journal engine in async write way. + */ +static struct synchro_entry * +synchro_entry_new(struct applier *applier, + struct xrow_header *applier_row, + struct synchro_request *req) +{ + struct synchro_entry *entry; + size_t size = sizeof(*entry) + sizeof(struct xrow_header *); + + /* + * For simplicity we use malloc here but + * probably should provide some cache similar + * to txn cache. + */ + entry = (struct synchro_entry *)malloc(size); + if (entry == NULL) { + diag_set(OutOfMemory, size, "malloc", "synchro_entry"); + return NULL; + } + + struct journal_entry *journal_entry = &entry->journal_entry; + struct synchro_body_bin *body_bin = &entry->body_bin; + struct xrow_header *row = &entry->row; + + entry->applier = applier; + journal_entry->rows[0] = row; + + xrow_encode_synchro(row, body_bin, req); + + row->lsn = applier_row->lsn; + row->replica_id = applier_row->replica_id; + + journal_entry_create(journal_entry, 1, xrow_approx_len(row), + apply_synchro_row_cb, entry); + return entry; +} + +/* + * Process a synchro request from incoming applier packet + * without using txn engine, for a speed sake. + */ +static int +apply_synchro_row(struct applier *applier, struct xrow_header *row) +{ + assert(iproto_type_is_synchro_request(row->type)); + + struct latch *latch = applier_lock(row->replica_id); + if (vclock_get(&replicaset.applier.vclock, + row->replica_id) >= row->lsn) { + applier_unlock(latch); + return 0; + } + + struct synchro_request req; + if (xrow_decode_synchro(row, &req) != 0) + goto out; + + if (txn_limbo_process(&txn_limbo, &req)) + goto out; + + struct synchro_entry *entry; + entry = synchro_entry_new(applier, row, &req); + if (entry == NULL) + goto out; + + if (journal_write_async(&entry->journal_entry) != 0) { + diag_set(ClientError, ER_WAL_IO); + goto out; + } + + /* + * In case if something get wrong the journal completion + * handler will set the applier's vclock back to last + * successfully WAL written value. + */ + vclock_follow(&replicaset.applier.vclock, + row->replica_id, row->lsn); + applier_unlock(latch); + return 0; + +out: + diag_log(); + applier_unlock(latch); + return -1; +} + /** * Apply all rows in the rows queue as a single transaction. * @@ -1118,7 +1265,13 @@ applier_subscribe(struct applier *applier) applier->lag = TIMEOUT_INFINITY; - /* Register triggers to handle WAL writes and rollbacks. */ + /* + * Register triggers to handle WAL writes and rollbacks. + * + * Note we use them for syncronous packets handling as well + * thus when changing make sure that synchro handling won't + * be broken. + */ struct trigger on_wal_write; trigger_create(&on_wal_write, applier_on_wal_write, applier, NULL); trigger_add(&replicaset.applier.on_wal_write, &on_wal_write); @@ -1148,15 +1301,27 @@ applier_subscribe(struct applier *applier) applier_read_tx(applier, &rows); applier->last_row_time = ev_monotonic_now(loop()); + struct xrow_header *row = applier_first_row(&rows); - /* - * In case of an heartbeat message wake a writer up - * and check applier state. - */ - if (applier_first_row(&rows)->lsn == 0) + if (row->lsn == 0) { + /* + * In case of an heartbeat message + * wake a writer up and check + * the applier state. + */ applier_signal_ack(applier); - else if (applier_apply_tx(&rows) != 0) + } else if (iproto_type_is_synchro_request(row->type)) { + /* + * Make sure synchro messages are never reached + * in a batch (this is by design for simplicity + * sake). + */ + assert(stailq_first(&rows) == stailq_last(&rows)); + if (apply_synchro_row(applier, row) != 0) + diag_raise(); + } else if (applier_apply_tx(&rows) != 0) { diag_raise(); + } if (ibuf_used(ibuf) == 0) ibuf_reset(ibuf); -- 2.26.2