[tarantool-patches] [PATCH 1/3] Applier gets rid of a xstream

Georgy Kirichenko georgy at tarantool.org
Sun Mar 3 23:26:16 MSK 2019


Remove xstream dependency and use direct box interface to apply all
replication rows. This is refactoring before transactional replication.

Needed for: #2798
---
 src/box/applier.cc | 20 ++++++++------------
 src/box/applier.h  |  9 +--------
 src/box/box.cc     | 32 ++++++++++++--------------------
 src/box/box.h      | 18 ++++++++++++++++++
 4 files changed, 39 insertions(+), 40 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index e9addcb3e..fd98b733d 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -37,7 +37,6 @@
 #include "fiber_cond.h"
 #include "coio.h"
 #include "coio_buf.h"
-#include "xstream.h"
 #include "wal.h"
 #include "xrow.h"
 #include "replication.h"
@@ -48,6 +47,7 @@
 #include "error.h"
 #include "session.h"
 #include "cfg.h"
+#include "box.h"
 
 STRS(applier_state, applier_STATE);
 
@@ -309,13 +309,13 @@ applier_join(struct applier *applier)
 	/*
 	 * Receive initial data.
 	 */
-	assert(applier->join_stream != NULL);
 	uint64_t row_count = 0;
 	while (true) {
 		coio_read_xrow(coio, ibuf, &row);
 		applier->last_row_time = ev_monotonic_now(loop());
 		if (iproto_type_is_dml(row.type)) {
-			xstream_write_xc(applier->join_stream, &row);
+			if (apply_initial_join_row(&row) != 0)
+				diag_raise();
 			if (++row_count % 100000 == 0)
 				say_info("%.1fM rows received", row_count / 1e6);
 		} else if (row.type == IPROTO_OK) {
@@ -357,7 +357,8 @@ applier_join(struct applier *applier)
 		applier->last_row_time = ev_monotonic_now(loop());
 		if (iproto_type_is_dml(row.type)) {
 			vclock_follow_xrow(&replicaset.vclock, &row);
-			xstream_write_xc(applier->subscribe_stream, &row);
+			if (apply_row(&row) != 0)
+				diag_raise();
 			if (++row_count % 100000 == 0)
 				say_info("%.1fM rows received", row_count / 1e6);
 		} else if (row.type == IPROTO_OK) {
@@ -385,8 +386,6 @@ applier_join(struct applier *applier)
 static void
 applier_subscribe(struct applier *applier)
 {
-	assert(applier->subscribe_stream != NULL);
-
 	/* Send SUBSCRIBE request */
 	struct ev_io *coio = &applier->io;
 	struct ibuf *ibuf = &applier->ibuf;
@@ -550,7 +549,7 @@ applier_subscribe(struct applier *applier)
 		 */
 		latch_lock(latch);
 		if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
-			int res = xstream_write(applier->subscribe_stream, &row);
+			int res = apply_row(&row);
 			if (res != 0) {
 				struct error *e = diag_last_error(diag_get());
 				/*
@@ -569,7 +568,7 @@ applier_subscribe(struct applier *applier)
 					nop.bodycnt = 0;
 					nop.replica_id = row.replica_id;
 					nop.lsn = row.lsn;
-					res = xstream_write(applier->subscribe_stream, &nop);
+					res = apply_row(&nop);
 				}
 			}
 			if (res != 0) {
@@ -731,8 +730,7 @@ applier_stop(struct applier *applier)
 }
 
 struct applier *
-applier_new(const char *uri, struct xstream *join_stream,
-	    struct xstream *subscribe_stream)
+applier_new(const char *uri)
 {
 	struct applier *applier = (struct applier *)
 		calloc(1, sizeof(struct applier));
@@ -751,8 +749,6 @@ applier_new(const char *uri, struct xstream *join_stream,
 	assert(rc == 0 && applier->uri.service != NULL);
 	(void) rc;
 
-	applier->join_stream = join_stream;
-	applier->subscribe_stream = subscribe_stream;
 	applier->last_row_time = ev_monotonic_now(loop());
 	rlist_create(&applier->on_state);
 	fiber_cond_create(&applier->resume_cond);
diff --git a/src/box/applier.h b/src/box/applier.h
index d942b6fbb..5bff90031 100644
--- a/src/box/applier.h
+++ b/src/box/applier.h
@@ -45,8 +45,6 @@
 
 #include "xrow.h"
 
-struct xstream;
-
 enum { APPLIER_SOURCE_MAXLEN = 1024 }; /* enough to fit URI with passwords */
 
 #define applier_STATE(_)                                             \
@@ -116,10 +114,6 @@ struct applier {
 	bool is_paused;
 	/** Condition variable signaled to resume the applier. */
 	struct fiber_cond resume_cond;
-	/** xstream to process rows during initial JOIN */
-	struct xstream *join_stream;
-	/** xstream to process rows during final JOIN and SUBSCRIBE */
-	struct xstream *subscribe_stream;
 };
 
 /**
@@ -152,8 +146,7 @@ applier_stop(struct applier *applier);
  * @error   throws OutOfMemory exception if out of memory.
  */
 struct applier *
-applier_new(const char *uri, struct xstream *join_stream,
-	    struct xstream *subscribe_stream);
+applier_new(const char *uri);
 
 /**
  * Destroy and delete a applier.
diff --git a/src/box/box.cc b/src/box/box.cc
index a62595421..22cd52b04 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -122,10 +122,6 @@ static fiber_cond ro_cond;
  */
 static bool is_orphan;
 
-/* Use the shared instance of xstream for all appliers */
-static struct xstream join_stream;
-static struct xstream subscribe_stream;
-
 /**
  * The pool of fibers in the transaction processor thread
  * working on incoming messages from net, wal and other
@@ -305,28 +301,29 @@ recovery_journal_create(struct recovery_journal *journal, struct vclock *v)
 	journal->vclock = v;
 }
 
-static inline void
-apply_row(struct xstream *stream, struct xrow_header *row)
+int
+apply_row(struct xrow_header *row)
 {
-	(void) stream;
 	struct request request;
 	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
 	if (request.type == IPROTO_NOP) {
 		if (process_nop(&request) != 0)
-			diag_raise();
-		return;
+			return -1;
+		return 0;
 	}
 	struct space *space = space_cache_find_xc(request.space_id);
 	if (box_process_rw(&request, space, NULL) != 0) {
 		say_error("error applying row: %s", request_str(&request));
-		diag_raise();
+		return -1;
 	}
+	return 0;
 }
 
 static void
 apply_wal_row(struct xstream *stream, struct xrow_header *row)
 {
-	apply_row(stream, row);
+	if (apply_row(row) != 0)
+		diag_raise();
 
 	struct wal_stream *xstream =
 		container_of(stream, struct wal_stream, base);
@@ -352,15 +349,14 @@ wal_stream_create(struct wal_stream *ctx, size_t wal_max_rows)
 	ctx->yield = (wal_max_rows >> 4)  + 1;
 }
 
-static void
-apply_initial_join_row(struct xstream *stream, struct xrow_header *row)
+int
+apply_initial_join_row(struct xrow_header *row)
 {
-	(void) stream;
 	struct request request;
 	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
 	struct space *space = space_cache_find_xc(request.space_id);
 	/* no access checks here - applier always works with admin privs */
-	space_apply_initial_join_row_xc(space, &request);
+	return space_apply_initial_join_row(space, &request);
 }
 
 /* {{{ configuration bindings */
@@ -656,9 +652,7 @@ cfg_get_replication(int *p_count)
 
 	for (int i = 0; i < count; i++) {
 		const char *source = cfg_getarr_elem("replication", i);
-		struct applier *applier = applier_new(source,
-						      &join_stream,
-						      &subscribe_stream);
+		struct applier *applier = applier_new(source);
 		if (applier == NULL) {
 			/* Delete created appliers */
 			while (--i >= 0)
@@ -2131,8 +2125,6 @@ box_cfg_xc(void)
 	box_set_replication_sync_lag();
 	box_set_replication_sync_timeout();
 	box_set_replication_skip_conflict();
-	xstream_create(&join_stream, apply_initial_join_row);
-	xstream_create(&subscribe_stream, apply_row);
 
 	struct gc_checkpoint *checkpoint = gc_last_checkpoint();
 
diff --git a/src/box/box.h b/src/box/box.h
index 53d88ab71..8d76b723d 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -218,6 +218,24 @@ extern "C" {
 
 typedef struct tuple box_tuple_t;
 
+/*
+ * Apply row while bootstraping.
+ * Return codes:
+ * 0 for Ok
+ * -1 in case of an error.
+ */
+int
+apply_initial_join_row(struct xrow_header *row);
+
+/*
+ * Apply row after bootstrap is done (e.g. final join, subscribe.
+ * Return codes:
+ * 0 for Ok
+ * -1 in case of an error.
+ */
+int
+apply_row(struct xrow_header *row);
+
 /* box_select is private and used only by FFI */
 API_EXPORT int
 box_select(uint32_t space_id, uint32_t index_id,
-- 
2.21.0





More information about the Tarantool-patches mailing list