From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 28C8328A59 for ; Sun, 3 Mar 2019 15:26:25 -0500 (EST) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id py7Pz0H9_4sl for ; Sun, 3 Mar 2019 15:26:25 -0500 (EST) Received: from smtp10.mail.ru (smtp10.mail.ru [94.100.181.92]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 6B62B289BD for ; Sun, 3 Mar 2019 15:26:24 -0500 (EST) From: Georgy Kirichenko Subject: [tarantool-patches] [PATCH 1/3] Applier gets rid of a xstream Date: Sun, 3 Mar 2019 23:26:16 +0300 Message-Id: In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-Help: List-Unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-Subscribe: List-Owner: List-post: List-Archive: To: tarantool-patches@freelists.org Cc: Georgy Kirichenko Remove xstream dependency and use direct box interface to apply all replication rows. This is refactoring before transactional replication. Needed for: #2798 --- src/box/applier.cc | 20 ++++++++------------ src/box/applier.h | 9 +-------- src/box/box.cc | 32 ++++++++++++-------------------- src/box/box.h | 18 ++++++++++++++++++ 4 files changed, 39 insertions(+), 40 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index e9addcb3e..fd98b733d 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -37,7 +37,6 @@ #include "fiber_cond.h" #include "coio.h" #include "coio_buf.h" -#include "xstream.h" #include "wal.h" #include "xrow.h" #include "replication.h" @@ -48,6 +47,7 @@ #include "error.h" #include "session.h" #include "cfg.h" +#include "box.h" STRS(applier_state, applier_STATE); @@ -309,13 +309,13 @@ applier_join(struct applier *applier) /* * Receive initial data. */ - assert(applier->join_stream != NULL); uint64_t row_count = 0; while (true) { coio_read_xrow(coio, ibuf, &row); applier->last_row_time = ev_monotonic_now(loop()); if (iproto_type_is_dml(row.type)) { - xstream_write_xc(applier->join_stream, &row); + if (apply_initial_join_row(&row) != 0) + diag_raise(); if (++row_count % 100000 == 0) say_info("%.1fM rows received", row_count / 1e6); } else if (row.type == IPROTO_OK) { @@ -357,7 +357,8 @@ applier_join(struct applier *applier) applier->last_row_time = ev_monotonic_now(loop()); if (iproto_type_is_dml(row.type)) { vclock_follow_xrow(&replicaset.vclock, &row); - xstream_write_xc(applier->subscribe_stream, &row); + if (apply_row(&row) != 0) + diag_raise(); if (++row_count % 100000 == 0) say_info("%.1fM rows received", row_count / 1e6); } else if (row.type == IPROTO_OK) { @@ -385,8 +386,6 @@ applier_join(struct applier *applier) static void applier_subscribe(struct applier *applier) { - assert(applier->subscribe_stream != NULL); - /* Send SUBSCRIBE request */ struct ev_io *coio = &applier->io; struct ibuf *ibuf = &applier->ibuf; @@ -550,7 +549,7 @@ applier_subscribe(struct applier *applier) */ latch_lock(latch); if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) { - int res = xstream_write(applier->subscribe_stream, &row); + int res = apply_row(&row); if (res != 0) { struct error *e = diag_last_error(diag_get()); /* @@ -569,7 +568,7 @@ applier_subscribe(struct applier *applier) nop.bodycnt = 0; nop.replica_id = row.replica_id; nop.lsn = row.lsn; - res = xstream_write(applier->subscribe_stream, &nop); + res = apply_row(&nop); } } if (res != 0) { @@ -731,8 +730,7 @@ applier_stop(struct applier *applier) } struct applier * -applier_new(const char *uri, struct xstream *join_stream, - struct xstream *subscribe_stream) +applier_new(const char *uri) { struct applier *applier = (struct applier *) calloc(1, sizeof(struct applier)); @@ -751,8 +749,6 @@ applier_new(const char *uri, struct xstream *join_stream, assert(rc == 0 && applier->uri.service != NULL); (void) rc; - applier->join_stream = join_stream; - applier->subscribe_stream = subscribe_stream; applier->last_row_time = ev_monotonic_now(loop()); rlist_create(&applier->on_state); fiber_cond_create(&applier->resume_cond); diff --git a/src/box/applier.h b/src/box/applier.h index d942b6fbb..5bff90031 100644 --- a/src/box/applier.h +++ b/src/box/applier.h @@ -45,8 +45,6 @@ #include "xrow.h" -struct xstream; - enum { APPLIER_SOURCE_MAXLEN = 1024 }; /* enough to fit URI with passwords */ #define applier_STATE(_) \ @@ -116,10 +114,6 @@ struct applier { bool is_paused; /** Condition variable signaled to resume the applier. */ struct fiber_cond resume_cond; - /** xstream to process rows during initial JOIN */ - struct xstream *join_stream; - /** xstream to process rows during final JOIN and SUBSCRIBE */ - struct xstream *subscribe_stream; }; /** @@ -152,8 +146,7 @@ applier_stop(struct applier *applier); * @error throws OutOfMemory exception if out of memory. */ struct applier * -applier_new(const char *uri, struct xstream *join_stream, - struct xstream *subscribe_stream); +applier_new(const char *uri); /** * Destroy and delete a applier. diff --git a/src/box/box.cc b/src/box/box.cc index a62595421..22cd52b04 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -122,10 +122,6 @@ static fiber_cond ro_cond; */ static bool is_orphan; -/* Use the shared instance of xstream for all appliers */ -static struct xstream join_stream; -static struct xstream subscribe_stream; - /** * The pool of fibers in the transaction processor thread * working on incoming messages from net, wal and other @@ -305,28 +301,29 @@ recovery_journal_create(struct recovery_journal *journal, struct vclock *v) journal->vclock = v; } -static inline void -apply_row(struct xstream *stream, struct xrow_header *row) +int +apply_row(struct xrow_header *row) { - (void) stream; struct request request; xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type)); if (request.type == IPROTO_NOP) { if (process_nop(&request) != 0) - diag_raise(); - return; + return -1; + return 0; } struct space *space = space_cache_find_xc(request.space_id); if (box_process_rw(&request, space, NULL) != 0) { say_error("error applying row: %s", request_str(&request)); - diag_raise(); + return -1; } + return 0; } static void apply_wal_row(struct xstream *stream, struct xrow_header *row) { - apply_row(stream, row); + if (apply_row(row) != 0) + diag_raise(); struct wal_stream *xstream = container_of(stream, struct wal_stream, base); @@ -352,15 +349,14 @@ wal_stream_create(struct wal_stream *ctx, size_t wal_max_rows) ctx->yield = (wal_max_rows >> 4) + 1; } -static void -apply_initial_join_row(struct xstream *stream, struct xrow_header *row) +int +apply_initial_join_row(struct xrow_header *row) { - (void) stream; struct request request; xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type)); struct space *space = space_cache_find_xc(request.space_id); /* no access checks here - applier always works with admin privs */ - space_apply_initial_join_row_xc(space, &request); + return space_apply_initial_join_row(space, &request); } /* {{{ configuration bindings */ @@ -656,9 +652,7 @@ cfg_get_replication(int *p_count) for (int i = 0; i < count; i++) { const char *source = cfg_getarr_elem("replication", i); - struct applier *applier = applier_new(source, - &join_stream, - &subscribe_stream); + struct applier *applier = applier_new(source); if (applier == NULL) { /* Delete created appliers */ while (--i >= 0) @@ -2131,8 +2125,6 @@ box_cfg_xc(void) box_set_replication_sync_lag(); box_set_replication_sync_timeout(); box_set_replication_skip_conflict(); - xstream_create(&join_stream, apply_initial_join_row); - xstream_create(&subscribe_stream, apply_row); struct gc_checkpoint *checkpoint = gc_last_checkpoint(); diff --git a/src/box/box.h b/src/box/box.h index 53d88ab71..8d76b723d 100644 --- a/src/box/box.h +++ b/src/box/box.h @@ -218,6 +218,24 @@ extern "C" { typedef struct tuple box_tuple_t; +/* + * Apply row while bootstraping. + * Return codes: + * 0 for Ok + * -1 in case of an error. + */ +int +apply_initial_join_row(struct xrow_header *row); + +/* + * Apply row after bootstrap is done (e.g. final join, subscribe. + * Return codes: + * 0 for Ok + * -1 in case of an error. + */ +int +apply_row(struct xrow_header *row); + /* box_select is private and used only by FFI */ API_EXPORT int box_select(uint32_t space_id, uint32_t index_id, -- 2.21.0