From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id ED55D28267 for ; Wed, 6 Mar 2019 15:16:22 -0500 (EST) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id X2805lCEEg45 for ; Wed, 6 Mar 2019 15:16:22 -0500 (EST) Received: from smtp34.i.mail.ru (smtp34.i.mail.ru [94.100.177.94]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 7C47D284B5 for ; Wed, 6 Mar 2019 15:16:22 -0500 (EST) From: Georgy Kirichenko Subject: [tarantool-patches] [PATCH v2 1/3] Applier gets rid of a xstream Date: Wed, 6 Mar 2019 23:16:15 +0300 Message-Id: <166b45adc75c0753d36ea473d57ac452548fbbec.1551902962.git.georgy@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-Help: List-Unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-Subscribe: List-Owner: List-post: List-Archive: To: tarantool-patches@freelists.org Cc: Georgy Kirichenko Remove xstream dependency and use direct box interface to apply all replication rows. This is refactoring before transactional replication. Needed for: #2798 --- src/box/applier.cc | 69 ++++++++++++++++++++++++++++++++++++++-------- src/box/applier.h | 9 +----- src/box/box.cc | 68 ++++++++------------------------------------- 3 files changed, 69 insertions(+), 77 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index e9addcb3e..a687d2bea 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -37,7 +37,6 @@ #include "fiber_cond.h" #include "coio.h" #include "coio_buf.h" -#include "xstream.h" #include "wal.h" #include "xrow.h" #include "replication.h" @@ -48,6 +47,9 @@ #include "error.h" #include "session.h" #include "cfg.h" +#include "schema.h" +#include "txn.h" +#include "box.h" STRS(applier_state, applier_STATE); @@ -167,6 +169,53 @@ applier_writer_f(va_list ap) return 0; } +static int +apply_initial_join_row(struct xrow_header *row) +{ + struct request request; + xrow_decode_dml(row, &request, dml_request_key_map(row->type)); + struct space *space = space_cache_find_xc(request.space_id); + /* no access checks here - applier always works with admin privs */ + return space_apply_initial_join_row(space, &request); +} + +/** + * Process a no-op request. + * + * A no-op request does not affect any space, but it + * promotes vclock and is written to WAL. + */ +static int +process_nop(struct request *request) +{ + assert(request->type == IPROTO_NOP); + struct txn *txn = txn_begin_stmt(NULL); + if (txn == NULL) + return -1; + return txn_commit_stmt(txn, request); +} + +static int +apply_row(struct xrow_header *row) +{ + struct request request; + if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0) + return -1; + if (request.type == IPROTO_NOP) { + if (process_nop(&request) != 0) + return -1; + return 0; + } + struct space *space = space_cache_find(request.space_id); + if (space == NULL) + return -1; + if (box_process_rw(&request, space, NULL) != 0) { + say_error("error applying row: %s", request_str(&request)); + return -1; + } + return 0; +} + /** * Connect to a remote host and authenticate the client. */ @@ -309,13 +358,13 @@ applier_join(struct applier *applier) /* * Receive initial data. */ - assert(applier->join_stream != NULL); uint64_t row_count = 0; while (true) { coio_read_xrow(coio, ibuf, &row); applier->last_row_time = ev_monotonic_now(loop()); if (iproto_type_is_dml(row.type)) { - xstream_write_xc(applier->join_stream, &row); + if (apply_initial_join_row(&row) != 0) + diag_raise(); if (++row_count % 100000 == 0) say_info("%.1fM rows received", row_count / 1e6); } else if (row.type == IPROTO_OK) { @@ -357,7 +406,8 @@ applier_join(struct applier *applier) applier->last_row_time = ev_monotonic_now(loop()); if (iproto_type_is_dml(row.type)) { vclock_follow_xrow(&replicaset.vclock, &row); - xstream_write_xc(applier->subscribe_stream, &row); + if (apply_row(&row) != 0) + diag_raise(); if (++row_count % 100000 == 0) say_info("%.1fM rows received", row_count / 1e6); } else if (row.type == IPROTO_OK) { @@ -385,8 +435,6 @@ applier_join(struct applier *applier) static void applier_subscribe(struct applier *applier) { - assert(applier->subscribe_stream != NULL); - /* Send SUBSCRIBE request */ struct ev_io *coio = &applier->io; struct ibuf *ibuf = &applier->ibuf; @@ -550,7 +598,7 @@ applier_subscribe(struct applier *applier) */ latch_lock(latch); if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) { - int res = xstream_write(applier->subscribe_stream, &row); + int res = apply_row(&row); if (res != 0) { struct error *e = diag_last_error(diag_get()); /* @@ -569,7 +617,7 @@ applier_subscribe(struct applier *applier) nop.bodycnt = 0; nop.replica_id = row.replica_id; nop.lsn = row.lsn; - res = xstream_write(applier->subscribe_stream, &nop); + res = apply_row(&nop); } } if (res != 0) { @@ -731,8 +779,7 @@ applier_stop(struct applier *applier) } struct applier * -applier_new(const char *uri, struct xstream *join_stream, - struct xstream *subscribe_stream) +applier_new(const char *uri) { struct applier *applier = (struct applier *) calloc(1, sizeof(struct applier)); @@ -751,8 +798,6 @@ applier_new(const char *uri, struct xstream *join_stream, assert(rc == 0 && applier->uri.service != NULL); (void) rc; - applier->join_stream = join_stream; - applier->subscribe_stream = subscribe_stream; applier->last_row_time = ev_monotonic_now(loop()); rlist_create(&applier->on_state); fiber_cond_create(&applier->resume_cond); diff --git a/src/box/applier.h b/src/box/applier.h index d942b6fbb..5bff90031 100644 --- a/src/box/applier.h +++ b/src/box/applier.h @@ -45,8 +45,6 @@ #include "xrow.h" -struct xstream; - enum { APPLIER_SOURCE_MAXLEN = 1024 }; /* enough to fit URI with passwords */ #define applier_STATE(_) \ @@ -116,10 +114,6 @@ struct applier { bool is_paused; /** Condition variable signaled to resume the applier. */ struct fiber_cond resume_cond; - /** xstream to process rows during initial JOIN */ - struct xstream *join_stream; - /** xstream to process rows during final JOIN and SUBSCRIBE */ - struct xstream *subscribe_stream; }; /** @@ -152,8 +146,7 @@ applier_stop(struct applier *applier); * @error throws OutOfMemory exception if out of memory. */ struct applier * -applier_new(const char *uri, struct xstream *join_stream, - struct xstream *subscribe_stream); +applier_new(const char *uri); /** * Destroy and delete a applier. diff --git a/src/box/box.cc b/src/box/box.cc index a62595421..190e5eae5 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -122,10 +122,6 @@ static fiber_cond ro_cond; */ static bool is_orphan; -/* Use the shared instance of xstream for all appliers */ -static struct xstream join_stream; -static struct xstream subscribe_stream; - /** * The pool of fibers in the transaction processor thread * working on incoming messages from net, wal and other @@ -203,22 +199,6 @@ box_process_rw(struct request *request, struct space *space, return rc; } -/** - * Process a no-op request. - * - * A no-op request does not affect any space, but it - * promotes vclock and is written to WAL. - */ -static int -process_nop(struct request *request) -{ - assert(request->type == IPROTO_NOP); - struct txn *txn = txn_begin_stmt(NULL); - if (txn == NULL) - return -1; - return txn_commit_stmt(txn, request); -} - void box_set_ro(bool ro) { @@ -305,35 +285,24 @@ recovery_journal_create(struct recovery_journal *journal, struct vclock *v) journal->vclock = v; } -static inline void -apply_row(struct xstream *stream, struct xrow_header *row) +static void +apply_wal_row(struct xstream *stream, struct xrow_header *row) { - (void) stream; struct request request; xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type)); - if (request.type == IPROTO_NOP) { - if (process_nop(&request) != 0) + if (request.type != IPROTO_NOP) { + struct space *space = space_cache_find_xc(request.space_id); + if (box_process_rw(&request, space, NULL) != 0) { + say_error("error applying row: %s", request_str(&request)); diag_raise(); - return; - } - struct space *space = space_cache_find_xc(request.space_id); - if (box_process_rw(&request, space, NULL) != 0) { - say_error("error applying row: %s", request_str(&request)); - diag_raise(); + } } -} - -static void -apply_wal_row(struct xstream *stream, struct xrow_header *row) -{ - apply_row(stream, row); - struct wal_stream *xstream = container_of(stream, struct wal_stream, base); /** - * Yield once in a while, but not too often, - * mostly to allow signal handling to take place. - */ + * Yield once in a while, but not too often, + * mostly to allow signal handling to take place. + */ if (++xstream->rows % xstream->yield == 0) fiber_sleep(0); } @@ -352,17 +321,6 @@ wal_stream_create(struct wal_stream *ctx, size_t wal_max_rows) ctx->yield = (wal_max_rows >> 4) + 1; } -static void -apply_initial_join_row(struct xstream *stream, struct xrow_header *row) -{ - (void) stream; - struct request request; - xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type)); - struct space *space = space_cache_find_xc(request.space_id); - /* no access checks here - applier always works with admin privs */ - space_apply_initial_join_row_xc(space, &request); -} - /* {{{ configuration bindings */ static void @@ -656,9 +614,7 @@ cfg_get_replication(int *p_count) for (int i = 0; i < count; i++) { const char *source = cfg_getarr_elem("replication", i); - struct applier *applier = applier_new(source, - &join_stream, - &subscribe_stream); + struct applier *applier = applier_new(source); if (applier == NULL) { /* Delete created appliers */ while (--i >= 0) @@ -2131,8 +2087,6 @@ box_cfg_xc(void) box_set_replication_sync_lag(); box_set_replication_sync_timeout(); box_set_replication_skip_conflict(); - xstream_create(&join_stream, apply_initial_join_row); - xstream_create(&subscribe_stream, apply_row); struct gc_checkpoint *checkpoint = gc_last_checkpoint(); -- 2.21.0