[tarantool-patches] [PATCH v2 1/3] Applier gets rid of a xstream
Georgy Kirichenko
georgy at tarantool.org
Wed Mar 6 23:16:15 MSK 2019
Remove xstream dependency and use direct box interface to apply all
replication rows. This is refactoring before transactional replication.
Needed for: #2798
---
src/box/applier.cc | 69 ++++++++++++++++++++++++++++++++++++++--------
src/box/applier.h | 9 +-----
src/box/box.cc | 68 ++++++++-------------------------------------
3 files changed, 69 insertions(+), 77 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index e9addcb3e..a687d2bea 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -37,7 +37,6 @@
#include "fiber_cond.h"
#include "coio.h"
#include "coio_buf.h"
-#include "xstream.h"
#include "wal.h"
#include "xrow.h"
#include "replication.h"
@@ -48,6 +47,9 @@
#include "error.h"
#include "session.h"
#include "cfg.h"
+#include "schema.h"
+#include "txn.h"
+#include "box.h"
STRS(applier_state, applier_STATE);
@@ -167,6 +169,53 @@ applier_writer_f(va_list ap)
return 0;
}
+static int
+apply_initial_join_row(struct xrow_header *row)
+{
+ struct request request;
+ xrow_decode_dml(row, &request, dml_request_key_map(row->type));
+ struct space *space = space_cache_find_xc(request.space_id);
+ /* no access checks here - applier always works with admin privs */
+ return space_apply_initial_join_row(space, &request);
+}
+
+/**
+ * Process a no-op request.
+ *
+ * A no-op request does not affect any space, but it
+ * promotes vclock and is written to WAL.
+ */
+static int
+process_nop(struct request *request)
+{
+ assert(request->type == IPROTO_NOP);
+ struct txn *txn = txn_begin_stmt(NULL);
+ if (txn == NULL)
+ return -1;
+ return txn_commit_stmt(txn, request);
+}
+
+static int
+apply_row(struct xrow_header *row)
+{
+ struct request request;
+ if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0)
+ return -1;
+ if (request.type == IPROTO_NOP) {
+ if (process_nop(&request) != 0)
+ return -1;
+ return 0;
+ }
+ struct space *space = space_cache_find(request.space_id);
+ if (space == NULL)
+ return -1;
+ if (box_process_rw(&request, space, NULL) != 0) {
+ say_error("error applying row: %s", request_str(&request));
+ return -1;
+ }
+ return 0;
+}
+
/**
* Connect to a remote host and authenticate the client.
*/
@@ -309,13 +358,13 @@ applier_join(struct applier *applier)
/*
* Receive initial data.
*/
- assert(applier->join_stream != NULL);
uint64_t row_count = 0;
while (true) {
coio_read_xrow(coio, ibuf, &row);
applier->last_row_time = ev_monotonic_now(loop());
if (iproto_type_is_dml(row.type)) {
- xstream_write_xc(applier->join_stream, &row);
+ if (apply_initial_join_row(&row) != 0)
+ diag_raise();
if (++row_count % 100000 == 0)
say_info("%.1fM rows received", row_count / 1e6);
} else if (row.type == IPROTO_OK) {
@@ -357,7 +406,8 @@ applier_join(struct applier *applier)
applier->last_row_time = ev_monotonic_now(loop());
if (iproto_type_is_dml(row.type)) {
vclock_follow_xrow(&replicaset.vclock, &row);
- xstream_write_xc(applier->subscribe_stream, &row);
+ if (apply_row(&row) != 0)
+ diag_raise();
if (++row_count % 100000 == 0)
say_info("%.1fM rows received", row_count / 1e6);
} else if (row.type == IPROTO_OK) {
@@ -385,8 +435,6 @@ applier_join(struct applier *applier)
static void
applier_subscribe(struct applier *applier)
{
- assert(applier->subscribe_stream != NULL);
-
/* Send SUBSCRIBE request */
struct ev_io *coio = &applier->io;
struct ibuf *ibuf = &applier->ibuf;
@@ -550,7 +598,7 @@ applier_subscribe(struct applier *applier)
*/
latch_lock(latch);
if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
- int res = xstream_write(applier->subscribe_stream, &row);
+ int res = apply_row(&row);
if (res != 0) {
struct error *e = diag_last_error(diag_get());
/*
@@ -569,7 +617,7 @@ applier_subscribe(struct applier *applier)
nop.bodycnt = 0;
nop.replica_id = row.replica_id;
nop.lsn = row.lsn;
- res = xstream_write(applier->subscribe_stream, &nop);
+ res = apply_row(&nop);
}
}
if (res != 0) {
@@ -731,8 +779,7 @@ applier_stop(struct applier *applier)
}
struct applier *
-applier_new(const char *uri, struct xstream *join_stream,
- struct xstream *subscribe_stream)
+applier_new(const char *uri)
{
struct applier *applier = (struct applier *)
calloc(1, sizeof(struct applier));
@@ -751,8 +798,6 @@ applier_new(const char *uri, struct xstream *join_stream,
assert(rc == 0 && applier->uri.service != NULL);
(void) rc;
- applier->join_stream = join_stream;
- applier->subscribe_stream = subscribe_stream;
applier->last_row_time = ev_monotonic_now(loop());
rlist_create(&applier->on_state);
fiber_cond_create(&applier->resume_cond);
diff --git a/src/box/applier.h b/src/box/applier.h
index d942b6fbb..5bff90031 100644
--- a/src/box/applier.h
+++ b/src/box/applier.h
@@ -45,8 +45,6 @@
#include "xrow.h"
-struct xstream;
-
enum { APPLIER_SOURCE_MAXLEN = 1024 }; /* enough to fit URI with passwords */
#define applier_STATE(_) \
@@ -116,10 +114,6 @@ struct applier {
bool is_paused;
/** Condition variable signaled to resume the applier. */
struct fiber_cond resume_cond;
- /** xstream to process rows during initial JOIN */
- struct xstream *join_stream;
- /** xstream to process rows during final JOIN and SUBSCRIBE */
- struct xstream *subscribe_stream;
};
/**
@@ -152,8 +146,7 @@ applier_stop(struct applier *applier);
* @error throws OutOfMemory exception if out of memory.
*/
struct applier *
-applier_new(const char *uri, struct xstream *join_stream,
- struct xstream *subscribe_stream);
+applier_new(const char *uri);
/**
* Destroy and delete a applier.
diff --git a/src/box/box.cc b/src/box/box.cc
index a62595421..190e5eae5 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -122,10 +122,6 @@ static fiber_cond ro_cond;
*/
static bool is_orphan;
-/* Use the shared instance of xstream for all appliers */
-static struct xstream join_stream;
-static struct xstream subscribe_stream;
-
/**
* The pool of fibers in the transaction processor thread
* working on incoming messages from net, wal and other
@@ -203,22 +199,6 @@ box_process_rw(struct request *request, struct space *space,
return rc;
}
-/**
- * Process a no-op request.
- *
- * A no-op request does not affect any space, but it
- * promotes vclock and is written to WAL.
- */
-static int
-process_nop(struct request *request)
-{
- assert(request->type == IPROTO_NOP);
- struct txn *txn = txn_begin_stmt(NULL);
- if (txn == NULL)
- return -1;
- return txn_commit_stmt(txn, request);
-}
-
void
box_set_ro(bool ro)
{
@@ -305,35 +285,24 @@ recovery_journal_create(struct recovery_journal *journal, struct vclock *v)
journal->vclock = v;
}
-static inline void
-apply_row(struct xstream *stream, struct xrow_header *row)
+static void
+apply_wal_row(struct xstream *stream, struct xrow_header *row)
{
- (void) stream;
struct request request;
xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
- if (request.type == IPROTO_NOP) {
- if (process_nop(&request) != 0)
+ if (request.type != IPROTO_NOP) {
+ struct space *space = space_cache_find_xc(request.space_id);
+ if (box_process_rw(&request, space, NULL) != 0) {
+ say_error("error applying row: %s", request_str(&request));
diag_raise();
- return;
- }
- struct space *space = space_cache_find_xc(request.space_id);
- if (box_process_rw(&request, space, NULL) != 0) {
- say_error("error applying row: %s", request_str(&request));
- diag_raise();
+ }
}
-}
-
-static void
-apply_wal_row(struct xstream *stream, struct xrow_header *row)
-{
- apply_row(stream, row);
-
struct wal_stream *xstream =
container_of(stream, struct wal_stream, base);
/**
- * Yield once in a while, but not too often,
- * mostly to allow signal handling to take place.
- */
+ * Yield once in a while, but not too often,
+ * mostly to allow signal handling to take place.
+ */
if (++xstream->rows % xstream->yield == 0)
fiber_sleep(0);
}
@@ -352,17 +321,6 @@ wal_stream_create(struct wal_stream *ctx, size_t wal_max_rows)
ctx->yield = (wal_max_rows >> 4) + 1;
}
-static void
-apply_initial_join_row(struct xstream *stream, struct xrow_header *row)
-{
- (void) stream;
- struct request request;
- xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
- struct space *space = space_cache_find_xc(request.space_id);
- /* no access checks here - applier always works with admin privs */
- space_apply_initial_join_row_xc(space, &request);
-}
-
/* {{{ configuration bindings */
static void
@@ -656,9 +614,7 @@ cfg_get_replication(int *p_count)
for (int i = 0; i < count; i++) {
const char *source = cfg_getarr_elem("replication", i);
- struct applier *applier = applier_new(source,
- &join_stream,
- &subscribe_stream);
+ struct applier *applier = applier_new(source);
if (applier == NULL) {
/* Delete created appliers */
while (--i >= 0)
@@ -2131,8 +2087,6 @@ box_cfg_xc(void)
box_set_replication_sync_lag();
box_set_replication_sync_timeout();
box_set_replication_skip_conflict();
- xstream_create(&join_stream, apply_initial_join_row);
- xstream_create(&subscribe_stream, apply_row);
struct gc_checkpoint *checkpoint = gc_last_checkpoint();
--
2.21.0
More information about the Tarantool-patches
mailing list