From: Georgy Kirichenko <georgy@tarantool.org> To: tarantool-patches@freelists.org Cc: Georgy Kirichenko <georgy@tarantool.org> Subject: [tarantool-patches] [PATCH 1/3] Applier gets rid of a xstream Date: Sun, 3 Mar 2019 23:26:16 +0300 [thread overview] Message-ID: <a3c98f644efeeda6f829ad68cc0a839d9dfa7306.1551644303.git.georgy@tarantool.org> (raw) In-Reply-To: <cover.1551644303.git.georgy@tarantool.org> Remove xstream dependency and use direct box interface to apply all replication rows. This is refactoring before transactional replication. Needed for: #2798 --- src/box/applier.cc | 20 ++++++++------------ src/box/applier.h | 9 +-------- src/box/box.cc | 32 ++++++++++++-------------------- src/box/box.h | 18 ++++++++++++++++++ 4 files changed, 39 insertions(+), 40 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index e9addcb3e..fd98b733d 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -37,7 +37,6 @@ #include "fiber_cond.h" #include "coio.h" #include "coio_buf.h" -#include "xstream.h" #include "wal.h" #include "xrow.h" #include "replication.h" @@ -48,6 +47,7 @@ #include "error.h" #include "session.h" #include "cfg.h" +#include "box.h" STRS(applier_state, applier_STATE); @@ -309,13 +309,13 @@ applier_join(struct applier *applier) /* * Receive initial data. */ - assert(applier->join_stream != NULL); uint64_t row_count = 0; while (true) { coio_read_xrow(coio, ibuf, &row); applier->last_row_time = ev_monotonic_now(loop()); if (iproto_type_is_dml(row.type)) { - xstream_write_xc(applier->join_stream, &row); + if (apply_initial_join_row(&row) != 0) + diag_raise(); if (++row_count % 100000 == 0) say_info("%.1fM rows received", row_count / 1e6); } else if (row.type == IPROTO_OK) { @@ -357,7 +357,8 @@ applier_join(struct applier *applier) applier->last_row_time = ev_monotonic_now(loop()); if (iproto_type_is_dml(row.type)) { vclock_follow_xrow(&replicaset.vclock, &row); - xstream_write_xc(applier->subscribe_stream, &row); + if (apply_row(&row) != 0) + diag_raise(); if (++row_count % 100000 == 0) say_info("%.1fM rows received", row_count / 1e6); } else if (row.type == IPROTO_OK) { @@ -385,8 +386,6 @@ applier_join(struct applier *applier) static void applier_subscribe(struct applier *applier) { - assert(applier->subscribe_stream != NULL); - /* Send SUBSCRIBE request */ struct ev_io *coio = &applier->io; struct ibuf *ibuf = &applier->ibuf; @@ -550,7 +549,7 @@ applier_subscribe(struct applier *applier) */ latch_lock(latch); if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) { - int res = xstream_write(applier->subscribe_stream, &row); + int res = apply_row(&row); if (res != 0) { struct error *e = diag_last_error(diag_get()); /* @@ -569,7 +568,7 @@ applier_subscribe(struct applier *applier) nop.bodycnt = 0; nop.replica_id = row.replica_id; nop.lsn = row.lsn; - res = xstream_write(applier->subscribe_stream, &nop); + res = apply_row(&nop); } } if (res != 0) { @@ -731,8 +730,7 @@ applier_stop(struct applier *applier) } struct applier * -applier_new(const char *uri, struct xstream *join_stream, - struct xstream *subscribe_stream) +applier_new(const char *uri) { struct applier *applier = (struct applier *) calloc(1, sizeof(struct applier)); @@ -751,8 +749,6 @@ applier_new(const char *uri, struct xstream *join_stream, assert(rc == 0 && applier->uri.service != NULL); (void) rc; - applier->join_stream = join_stream; - applier->subscribe_stream = subscribe_stream; applier->last_row_time = ev_monotonic_now(loop()); rlist_create(&applier->on_state); fiber_cond_create(&applier->resume_cond); diff --git a/src/box/applier.h b/src/box/applier.h index d942b6fbb..5bff90031 100644 --- a/src/box/applier.h +++ b/src/box/applier.h @@ -45,8 +45,6 @@ #include "xrow.h" -struct xstream; - enum { APPLIER_SOURCE_MAXLEN = 1024 }; /* enough to fit URI with passwords */ #define applier_STATE(_) \ @@ -116,10 +114,6 @@ struct applier { bool is_paused; /** Condition variable signaled to resume the applier. */ struct fiber_cond resume_cond; - /** xstream to process rows during initial JOIN */ - struct xstream *join_stream; - /** xstream to process rows during final JOIN and SUBSCRIBE */ - struct xstream *subscribe_stream; }; /** @@ -152,8 +146,7 @@ applier_stop(struct applier *applier); * @error throws OutOfMemory exception if out of memory. */ struct applier * -applier_new(const char *uri, struct xstream *join_stream, - struct xstream *subscribe_stream); +applier_new(const char *uri); /** * Destroy and delete a applier. diff --git a/src/box/box.cc b/src/box/box.cc index a62595421..22cd52b04 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -122,10 +122,6 @@ static fiber_cond ro_cond; */ static bool is_orphan; -/* Use the shared instance of xstream for all appliers */ -static struct xstream join_stream; -static struct xstream subscribe_stream; - /** * The pool of fibers in the transaction processor thread * working on incoming messages from net, wal and other @@ -305,28 +301,29 @@ recovery_journal_create(struct recovery_journal *journal, struct vclock *v) journal->vclock = v; } -static inline void -apply_row(struct xstream *stream, struct xrow_header *row) +int +apply_row(struct xrow_header *row) { - (void) stream; struct request request; xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type)); if (request.type == IPROTO_NOP) { if (process_nop(&request) != 0) - diag_raise(); - return; + return -1; + return 0; } struct space *space = space_cache_find_xc(request.space_id); if (box_process_rw(&request, space, NULL) != 0) { say_error("error applying row: %s", request_str(&request)); - diag_raise(); + return -1; } + return 0; } static void apply_wal_row(struct xstream *stream, struct xrow_header *row) { - apply_row(stream, row); + if (apply_row(row) != 0) + diag_raise(); struct wal_stream *xstream = container_of(stream, struct wal_stream, base); @@ -352,15 +349,14 @@ wal_stream_create(struct wal_stream *ctx, size_t wal_max_rows) ctx->yield = (wal_max_rows >> 4) + 1; } -static void -apply_initial_join_row(struct xstream *stream, struct xrow_header *row) +int +apply_initial_join_row(struct xrow_header *row) { - (void) stream; struct request request; xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type)); struct space *space = space_cache_find_xc(request.space_id); /* no access checks here - applier always works with admin privs */ - space_apply_initial_join_row_xc(space, &request); + return space_apply_initial_join_row(space, &request); } /* {{{ configuration bindings */ @@ -656,9 +652,7 @@ cfg_get_replication(int *p_count) for (int i = 0; i < count; i++) { const char *source = cfg_getarr_elem("replication", i); - struct applier *applier = applier_new(source, - &join_stream, - &subscribe_stream); + struct applier *applier = applier_new(source); if (applier == NULL) { /* Delete created appliers */ while (--i >= 0) @@ -2131,8 +2125,6 @@ box_cfg_xc(void) box_set_replication_sync_lag(); box_set_replication_sync_timeout(); box_set_replication_skip_conflict(); - xstream_create(&join_stream, apply_initial_join_row); - xstream_create(&subscribe_stream, apply_row); struct gc_checkpoint *checkpoint = gc_last_checkpoint(); diff --git a/src/box/box.h b/src/box/box.h index 53d88ab71..8d76b723d 100644 --- a/src/box/box.h +++ b/src/box/box.h @@ -218,6 +218,24 @@ extern "C" { typedef struct tuple box_tuple_t; +/* + * Apply row while bootstraping. + * Return codes: + * 0 for Ok + * -1 in case of an error. + */ +int +apply_initial_join_row(struct xrow_header *row); + +/* + * Apply row after bootstrap is done (e.g. final join, subscribe. + * Return codes: + * 0 for Ok + * -1 in case of an error. + */ +int +apply_row(struct xrow_header *row); + /* box_select is private and used only by FFI */ API_EXPORT int box_select(uint32_t space_id, uint32_t index_id, -- 2.21.0
next prev parent reply other threads:[~2019-03-03 20:26 UTC|newest] Thread overview: 15+ messages / expand[flat|nested] mbox.gz Atom feed top 2019-03-03 20:26 [tarantool-patches] [PATCH 0/3] Transaction boundaries for applier Georgy Kirichenko 2019-03-03 20:26 ` Georgy Kirichenko [this message] 2019-03-05 8:52 ` [tarantool-patches] Re: [PATCH 1/3] Applier gets rid of a xstream Konstantin Osipov 2019-03-05 10:19 ` [tarantool-patches] " Vladimir Davydov 2019-03-03 20:26 ` [tarantool-patches] [PATCH 2/3] Merge apply row and apply_initial_join_row Georgy Kirichenko 2019-03-05 9:06 ` [tarantool-patches] " Konstantin Osipov 2019-03-05 10:26 ` [tarantool-patches] " Vladimir Davydov 2019-03-03 20:26 ` [tarantool-patches] [PATCH 3/3] Transaction support for applier Georgy Kirichenko 2019-03-05 9:11 ` [tarantool-patches] " Konstantin Osipov 2019-03-05 10:28 ` Vladimir Davydov [not found] ` <20190305112333.GA30697@chai> 2019-03-05 11:27 ` Vladimir Davydov 2019-03-05 9:13 ` Konstantin Osipov 2019-03-05 9:25 ` Konstantin Osipov 2019-03-05 9:28 ` Konstantin Osipov 2019-03-05 11:59 ` [tarantool-patches] " Vladimir Davydov
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=a3c98f644efeeda6f829ad68cc0a839d9dfa7306.1551644303.git.georgy@tarantool.org \ --to=georgy@tarantool.org \ --cc=tarantool-patches@freelists.org \ --subject='Re: [tarantool-patches] [PATCH 1/3] Applier gets rid of a xstream' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox