Tarantool development patches archive
 help / color / mirror / Atom feed
From: Georgy Kirichenko <georgy@tarantool.org>
To: tarantool-patches@freelists.org
Cc: Georgy Kirichenko <georgy@tarantool.org>
Subject: [tarantool-patches] [PATCH v2 1/3] Applier gets rid of a xstream
Date: Wed,  6 Mar 2019 23:16:15 +0300	[thread overview]
Message-ID: <166b45adc75c0753d36ea473d57ac452548fbbec.1551902962.git.georgy@tarantool.org> (raw)
In-Reply-To: <cover.1551902962.git.georgy@tarantool.org>

Remove xstream dependency and use direct box interface to apply all
replication rows. This is refactoring before transactional replication.

Needed for: #2798
---
 src/box/applier.cc | 69 ++++++++++++++++++++++++++++++++++++++--------
 src/box/applier.h  |  9 +-----
 src/box/box.cc     | 68 ++++++++-------------------------------------
 3 files changed, 69 insertions(+), 77 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index e9addcb3e..a687d2bea 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -37,7 +37,6 @@
 #include "fiber_cond.h"
 #include "coio.h"
 #include "coio_buf.h"
-#include "xstream.h"
 #include "wal.h"
 #include "xrow.h"
 #include "replication.h"
@@ -48,6 +47,9 @@
 #include "error.h"
 #include "session.h"
 #include "cfg.h"
+#include "schema.h"
+#include "txn.h"
+#include "box.h"
 
 STRS(applier_state, applier_STATE);
 
@@ -167,6 +169,53 @@ applier_writer_f(va_list ap)
 	return 0;
 }
 
+static int
+apply_initial_join_row(struct xrow_header *row)
+{
+	struct request request;
+	xrow_decode_dml(row, &request, dml_request_key_map(row->type));
+	struct space *space = space_cache_find_xc(request.space_id);
+	/* no access checks here - applier always works with admin privs */
+	return space_apply_initial_join_row(space, &request);
+}
+
+/**
+ * Process a no-op request.
+ *
+ * A no-op request does not affect any space, but it
+ * promotes vclock and is written to WAL.
+ */
+static int
+process_nop(struct request *request)
+{
+	assert(request->type == IPROTO_NOP);
+	struct txn *txn = txn_begin_stmt(NULL);
+	if (txn == NULL)
+		return -1;
+	return txn_commit_stmt(txn, request);
+}
+
+static int
+apply_row(struct xrow_header *row)
+{
+	struct request request;
+	if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0)
+		return -1;
+	if (request.type == IPROTO_NOP) {
+		if (process_nop(&request) != 0)
+			return -1;
+		return 0;
+	}
+	struct space *space = space_cache_find(request.space_id);
+	if (space == NULL)
+		return -1;
+	if (box_process_rw(&request, space, NULL) != 0) {
+		say_error("error applying row: %s", request_str(&request));
+		return -1;
+	}
+	return 0;
+}
+
 /**
  * Connect to a remote host and authenticate the client.
  */
@@ -309,13 +358,13 @@ applier_join(struct applier *applier)
 	/*
 	 * Receive initial data.
 	 */
-	assert(applier->join_stream != NULL);
 	uint64_t row_count = 0;
 	while (true) {
 		coio_read_xrow(coio, ibuf, &row);
 		applier->last_row_time = ev_monotonic_now(loop());
 		if (iproto_type_is_dml(row.type)) {
-			xstream_write_xc(applier->join_stream, &row);
+			if (apply_initial_join_row(&row) != 0)
+				diag_raise();
 			if (++row_count % 100000 == 0)
 				say_info("%.1fM rows received", row_count / 1e6);
 		} else if (row.type == IPROTO_OK) {
@@ -357,7 +406,8 @@ applier_join(struct applier *applier)
 		applier->last_row_time = ev_monotonic_now(loop());
 		if (iproto_type_is_dml(row.type)) {
 			vclock_follow_xrow(&replicaset.vclock, &row);
-			xstream_write_xc(applier->subscribe_stream, &row);
+			if (apply_row(&row) != 0)
+				diag_raise();
 			if (++row_count % 100000 == 0)
 				say_info("%.1fM rows received", row_count / 1e6);
 		} else if (row.type == IPROTO_OK) {
@@ -385,8 +435,6 @@ applier_join(struct applier *applier)
 static void
 applier_subscribe(struct applier *applier)
 {
-	assert(applier->subscribe_stream != NULL);
-
 	/* Send SUBSCRIBE request */
 	struct ev_io *coio = &applier->io;
 	struct ibuf *ibuf = &applier->ibuf;
@@ -550,7 +598,7 @@ applier_subscribe(struct applier *applier)
 		 */
 		latch_lock(latch);
 		if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
-			int res = xstream_write(applier->subscribe_stream, &row);
+			int res = apply_row(&row);
 			if (res != 0) {
 				struct error *e = diag_last_error(diag_get());
 				/*
@@ -569,7 +617,7 @@ applier_subscribe(struct applier *applier)
 					nop.bodycnt = 0;
 					nop.replica_id = row.replica_id;
 					nop.lsn = row.lsn;
-					res = xstream_write(applier->subscribe_stream, &nop);
+					res = apply_row(&nop);
 				}
 			}
 			if (res != 0) {
@@ -731,8 +779,7 @@ applier_stop(struct applier *applier)
 }
 
 struct applier *
-applier_new(const char *uri, struct xstream *join_stream,
-	    struct xstream *subscribe_stream)
+applier_new(const char *uri)
 {
 	struct applier *applier = (struct applier *)
 		calloc(1, sizeof(struct applier));
@@ -751,8 +798,6 @@ applier_new(const char *uri, struct xstream *join_stream,
 	assert(rc == 0 && applier->uri.service != NULL);
 	(void) rc;
 
-	applier->join_stream = join_stream;
-	applier->subscribe_stream = subscribe_stream;
 	applier->last_row_time = ev_monotonic_now(loop());
 	rlist_create(&applier->on_state);
 	fiber_cond_create(&applier->resume_cond);
diff --git a/src/box/applier.h b/src/box/applier.h
index d942b6fbb..5bff90031 100644
--- a/src/box/applier.h
+++ b/src/box/applier.h
@@ -45,8 +45,6 @@
 
 #include "xrow.h"
 
-struct xstream;
-
 enum { APPLIER_SOURCE_MAXLEN = 1024 }; /* enough to fit URI with passwords */
 
 #define applier_STATE(_)                                             \
@@ -116,10 +114,6 @@ struct applier {
 	bool is_paused;
 	/** Condition variable signaled to resume the applier. */
 	struct fiber_cond resume_cond;
-	/** xstream to process rows during initial JOIN */
-	struct xstream *join_stream;
-	/** xstream to process rows during final JOIN and SUBSCRIBE */
-	struct xstream *subscribe_stream;
 };
 
 /**
@@ -152,8 +146,7 @@ applier_stop(struct applier *applier);
  * @error   throws OutOfMemory exception if out of memory.
  */
 struct applier *
-applier_new(const char *uri, struct xstream *join_stream,
-	    struct xstream *subscribe_stream);
+applier_new(const char *uri);
 
 /**
  * Destroy and delete a applier.
diff --git a/src/box/box.cc b/src/box/box.cc
index a62595421..190e5eae5 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -122,10 +122,6 @@ static fiber_cond ro_cond;
  */
 static bool is_orphan;
 
-/* Use the shared instance of xstream for all appliers */
-static struct xstream join_stream;
-static struct xstream subscribe_stream;
-
 /**
  * The pool of fibers in the transaction processor thread
  * working on incoming messages from net, wal and other
@@ -203,22 +199,6 @@ box_process_rw(struct request *request, struct space *space,
 	return rc;
 }
 
-/**
- * Process a no-op request.
- *
- * A no-op request does not affect any space, but it
- * promotes vclock and is written to WAL.
- */
-static int
-process_nop(struct request *request)
-{
-	assert(request->type == IPROTO_NOP);
-	struct txn *txn = txn_begin_stmt(NULL);
-	if (txn == NULL)
-		return -1;
-	return txn_commit_stmt(txn, request);
-}
-
 void
 box_set_ro(bool ro)
 {
@@ -305,35 +285,24 @@ recovery_journal_create(struct recovery_journal *journal, struct vclock *v)
 	journal->vclock = v;
 }
 
-static inline void
-apply_row(struct xstream *stream, struct xrow_header *row)
+static void
+apply_wal_row(struct xstream *stream, struct xrow_header *row)
 {
-	(void) stream;
 	struct request request;
 	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
-	if (request.type == IPROTO_NOP) {
-		if (process_nop(&request) != 0)
+	if (request.type != IPROTO_NOP) {
+		struct space *space = space_cache_find_xc(request.space_id);
+		if (box_process_rw(&request, space, NULL) != 0) {
+			say_error("error applying row: %s", request_str(&request));
 			diag_raise();
-		return;
-	}
-	struct space *space = space_cache_find_xc(request.space_id);
-	if (box_process_rw(&request, space, NULL) != 0) {
-		say_error("error applying row: %s", request_str(&request));
-		diag_raise();
+		}
 	}
-}
-
-static void
-apply_wal_row(struct xstream *stream, struct xrow_header *row)
-{
-	apply_row(stream, row);
-
 	struct wal_stream *xstream =
 		container_of(stream, struct wal_stream, base);
 	/**
-	 * Yield once in a while, but not too often,
-	 * mostly to allow signal handling to take place.
-	 */
+	* Yield once in a while, but not too often,
+	* mostly to allow signal handling to take place.
+	*/
 	if (++xstream->rows % xstream->yield == 0)
 		fiber_sleep(0);
 }
@@ -352,17 +321,6 @@ wal_stream_create(struct wal_stream *ctx, size_t wal_max_rows)
 	ctx->yield = (wal_max_rows >> 4)  + 1;
 }
 
-static void
-apply_initial_join_row(struct xstream *stream, struct xrow_header *row)
-{
-	(void) stream;
-	struct request request;
-	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
-	struct space *space = space_cache_find_xc(request.space_id);
-	/* no access checks here - applier always works with admin privs */
-	space_apply_initial_join_row_xc(space, &request);
-}
-
 /* {{{ configuration bindings */
 
 static void
@@ -656,9 +614,7 @@ cfg_get_replication(int *p_count)
 
 	for (int i = 0; i < count; i++) {
 		const char *source = cfg_getarr_elem("replication", i);
-		struct applier *applier = applier_new(source,
-						      &join_stream,
-						      &subscribe_stream);
+		struct applier *applier = applier_new(source);
 		if (applier == NULL) {
 			/* Delete created appliers */
 			while (--i >= 0)
@@ -2131,8 +2087,6 @@ box_cfg_xc(void)
 	box_set_replication_sync_lag();
 	box_set_replication_sync_timeout();
 	box_set_replication_skip_conflict();
-	xstream_create(&join_stream, apply_initial_join_row);
-	xstream_create(&subscribe_stream, apply_row);
 
 	struct gc_checkpoint *checkpoint = gc_last_checkpoint();
 
-- 
2.21.0

  reply	other threads:[~2019-03-06 20:16 UTC|newest]

Thread overview: 11+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2019-03-06 20:16 [tarantool-patches] [PATCH v2 0/3] Transaction boundaries for applier Georgy Kirichenko
2019-03-06 20:16 ` Georgy Kirichenko [this message]
2019-03-07  9:31   ` [tarantool-patches] [PATCH v2 1/3] Applier gets rid of a xstream Vladimir Davydov
2019-03-06 20:16 ` [tarantool-patches] [PATCH v2 2/3] Put all new rows to the end of journal request Georgy Kirichenko
2019-03-07  9:46   ` Vladimir Davydov
2019-03-07 10:38   ` [tarantool-patches] " Konstantin Osipov
2019-03-07 10:53     ` Vladimir Davydov
2019-03-07 11:22       ` Konstantin Osipov
2019-03-06 20:16 ` [tarantool-patches] [PATCH v2 3/3] Transaction support for applier Georgy Kirichenko
2019-03-07 10:38   ` Vladimir Davydov
2019-03-07 10:40   ` [tarantool-patches] " Konstantin Osipov

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=166b45adc75c0753d36ea473d57ac452548fbbec.1551902962.git.georgy@tarantool.org \
    --to=georgy@tarantool.org \
    --cc=tarantool-patches@freelists.org \
    --subject='Re: [tarantool-patches] [PATCH v2 1/3] Applier gets rid of a xstream' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox