From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 20FCF28A2F for ; Sun, 3 Mar 2019 15:26:25 -0500 (EST) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id q6Ace72dyVLz for ; Sun, 3 Mar 2019 15:26:24 -0500 (EST) Received: from smtp10.mail.ru (smtp10.mail.ru [94.100.181.92]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 6A4EB2895C for ; Sun, 3 Mar 2019 15:26:24 -0500 (EST) From: Georgy Kirichenko Subject: [tarantool-patches] [PATCH 2/3] Merge apply row and apply_initial_join_row Date: Sun, 3 Mar 2019 23:26:17 +0300 Message-Id: <1c0402648b0ce2f9a74bdd9cf4b95fb26e0fd297.1551644303.git.georgy@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-Help: List-Unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-Subscribe: List-Owner: List-post: List-Archive: To: tarantool-patches@freelists.org Cc: Georgy Kirichenko Use apply_row for bot initial join and subscribe. Refactoring: add memtx and vinyl engine static variable, get rid of engine_by_name at box.cc Needed for: 2798 --- src/box/applier.cc | 2 +- src/box/box.cc | 51 ++++++++++++++++------------------------------ src/box/box.h | 9 -------- 3 files changed, 19 insertions(+), 43 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index fd98b733d..3222b041d 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -314,7 +314,7 @@ applier_join(struct applier *applier) coio_read_xrow(coio, ibuf, &row); applier->last_row_time = ev_monotonic_now(loop()); if (iproto_type_is_dml(row.type)) { - if (apply_initial_join_row(&row) != 0) + if (apply_row(&row) != 0) diag_raise(); if (++row_count % 100000 == 0) say_info("%.1fM rows received", row_count / 1e6); diff --git a/src/box/box.cc b/src/box/box.cc index 22cd52b04..e62f2ea12 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -137,6 +137,15 @@ static struct fiber_pool tx_fiber_pool; */ static struct cbus_endpoint tx_prio_endpoint; +/** + * Memtx engine instance + */ +static struct memtx_engine *memtx = NULL; +/** + * Vinyl engine instance + */ +static struct vinyl_engine *vinyl = NULL; + static int box_check_writable(void) { @@ -306,6 +315,15 @@ apply_row(struct xrow_header *row) { struct request request; xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type)); + + assert(memtx != NULL); + if (memtx->state == MEMTX_INITIAL_RECOVERY) { + /* Snapshot recovery or initial join */ + struct space *space = space_cache_find_xc(request.space_id); + /* no access checks here - applier always works with admin privs */ + return space_apply_initial_join_row(space, &request); + } + if (request.type == IPROTO_NOP) { if (process_nop(&request) != 0) return -1; @@ -349,16 +367,6 @@ wal_stream_create(struct wal_stream *ctx, size_t wal_max_rows) ctx->yield = (wal_max_rows >> 4) + 1; } -int -apply_initial_join_row(struct xrow_header *row) -{ - struct request request; - xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type)); - struct space *space = space_cache_find_xc(request.space_id); - /* no access checks here - applier always works with admin privs */ - return space_apply_initial_join_row(space, &request); -} - /* {{{ configuration bindings */ static void @@ -784,13 +792,9 @@ box_set_io_collect_interval(void) void box_set_snap_io_rate_limit(void) { - struct memtx_engine *memtx; - memtx = (struct memtx_engine *)engine_by_name("memtx"); assert(memtx != NULL); memtx_engine_set_snap_io_rate_limit(memtx, cfg_getd("snap_io_rate_limit")); - struct vinyl_engine *vinyl; - vinyl = (struct vinyl_engine *)engine_by_name("vinyl"); assert(vinyl != NULL); vinyl_engine_set_snap_io_rate_limit(vinyl, cfg_getd("snap_io_rate_limit")); @@ -799,8 +803,6 @@ box_set_snap_io_rate_limit(void) void box_set_memtx_memory(void) { - struct memtx_engine *memtx; - memtx = (struct memtx_engine *)engine_by_name("memtx"); assert(memtx != NULL); memtx_engine_set_memory_xc(memtx, box_check_memtx_memory(cfg_geti64("memtx_memory"))); @@ -809,8 +811,6 @@ box_set_memtx_memory(void) void box_set_memtx_max_tuple_size(void) { - struct memtx_engine *memtx; - memtx = (struct memtx_engine *)engine_by_name("memtx"); assert(memtx != NULL); memtx_engine_set_max_tuple_size(memtx, cfg_geti("memtx_max_tuple_size")); @@ -821,8 +821,6 @@ box_set_too_long_threshold(void) { too_long_threshold = cfg_getd("too_long_threshold"); - struct vinyl_engine *vinyl; - vinyl = (struct vinyl_engine *)engine_by_name("vinyl"); assert(vinyl != NULL); vinyl_engine_set_too_long_threshold(vinyl, too_long_threshold); } @@ -860,8 +858,6 @@ box_set_checkpoint_wal_threshold(void) void box_set_vinyl_memory(void) { - struct vinyl_engine *vinyl; - vinyl = (struct vinyl_engine *)engine_by_name("vinyl"); assert(vinyl != NULL); vinyl_engine_set_memory_xc(vinyl, box_check_vinyl_memory(cfg_geti64("vinyl_memory"))); @@ -870,8 +866,6 @@ box_set_vinyl_memory(void) void box_set_vinyl_max_tuple_size(void) { - struct vinyl_engine *vinyl; - vinyl = (struct vinyl_engine *)engine_by_name("vinyl"); assert(vinyl != NULL); vinyl_engine_set_max_tuple_size(vinyl, cfg_geti("vinyl_max_tuple_size")); @@ -880,8 +874,6 @@ box_set_vinyl_max_tuple_size(void) void box_set_vinyl_cache(void) { - struct vinyl_engine *vinyl; - vinyl = (struct vinyl_engine *)engine_by_name("vinyl"); assert(vinyl != NULL); vinyl_engine_set_cache(vinyl, cfg_geti64("vinyl_cache")); } @@ -889,8 +881,6 @@ box_set_vinyl_cache(void) void box_set_vinyl_timeout(void) { - struct vinyl_engine *vinyl; - vinyl = (struct vinyl_engine *)engine_by_name("vinyl"); assert(vinyl != NULL); vinyl_engine_set_timeout(vinyl, cfg_getd("vinyl_timeout")); } @@ -1703,7 +1693,6 @@ engine_init() * in checkpoints (in enigne_foreach order), * so it must be registered first. */ - struct memtx_engine *memtx; memtx = memtx_engine_new_xc(cfg_gets("memtx_dir"), cfg_geti("force_recovery"), cfg_getd("memtx_memory"), @@ -1718,7 +1707,6 @@ engine_init() struct engine *blackhole = blackhole_engine_new_xc(); engine_register(blackhole); - struct vinyl_engine *vinyl; vinyl = vinyl_engine_new_xc(cfg_gets("vinyl_dir"), cfg_geti64("vinyl_memory"), cfg_geti("vinyl_read_threads"), @@ -1963,9 +1951,6 @@ local_recovery(const struct tt_uuid *instance_uuid, */ engine_begin_initial_recovery_xc(&recovery->vclock); - struct memtx_engine *memtx; - memtx = (struct memtx_engine *)engine_by_name("memtx"); - assert(memtx != NULL); struct recovery_journal journal; recovery_journal_create(&journal, &recovery->vclock); diff --git a/src/box/box.h b/src/box/box.h index 8d76b723d..4f9b27264 100644 --- a/src/box/box.h +++ b/src/box/box.h @@ -218,15 +218,6 @@ extern "C" { typedef struct tuple box_tuple_t; -/* - * Apply row while bootstraping. - * Return codes: - * 0 for Ok - * -1 in case of an error. - */ -int -apply_initial_join_row(struct xrow_header *row); - /* * Apply row after bootstrap is done (e.g. final join, subscribe. * Return codes: -- 2.21.0