Tarantool development patches archive
 help / color / mirror / Atom feed
* [Tarantool-patches] [PATCH 0/6] Synchronous replication preparation
@ 2019-11-19 16:04 Georgy Kirichenko
  2019-11-19 16:04 ` [Tarantool-patches] [PATCH 1/6] recovery: do not throw an error Georgy Kirichenko
                   ` (7 more replies)
  0 siblings, 8 replies; 14+ messages in thread
From: Georgy Kirichenko @ 2019-11-19 16:04 UTC (permalink / raw)
  To: tarantool-patches

This patchset contains 6 patches and includes some refactoring
and synchronous replication preparation.

First three patches provides coio, recovery and xstream
refactoring which got rid of exceptions. This makes 
corresponding facilities C-compliant and enables its usage
from a wal source.

Fourth patch fixes a rare vinyl error which manifests itself while
transactional recovery as there is no data change and vy_tx log
tends to be empty.

Fifth patch improves recovery journal making them able to track
recovery vclock. This enables the last patch which implements
transactional recovery (either local wal including hot-standby or
final join). Transactional recovery is essential in case of
synchronous replication because this both sources (wal and final
join stream) would contain written but not yet committed
transaction and we will be in duty to recognize it.

Branch:
https://github.com/tarantool/tarantool/tree/g.kirichenko/gh-980-preparation
Issue:
https://github.com/tarantool/tarantool/issues/980

Georgy Kirichenko (6):
  recovery: do not throw an error
  coio: do not htrow an exception
  xstream: get rid of an exception
  vinyl: do not insert vy_tx twice into writers list
  box: improve recovery journal
  recovery: follow transaction boundaries while recovery or join

 src/box/applier.cc                     | 140 ++++++++--------
 src/box/box.cc                         | 119 ++++++++++----
 src/box/recovery.cc                    |  87 ++++++----
 src/box/recovery.h                     |  14 +-
 src/box/relay.cc                       |  47 +++---
 src/box/vy_tx.c                        |   2 +-
 src/box/xrow_io.cc                     |  59 +++----
 src/box/xrow_io.h                      |  11 +-
 src/box/xstream.cc                     |   7 +-
 src/box/xstream.h                      |   2 +-
 src/lib/core/coio.cc                   | 212 +++++++++++++------------
 src/lib/core/coio.h                    |  13 +-
 src/lib/core/coio_buf.h                |   8 +
 test/xlog-py/big_lsn.result            |   4 +
 test/xlog-py/big_lsn.test.py           |  13 +-
 test/xlog-py/dup_key.result            |   8 +
 test/xlog-py/dup_key.test.py           |   7 +
 test/xlog/panic_on_broken_lsn.result   |   9 +-
 test/xlog/panic_on_broken_lsn.test.lua |   7 +-
 19 files changed, 449 insertions(+), 320 deletions(-)

-- 
2.24.0

^ permalink raw reply	[flat|nested] 14+ messages in thread

* [Tarantool-patches] [PATCH 1/6] recovery: do not throw an error
  2019-11-19 16:04 [Tarantool-patches] [PATCH 0/6] Synchronous replication preparation Georgy Kirichenko
@ 2019-11-19 16:04 ` Georgy Kirichenko
  2019-11-23 13:45   ` Vladislav Shpilevoy
  2019-11-19 16:04 ` [Tarantool-patches] [PATCH 2/6] coio: do not htrow an exception Georgy Kirichenko
                   ` (6 subsequent siblings)
  7 siblings, 1 reply; 14+ messages in thread
From: Georgy Kirichenko @ 2019-11-19 16:04 UTC (permalink / raw)
  To: tarantool-patches

Relaying from C-written wal requires recovery to be a C-compliant. So
get rid of exception from recovery interface.

Part of #980
---
 src/box/box.cc      | 16 ++++++---
 src/box/recovery.cc | 87 +++++++++++++++++++++++++++------------------
 src/box/recovery.h  | 14 ++++----
 src/box/relay.cc    | 15 ++++----
 4 files changed, 79 insertions(+), 53 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index b119c927b..a53b6e912 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1903,6 +1903,8 @@ local_recovery(const struct tt_uuid *instance_uuid,
 	recovery = recovery_new(cfg_gets("wal_dir"),
 				cfg_geti("force_recovery"),
 				checkpoint_vclock);
+	if (recovery == NULL)
+		diag_raise();
 
 	/*
 	 * Make sure we report the actual recovery position
@@ -1911,6 +1913,7 @@ local_recovery(const struct tt_uuid *instance_uuid,
 	box_vclock = &recovery->vclock;
 	auto guard = make_scoped_guard([&]{
 		box_vclock = &replicaset.vclock;
+		recovery_stop_local(recovery);
 		recovery_delete(recovery);
 	});
 
@@ -1920,7 +1923,8 @@ local_recovery(const struct tt_uuid *instance_uuid,
 	 * so we must reflect this in replicaset vclock to
 	 * not attempt to apply these rows twice.
 	 */
-	recovery_scan(recovery, &replicaset.vclock, &gc.vclock);
+	if (recovery_scan(recovery, &replicaset.vclock, &gc.vclock) != 0)
+		diag_raise();
 	say_info("instance vclock %s", vclock_to_string(&replicaset.vclock));
 
 	if (wal_dir_lock >= 0) {
@@ -1963,7 +1967,8 @@ local_recovery(const struct tt_uuid *instance_uuid,
 	memtx_engine_recover_snapshot_xc(memtx, checkpoint_vclock);
 
 	engine_begin_final_recovery_xc();
-	recover_remaining_wals(recovery, &wal_stream.base, NULL, false);
+	if (recover_remaining_wals(recovery, &wal_stream.base, NULL, false) != 0)
+		diag_raise();
 	engine_end_recovery_xc();
 	/*
 	 * Leave hot standby mode, if any, only after
@@ -1981,8 +1986,11 @@ local_recovery(const struct tt_uuid *instance_uuid,
 				break;
 			fiber_sleep(0.1);
 		}
-		recovery_stop_local(recovery);
-		recover_remaining_wals(recovery, &wal_stream.base, NULL, true);
+		if (recovery_stop_local(recovery) != 0)
+			diag_raise();
+		if (recover_remaining_wals(recovery, &wal_stream.base, NULL,
+					   true) != 0)
+			diag_raise();
 		/*
 		 * Advance replica set vclock to reflect records
 		 * applied in hot standby mode.
diff --git a/src/box/recovery.cc b/src/box/recovery.cc
index d122d618a..4693008f1 100644
--- a/src/box/recovery.cc
+++ b/src/box/recovery.cc
@@ -87,14 +87,11 @@ recovery_new(const char *wal_dirname, bool force_recovery,
 			calloc(1, sizeof(*r));
 
 	if (r == NULL) {
-		tnt_raise(OutOfMemory, sizeof(*r), "malloc",
-			  "struct recovery");
+		diag_set(OutOfMemory, sizeof(*r), "malloc",
+			 "struct recovery");
+		return NULL;
 	}
 
-	auto guard = make_scoped_guard([=]{
-		free(r);
-	});
-
 	xdir_create(&r->wal_dir, wal_dirname, XLOG, &INSTANCE_UUID,
 		    &xlog_opts_default);
 	r->wal_dir.force_recovery = force_recovery;
@@ -108,27 +105,31 @@ recovery_new(const char *wal_dirname, bool force_recovery,
 	 * UUID, see replication/cluster.test for
 	 * details.
 	 */
-	xdir_check_xc(&r->wal_dir);
+	if (xdir_check(&r->wal_dir) != 0) {
+		xdir_destroy(&r->wal_dir);
+		free(r);
+		return NULL;
+	}
 
 	r->watcher = NULL;
 	rlist_create(&r->on_close_log);
 
-	guard.is_active = false;
 	return r;
 }
 
-void
+int
 recovery_scan(struct recovery *r, struct vclock *end_vclock,
 	      struct vclock *gc_vclock)
 {
-	xdir_scan_xc(&r->wal_dir);
+	if (xdir_scan(&r->wal_dir) != 0)
+		return -1;
 
 	if (xdir_last_vclock(&r->wal_dir, end_vclock) < 0 ||
 	    vclock_compare(end_vclock, &r->vclock) < 0) {
 		/* No xlogs after last checkpoint. */
 		vclock_copy(gc_vclock, &r->vclock);
 		vclock_copy(end_vclock, &r->vclock);
-		return;
+		return 0;
 	}
 
 	if (xdir_first_vclock(&r->wal_dir, gc_vclock) < 0)
@@ -137,11 +138,12 @@ recovery_scan(struct recovery *r, struct vclock *end_vclock,
 	/* Scan the last xlog to find end vclock. */
 	struct xlog_cursor cursor;
 	if (xdir_open_cursor(&r->wal_dir, vclock_sum(end_vclock), &cursor) != 0)
-		return;
+		return 0;
 	struct xrow_header row;
 	while (xlog_cursor_next(&cursor, &row, true) == 0)
 		vclock_follow_xrow(end_vclock, &row);
 	xlog_cursor_close(&cursor, false);
+	return 0;
 }
 
 static inline void
@@ -156,19 +158,21 @@ recovery_close_log(struct recovery *r)
 			 r->cursor.name);
 	}
 	xlog_cursor_close(&r->cursor, false);
-	trigger_run_xc(&r->on_close_log, NULL);
+	/* Suppress a trigger error if happened. */
+	trigger_run(&r->on_close_log, NULL);
 }
 
-static void
+static int
 recovery_open_log(struct recovery *r, const struct vclock *vclock)
 {
-	XlogGapError *e;
 	struct xlog_meta meta = r->cursor.meta;
 	enum xlog_cursor_state state = r->cursor.state;
 
 	recovery_close_log(r);
 
-	xdir_open_cursor_xc(&r->wal_dir, vclock_sum(vclock), &r->cursor);
+	if (xdir_open_cursor(&r->wal_dir, vclock_sum(vclock),
+			     &r->cursor) != 0)
+		return -1;
 
 	if (state == XLOG_CURSOR_NEW &&
 	    vclock_compare(vclock, &r->vclock) > 0) {
@@ -201,14 +205,14 @@ out:
 	 */
 	if (vclock_compare(&r->vclock, vclock) < 0)
 		vclock_copy(&r->vclock, vclock);
-	return;
+	return 0;
 
 gap_error:
-	e = tnt_error(XlogGapError, &r->vclock, vclock);
+	diag_set(XlogGapError, &r->vclock, vclock);
 	if (!r->wal_dir.force_recovery)
-		throw e;
+		return -1;
 	/* Ignore missing WALs if force_recovery is set. */
-	e->log();
+	diag_log();
 	say_warn("ignoring a gap in LSN");
 	goto out;
 }
@@ -216,8 +220,9 @@ gap_error:
 void
 recovery_delete(struct recovery *r)
 {
-	recovery_stop_local(r);
+	/* Recovery should be stopped before deleting. */
 
+	assert(r->watcher == NULL);
 	trigger_destroy(&r->on_close_log);
 	xdir_destroy(&r->wal_dir);
 	if (xlog_cursor_is_open(&r->cursor)) {
@@ -237,25 +242,26 @@ recovery_delete(struct recovery *r)
  * The reading will be stopped on reaching stop_vclock.
  * Use NULL for boundless recover
  */
-static void
+static int
 recover_xlog(struct recovery *r, struct xstream *stream,
 	     const struct vclock *stop_vclock)
 {
 	struct xrow_header row;
 	uint64_t row_count = 0;
-	while (xlog_cursor_next_xc(&r->cursor, &row,
-				   r->wal_dir.force_recovery) == 0) {
+	int rc;
+	while ((rc = xlog_cursor_next(&r->cursor, &row,
+				      r->wal_dir.force_recovery)) == 0) {
 		/*
 		 * Read the next row from xlog file.
 		 *
-		 * xlog_cursor_next_xc() returns 1 when
+		 * xlog_cursor_next() returns 1 when
 		 * it can not read more rows. This doesn't mean
 		 * the file is fully read: it's fully read only
 		 * when EOF marker has been read, see i.eof_read
 		 */
 		if (stop_vclock != NULL &&
 		    r->vclock.signature >= stop_vclock->signature)
-			return;
+			return 0;
 		int64_t current_lsn = vclock_get(&r->vclock, row.replica_id);
 		if (row.lsn <= current_lsn)
 			continue; /* already applied, skip */
@@ -279,13 +285,16 @@ recover_xlog(struct recovery *r, struct xstream *stream,
 					 row_count / 1000000.);
 		} else {
 			if (!r->wal_dir.force_recovery)
-				diag_raise();
+				return -1;
 
 			say_error("skipping row {%u: %lld}",
 				  (unsigned)row.replica_id, (long long)row.lsn);
 			diag_log();
 		}
 	}
+	if (rc < 0)
+		return -1;
+	return 0;
 }
 
 /**
@@ -299,7 +308,7 @@ recover_xlog(struct recovery *r, struct xstream *stream,
  * This function will not close r->current_wal if
  * recovery was successful.
  */
-void
+int
 recover_remaining_wals(struct recovery *r, struct xstream *stream,
 		       const struct vclock *stop_vclock, bool scan_dir)
 {
@@ -340,21 +349,26 @@ recover_remaining_wals(struct recovery *r, struct xstream *stream,
 			continue;
 		}
 
-		recovery_open_log(r, clock);
+		if (recovery_open_log(r, clock) != 0)
+			return -1;
 
 		say_info("recover from `%s'", r->cursor.name);
 
 recover_current_wal:
-		recover_xlog(r, stream, stop_vclock);
+		if (recover_xlog(r, stream, stop_vclock) != 0)
+			return -1;
 	}
 
 	if (xlog_cursor_is_eof(&r->cursor))
 		recovery_close_log(r);
 
-	if (stop_vclock != NULL && vclock_compare(&r->vclock, stop_vclock) != 0)
-		tnt_raise(XlogGapError, &r->vclock, stop_vclock);
+	if (stop_vclock != NULL && vclock_compare(&r->vclock, stop_vclock) != 0) {
+		diag_set(XlogGapError, &r->vclock, stop_vclock);
+		return -1;
+	}
 
 	region_free(&fiber()->gc);
+	return 0;
 }
 
 void
@@ -478,7 +492,9 @@ hot_standby_f(va_list ap)
 		do {
 			start = vclock_sum(&r->vclock);
 
-			recover_remaining_wals(r, stream, NULL, scan_dir);
+			if (recover_remaining_wals(r, stream, NULL,
+						   scan_dir) != 0)
+				diag_raise();
 
 			end = vclock_sum(&r->vclock);
 			/*
@@ -526,7 +542,7 @@ recovery_follow_local(struct recovery *r, struct xstream *stream,
 	fiber_start(r->watcher, r, stream, wal_dir_rescan_delay);
 }
 
-void
+int
 recovery_stop_local(struct recovery *r)
 {
 	if (r->watcher) {
@@ -534,8 +550,9 @@ recovery_stop_local(struct recovery *r)
 		r->watcher = NULL;
 		fiber_cancel(f);
 		if (fiber_join(f) != 0)
-			diag_raise();
+			return -1;
 	}
+	return 0;
 }
 
 /* }}} */
diff --git a/src/box/recovery.h b/src/box/recovery.h
index 6e68abc0b..145d9199e 100644
--- a/src/box/recovery.h
+++ b/src/box/recovery.h
@@ -74,7 +74,7 @@ recovery_delete(struct recovery *r);
  * @gc_vclock is set to the oldest vclock available in the
  * WAL directory.
  */
-void
+int
 recovery_scan(struct recovery *r,  struct vclock *end_vclock,
 	      struct vclock *gc_vclock);
 
@@ -82,16 +82,12 @@ void
 recovery_follow_local(struct recovery *r, struct xstream *stream,
 		      const char *name, ev_tstamp wal_dir_rescan_delay);
 
-void
+int
 recovery_stop_local(struct recovery *r);
 
 void
 recovery_finalize(struct recovery *r);
 
-#if defined(__cplusplus)
-} /* extern "C" */
-#endif /* defined(__cplusplus) */
-
 /**
  * Find out if there are new .xlog files since the current
  * vclock, and read them all up.
@@ -102,8 +98,12 @@ recovery_finalize(struct recovery *r);
  * This function will not close r->current_wal if
  * recovery was successful.
  */
-void
+int
 recover_remaining_wals(struct recovery *r, struct xstream *stream,
 		       const struct vclock *stop_vclock, bool scan_dir);
 
+#if defined(__cplusplus)
+} /* extern "C" */
+#endif /* defined(__cplusplus) */
+
 #endif /* TARANTOOL_RECOVERY_H_INCLUDED */
diff --git a/src/box/relay.cc b/src/box/relay.cc
index e849fcf4f..5c2b0067e 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -335,8 +335,9 @@ relay_final_join_f(va_list ap)
 
 	/* Send all WALs until stop_vclock */
 	assert(relay->stream.write != NULL);
-	recover_remaining_wals(relay->r, &relay->stream,
-			       &relay->stop_vclock, true);
+	if (recover_remaining_wals(relay->r, &relay->stream,
+				   &relay->stop_vclock, true) != 0)
+		diag_raise();
 	assert(vclock_compare(&relay->r->vclock, &relay->stop_vclock) == 0);
 	return 0;
 }
@@ -492,11 +493,9 @@ relay_process_wal_event(struct wal_watcher *watcher, unsigned events)
 		 */
 		return;
 	}
-	try {
-		recover_remaining_wals(relay->r, &relay->stream, NULL,
-				       (events & WAL_EVENT_ROTATE) != 0);
-	} catch (Exception *e) {
-		relay_set_error(relay, e);
+	if (recover_remaining_wals(relay->r, &relay->stream, NULL,
+				   (events & WAL_EVENT_ROTATE) != 0) != 0) {
+		relay_set_error(relay, diag_last_error(diag_get()));
 		fiber_cancel(fiber());
 	}
 }
@@ -697,6 +696,8 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
 	vclock_copy(&relay->local_vclock_at_subscribe, &replicaset.vclock);
 	relay->r = recovery_new(cfg_gets("wal_dir"), false,
 			        replica_clock);
+	if (relay->r == NULL)
+		diag_raise();
 	vclock_copy(&relay->tx.vclock, replica_clock);
 	relay->version_id = replica_version_id;
 
-- 
2.24.0

^ permalink raw reply	[flat|nested] 14+ messages in thread

* [Tarantool-patches] [PATCH 2/6] coio: do not htrow an exception
  2019-11-19 16:04 [Tarantool-patches] [PATCH 0/6] Synchronous replication preparation Georgy Kirichenko
  2019-11-19 16:04 ` [Tarantool-patches] [PATCH 1/6] recovery: do not throw an error Georgy Kirichenko
@ 2019-11-19 16:04 ` Georgy Kirichenko
  2019-11-23 13:45   ` Vladislav Shpilevoy
  2019-11-19 16:04 ` [Tarantool-patches] [PATCH 3/6] xstream: get rid of " Georgy Kirichenko
                   ` (5 subsequent siblings)
  7 siblings, 1 reply; 14+ messages in thread
From: Georgy Kirichenko @ 2019-11-19 16:04 UTC (permalink / raw)
  To: tarantool-patches

Relaying from C-written wal requires coio to be a C-compliant. So
get rid of exception from coio interface.

Part of #980
---
 src/box/applier.cc      |  52 ++++++----
 src/box/box.cc          |   9 +-
 src/box/relay.cc        |  11 ++-
 src/box/xrow_io.cc      |  59 +++++------
 src/box/xrow_io.h       |  11 ++-
 src/lib/core/coio.cc    | 212 +++++++++++++++++++++-------------------
 src/lib/core/coio.h     |  13 +--
 src/lib/core/coio_buf.h |   8 ++
 8 files changed, 207 insertions(+), 168 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index a04d13564..294765195 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -173,8 +173,9 @@ applier_writer_f(va_list ap)
 			continue;
 		try {
 			struct xrow_header xrow;
-			xrow_encode_vclock(&xrow, &replicaset.vclock);
-			coio_write_xrow(&io, &xrow);
+			if (xrow_encode_vclock(&xrow, &replicaset.vclock) != 0 ||
+			    coio_write_xrow(&io, &xrow) < 0)
+				diag_raise();
 		} catch (SocketError *e) {
 			/*
 			 * There is no point trying to send ACKs if
@@ -308,9 +309,11 @@ applier_connect(struct applier *applier)
 	 */
 	applier->addr_len = sizeof(applier->addrstorage);
 	applier_set_state(applier, APPLIER_CONNECT);
-	coio_connect(coio, uri, &applier->addr, &applier->addr_len);
+	if (coio_connect(coio, uri, &applier->addr, &applier->addr_len) != 0)
+		diag_raise();
 	assert(coio->fd >= 0);
-	coio_readn(coio, greetingbuf, IPROTO_GREETING_SIZE);
+	if (coio_readn(coio, greetingbuf, IPROTO_GREETING_SIZE) < 0)
+		diag_raise();
 	applier->last_row_time = ev_monotonic_now(loop());
 
 	/* Decode instance version and name from greeting */
@@ -345,8 +348,9 @@ applier_connect(struct applier *applier)
 	 * election on bootstrap.
 	 */
 	xrow_encode_vote(&row);
-	coio_write_xrow(coio, &row);
-	coio_read_xrow(coio, ibuf, &row);
+	if (coio_write_xrow(coio, &row) < 0 ||
+	    coio_read_xrow(coio, ibuf, &row) < 0)
+		diag_raise();
 	if (row.type == IPROTO_OK) {
 		xrow_decode_ballot_xc(&row, &applier->ballot);
 	} else try {
@@ -374,8 +378,9 @@ applier_connect(struct applier *applier)
 	applier_set_state(applier, APPLIER_AUTH);
 	xrow_encode_auth_xc(&row, greeting.salt, greeting.salt_len, uri->login,
 			    uri->login_len, uri->password, uri->password_len);
-	coio_write_xrow(coio, &row);
-	coio_read_xrow(coio, ibuf, &row);
+	if (coio_write_xrow(coio, &row) < 0 ||
+	    coio_read_xrow(coio, ibuf, &row) < 0)
+		diag_raise();
 	applier->last_row_time = ev_monotonic_now(loop());
 	if (row.type != IPROTO_OK)
 		xrow_decode_error_xc(&row); /* auth failed */
@@ -397,7 +402,8 @@ applier_join(struct applier *applier)
 	struct ibuf *ibuf = &applier->ibuf;
 	struct xrow_header row;
 	xrow_encode_join_xc(&row, &INSTANCE_UUID);
-	coio_write_xrow(coio, &row);
+	if (coio_write_xrow(coio, &row) < 0)
+		diag_raise();
 
 	/**
 	 * Tarantool < 1.7.0: if JOIN is successful, there is no "OK"
@@ -405,7 +411,8 @@ applier_join(struct applier *applier)
 	 */
 	if (applier->version_id >= version_id(1, 7, 0)) {
 		/* Decode JOIN response */
-		coio_read_xrow(coio, ibuf, &row);
+		if (coio_read_xrow(coio, ibuf, &row) < 0)
+			diag_raise();
 		if (iproto_type_is_error(row.type)) {
 			xrow_decode_error_xc(&row); /* re-throw error */
 		} else if (row.type != IPROTO_OK) {
@@ -428,7 +435,8 @@ applier_join(struct applier *applier)
 	 */
 	uint64_t row_count = 0;
 	while (true) {
-		coio_read_xrow(coio, ibuf, &row);
+		if (coio_read_xrow(coio, ibuf, &row) < 0)
+			diag_raise();
 		applier->last_row_time = ev_monotonic_now(loop());
 		if (iproto_type_is_dml(row.type)) {
 			if (apply_initial_join_row(&row) != 0)
@@ -470,7 +478,8 @@ applier_join(struct applier *applier)
 	 * Receive final data.
 	 */
 	while (true) {
-		coio_read_xrow(coio, ibuf, &row);
+		if (coio_read_xrow(coio, ibuf, &row) < 0)
+			diag_raise();
 		applier->last_row_time = ev_monotonic_now(loop());
 		if (iproto_type_is_dml(row.type)) {
 			vclock_follow_xrow(&replicaset.vclock, &row);
@@ -529,10 +538,13 @@ applier_read_tx_row(struct applier *applier)
 	 * from the master for quite a while the connection is
 	 * broken - the master might just be idle.
 	 */
-	if (applier->version_id < version_id(1, 7, 7))
-		coio_read_xrow(coio, ibuf, row);
-	else
-		coio_read_xrow_timeout_xc(coio, ibuf, row, timeout);
+	if (applier->version_id < version_id(1, 7, 7)) {
+		if (coio_read_xrow(coio, ibuf, row) < 0)
+			diag_raise();
+	} else {
+		if (coio_read_xrow_timeout(coio, ibuf, row, timeout) < 0)
+			diag_raise();
+	}
 
 	applier->lag = ev_now(loop()) - row->tm;
 	applier->last_row_time = ev_monotonic_now(loop());
@@ -792,11 +804,13 @@ applier_subscribe(struct applier *applier)
 	vclock_copy(&vclock, &replicaset.vclock);
 	xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID,
 				 &vclock);
-	coio_write_xrow(coio, &row);
+	if (coio_write_xrow(coio, &row) < 0)
+		diag_raise();
 
 	/* Read SUBSCRIBE response */
 	if (applier->version_id >= version_id(1, 6, 7)) {
-		coio_read_xrow(coio, ibuf, &row);
+		if (coio_read_xrow(coio, ibuf, &row) < 0)
+			diag_raise();
 		if (iproto_type_is_error(row.type)) {
 			xrow_decode_error_xc(&row);  /* error */
 		} else if (row.type != IPROTO_OK) {
@@ -933,7 +947,7 @@ applier_disconnect(struct applier *applier, enum applier_state state)
 		applier->writer = NULL;
 	}
 
-	coio_close(loop(), &applier->io);
+	coio_destroy(loop(), &applier->io);
 	/* Clear all unparsed input. */
 	ibuf_reinit(&applier->ibuf);
 	fiber_gc();
diff --git a/src/box/box.cc b/src/box/box.cc
index a53b6e912..6323e5e6e 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1497,7 +1497,8 @@ box_process_join(struct ev_io *io, struct xrow_header *header)
 	struct xrow_header row;
 	xrow_encode_vclock_xc(&row, &stop_vclock);
 	row.sync = header->sync;
-	coio_write_xrow(io, &row);
+	if (coio_write_xrow(io, &row) < 0)
+		diag_raise();
 
 	/*
 	 * Final stage: feed replica with WALs in range
@@ -1509,7 +1510,8 @@ box_process_join(struct ev_io *io, struct xrow_header *header)
 	/* Send end of WAL stream marker */
 	xrow_encode_vclock_xc(&row, &replicaset.vclock);
 	row.sync = header->sync;
-	coio_write_xrow(io, &row);
+	if (coio_write_xrow(io, &row) < 0)
+		diag_raise();
 
 	/*
 	 * Advance the WAL consumer state to the position where
@@ -1591,7 +1593,8 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
 	assert(self != NULL); /* the local registration is read-only */
 	row.replica_id = self->id;
 	row.sync = header->sync;
-	coio_write_xrow(io, &row);
+	if (coio_write_xrow(io, &row) < 0)
+		diag_raise();
 
 	say_info("subscribed replica %s at %s",
 		 tt_uuid_str(&replica_uuid), sio_socketname(io->fd));
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 5c2b0067e..202620694 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -318,7 +318,8 @@ relay_initial_join(int fd, uint64_t sync, struct vclock *vclock)
 	struct xrow_header row;
 	xrow_encode_vclock_xc(&row, vclock);
 	row.sync = sync;
-	coio_write_xrow(&relay->io, &row);
+	if (coio_write_xrow(&relay->io, &row) < 0)
+		diag_raise();
 
 	/* Send read view to the replica. */
 	engine_join_xc(&ctx, &relay->stream);
@@ -517,8 +518,9 @@ relay_reader_f(va_list ap)
 	try {
 		while (!fiber_is_cancelled()) {
 			struct xrow_header xrow;
-			coio_read_xrow_timeout_xc(&io, &ibuf, &xrow,
-					replication_disconnect_timeout());
+			if (coio_read_xrow_timeout(&io, &ibuf, &xrow,
+					replication_disconnect_timeout()) < 0)
+				diag_raise();
 			/* vclock is followed while decoding, zeroing it. */
 			vclock_create(&relay->recv_vclock);
 			xrow_decode_vclock_xc(&xrow, &relay->recv_vclock);
@@ -716,7 +718,8 @@ relay_send(struct relay *relay, struct xrow_header *packet)
 
 	packet->sync = relay->sync;
 	relay->last_row_time = ev_monotonic_now(loop());
-	coio_write_xrow(&relay->io, packet);
+	if (coio_write_xrow(&relay->io, packet) < 0)
+		diag_raise();
 	fiber_gc();
 
 	struct errinj *inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE);
diff --git a/src/box/xrow_io.cc b/src/box/xrow_io.cc
index 48707982b..f432c6b49 100644
--- a/src/box/xrow_io.cc
+++ b/src/box/xrow_io.cc
@@ -35,71 +35,74 @@
 #include "error.h"
 #include "msgpuck/msgpuck.h"
 
-void
+ssize_t
 coio_read_xrow(struct ev_io *coio, struct ibuf *in, struct xrow_header *row)
 {
 	/* Read fixed header */
-	if (ibuf_used(in) < 1)
-		coio_breadn(coio, in, 1);
+	if (ibuf_used(in) < 1 && coio_breadn(coio, in, 1) < 0)
+		return -1;
 
 	/* Read length */
 	if (mp_typeof(*in->rpos) != MP_UINT) {
-		tnt_raise(ClientError, ER_INVALID_MSGPACK,
-			  "packet length");
+		diag_set(ClientError, ER_INVALID_MSGPACK,
+			 "packet length");
+		return -1;
 	}
 	ssize_t to_read = mp_check_uint(in->rpos, in->wpos);
-	if (to_read > 0)
-		coio_breadn(coio, in, to_read);
+	if (to_read > 0 && coio_breadn(coio, in, to_read) < 0)
+		return -1;
 
 	uint32_t len = mp_decode_uint((const char **) &in->rpos);
 
 	/* Read header and body */
 	to_read = len - ibuf_used(in);
-	if (to_read > 0)
-		coio_breadn(coio, in, to_read);
+	if (to_read > 0 && coio_breadn(coio, in, to_read) < 0)
+		return -1;
 
-	xrow_header_decode_xc(row, (const char **) &in->rpos, in->rpos + len,
-			      true);
+	return xrow_header_decode(row, (const char **) &in->rpos, in->rpos + len,
+				  true);
 }
 
-void
-coio_read_xrow_timeout_xc(struct ev_io *coio, struct ibuf *in,
-			  struct xrow_header *row, ev_tstamp timeout)
+ssize_t
+coio_read_xrow_timeout(struct ev_io *coio, struct ibuf *in,
+		       struct xrow_header *row, ev_tstamp timeout)
 {
 	ev_tstamp start, delay;
 	coio_timeout_init(&start, &delay, timeout);
 	/* Read fixed header */
-	if (ibuf_used(in) < 1)
-		coio_breadn_timeout(coio, in, 1, delay);
+	if (ibuf_used(in) < 1 && coio_breadn_timeout(coio, in, 1, delay) < 0)
+		return -1;
 	coio_timeout_update(&start, &delay);
 
 	/* Read length */
 	if (mp_typeof(*in->rpos) != MP_UINT) {
-		tnt_raise(ClientError, ER_INVALID_MSGPACK,
-			  "packet length");
+		diag_set(ClientError, ER_INVALID_MSGPACK,
+			 "packet length");
+		return -1;
 	}
 	ssize_t to_read = mp_check_uint(in->rpos, in->wpos);
-	if (to_read > 0)
-		coio_breadn_timeout(coio, in, to_read, delay);
+	if (to_read > 0 && coio_breadn_timeout(coio, in, to_read, delay) < 0)
+		return -1;
 	coio_timeout_update(&start, &delay);
 
 	uint32_t len = mp_decode_uint((const char **) &in->rpos);
 
 	/* Read header and body */
 	to_read = len - ibuf_used(in);
-	if (to_read > 0)
-		coio_breadn_timeout(coio, in, to_read, delay);
+	if (to_read > 0 && coio_breadn_timeout(coio, in, to_read, delay) < 0)
+		return -1;
 
-	xrow_header_decode_xc(row, (const char **) &in->rpos, in->rpos + len,
-			      true);
+	return xrow_header_decode(row, (const char **) &in->rpos, in->rpos + len,
+				  true);
 }
 
-
-void
+ssize_t
 coio_write_xrow(struct ev_io *coio, const struct xrow_header *row)
 {
 	struct iovec iov[XROW_IOVMAX];
-	int iovcnt = xrow_to_iovec_xc(row, iov);
-	coio_writev(coio, iov, iovcnt, 0);
+	int iovcnt = xrow_to_iovec(row, iov);
+	if (iovcnt < 0)
+		return -1;
+	return coio_writev(coio, iov, iovcnt, 0);
 }
 
diff --git a/src/box/xrow_io.h b/src/box/xrow_io.h
index 0eb7a8ace..365a70db7 100644
--- a/src/box/xrow_io.h
+++ b/src/box/xrow_io.h
@@ -30,6 +30,7 @@
  * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  * SUCH DAMAGE.
  */
+#include <unistd.h>
 #if defined(__cplusplus)
 extern "C" {
 #endif
@@ -38,14 +39,14 @@ struct ev_io;
 struct ibuf;
 struct xrow_header;
 
-void
+ssize_t
 coio_read_xrow(struct ev_io *coio, struct ibuf *in, struct xrow_header *row);
 
-void
-coio_read_xrow_timeout_xc(struct ev_io *coio, struct ibuf *in,
-			  struct xrow_header *row, double timeout);
+ssize_t
+coio_read_xrow_timeout(struct ev_io *coio, struct ibuf *in,
+		       struct xrow_header *row, double timeout);
 
-void
+ssize_t
 coio_write_xrow(struct ev_io *coio, const struct xrow_header *row);
 
 
diff --git a/src/lib/core/coio.cc b/src/lib/core/coio.cc
index e88d724d5..96a529c2c 100644
--- a/src/lib/core/coio.cc
+++ b/src/lib/core/coio.cc
@@ -41,12 +41,6 @@
 #include "scoped_guard.h"
 #include "coio_task.h" /* coio_resolve() */
 
-struct CoioGuard {
-	struct ev_io *ev_io;
-	CoioGuard(struct ev_io *arg) :ev_io(arg) {}
-	~CoioGuard() { ev_io_stop(loop(), ev_io); }
-};
-
 typedef void (*ev_stat_cb)(ev_loop *, ev_stat *, int);
 
 /** Note: this function does not throw */
@@ -65,12 +59,14 @@ coio_fiber_yield_timeout(struct ev_io *coio, ev_tstamp delay)
 	coio->data = fiber();
 	bool is_timedout = fiber_yield_timeout(delay);
 	coio->data = NULL;
+	if (is_timedout)
+		diag_set(TimedOut);
 	return is_timedout;
 }
 
 /**
  * Connect to a host with a specified timeout.
- * @retval -1 timeout
+ * @retval -1 error or timeout
  * @retval 0 connected
  */
 static int
@@ -79,36 +75,43 @@ coio_connect_addr(struct ev_io *coio, struct sockaddr *addr,
 {
 	ev_loop *loop = loop();
 	if (evio_socket(coio, addr->sa_family, SOCK_STREAM, 0) != 0)
-		diag_raise();
-	auto coio_guard = make_scoped_guard([=]{ evio_close(loop, coio); });
-	if (sio_connect(coio->fd, addr, len) == 0) {
-		coio_guard.is_active = false;
+		return -1;
+	if (sio_connect(coio->fd, addr, len) == 0)
 		return 0;
-	}
 	if (errno != EINPROGRESS)
-		diag_raise();
+		goto close;
 	/*
 	 * Wait until socket is ready for writing or
 	 * timed out.
 	 */
 	ev_io_set(coio, coio->fd, EV_WRITE);
 	ev_io_start(loop, coio);
-	bool is_timedout = coio_fiber_yield_timeout(coio, timeout);
+	bool is_timedout;
+	is_timedout = coio_fiber_yield_timeout(coio, timeout);
 	ev_io_stop(loop, coio);
-	fiber_testcancel();
+	if (fiber_is_cancelled()) {
+		diag_set(FiberIsCancelled);
+		goto close;
+	}
 	if (is_timedout)
-		tnt_raise(TimedOut);
-	int error = EINPROGRESS;
-	socklen_t sz = sizeof(error);
+		goto close;
+	int error;
+	socklen_t sz;
+	error = EINPROGRESS;
+	sz = sizeof(error);
 	if (sio_getsockopt(coio->fd, SOL_SOCKET, SO_ERROR,
 		       &error, &sz))
-		diag_raise();
+		goto close;
 	if (error != 0) {
 		errno = error;
-		tnt_raise(SocketError, sio_socketname(coio->fd), "connect");
+		diag_set(SocketError, sio_socketname(coio->fd), "connect");
+		goto close;
 	}
-	coio_guard.is_active = false;
 	return 0;
+
+close:
+	evio_close(loop, coio);
+	return -1;
 }
 
 void
@@ -152,7 +155,7 @@ coio_fill_addrinfo(struct addrinfo *ai_local, const char *host,
  * This function also supports UNIX domain sockets if uri->path is not NULL and
  * uri->service is NULL.
  *
- * @retval -1 timeout
+ * @retval -1 error or timeout
  * @retval 0 connected
  */
 int
@@ -201,41 +204,37 @@ coio_connect_timeout(struct ev_io *coio, struct uri *uri, struct sockaddr *addr,
 	    hints.ai_flags = AI_ADDRCONFIG|AI_NUMERICSERV|AI_PASSIVE;
 	    hints.ai_protocol = 0;
 	    int rc = coio_getaddrinfo(host, service, &hints, &ai, delay);
-	    if (rc != 0) {
-		    diag_raise();
-		    panic("unspecified getaddrinfo error");
-	    }
+	    if (rc != 0)
+			return -1;
 	}
-	auto addrinfo_guard = make_scoped_guard([=] {
-		if (!uri->host_hint) freeaddrinfo(ai);
-		else free(ai_local.ai_addr);
-	});
+	struct addrinfo *first_ai = ai;
 	evio_timeout_update(loop(), &start, &delay);
 
 	coio_timeout_init(&start, &delay, timeout);
 	assert(! evio_has_fd(coio));
-	while (ai) {
-		try {
-			if (coio_connect_addr(coio, ai->ai_addr,
-					      ai->ai_addrlen, delay))
-				return -1;
+	while (ai && delay >= 0) {
+		if (coio_connect_addr(coio, ai->ai_addr,
+				      ai->ai_addrlen, delay) == 0) {
 			if (addr != NULL) {
 				assert(addr_len != NULL);
 				*addr_len = MIN(ai->ai_addrlen, *addr_len);
 				memcpy(addr, ai->ai_addr, *addr_len);
 			}
 			return 0; /* connected */
-		} catch (SocketError *e) {
-			if (ai->ai_next == NULL)
-				throw;
-			/* ignore exception and try the next address */
 		}
-		ai = ai->ai_next;
 		ev_now_update(loop);
 		coio_timeout_update(&start, &delay);
+		ai = ai->ai_next;
 	}
 
-	tnt_raise(SocketError, sio_socketname(coio->fd), "connection failed");
+	/* Set an error if not timedout. */
+	if (delay >= 0)
+		diag_set(SocketError, sio_socketname(coio->fd), "connection failed");
+	if (!uri->host_hint)
+		freeaddrinfo(first_ai);
+	else
+		free(ai_local.ai_addr);
+	return -1;
 }
 
 /**
@@ -249,8 +248,6 @@ coio_accept(struct ev_io *coio, struct sockaddr *addr,
 	ev_tstamp start, delay;
 	coio_timeout_init(&start, &delay, timeout);
 
-	CoioGuard coio_guard(coio);
-
 	while (true) {
 		/* Assume that there are waiting clients
 		 * available */
@@ -259,12 +256,12 @@ coio_accept(struct ev_io *coio, struct sockaddr *addr,
 			if (evio_setsockopt_client(fd, addr->sa_family,
 						   SOCK_STREAM) != 0) {
 				close(fd);
-				diag_raise();
+				return -1;
 			}
 			return fd;
 		}
 		if (! sio_wouldblock(errno))
-			diag_raise();
+			return -1;
 		/* The socket is not ready, yield */
 		if (! ev_is_active(coio)) {
 			ev_io_set(coio, coio->fd, EV_READ);
@@ -275,11 +272,16 @@ coio_accept(struct ev_io *coio, struct sockaddr *addr,
 		 * timeout is reached.
 		 */
 		bool is_timedout = coio_fiber_yield_timeout(coio, delay);
-		fiber_testcancel();
+		ev_io_stop(loop(), coio);
+		if (fiber_is_cancelled()) {
+			diag_set(FiberIsCancelled);
+			break;
+		}
 		if (is_timedout)
-			tnt_raise(TimedOut);
+			break;
 		coio_timeout_update(&start, &delay);
 	}
+	return -1;
 }
 
 /**
@@ -302,8 +304,6 @@ coio_read_ahead_timeout(struct ev_io *coio, void *buf, size_t sz,
 
 	ssize_t to_read = (ssize_t) sz;
 
-	CoioGuard coio_guard(coio);
-
 	while (true) {
 		/*
 		 * Sic: assume the socket is ready: since
@@ -320,9 +320,8 @@ coio_read_ahead_timeout(struct ev_io *coio, void *buf, size_t sz,
 		} else if (nrd == 0) {
 			errno = 0;
 			return sz - to_read;
-		} else if (! sio_wouldblock(errno)) {
-			diag_raise();
-		}
+		} else if (! sio_wouldblock(errno))
+			return -1;
 
 		/* The socket is not ready, yield */
 		if (! ev_is_active(coio)) {
@@ -333,19 +332,23 @@ coio_read_ahead_timeout(struct ev_io *coio, void *buf, size_t sz,
 		 * Yield control to other fibers until the
 		 * timeout is being reached.
 		 */
-		bool is_timedout = coio_fiber_yield_timeout(coio,
-							    delay);
-		fiber_testcancel();
+		bool is_timedout = coio_fiber_yield_timeout(coio, delay);
+		ev_io_stop(loop(), coio);
+		if (fiber_is_cancelled()) {
+			diag_set(FiberIsCancelled);
+			break;
+		}
 		if (is_timedout)
-			tnt_raise(TimedOut);
+			break;
 		coio_timeout_update(&start, &delay);
 	}
+	return -1;
 }
 
 /**
  * Read at least sz bytes, with readahead.
  *
- * Treats EOF as an error, and throws an exception.
+ * Treats EOF as an error.
  *
  * @retval the number of bytes read, > 0.
  */
@@ -355,8 +358,9 @@ coio_readn_ahead(struct ev_io *coio, void *buf, size_t sz, size_t bufsiz)
 	ssize_t nrd = coio_read_ahead(coio, buf, sz, bufsiz);
 	if (nrd < (ssize_t)sz) {
 		errno = EPIPE;
-		tnt_raise(SocketError, sio_socketname(coio->fd),
-			  "unexpected EOF when reading from socket");
+		diag_set(SocketError, sio_socketname(coio->fd),
+			 "unexpected EOF when reading from socket");
+		return -1;
 	}
 	return nrd;
 }
@@ -364,7 +368,7 @@ coio_readn_ahead(struct ev_io *coio, void *buf, size_t sz, size_t bufsiz)
 /**
  * Read at least sz bytes, with readahead and timeout.
  *
- * Treats EOF as an error, and throws an exception.
+ * Treats EOF as an error.
  *
  * @retval the number of bytes read, > 0.
  */
@@ -375,8 +379,9 @@ coio_readn_ahead_timeout(struct ev_io *coio, void *buf, size_t sz, size_t bufsiz
 	ssize_t nrd = coio_read_ahead_timeout(coio, buf, sz, bufsiz, timeout);
 	if (nrd < (ssize_t)sz && errno == 0) { /* EOF. */
 		errno = EPIPE;
-		tnt_raise(SocketError, sio_socketname(coio->fd),
-			  "unexpected EOF when reading from socket");
+		diag_set(SocketError, sio_socketname(coio->fd),
+			 "unexpected EOF when reading from socket");
+		return -1;
 	}
 	return nrd;
 }
@@ -399,8 +404,6 @@ coio_write_timeout(struct ev_io *coio, const void *buf, size_t sz,
 	ev_tstamp start, delay;
 	coio_timeout_init(&start, &delay, timeout);
 
-	CoioGuard coio_guard(coio);
-
 	while (true) {
 		/*
 		 * Sic: write as much data as possible,
@@ -413,28 +416,28 @@ coio_write_timeout(struct ev_io *coio, const void *buf, size_t sz,
 				return sz;
 			towrite -= nwr;
 			buf = (char *) buf + nwr;
-		} else if (nwr < 0 && !sio_wouldblock(errno)) {
-			diag_raise();
-		}
+		} else if (nwr < 0 && !sio_wouldblock(errno))
+			return -1;
 		if (! ev_is_active(coio)) {
 			ev_io_set(coio, coio->fd, EV_WRITE);
 			ev_io_start(loop(), coio);
 		}
-		/* Yield control to other fibers. */
-		fiber_testcancel();
 		/*
 		 * Yield control to other fibers until the
 		 * timeout is reached or the socket is
 		 * ready.
 		 */
-		bool is_timedout = coio_fiber_yield_timeout(coio,
-							    delay);
-		fiber_testcancel();
-
+		bool is_timedout = coio_fiber_yield_timeout(coio, delay);
+		ev_io_stop(loop(), coio);
+		if (fiber_is_cancelled()) {
+			diag_set(FiberIsCancelled);
+			break;
+		}
 		if (is_timedout)
-			tnt_raise(TimedOut);
+			break;
 		coio_timeout_update(&start, &delay);
 	}
+	return -1;
 }
 
 /*
@@ -446,9 +449,11 @@ coio_flush(int fd, struct iovec *iov, ssize_t offset, int iovcnt)
 {
 	sio_add_to_iov(iov, -offset);
 	ssize_t nwr = sio_writev(fd, iov, iovcnt);
+	if (nwr < 0 && !sio_wouldblock(errno))
+		return -1;
 	sio_add_to_iov(iov, offset);
-	if (nwr < 0 && ! sio_wouldblock(errno))
-		diag_raise();
+	if (nwr < 0)
+		return 0;
 	return nwr;
 }
 
@@ -461,14 +466,15 @@ coio_writev_timeout(struct ev_io *coio, struct iovec *iov, int iovcnt,
 	struct iovec *end = iov + iovcnt;
 	ev_tstamp start, delay;
 	coio_timeout_init(&start, &delay, timeout);
-	CoioGuard coio_guard(coio);
 
 	/* Avoid a syscall in case of 0 iovcnt. */
 	while (iov < end) {
 		/* Write as much data as possible. */
 		ssize_t nwr = coio_flush(coio->fd, iov, iov_len,
 					 end - iov);
-		if (nwr >= 0) {
+		if (nwr < 0)
+			return -1;
+		if (nwr > 0) {
 			total += nwr;
 			/*
 			 * If there was a hint for the total size
@@ -487,18 +493,19 @@ coio_writev_timeout(struct ev_io *coio, struct iovec *iov, int iovcnt,
 			ev_io_set(coio, coio->fd, EV_WRITE);
 			ev_io_start(loop(), coio);
 		}
-		/* Yield control to other fibers. */
-		fiber_testcancel();
 		/*
 		 * Yield control to other fibers until the
 		 * timeout is reached or the socket is
 		 * ready.
 		 */
 		bool is_timedout = coio_fiber_yield_timeout(coio, delay);
-		fiber_testcancel();
-
+		ev_io_stop(loop(), coio);
+		if (fiber_is_cancelled()) {
+			diag_set(FiberIsCancelled);
+			return -1;
+		}
 		if (is_timedout)
-			tnt_raise(TimedOut);
+			return -1;
 		coio_timeout_update(&start, &delay);
 	}
 	return total;
@@ -518,8 +525,6 @@ coio_sendto_timeout(struct ev_io *coio, const void *buf, size_t sz, int flags,
 	ev_tstamp start, delay;
 	coio_timeout_init(&start, &delay, timeout);
 
-	CoioGuard coio_guard(coio);
-
 	while (true) {
 		/*
 		 * Sic: write as much data as possible,
@@ -530,7 +535,7 @@ coio_sendto_timeout(struct ev_io *coio, const void *buf, size_t sz, int flags,
 		if (nwr > 0)
 			return nwr;
 		if (nwr < 0 && ! sio_wouldblock(errno))
-			diag_raise();
+			return -1;
 		if (! ev_is_active(coio)) {
 			ev_io_set(coio, coio->fd, EV_WRITE);
 			ev_io_start(loop(), coio);
@@ -540,13 +545,17 @@ coio_sendto_timeout(struct ev_io *coio, const void *buf, size_t sz, int flags,
 		 * timeout is reached or the socket is
 		 * ready.
 		 */
-		bool is_timedout = coio_fiber_yield_timeout(coio,
-							    delay);
-		fiber_testcancel();
+		bool is_timedout = coio_fiber_yield_timeout(coio, delay);
+		ev_io_stop(loop(), coio);
+		if (fiber_is_cancelled()) {
+			diag_set(FiberIsCancelled);
+			break;
+		}
 		if (is_timedout)
-			tnt_raise(TimedOut);
+			break;
 		coio_timeout_update(&start, &delay);
 	}
+	return -1;
 }
 
 /**
@@ -563,8 +572,6 @@ coio_recvfrom_timeout(struct ev_io *coio, void *buf, size_t sz, int flags,
 	ev_tstamp start, delay;
 	coio_timeout_init(&start, &delay, timeout);
 
-	CoioGuard coio_guard(coio);
-
 	while (true) {
 		/*
 		 * Read as much data as possible,
@@ -575,7 +582,7 @@ coio_recvfrom_timeout(struct ev_io *coio, void *buf, size_t sz, int flags,
 		if (nrd >= 0)
 			return nrd;
 		if (! sio_wouldblock(errno))
-			diag_raise();
+			return -1;
 		if (! ev_is_active(coio)) {
 			ev_io_set(coio, coio->fd, EV_READ);
 			ev_io_start(loop(), coio);
@@ -585,13 +592,17 @@ coio_recvfrom_timeout(struct ev_io *coio, void *buf, size_t sz, int flags,
 		 * timeout is reached or the socket is
 		 * ready.
 		 */
-		bool is_timedout = coio_fiber_yield_timeout(coio,
-							    delay);
-		fiber_testcancel();
+		bool is_timedout = coio_fiber_yield_timeout(coio, delay);
+		ev_io_stop(loop(), coio);
+		if (fiber_is_cancelled()) {
+			diag_set(FiberIsCancelled);
+			break;
+		}
 		if (is_timedout)
-			tnt_raise(TimedOut);
+			break;
 		coio_timeout_update(&start, &delay);
 	}
+	return -1;
 }
 
 static int
@@ -638,12 +649,13 @@ coio_service_init(struct coio_service *service, const char *name,
 	service->handler_param = handler_param;
 }
 
-void
+int
 coio_service_start(struct evio_service *service, const char *uri)
 {
 	if (evio_service_bind(service, uri) != 0 ||
 	    evio_service_listen(service) != 0)
-		diag_raise();
+		return -1;
+	return -0;
 }
 
 void
@@ -661,7 +673,6 @@ coio_stat_stat_timeout(ev_stat *stat, ev_tstamp timeout)
 	coio_timeout_init(&start, &delay, timeout);
 	fiber_yield_timeout(delay);
 	ev_stat_stop(loop(), stat);
-	fiber_testcancel();
 }
 
 typedef void (*ev_child_cb)(ev_loop *, ev_child *, int);
@@ -689,7 +700,6 @@ coio_waitpid(pid_t pid)
 	fiber_set_cancellable(allow_cancel);
 	ev_child_stop(loop(), &cw);
 	int status = cw.rstatus;
-	fiber_testcancel();
 	return status;
 }
 
diff --git a/src/lib/core/coio.h b/src/lib/core/coio.h
index 6a2337689..d557f2869 100644
--- a/src/lib/core/coio.h
+++ b/src/lib/core/coio.h
@@ -33,6 +33,9 @@
 #include "fiber.h"
 #include "trivia/util.h"
 #if defined(__cplusplus)
+extern "C" {
+#endif /* defined(__cplusplus) */
+
 #include "evio.h"
 
 /**
@@ -59,10 +62,6 @@ coio_connect(struct ev_io *coio, struct uri *uri, struct sockaddr *addr,
 	return coio_connect_timeout(coio, uri, addr, addr_len, TIMEOUT_INFINITY);
 }
 
-void
-coio_bind(struct ev_io *coio, struct sockaddr *addr,
-	  socklen_t addrlen);
-
 int
 coio_accept(struct ev_io *coio, struct sockaddr *addr, socklen_t addrlen,
 	    ev_tstamp timeout);
@@ -71,7 +70,7 @@ void
 coio_create(struct ev_io *coio, int fd);
 
 static inline void
-coio_close(ev_loop *loop, struct ev_io *coio)
+coio_destroy(ev_loop *loop, struct ev_io *coio)
 {
 	return evio_close(loop, coio);
 }
@@ -164,7 +163,7 @@ coio_service_init(struct coio_service *service, const char *name,
 		  fiber_func handler, void *handler_param);
 
 /** Wait until the service binds to the port. */
-void
+int
 coio_service_start(struct evio_service *service, const char *uri);
 
 void
@@ -185,8 +184,6 @@ coio_stat_stat_timeout(ev_stat *stat, ev_tstamp delay);
 int
 coio_waitpid(pid_t pid);
 
-extern "C" {
-#endif /* defined(__cplusplus) */
 
 /** \cond public */
 
diff --git a/src/lib/core/coio_buf.h b/src/lib/core/coio_buf.h
index 1ad104985..3a83f8fe1 100644
--- a/src/lib/core/coio_buf.h
+++ b/src/lib/core/coio_buf.h
@@ -45,6 +45,8 @@ coio_bread(struct ev_io *coio, struct ibuf *buf, size_t sz)
 {
 	ibuf_reserve_xc(buf, sz);
 	ssize_t n = coio_read_ahead(coio, buf->wpos, sz, ibuf_unused(buf));
+	if (n < 0)
+		return -1;
 	buf->wpos += n;
 	return n;
 }
@@ -61,6 +63,8 @@ coio_bread_timeout(struct ev_io *coio, struct ibuf *buf, size_t sz,
 	ibuf_reserve_xc(buf, sz);
 	ssize_t n = coio_read_ahead_timeout(coio, buf->wpos, sz, ibuf_unused(buf),
 			                    timeout);
+	if (n < 0)
+		return -1;
 	buf->wpos += n;
 	return n;
 }
@@ -71,6 +75,8 @@ coio_breadn(struct ev_io *coio, struct ibuf *buf, size_t sz)
 {
 	ibuf_reserve_xc(buf, sz);
 	ssize_t n = coio_readn_ahead(coio, buf->wpos, sz, ibuf_unused(buf));
+	if (n < 0)
+		return -1;
 	buf->wpos += n;
 	return n;
 }
@@ -87,6 +93,8 @@ coio_breadn_timeout(struct ev_io *coio, struct ibuf *buf, size_t sz,
 	ibuf_reserve_xc(buf, sz);
 	ssize_t n = coio_readn_ahead_timeout(coio, buf->wpos, sz, ibuf_unused(buf),
 			                     timeout);
+	if (n < 0)
+		return -1;
 	buf->wpos += n;
 	return n;
 }
-- 
2.24.0

^ permalink raw reply	[flat|nested] 14+ messages in thread

* [Tarantool-patches] [PATCH 3/6] xstream: get rid of an exception
  2019-11-19 16:04 [Tarantool-patches] [PATCH 0/6] Synchronous replication preparation Georgy Kirichenko
  2019-11-19 16:04 ` [Tarantool-patches] [PATCH 1/6] recovery: do not throw an error Georgy Kirichenko
  2019-11-19 16:04 ` [Tarantool-patches] [PATCH 2/6] coio: do not htrow an exception Georgy Kirichenko
@ 2019-11-19 16:04 ` Georgy Kirichenko
  2019-11-23 13:45   ` Vladislav Shpilevoy
  2019-11-19 16:04 ` [Tarantool-patches] [PATCH 4/6] vinyl: do not insert vy_tx twice into writers list Georgy Kirichenko
                   ` (4 subsequent siblings)
  7 siblings, 1 reply; 14+ messages in thread
From: Georgy Kirichenko @ 2019-11-19 16:04 UTC (permalink / raw)
  To: tarantool-patches

Refactoring: make xstream C-compliant

Part of #380
---
 src/box/box.cc     |  5 +++--
 src/box/relay.cc   | 23 +++++++++++++----------
 src/box/xstream.cc |  7 +------
 src/box/xstream.h  |  2 +-
 4 files changed, 18 insertions(+), 19 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index 6323e5e6e..f41ef9ce8 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -321,7 +321,7 @@ recovery_journal_create(struct recovery_journal *journal, struct vclock *v)
 	journal->vclock = v;
 }
 
-static void
+static int
 apply_wal_row(struct xstream *stream, struct xrow_header *row)
 {
 	struct request request;
@@ -330,7 +330,7 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
 		struct space *space = space_cache_find_xc(request.space_id);
 		if (box_process_rw(&request, space, NULL) != 0) {
 			say_error("error applying row: %s", request_str(&request));
-			diag_raise();
+			return -1;
 		}
 	}
 	struct wal_stream *xstream =
@@ -341,6 +341,7 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
 	 */
 	if (++xstream->rows % WAL_ROWS_PER_YIELD == 0)
 		fiber_sleep(0);
+	return 0;
 }
 
 static void
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 202620694..fe5e0cfc9 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -165,11 +165,11 @@ relay_last_row_time(const struct relay *relay)
 	return relay->last_row_time;
 }
 
-static void
+static int
 relay_send(struct relay *relay, struct xrow_header *packet);
-static void
+static int
 relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row);
-static void
+static int
 relay_send_row(struct xstream *stream, struct xrow_header *row);
 
 struct relay *
@@ -192,7 +192,7 @@ relay_new(struct replica *replica)
 
 static void
 relay_start(struct relay *relay, int fd, uint64_t sync,
-	     void (*stream_write)(struct xstream *, struct xrow_header *))
+	     int (*stream_write)(struct xstream *, struct xrow_header *))
 {
 	xstream_create(&relay->stream, stream_write);
 	/*
@@ -711,7 +711,7 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
 		diag_raise();
 }
 
-static void
+static int
 relay_send(struct relay *relay, struct xrow_header *packet)
 {
 	ERROR_INJECT_YIELD(ERRINJ_RELAY_SEND_DELAY);
@@ -719,15 +719,16 @@ relay_send(struct relay *relay, struct xrow_header *packet)
 	packet->sync = relay->sync;
 	relay->last_row_time = ev_monotonic_now(loop());
 	if (coio_write_xrow(&relay->io, packet) < 0)
-		diag_raise();
+		return -1;
 	fiber_gc();
 
 	struct errinj *inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE);
 	if (inj != NULL && inj->dparam > 0)
 		fiber_sleep(inj->dparam);
+	return 0;
 }
 
-static void
+static int
 relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row)
 {
 	struct relay *relay = container_of(stream, struct relay, stream);
@@ -736,11 +737,12 @@ relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row)
 	 * vclock while sending a snapshot.
 	 */
 	if (row->group_id != GROUP_LOCAL)
-		relay_send(relay, row);
+		return relay_send(relay, row);
+	return 0;
 }
 
 /** Send a single row to the client. */
-static void
+static int
 relay_send_row(struct xstream *stream, struct xrow_header *packet)
 {
 	struct relay *relay = container_of(stream, struct relay, stream);
@@ -778,6 +780,7 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
 			say_warn("injected broken lsn: %lld",
 				 (long long) packet->lsn);
 		}
-		relay_send(relay, packet);
+		return relay_send(relay, packet);
 	}
+	return 0;
 }
diff --git a/src/box/xstream.cc b/src/box/xstream.cc
index c77e4360e..80f3030d0 100644
--- a/src/box/xstream.cc
+++ b/src/box/xstream.cc
@@ -35,10 +35,5 @@
 int
 xstream_write(struct xstream *stream, struct xrow_header *row)
 {
-	try {
-		stream->write(stream, row);
-	} catch (Exception *e) {
-		return -1;
-	}
-	return 0;
+	return stream->write(stream, row);
 }
diff --git a/src/box/xstream.h b/src/box/xstream.h
index d29ff4213..dbeea3d5b 100644
--- a/src/box/xstream.h
+++ b/src/box/xstream.h
@@ -41,7 +41,7 @@ extern "C" {
 struct xrow_header;
 struct xstream;
 
-typedef void (*xstream_write_f)(struct xstream *, struct xrow_header *);
+typedef int (*xstream_write_f)(struct xstream *, struct xrow_header *);
 
 struct xstream {
 	xstream_write_f write;
-- 
2.24.0

^ permalink raw reply	[flat|nested] 14+ messages in thread

* [Tarantool-patches] [PATCH 4/6] vinyl: do not insert vy_tx twice into writers list
  2019-11-19 16:04 [Tarantool-patches] [PATCH 0/6] Synchronous replication preparation Georgy Kirichenko
                   ` (2 preceding siblings ...)
  2019-11-19 16:04 ` [Tarantool-patches] [PATCH 3/6] xstream: get rid of " Georgy Kirichenko
@ 2019-11-19 16:04 ` Georgy Kirichenko
  2019-11-19 16:04 ` [Tarantool-patches] [PATCH 5/6] box: improve recovery journal Georgy Kirichenko
                   ` (3 subsequent siblings)
  7 siblings, 0 replies; 14+ messages in thread
From: Georgy Kirichenko @ 2019-11-19 16:04 UTC (permalink / raw)
  To: tarantool-patches

If some cases (like there is not data update in case of recovery) a
vy_tx could be inserted twice into the corresponding writers list as
the vy_tx would have empty log. So check that a vy_tx is already
inserted.
This was not detected before as we did not do recovery preserving
transaction boundaries before.

Part of #980
---
 src/box/vy_tx.c | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/box/vy_tx.c b/src/box/vy_tx.c
index d092e0cdb..990c0df85 100644
--- a/src/box/vy_tx.c
+++ b/src/box/vy_tx.c
@@ -895,7 +895,7 @@ vy_tx_begin_statement(struct vy_tx *tx, struct space *space, void **savepoint)
 	}
 	assert(tx->state == VINYL_TX_READY);
 	tx->last_stmt_space = space;
-	if (stailq_empty(&tx->log))
+	if (stailq_empty(&tx->log) && rlist_empty(&tx->in_writers))
 		rlist_add_entry(&tx->xm->writers, tx, in_writers);
 	*savepoint = stailq_last(&tx->log);
 	return 0;
-- 
2.24.0

^ permalink raw reply	[flat|nested] 14+ messages in thread

* [Tarantool-patches] [PATCH 5/6] box: improve recovery journal
  2019-11-19 16:04 [Tarantool-patches] [PATCH 0/6] Synchronous replication preparation Georgy Kirichenko
                   ` (3 preceding siblings ...)
  2019-11-19 16:04 ` [Tarantool-patches] [PATCH 4/6] vinyl: do not insert vy_tx twice into writers list Georgy Kirichenko
@ 2019-11-19 16:04 ` Georgy Kirichenko
  2019-11-23 13:46   ` Vladislav Shpilevoy
  2019-11-19 16:04 ` [Tarantool-patches] [PATCH 6/6] recovery: follow transaction boundaries while recovery or join Georgy Kirichenko
                   ` (2 subsequent siblings)
  7 siblings, 1 reply; 14+ messages in thread
From: Georgy Kirichenko @ 2019-11-19 16:04 UTC (permalink / raw)
  To: tarantool-patches

Refactoring: track recovery journal vclock instead of to use
the recovery ones. Now replicaset vclock will rely on recovery stream
content instead of wal directory content (xlog names and meta). This
enables applier to use this journal and  generalize wal recovery and
applier final join handling.

Part of #980
---
 src/box/box.cc               | 39 +++++++++++++++++++++++-------------
 test/xlog-py/big_lsn.result  |  4 ++++
 test/xlog-py/big_lsn.test.py | 13 ++++++------
 test/xlog-py/dup_key.result  |  8 ++++++++
 test/xlog-py/dup_key.test.py |  7 +++++++
 5 files changed, 51 insertions(+), 20 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index f41ef9ce8..71822551e 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -296,7 +296,7 @@ struct wal_stream {
  */
 struct recovery_journal {
 	struct journal base;
-	struct vclock *vclock;
+	struct vclock vclock;
 };
 
 /**
@@ -309,16 +309,22 @@ recovery_journal_write(struct journal *base,
 		       struct journal_entry *entry)
 {
 	struct recovery_journal *journal = (struct recovery_journal *) base;
-	entry->res = vclock_sum(journal->vclock);
+	for (struct xrow_header **row = entry->rows;
+	     row < entry->rows + entry->n_rows; ++row) {
+		vclock_follow_xrow(&journal->vclock, *row);
+	}
+	entry->res = vclock_sum(&journal->vclock);
+	/* Assume the entry was committed and adjust replicaset vclock. */
+	vclock_copy(&replicaset.vclock, &journal->vclock);
 	journal_entry_complete(entry);
 	return 0;
 }
 
 static inline void
-recovery_journal_create(struct recovery_journal *journal, struct vclock *v)
+recovery_journal_create(struct recovery_journal *journal, const struct vclock *v)
 {
 	journal_create(&journal->base, recovery_journal_write, NULL);
-	journal->vclock = v;
+	vclock_copy(&journal->vclock, v);
 }
 
 static int
@@ -332,6 +338,15 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
 			say_error("error applying row: %s", request_str(&request));
 			return -1;
 		}
+	} else {
+		struct txn *txn = txn_begin();
+		if (txn == NULL || txn_begin_stmt(txn, NULL) != 0 ||
+		    txn_commit_stmt(txn, &request) != 0) {
+			txn_rollback(txn);
+			return -1;
+		}
+		if (txn_commit(txn) != 0)
+			return -1;
 	}
 	struct wal_stream *xstream =
 		container_of(stream, struct wal_stream, base);
@@ -1956,11 +1971,7 @@ local_recovery(const struct tt_uuid *instance_uuid,
 	struct memtx_engine *memtx;
 	memtx = (struct memtx_engine *)engine_by_name("memtx");
 	assert(memtx != NULL);
-
-	struct recovery_journal journal;
-	recovery_journal_create(&journal, &recovery->vclock);
-	journal_set(&journal.base);
-
+	vclock_copy(&replicaset.vclock, checkpoint_vclock);
 	/*
 	 * We explicitly request memtx to recover its
 	 * snapshot as a separate phase since it contains
@@ -1970,6 +1981,11 @@ local_recovery(const struct tt_uuid *instance_uuid,
 	 */
 	memtx_engine_recover_snapshot_xc(memtx, checkpoint_vclock);
 
+	vclock_copy(&replicaset.vclock, checkpoint_vclock);
+	struct recovery_journal journal;
+	recovery_journal_create(&journal, &recovery->vclock);
+	journal_set(&journal.base);
+
 	engine_begin_final_recovery_xc();
 	if (recover_remaining_wals(recovery, &wal_stream.base, NULL, false) != 0)
 		diag_raise();
@@ -1995,11 +2011,6 @@ local_recovery(const struct tt_uuid *instance_uuid,
 		if (recover_remaining_wals(recovery, &wal_stream.base, NULL,
 					   true) != 0)
 			diag_raise();
-		/*
-		 * Advance replica set vclock to reflect records
-		 * applied in hot standby mode.
-		 */
-		vclock_copy(&replicaset.vclock, &recovery->vclock);
 		box_listen();
 		box_sync_replication(false);
 	}
diff --git a/test/xlog-py/big_lsn.result b/test/xlog-py/big_lsn.result
index b370773f2..6c10f6957 100644
--- a/test/xlog-py/big_lsn.result
+++ b/test/xlog-py/big_lsn.result
@@ -5,6 +5,10 @@ box.info.lsn
 box.space._schema:delete('dummy')
 ---
 ...
+box.snapshot()
+---
+- ok
+...
 box.info.lsn
 ---
 - 123456789123
diff --git a/test/xlog-py/big_lsn.test.py b/test/xlog-py/big_lsn.test.py
index c6a31d971..bdc84e012 100644
--- a/test/xlog-py/big_lsn.test.py
+++ b/test/xlog-py/big_lsn.test.py
@@ -9,21 +9,22 @@ server.stop()
 server.deploy()
 server.admin("box.info.lsn")
 server.admin("box.space._schema:delete('dummy')")
+server.admin("box.snapshot()")
 server.stop()
 
-# Bump the instance vclock by tweaking the last xlog.
+# Bump the instance vclock by tweaking the checkpoint.
 old_lsn = 1
 new_lsn = 123456789123
-wal_dir = os.path.join(server.vardir, server.name)
-old_wal = os.path.join(wal_dir, "%020d.xlog" % old_lsn)
-new_wal = os.path.join(wal_dir, "%020d.xlog" % new_lsn)
-with open(old_wal, "r+") as f:
+snap_dir = os.path.join(server.vardir, server.name)
+old_snap = os.path.join(snap_dir, "%020d.snap" % old_lsn)
+new_snap = os.path.join(snap_dir, "%020d.snap" % new_lsn)
+with open(old_snap, "r+") as f:
     s = f.read()
     s = s.replace("VClock: {1: %d}" % old_lsn,
                   "VClock: {1: %d}" % new_lsn)
     f.seek(0)
     f.write(s)
-os.rename(old_wal, new_wal)
+os.rename(old_snap, new_snap)
 
 # Recover and make a snapshot.
 server.start()
diff --git a/test/xlog-py/dup_key.result b/test/xlog-py/dup_key.result
index f387e8e89..0d00dfb97 100644
--- a/test/xlog-py/dup_key.result
+++ b/test/xlog-py/dup_key.result
@@ -17,6 +17,14 @@ box.space.test:insert{2, 'second tuple'}
 - [2, 'second tuple']
 ...
 .xlog exists
+box.space.test:insert{1, 'nop'}
+---
+- [1, 'nop']
+...
+box.space.test:delete{1}
+---
+- [1, 'nop']
+...
 box.space.test:insert{1, 'third tuple'}
 ---
 - [1, 'third tuple']
diff --git a/test/xlog-py/dup_key.test.py b/test/xlog-py/dup_key.test.py
index 7609c9555..cb834747f 100644
--- a/test/xlog-py/dup_key.test.py
+++ b/test/xlog-py/dup_key.test.py
@@ -29,12 +29,19 @@ if os.access(wal, os.F_OK):
     print ".xlog exists"
     os.rename(wal, wal_old)
 
+# Write wal#1-1
+server.start()
+server.admin("box.space.test:insert{1, 'nop'}")
+server.admin("box.space.test:delete{1}")
+server.stop()
+
 # Write wal#2
 server.start()
 server.admin("box.space.test:insert{1, 'third tuple'}")
 server.admin("box.space.test:insert{2, 'fourth tuple'}")
 server.stop()
 
+os.unlink(wal)
 # Restore wal#1
 if not os.access(wal, os.F_OK):
     print ".xlog does not exist"
-- 
2.24.0

^ permalink raw reply	[flat|nested] 14+ messages in thread

* [Tarantool-patches] [PATCH 6/6] recovery: follow transaction boundaries while recovery or join
  2019-11-19 16:04 [Tarantool-patches] [PATCH 0/6] Synchronous replication preparation Georgy Kirichenko
                   ` (4 preceding siblings ...)
  2019-11-19 16:04 ` [Tarantool-patches] [PATCH 5/6] box: improve recovery journal Georgy Kirichenko
@ 2019-11-19 16:04 ` Georgy Kirichenko
  2019-11-23 13:46   ` Vladislav Shpilevoy
  2019-11-20 17:15 ` [Tarantool-patches] [PATCH 0/6] Synchronous replication preparation Konstantin Osipov
  2019-11-23 13:45 ` Vladislav Shpilevoy
  7 siblings, 1 reply; 14+ messages in thread
From: Georgy Kirichenko @ 2019-11-19 16:04 UTC (permalink / raw)
  To: tarantool-patches

Do not start a transaction for each local journal or final join row
but follow transaction boundaries instead.

Part of #980
---
 src/box/applier.cc                     | 92 +++++++++++++-------------
 src/box/box.cc                         | 72 ++++++++++++++------
 test/xlog/panic_on_broken_lsn.result   |  9 ++-
 test/xlog/panic_on_broken_lsn.test.lua |  7 +-
 4 files changed, 107 insertions(+), 73 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 294765195..d00b1b04a 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -268,23 +268,6 @@ apply_row(struct xrow_header *row)
 	return 0;
 }
 
-static int
-apply_final_join_row(struct xrow_header *row)
-{
-	struct txn *txn = txn_begin();
-	if (txn == NULL)
-		return -1;
-	if (apply_row(row) != 0) {
-		txn_rollback(txn);
-		fiber_gc();
-		return -1;
-	}
-	if (txn_commit(txn) != 0)
-		return -1;
-	fiber_gc();
-	return 0;
-}
-
 /**
  * Connect to a remote host and authenticate the client.
  */
@@ -391,6 +374,22 @@ done:
 	applier_set_state(applier, APPLIER_READY);
 }
 
+/**
+ * A helper struct to link xrow objects in a list.
+ */
+struct applier_tx_row {
+	/* Next transaction row. */
+	struct stailq_entry next;
+	/* xrow_header struct for the current transaction row. */
+	struct xrow_header row;
+};
+
+static void
+applier_read_tx(struct applier *applier, struct stailq *rows);
+
+static int
+applier_apply_tx(struct stailq *rows);
+
 /**
  * Execute and process JOIN request (bootstrap the instance).
  */
@@ -478,27 +477,29 @@ applier_join(struct applier *applier)
 	 * Receive final data.
 	 */
 	while (true) {
-		if (coio_read_xrow(coio, ibuf, &row) < 0)
-			diag_raise();
-		applier->last_row_time = ev_monotonic_now(loop());
-		if (iproto_type_is_dml(row.type)) {
-			vclock_follow_xrow(&replicaset.vclock, &row);
-			if (apply_final_join_row(&row) != 0)
-				diag_raise();
-			if (++row_count % 100000 == 0)
-				say_info("%.1fM rows received", row_count / 1e6);
-		} else if (row.type == IPROTO_OK) {
-			/*
-			 * Current vclock. This is not used now,
-			 * ignore.
-			 */
+		struct stailq rows;
+		applier_read_tx(applier, &rows);
+		struct xrow_header *first_row =
+			&(stailq_first_entry(&rows, struct applier_tx_row,
+					    next)->row);
+		if (first_row->type == IPROTO_OK) {
+			if (applier->version_id < version_id(1, 7, 0)) {
+				/*
+				 * This is the start vclock if the
+				 * server is 1.6. Since we have
+				 * not initialized replication
+				 * vclock yet, do it now. In 1.7+
+				 * this vclock is not used.
+				 */
+				xrow_decode_vclock_xc(first_row, &replicaset.vclock);
+			}
 			break; /* end of stream */
-		} else if (iproto_type_is_error(row.type)) {
-			xrow_decode_error_xc(&row);  /* rethrow error */
-		} else {
-			tnt_raise(ClientError, ER_UNKNOWN_REQUEST_TYPE,
-				  (uint32_t) row.type);
 		}
+		if (applier_apply_tx(&rows) != 0)
+			diag_raise();
+		if (ibuf_used(ibuf) == 0)
+			ibuf_reset(ibuf);
+		fiber_gc();
 	}
 	say_info("final data received");
 
@@ -506,16 +507,6 @@ applier_join(struct applier *applier)
 	applier_set_state(applier, APPLIER_READY);
 }
 
-/**
- * A helper struct to link xrow objects in a list.
- */
-struct applier_tx_row {
-	/* Next transaction row. */
-	struct stailq_entry next;
-	/* xrow_header struct for the current transaction row. */
-	struct xrow_header row;
-};
-
 static struct applier_tx_row *
 applier_read_tx_row(struct applier *applier)
 {
@@ -532,6 +523,9 @@ applier_read_tx_row(struct applier *applier)
 	struct xrow_header *row = &tx_row->row;
 
 	double timeout = replication_disconnect_timeout();
+	/* We check timeout only in case of subscribe. */
+	if (applier->state == APPLIER_FINAL_JOIN)
+		timeout = TIMEOUT_INFINITY;
 	/*
 	 * Tarantool < 1.7.7 does not send periodic heartbeat
 	 * messages so we can't assume that if we haven't heard
@@ -568,6 +562,12 @@ applier_read_tx(struct applier *applier, struct stailq *rows)
 		struct applier_tx_row *tx_row = applier_read_tx_row(applier);
 		struct xrow_header *row = &tx_row->row;
 
+		if (row->type == IPROTO_OK) {
+			stailq_add_tail(rows, &tx_row->next);
+			assert(tx_row->row.is_commit);
+			break;
+		}
+
 		if (iproto_type_is_error(row->type))
 			xrow_decode_error_xc(row);
 
diff --git a/src/box/box.cc b/src/box/box.cc
index 71822551e..9464eee63 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -286,6 +286,8 @@ struct wal_stream {
 	struct xstream base;
 	/** How many rows have been recovered so far. */
 	size_t rows;
+	/** Current transaction.*/
+	struct txn *txn;
 };
 
 /**
@@ -321,7 +323,8 @@ recovery_journal_write(struct journal *base,
 }
 
 static inline void
-recovery_journal_create(struct recovery_journal *journal, const struct vclock *v)
+recovery_journal_create(struct recovery_journal *journal,
+			const struct vclock *v)
 {
 	journal_create(&journal->base, recovery_journal_write, NULL);
 	vclock_copy(&journal->vclock, v);
@@ -330,33 +333,44 @@ recovery_journal_create(struct recovery_journal *journal, const struct vclock *v
 static int
 apply_wal_row(struct xstream *stream, struct xrow_header *row)
 {
+	struct wal_stream *wal_stream =
+		container_of(stream, struct wal_stream, base);
+	if (wal_stream->txn == NULL) {
+		wal_stream->txn = txn_begin();
+		if (wal_stream->txn == NULL)
+			return -1;
+	}
 	struct request request;
 	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
+	int rc = 0;
 	if (request.type != IPROTO_NOP) {
 		struct space *space = space_cache_find_xc(request.space_id);
-		if (box_process_rw(&request, space, NULL) != 0) {
+		rc = box_process_rw(&request, space, NULL);
+		if (rc != 0)
 			say_error("error applying row: %s", request_str(&request));
-			return -1;
-		}
 	} else {
-		struct txn *txn = txn_begin();
-		if (txn == NULL || txn_begin_stmt(txn, NULL) != 0 ||
-		    txn_commit_stmt(txn, &request) != 0) {
-			txn_rollback(txn);
+		struct txn *txn = in_txn();
+		rc = txn_begin_stmt(txn, NULL);
+		if (rc == 0)
+			rc = txn_commit_stmt(txn, &request);
+	}
+	if (row->is_commit) {
+		if (txn_commit(wal_stream->txn) != 0) {
+			wal_stream->txn = NULL;
 			return -1;
 		}
-		if (txn_commit(txn) != 0)
-			return -1;
+		wal_stream->txn = NULL;
 	}
-	struct wal_stream *xstream =
-		container_of(stream, struct wal_stream, base);
 	/**
 	 * Yield once in a while, but not too often,
 	 * mostly to allow signal handling to take place.
 	 */
-	if (++xstream->rows % WAL_ROWS_PER_YIELD == 0)
+	if (++(wal_stream->rows) > WAL_ROWS_PER_YIELD &&
+	    wal_stream->txn == NULL) {
+		wal_stream->rows -= WAL_ROWS_PER_YIELD;
 		fiber_sleep(0);
-	return 0;
+	}
+	return rc;
 }
 
 static void
@@ -364,6 +378,21 @@ wal_stream_create(struct wal_stream *ctx)
 {
 	xstream_create(&ctx->base, apply_wal_row);
 	ctx->rows = 0;
+	ctx->txn = NULL;
+}
+
+static int
+wal_stream_destroy(struct wal_stream *ctx)
+{
+	if (ctx->txn != NULL) {
+		/* The last processed row does not have a commit flag set. */
+		txn_rollback(ctx->txn);
+		ctx->txn = NULL;
+		diag_set(ClientError, ER_UNSUPPORTED,
+			 "recovery", "not finished transactions");
+		return -1;
+	}
+	return 0;
 }
 
 /* {{{ configuration bindings */
@@ -1917,6 +1946,9 @@ local_recovery(const struct tt_uuid *instance_uuid,
 
 	struct wal_stream wal_stream;
 	wal_stream_create(&wal_stream);
+	auto wal_stream_guard = make_scoped_guard([&]{
+		wal_stream_destroy(&wal_stream);
+	});
 
 	struct recovery *recovery;
 	recovery = recovery_new(cfg_gets("wal_dir"),
@@ -1925,13 +1957,7 @@ local_recovery(const struct tt_uuid *instance_uuid,
 	if (recovery == NULL)
 		diag_raise();
 
-	/*
-	 * Make sure we report the actual recovery position
-	 * in box.info while local recovery is in progress.
-	 */
-	box_vclock = &recovery->vclock;
 	auto guard = make_scoped_guard([&]{
-		box_vclock = &replicaset.vclock;
 		recovery_stop_local(recovery);
 		recovery_delete(recovery);
 	});
@@ -1981,12 +2007,12 @@ local_recovery(const struct tt_uuid *instance_uuid,
 	 */
 	memtx_engine_recover_snapshot_xc(memtx, checkpoint_vclock);
 
-	vclock_copy(&replicaset.vclock, checkpoint_vclock);
 	struct recovery_journal journal;
 	recovery_journal_create(&journal, &recovery->vclock);
 	journal_set(&journal.base);
 
 	engine_begin_final_recovery_xc();
+
 	if (recover_remaining_wals(recovery, &wal_stream.base, NULL, false) != 0)
 		diag_raise();
 	engine_end_recovery_xc();
@@ -2015,7 +2041,9 @@ local_recovery(const struct tt_uuid *instance_uuid,
 		box_sync_replication(false);
 	}
 	recovery_finalize(recovery);
-
+	wal_stream_guard.is_active = false;
+	if (wal_stream_destroy(&wal_stream))
+		diag_raise();
 	/*
 	 * We must enable WAL before finalizing engine recovery,
 	 * because an engine may start writing to WAL right after
diff --git a/test/xlog/panic_on_broken_lsn.result b/test/xlog/panic_on_broken_lsn.result
index 1e62680eb..e209374b6 100644
--- a/test/xlog/panic_on_broken_lsn.result
+++ b/test/xlog/panic_on_broken_lsn.result
@@ -141,8 +141,11 @@ test_run:cmd("setopt delimiter ';'")
 _ = fiber.create(function()
     test_run:wait_cond(function() return box.info.replication[2] ~= nil end)
     lsn = box.info.vclock[1]
-    box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", lsn + 1)
+    box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", lsn + 2)
+    box.begin()
     box.space.test:auto_increment{'v1'}
+    box.space.test:auto_increment{'v1'}
+    box.commit()
     box.error.injection.set("ERRINJ_REPLICA_JOIN_DELAY", false)
 end);
 ---
@@ -164,9 +167,9 @@ box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", -1)
 - ok
 ...
 -- Check that log contains the mention of broken LSN and the request printout
-grep_broken_lsn(fio.pathjoin(fio.cwd(), 'replica.log'), lsn)
+grep_broken_lsn(fio.pathjoin(fio.cwd(), 'replica.log'), lsn + 1)
 ---
-- '{type: ''INSERT'', replica_id: 1, space_id: 9000, index_id: 0, tuple: [2, "v1"]}'
+- '{type: ''INSERT'', replica_id: 1, space_id: 9000, index_id: 0, tuple: [3, "v1"]}'
 ...
 test_run:cmd('cleanup server replica')
 ---
diff --git a/test/xlog/panic_on_broken_lsn.test.lua b/test/xlog/panic_on_broken_lsn.test.lua
index 80cccd918..a1d62cee5 100644
--- a/test/xlog/panic_on_broken_lsn.test.lua
+++ b/test/xlog/panic_on_broken_lsn.test.lua
@@ -67,8 +67,11 @@ test_run:cmd("setopt delimiter ';'")
 _ = fiber.create(function()
     test_run:wait_cond(function() return box.info.replication[2] ~= nil end)
     lsn = box.info.vclock[1]
-    box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", lsn + 1)
+    box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", lsn + 2)
+    box.begin()
     box.space.test:auto_increment{'v1'}
+    box.space.test:auto_increment{'v1'}
+    box.commit()
     box.error.injection.set("ERRINJ_REPLICA_JOIN_DELAY", false)
 end);
 test_run:cmd("setopt delimiter ''");
@@ -78,7 +81,7 @@ test_run:cmd('start server replica with crash_expected=True')
 box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", -1)
 
 -- Check that log contains the mention of broken LSN and the request printout
-grep_broken_lsn(fio.pathjoin(fio.cwd(), 'replica.log'), lsn)
+grep_broken_lsn(fio.pathjoin(fio.cwd(), 'replica.log'), lsn + 1)
 
 test_run:cmd('cleanup server replica')
 test_run:cmd('delete server replica')
-- 
2.24.0

^ permalink raw reply	[flat|nested] 14+ messages in thread

* Re: [Tarantool-patches] [PATCH 0/6] Synchronous replication preparation
  2019-11-19 16:04 [Tarantool-patches] [PATCH 0/6] Synchronous replication preparation Georgy Kirichenko
                   ` (5 preceding siblings ...)
  2019-11-19 16:04 ` [Tarantool-patches] [PATCH 6/6] recovery: follow transaction boundaries while recovery or join Georgy Kirichenko
@ 2019-11-20 17:15 ` Konstantin Osipov
  2019-11-23 13:45 ` Vladislav Shpilevoy
  7 siblings, 0 replies; 14+ messages in thread
From: Konstantin Osipov @ 2019-11-20 17:15 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

* Georgy Kirichenko <georgy@tarantool.org> [19/11/19 19:06]:
> This patchset contains 6 patches and includes some refactoring
> and synchronous replication preparation.
> 
> First three patches provides coio, recovery and xstream
> refactoring which got rid of exceptions. This makes 
> corresponding facilities C-compliant and enables its usage
> from a wal source.
> 
> Fourth patch fixes a rare vinyl error which manifests itself while
> transactional recovery as there is no data change and vy_tx log
> tends to be empty.
> 
> Fifth patch improves recovery journal making them able to track
> recovery vclock. This enables the last patch which implements
> transactional recovery (either local wal including hot-standby or
> final join). Transactional recovery is essential in case of
> synchronous replication because this both sources (wal and final
> join stream) would contain written but not yet committed
> transaction and we will be in duty to recognize it.

LGTM for the design decisions of the entire series. That is,
I agree the premise and the implementation for the changes in the
patch set are good.

I haven't reviewed the patch set thoroughly, so it either has to
be a second review or I will need to go over the details more
carefully.


-- 
Konstantin Osipov, Moscow, Russia

^ permalink raw reply	[flat|nested] 14+ messages in thread

* Re: [Tarantool-patches] [PATCH 0/6] Synchronous replication preparation
  2019-11-19 16:04 [Tarantool-patches] [PATCH 0/6] Synchronous replication preparation Georgy Kirichenko
                   ` (6 preceding siblings ...)
  2019-11-20 17:15 ` [Tarantool-patches] [PATCH 0/6] Synchronous replication preparation Konstantin Osipov
@ 2019-11-23 13:45 ` Vladislav Shpilevoy
  7 siblings, 0 replies; 14+ messages in thread
From: Vladislav Shpilevoy @ 2019-11-23 13:45 UTC (permalink / raw)
  To: Georgy Kirichenko, tarantool-patches

Hi!

You have 9 commits on top of this branch. What
should I review? Please, drop the commits
not related to this patchset from the branch.

Look at CI, it is broken:

https://travis-ci.org/tarantool/tarantool/builds/614719290?utm_source=github_status&utm_medium=notification

box-py/iproto.test.py does not pass.

> Georgy Kirichenko (6):
>   recovery: do not throw an error
>   coio: do not htrow an exception
>   xstream: get rid of an exception
>   vinyl: do not insert vy_tx twice into writers list
>   box: improve recovery journal
>   recovery: follow transaction boundaries while recovery or join
> 
>  src/box/applier.cc                     | 140 ++++++++--------
>  src/box/box.cc                         | 119 ++++++++++----
>  src/box/recovery.cc                    |  87 ++++++----
>  src/box/recovery.h                     |  14 +-
>  src/box/relay.cc                       |  47 +++---
>  src/box/vy_tx.c                        |   2 +-
>  src/box/xrow_io.cc                     |  59 +++----
>  src/box/xrow_io.h                      |  11 +-
>  src/box/xstream.cc                     |   7 +-
>  src/box/xstream.h                      |   2 +-
>  src/lib/core/coio.cc                   | 212 +++++++++++++------------
>  src/lib/core/coio.h                    |  13 +-
>  src/lib/core/coio_buf.h                |   8 +
>  test/xlog-py/big_lsn.result            |   4 +
>  test/xlog-py/big_lsn.test.py           |  13 +-
>  test/xlog-py/dup_key.result            |   8 +
>  test/xlog-py/dup_key.test.py           |   7 +
>  test/xlog/panic_on_broken_lsn.result   |   9 +-
>  test/xlog/panic_on_broken_lsn.test.lua |   7 +-
>  19 files changed, 449 insertions(+), 320 deletions(-)
> 

^ permalink raw reply	[flat|nested] 14+ messages in thread

* Re: [Tarantool-patches] [PATCH 1/6] recovery: do not throw an error
  2019-11-19 16:04 ` [Tarantool-patches] [PATCH 1/6] recovery: do not throw an error Georgy Kirichenko
@ 2019-11-23 13:45   ` Vladislav Shpilevoy
  0 siblings, 0 replies; 14+ messages in thread
From: Vladislav Shpilevoy @ 2019-11-23 13:45 UTC (permalink / raw)
  To: Georgy Kirichenko, tarantool-patches

Thanks for the patch!

See 9 comments below.

On 19/11/2019 17:04, Georgy Kirichenko wrote:
> Relaying from C-written wal requires recovery to be a C-compliant. So
> get rid of exception from recovery interface.
> 
> Part of #980
> ---
>  src/box/box.cc      | 16 ++++++---
>  src/box/recovery.cc | 87 +++++++++++++++++++++++++++------------------
>  src/box/recovery.h  | 14 ++++----
>  src/box/relay.cc    | 15 ++++----
>  4 files changed, 79 insertions(+), 53 deletions(-)
> 
> diff --git a/src/box/box.cc b/src/box/box.cc
> index b119c927b..a53b6e912 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -1911,6 +1913,7 @@ local_recovery(const struct tt_uuid *instance_uuid,
>  	box_vclock = &recovery->vclock;
>  	auto guard = make_scoped_guard([&]{
>  		box_vclock = &replicaset.vclock;
> +		recovery_stop_local(recovery);

1. AFAIU the patch is pure refactoring. Why is this functional change
here?

>  		recovery_delete(recovery);
>  	});
>  
> diff --git a/src/box/recovery.cc b/src/box/recovery.cc
> index d122d618a..4693008f1 100644
> --- a/src/box/recovery.cc
> +++ b/src/box/recovery.cc
> @@ -87,14 +87,11 @@ recovery_new(const char *wal_dirname, bool force_recovery,
>  			calloc(1, sizeof(*r));

2. src/box/relay.cc:359 does not check for
recovery_new() == NULL.

>  
>  	if (r == NULL) {
> -		tnt_raise(OutOfMemory, sizeof(*r), "malloc",
> -			  "struct recovery");
> +		diag_set(OutOfMemory, sizeof(*r), "malloc",
> +			 "struct recovery");
> +		return NULL;
>  	}
>  
> -	auto guard = make_scoped_guard([=]{
> -		free(r);
> -	});
> -
>  	xdir_create(&r->wal_dir, wal_dirname, XLOG, &INSTANCE_UUID,
>  		    &xlog_opts_default);
>  	r->wal_dir.force_recovery = force_recovery;
> @@ -156,19 +158,21 @@ recovery_close_log(struct recovery *r)
>  			 r->cursor.name);
>  	}
>  	xlog_cursor_close(&r->cursor, false);
> -	trigger_run_xc(&r->on_close_log, NULL);
> +	/* Suppress a trigger error if happened. */
> +	trigger_run(&r->on_close_log, NULL);

3. Why do you suppress it? It was not so before your
patch, and it has nothing to do with exceptions removal.

>  }
>  
> -static void
> +static int
>  recovery_open_log(struct recovery *r, const struct vclock *vclock)
>  {
> -	XlogGapError *e;
>  	struct xlog_meta meta = r->cursor.meta;
>  	enum xlog_cursor_state state = r->cursor.state;
>  
>  	recovery_close_log(r);
>  
> -	xdir_open_cursor_xc(&r->wal_dir, vclock_sum(vclock), &r->cursor);

4. xdir_open_cursor_xc() is now unused and can be dropped.

> +	if (xdir_open_cursor(&r->wal_dir, vclock_sum(vclock),
> +			     &r->cursor) != 0)
> +		return -1;
>  
>  	if (state == XLOG_CURSOR_NEW &&
>  	    vclock_compare(vclock, &r->vclock) > 0) {
> @@ -216,8 +220,9 @@ gap_error:
>  void
>  recovery_delete(struct recovery *r)
>  {
> -	recovery_stop_local(r);
> +	/* Recovery should be stopped before deleting. */

5. It should be, perhaps. But how is it related to the
exceptions removal? If that part is broken, then please,
move it to a different commit, and add a test.

>  
> +	assert(r->watcher == NULL);
>  	trigger_destroy(&r->on_close_log);
>  	xdir_destroy(&r->wal_dir);
>  	if (xlog_cursor_is_open(&r->cursor)) {
> @@ -237,25 +242,26 @@ recovery_delete(struct recovery *r)
>   * The reading will be stopped on reaching stop_vclock.
>   * Use NULL for boundless recover
>   */
> -static void
> +static int
>  recover_xlog(struct recovery *r, struct xstream *stream,
>  	     const struct vclock *stop_vclock)
>  {
>  	struct xrow_header row;
>  	uint64_t row_count = 0;
> -	while (xlog_cursor_next_xc(&r->cursor, &row,
> -				   r->wal_dir.force_recovery) == 0) {

6. xlog_cursor_next_xc() is now unused and can be dropped.

> +	int rc;
> +	while ((rc = xlog_cursor_next(&r->cursor, &row,
> +				      r->wal_dir.force_recovery)) == 0) {
>  		/*
>  		 * Read the next row from xlog file.
>  		 *
> -		 * xlog_cursor_next_xc() returns 1 when
> +		 * xlog_cursor_next() returns 1 when
>  		 * it can not read more rows. This doesn't mean
>  		 * the file is fully read: it's fully read only
>  		 * when EOF marker has been read, see i.eof_read
>  		 */
>  		if (stop_vclock != NULL &&
>  		    r->vclock.signature >= stop_vclock->signature)
> -			return;
> +			return 0;
>  		int64_t current_lsn = vclock_get(&r->vclock, row.replica_id);
>  		if (row.lsn <= current_lsn)
>  			continue; /* already applied, skip */
> @@ -279,13 +285,16 @@ recover_xlog(struct recovery *r, struct xstream *stream,
>  					 row_count / 1000000.);
>  		} else {
>  			if (!r->wal_dir.force_recovery)
> -				diag_raise();
> +				return -1;
>  
>  			say_error("skipping row {%u: %lld}",
>  				  (unsigned)row.replica_id, (long long)row.lsn);
>  			diag_log();
>  		}
>  	}
> +	if (rc < 0)
> +		return -1;
> +	return 0;

7. xlog_cursor_next() returns either -1 or 0. The cycle won't
stop until it returns 0. It means, that here rc can't be anything
except -1, and you can do 'return rc;' or 'return -1;' instead,
without the 'if'.

>  }
>  
>  /**
> @@ -299,7 +308,7 @@ recover_xlog(struct recovery *r, struct xstream *stream,
>   * This function will not close r->current_wal if
>   * recovery was successful.
>   */
> -void
> +int
>  recover_remaining_wals(struct recovery *r, struct xstream *stream,
>  		       const struct vclock *stop_vclock, bool scan_dir)
>  {

8. The function still make a not exception safe call xdir_scan_xc().
After you will drop xdir_scan_xc() from there, it will become unused,
so you can drop the whole xdir_scan_xc() function.

9. There is 1 more function, using exceptions - hot_standby_f(). I
propose you to convert it to exception safe, convert class
WalSubscription to a struct, and turn this file into .c (in a
separate commit).

^ permalink raw reply	[flat|nested] 14+ messages in thread

* Re: [Tarantool-patches] [PATCH 2/6] coio: do not htrow an exception
  2019-11-19 16:04 ` [Tarantool-patches] [PATCH 2/6] coio: do not htrow an exception Georgy Kirichenko
@ 2019-11-23 13:45   ` Vladislav Shpilevoy
  0 siblings, 0 replies; 14+ messages in thread
From: Vladislav Shpilevoy @ 2019-11-23 13:45 UTC (permalink / raw)
  To: Georgy Kirichenko, tarantool-patches

Thanks for the patch!

See 23 comments below.

On 19/11/2019 17:04, Georgy Kirichenko wrote:
> Relaying from C-written wal requires coio to be a C-compliant. So
> get rid of exception from coio interface.
> 
> Part of #980
> ---
>  src/box/applier.cc      |  52 ++++++----
>  src/box/box.cc          |   9 +-
>  src/box/relay.cc        |  11 ++-
>  src/box/xrow_io.cc      |  59 +++++------
>  src/box/xrow_io.h       |  11 ++-
>  src/lib/core/coio.cc    | 212 +++++++++++++++++++++-------------------
>  src/lib/core/coio.h     |  13 +--
>  src/lib/core/coio_buf.h |   8 ++
>  8 files changed, 207 insertions(+), 168 deletions(-)
> > @@ -345,8 +348,9 @@ applier_connect(struct applier *applier)
>  	 * election on bootstrap.
>  	 */
>  	xrow_encode_vote(&row);
> -	coio_write_xrow(coio, &row);
> -	coio_read_xrow(coio, ibuf, &row);
> +	if (coio_write_xrow(coio, &row) < 0 ||
> +	    coio_read_xrow(coio, ibuf, &row) < 0)

1. coio_read_xrow() uses coio_breadn(), which calls ibuf_reserve_xc().
It is not exception safe.

> +		diag_raise();
>  	if (row.type == IPROTO_OK) {
>  		xrow_decode_ballot_xc(&row, &applier->ballot);
>  	} else try {> diff --git a/src/box/xrow_io.cc b/src/box/xrow_io.cc
> index 48707982b..f432c6b49 100644
> --- a/src/box/xrow_io.cc
> +++ b/src/box/xrow_io.cc
> @@ -35,71 +35,74 @@
>  #include "error.h"
>  #include "msgpuck/msgpuck.h"
>  
> -void
> +ssize_t
>  coio_read_xrow(struct ev_io *coio, struct ibuf *in, struct xrow_header *row)
>  {
>  	/* Read fixed header */
> -	if (ibuf_used(in) < 1)
> -		coio_breadn(coio, in, 1);
> +	if (ibuf_used(in) < 1 && coio_breadn(coio, in, 1) < 0)
> +		return -1;
>  
>  	/* Read length */
>  	if (mp_typeof(*in->rpos) != MP_UINT) {
> -		tnt_raise(ClientError, ER_INVALID_MSGPACK,
> -			  "packet length");
> +		diag_set(ClientError, ER_INVALID_MSGPACK,
> +			 "packet length");
> +		return -1;
>  	}
>  	ssize_t to_read = mp_check_uint(in->rpos, in->wpos);
> -	if (to_read > 0)
> -		coio_breadn(coio, in, to_read);
> +	if (to_read > 0 && coio_breadn(coio, in, to_read) < 0)
> +		return -1;
>  
>  	uint32_t len = mp_decode_uint((const char **) &in->rpos);
>  
>  	/* Read header and body */
>  	to_read = len - ibuf_used(in);
> -	if (to_read > 0)
> -		coio_breadn(coio, in, to_read);
> +	if (to_read > 0 && coio_breadn(coio, in, to_read) < 0)
> +		return -1;
>  
> -	xrow_header_decode_xc(row, (const char **) &in->rpos, in->rpos + len,
> -			      true);

2. xrow_header_decode_xc() is now unused and can be dropped.

> +	return xrow_header_decode(row, (const char **) &in->rpos, in->rpos + len,
> +				  true);
>  }
>  
> -void
> -coio_read_xrow_timeout_xc(struct ev_io *coio, struct ibuf *in,
> -			  struct xrow_header *row, ev_tstamp timeout)
> +ssize_t
> +coio_read_xrow_timeout(struct ev_io *coio, struct ibuf *in,
> +		       struct xrow_header *row, ev_tstamp timeout)
>  {
>  	ev_tstamp start, delay;
>  	coio_timeout_init(&start, &delay, timeout);
>  	/* Read fixed header */
> -	if (ibuf_used(in) < 1)
> -		coio_breadn_timeout(coio, in, 1, delay);
> +	if (ibuf_used(in) < 1 && coio_breadn_timeout(coio, in, 1, delay) < 0)
> +		return -1;
>  	coio_timeout_update(&start, &delay);
>  
>  	/* Read length */
>  	if (mp_typeof(*in->rpos) != MP_UINT) {
> -		tnt_raise(ClientError, ER_INVALID_MSGPACK,
> -			  "packet length");
> +		diag_set(ClientError, ER_INVALID_MSGPACK,
> +			 "packet length");
> +		return -1;
>  	}
>  	ssize_t to_read = mp_check_uint(in->rpos, in->wpos);
> -	if (to_read > 0)
> -		coio_breadn_timeout(coio, in, to_read, delay);
> +	if (to_read > 0 && coio_breadn_timeout(coio, in, to_read, delay) < 0)
> +		return -1;
>  	coio_timeout_update(&start, &delay);
>  
>  	uint32_t len = mp_decode_uint((const char **) &in->rpos);
>  
>  	/* Read header and body */
>  	to_read = len - ibuf_used(in);
> -	if (to_read > 0)
> -		coio_breadn_timeout(coio, in, to_read, delay);
> +	if (to_read > 0 && coio_breadn_timeout(coio, in, to_read, delay) < 0)
> +		return -1;
>  
> -	xrow_header_decode_xc(row, (const char **) &in->rpos, in->rpos + len,
> -			      true);
> +	return xrow_header_decode(row, (const char **) &in->rpos, in->rpos + len,
> +				  true);
>  }
>  
> -
> -void
> +ssize_t
>  coio_write_xrow(struct ev_io *coio, const struct xrow_header *row)
>  {
>  	struct iovec iov[XROW_IOVMAX];
> -	int iovcnt = xrow_to_iovec_xc(row, iov);

3. xrow_to_iovec_xc() is now unused and can be dropped.

> -	coio_writev(coio, iov, iovcnt, 0);
> +	int iovcnt = xrow_to_iovec(row, iov);
> +	if (iovcnt < 0)
> +		return -1;
> +	return coio_writev(coio, iov, iovcnt, 0);
>  }
>  

4. The whole xrow_io.cc is now exception safe. I propose you to
rename it to .c .Perhaps it would be good to rename all safe
.cc files to .c in one commit.

> diff --git a/src/lib/core/coio.cc b/src/lib/core/coio.cc
> index e88d724d5..96a529c2c 100644
> --- a/src/lib/core/coio.cc
> +++ b/src/lib/core/coio.cc
> @@ -41,12 +41,6 @@
>  #include "scoped_guard.h"

5. scoped guard header is not needed anymore here.

>  #include "coio_task.h" /* coio_resolve() */
>  
> @@ -79,36 +75,43 @@ coio_connect_addr(struct ev_io *coio, struct sockaddr *addr,
>  {
>  	ev_loop *loop = loop();
>  	if (evio_socket(coio, addr->sa_family, SOCK_STREAM, 0) != 0)
> -		diag_raise();
> -	auto coio_guard = make_scoped_guard([=]{ evio_close(loop, coio); });
> -	if (sio_connect(coio->fd, addr, len) == 0) {
> -		coio_guard.is_active = false;
> +		return -1;
> +	if (sio_connect(coio->fd, addr, len) == 0)
>  		return 0;
> -	}
>  	if (errno != EINPROGRESS)
> -		diag_raise();
> +		goto close;
>  	/*
>  	 * Wait until socket is ready for writing or
>  	 * timed out.
>  	 */
>  	ev_io_set(coio, coio->fd, EV_WRITE);
>  	ev_io_start(loop, coio);
> -	bool is_timedout = coio_fiber_yield_timeout(coio, timeout);
> +	bool is_timedout;
> +	is_timedout = coio_fiber_yield_timeout(coio, timeout);

6. Why did you break the line?

> -	int error = EINPROGRESS;
> -	socklen_t sz = sizeof(error);
> +	int error;
> +	socklen_t sz;
> +	error = EINPROGRESS;
> +	sz = sizeof(error);

7. Why did you break these lines? They were perfectly fine.

>  	if (sio_getsockopt(coio->fd, SOL_SOCKET, SO_ERROR,
>  		       &error, &sz))
> -		diag_raise();
> +		goto close;
>  	if (error != 0) {
>  		errno = error;
> -		tnt_raise(SocketError, sio_socketname(coio->fd), "connect");
> +		diag_set(SocketError, sio_socketname(coio->fd), "connect");
> +		goto close;
>  	}
> -	coio_guard.is_active = false;
>  	return 0;
> +
> +close:

8. I propose you to rename this to 'fail:' or something else meaning
an error, since this is not just a normal close. It happens at an
error only.

> +	evio_close(loop, coio);
> +	return -1;
>  }
>  
>  void
> @@ -201,41 +204,37 @@ coio_connect_timeout(struct ev_io *coio, struct uri *uri, struct sockaddr *addr,
>  	    hints.ai_flags = AI_ADDRCONFIG|AI_NUMERICSERV|AI_PASSIVE;
>  	    hints.ai_protocol = 0;
>  	    int rc = coio_getaddrinfo(host, service, &hints, &ai, delay);
> -	    if (rc != 0) {
> -		    diag_raise();
> -		    panic("unspecified getaddrinfo error");
> -	    }
> +	    if (rc != 0)
> +			return -1;

9. Double indentation.

10. You can drop 'rc' and check coio_getaddrinfo() result
directly:

    if (coio_getaddrinfo(...) != 0)
            return -1;

>  	}
> -	auto addrinfo_guard = make_scoped_guard([=] {
> -		if (!uri->host_hint) freeaddrinfo(ai);
> -		else free(ai_local.ai_addr);
> -	});
> +	struct addrinfo *first_ai = ai;
>  	evio_timeout_update(loop(), &start, &delay);
>  
>  	coio_timeout_init(&start, &delay, timeout);
>  	assert(! evio_has_fd(coio));
> -	while (ai) {
> -		try {
> -			if (coio_connect_addr(coio, ai->ai_addr,
> -					      ai->ai_addrlen, delay))
> -				return -1;
> +	while (ai && delay >= 0) {
> +		if (coio_connect_addr(coio, ai->ai_addr,
> +				      ai->ai_addrlen, delay) == 0) {
>  			if (addr != NULL) {
>  				assert(addr_len != NULL);
>  				*addr_len = MIN(ai->ai_addrlen, *addr_len);
>  				memcpy(addr, ai->ai_addr, *addr_len);
>  			}
>  			return 0; /* connected */

11. struct addrinfo *ai leaks here.

> -		} catch (SocketError *e) {
> -			if (ai->ai_next == NULL)
> -				throw;
> -			/* ignore exception and try the next address */
>  		}
> -		ai = ai->ai_next;
>  		ev_now_update(loop);
>  		coio_timeout_update(&start, &delay);
> +		ai = ai->ai_next;

12. Why did you move this line?

13. Before your patch 'SocketError' was allowed in case there are
more addresses to try. Other errors were not tolerated. Why
do you tolerate them now?

>  	}
>  
> -	tnt_raise(SocketError, sio_socketname(coio->fd), "connection failed");
> +	/* Set an error if not timedout. */
> +	if (delay >= 0)
> +		diag_set(SocketError, sio_socketname(coio->fd), "connection failed");

14. Please, don't make functional changes.

> +	if (!uri->host_hint)
> +		freeaddrinfo(first_ai);
> +	else
> +		free(ai_local.ai_addr);
> +	return -1;
>  }
>  
>  /**
> @@ -259,12 +256,12 @@ coio_accept(struct ev_io *coio, struct sockaddr *addr,
>  			if (evio_setsockopt_client(fd, addr->sa_family,
>  						   SOCK_STREAM) != 0) {
>  				close(fd);
> -				diag_raise();
> +				return -1;

15. CoioGuard did ev_io_stop() here. Why did you remove it?

>  			}
>  			return fd;
>  		}
>  		if (! sio_wouldblock(errno))
> -			diag_raise();
> +			return -1;

16. The same.

>  		/* The socket is not ready, yield */
>  		if (! ev_is_active(coio)) {
>  			ev_io_set(coio, coio->fd, EV_READ);
> @@ -275,11 +272,16 @@ coio_accept(struct ev_io *coio, struct sockaddr *addr,
>  		 * timeout is reached.
>  		 */
>  		bool is_timedout = coio_fiber_yield_timeout(coio, delay);
> -		fiber_testcancel();
> +		ev_io_stop(loop(), coio);

17. Why did you stop it unconditionally? It was stopped only
in case of timeout or fiber cancelation before your patch. Otherwise
the cycle was working further without stop of the coio.

> +		if (fiber_is_cancelled()) {
> +			diag_set(FiberIsCancelled);
> +			break;
> +		}
>  		if (is_timedout)
> -			tnt_raise(TimedOut);
> +			break;
>  		coio_timeout_update(&start, &delay);
>  	}
> +	return -1;
>  }
>  
>  /**
> @@ -302,8 +304,6 @@ coio_read_ahead_timeout(struct ev_io *coio, void *buf, size_t sz,
>  
>  	ssize_t to_read = (ssize_t) sz;
>  
> -	CoioGuard coio_guard(coio);

18. All the same as about the previous guard. In all other 'guarded'
places too.

> -
>  	while (true) {
>  		/*
>  		 * Sic: assume the socket is ready: since
> @@ -446,9 +449,11 @@ coio_flush(int fd, struct iovec *iov, ssize_t offset, int iovcnt)
>  {
>  	sio_add_to_iov(iov, -offset);
>  	ssize_t nwr = sio_writev(fd, iov, iovcnt);
> +	if (nwr < 0 && !sio_wouldblock(errno))
> +		return -1;
>  	sio_add_to_iov(iov, offset);
> -	if (nwr < 0 && ! sio_wouldblock(errno))
> -		diag_raise();
> +	if (nwr < 0)
> +		return 0;

19. Why? Please, don't make functional changes in a refactoring
patch.

>  	return nwr;
>  }
>  
> @@ -461,14 +466,15 @@ coio_writev_timeout(struct ev_io *coio, struct iovec *iov, int iovcnt,
>  	struct iovec *end = iov + iovcnt;
>  	ev_tstamp start, delay;
>  	coio_timeout_init(&start, &delay, timeout);
> -	CoioGuard coio_guard(coio);
>  
>  	/* Avoid a syscall in case of 0 iovcnt. */
>  	while (iov < end) {
>  		/* Write as much data as possible. */
>  		ssize_t nwr = coio_flush(coio->fd, iov, iov_len,
>  					 end - iov);
> -		if (nwr >= 0) {
> +		if (nwr < 0)
> +			return -1;

20. Why do you return -1? Before your patch it went to
testcancel() below.

> +		if (nwr > 0) {
>  			total += nwr;
>  			/*
>  			 * If there was a hint for the total size
> @@ -638,12 +649,13 @@ coio_service_init(struct coio_service *service, const char *name,
>  	service->handler_param = handler_param;
>  }
>  
> -void
> +int
>  coio_service_start(struct evio_service *service, const char *uri)
>  {
>  	if (evio_service_bind(service, uri) != 0 ||
>  	    evio_service_listen(service) != 0)
> -		diag_raise();
> +		return -1;
> +	return -0;

21. -0 ?

>  }
>  
>  void
> @@ -661,7 +673,6 @@ coio_stat_stat_timeout(ev_stat *stat, ev_tstamp timeout)
>  	coio_timeout_init(&start, &delay, timeout);
>  	fiber_yield_timeout(delay);
>  	ev_stat_stop(loop(), stat);
> -	fiber_testcancel();

22. Testcancel was supposed to throw an error if the fiber
is cancelled. Now you ignore cancellation. Please, don't.
In other places too.

>  }
>  
>  typedef void (*ev_child_cb)(ev_loop *, ev_child *, int);
> @@ -689,7 +700,6 @@ coio_waitpid(pid_t pid)
>  	fiber_set_cancellable(allow_cancel);
>  	ev_child_stop(loop(), &cw);
>  	int status = cw.rstatus;
> -	fiber_testcancel();
>  	return status;
>  }
>  
> diff --git a/src/lib/core/coio.h b/src/lib/core/coio.h
> index 6a2337689..d557f2869 100644
> --- a/src/lib/core/coio.h
> +++ b/src/lib/core/coio.h
> @@ -71,7 +70,7 @@ void
>  coio_create(struct ev_io *coio, int fd);
>  
>  static inline void
> -coio_close(ev_loop *loop, struct ev_io *coio)
> +coio_destroy(ev_loop *loop, struct ev_io *coio)

23. Please, avoid changes not related to the patch
purpose.

^ permalink raw reply	[flat|nested] 14+ messages in thread

* Re: [Tarantool-patches] [PATCH 3/6] xstream: get rid of an exception
  2019-11-19 16:04 ` [Tarantool-patches] [PATCH 3/6] xstream: get rid of " Georgy Kirichenko
@ 2019-11-23 13:45   ` Vladislav Shpilevoy
  0 siblings, 0 replies; 14+ messages in thread
From: Vladislav Shpilevoy @ 2019-11-23 13:45 UTC (permalink / raw)
  To: Georgy Kirichenko, tarantool-patches

Thanks for the patch!

See 3 comments below.

On 19/11/2019 17:04, Georgy Kirichenko wrote:
> Refactoring: make xstream C-compliant
> 
> Part of #380
> ---
>  src/box/box.cc     |  5 +++--
>  src/box/relay.cc   | 23 +++++++++++++----------
>  src/box/xstream.cc |  7 +------
>  src/box/xstream.h  |  2 +-
>  4 files changed, 18 insertions(+), 19 deletions(-)
> 
> diff --git a/src/box/box.cc b/src/box/box.cc
> index 6323e5e6e..f41ef9ce8 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -321,7 +321,7 @@ recovery_journal_create(struct recovery_journal *journal, struct vclock *v)
>  	journal->vclock = v;
>  }
>  
> -static void
> +static int
>  apply_wal_row(struct xstream *stream, struct xrow_header *row)
>  {
>  	struct request request;
> @@ -330,7 +330,7 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
>  		struct space *space = space_cache_find_xc(request.space_id);
>  		if (box_process_rw(&request, space, NULL) != 0) {
>  			say_error("error applying row: %s", request_str(&request));
> -			diag_raise();
> +			return -1;

1. apply_wal_row() still throws exceptions. It uses xrow_decode_dml_xc() and
space_cache_find_xc().

>  		}
>  	}
>  	struct wal_stream *xstream =
> diff --git a/src/box/relay.cc b/src/box/relay.cc
> index 202620694..fe5e0cfc9 100644
> --- a/src/box/relay.cc
> +++ b/src/box/relay.cc
> @@ -165,11 +165,11 @@ relay_last_row_time(const struct relay *relay)
>  	return relay->last_row_time;
>  }
>  
> -static void
> +static int
>  relay_send(struct relay *relay, struct xrow_header *packet);

2. relay_send_heartbeat() still assumes, that relay_send() throws.

> -static void
> +static int
>  relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row);
> -static void
> +static int
>  relay_send_row(struct xstream *stream, struct xrow_header *row);
>  
>  struct relay *
> diff --git a/src/box/xstream.cc b/src/box/xstream.cc
> index c77e4360e..80f3030d0 100644
> --- a/src/box/xstream.cc
> +++ b/src/box/xstream.cc
> @@ -35,10 +35,5 @@
>  int
>  xstream_write(struct xstream *stream, struct xrow_header *row)
>  {
> -	try {
> -		stream->write(stream, row);
> -	} catch (Exception *e) {
> -		return -1;
> -	}
> -	return 0;
> +	return stream->write(stream, row);
>  }

3. Now you can drop xstream.cc, and make xstream_write() static
inline in xstream.h header.

^ permalink raw reply	[flat|nested] 14+ messages in thread

* Re: [Tarantool-patches] [PATCH 5/6] box: improve recovery journal
  2019-11-19 16:04 ` [Tarantool-patches] [PATCH 5/6] box: improve recovery journal Georgy Kirichenko
@ 2019-11-23 13:46   ` Vladislav Shpilevoy
  0 siblings, 0 replies; 14+ messages in thread
From: Vladislav Shpilevoy @ 2019-11-23 13:46 UTC (permalink / raw)
  To: Georgy Kirichenko, tarantool-patches

Hi!

I still don't understand replication and recovery
code at all, so most of my comments are actually
questions.

See 5 of them below.

On 19/11/2019 17:04, Georgy Kirichenko wrote:
> Refactoring: track recovery journal vclock instead of to use
> the recovery ones. Now replicaset vclock will rely on recovery stream
> content instead of wal directory content (xlog names and meta). This
> enables applier to use this journal and  generalize wal recovery and
> applier final join handling.
> 
> Part of #980
> ---
>  src/box/box.cc               | 39 +++++++++++++++++++++++-------------
>  test/xlog-py/big_lsn.result  |  4 ++++
>  test/xlog-py/big_lsn.test.py | 13 ++++++------
>  test/xlog-py/dup_key.result  |  8 ++++++++
>  test/xlog-py/dup_key.test.py |  7 +++++++
>  5 files changed, 51 insertions(+), 20 deletions(-)
> 
> diff --git a/src/box/box.cc b/src/box/box.cc
> index f41ef9ce8..71822551e 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -309,16 +309,22 @@ recovery_journal_write(struct journal *base,
>  		       struct journal_entry *entry)
>  {
>  	struct recovery_journal *journal = (struct recovery_journal *) base;
> -	entry->res = vclock_sum(journal->vclock);
> +	for (struct xrow_header **row = entry->rows;
> +	     row < entry->rows + entry->n_rows; ++row) {
> +		vclock_follow_xrow(&journal->vclock, *row);
> +	}
> +	entry->res = vclock_sum(&journal->vclock);
> +	/* Assume the entry was committed and adjust replicaset vclock. */

1. What if it was not committed?

> +	vclock_copy(&replicaset.vclock, &journal->vclock);
>  	journal_entry_complete(entry);
>  	return 0;
>  }
> @@ -332,6 +338,15 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
>  			say_error("error applying row: %s", request_str(&request));
>  			return -1;
>  		}
> +	} else {
> +		struct txn *txn = txn_begin();
> +		if (txn == NULL || txn_begin_stmt(txn, NULL) != 0 ||
> +		    txn_commit_stmt(txn, &request) != 0) {
> +			txn_rollback(txn);

2. If txn == NULL, txn_rollback() will crash.

> +			return -1;
> +		}
> +		if (txn_commit(txn) != 0)
> +			return -1;
>  	}

3. What is it? It does not look like refactoring. Why did you add
this whole code block?

>  	struct wal_stream *xstream =
>  		container_of(stream, struct wal_stream, base);
> @@ -1970,6 +1981,11 @@ local_recovery(const struct tt_uuid *instance_uuid,
>  	 */
>  	memtx_engine_recover_snapshot_xc(memtx, checkpoint_vclock);
>  
> +	vclock_copy(&replicaset.vclock, checkpoint_vclock);
> +	struct recovery_journal journal;
> +	recovery_journal_create(&journal, &recovery->vclock);
> +	journal_set(&journal.base);

4. Why did you move it? Why is vclock_copy() called 2 times?

> +
>  	engine_begin_final_recovery_xc();
>  	if (recover_remaining_wals(recovery, &wal_stream.base, NULL, false) != 0)
>  		diag_raise();
> diff --git a/test/xlog-py/big_lsn.test.py b/test/xlog-py/big_lsn.test.py
> index c6a31d971..bdc84e012 100644
> --- a/test/xlog-py/big_lsn.test.py
> +++ b/test/xlog-py/big_lsn.test.py
> @@ -9,21 +9,22 @@ server.stop()
>  server.deploy()
>  server.admin("box.info.lsn")
>  server.admin("box.space._schema:delete('dummy')")
> +server.admin("box.snapshot()")
>  server.stop()
>  
> -# Bump the instance vclock by tweaking the last xlog.
> +# Bump the instance vclock by tweaking the checkpoint.
>  old_lsn = 1
>  new_lsn = 123456789123
> -wal_dir = os.path.join(server.vardir, server.name)
> -old_wal = os.path.join(wal_dir, "%020d.xlog" % old_lsn)
> -new_wal = os.path.join(wal_dir, "%020d.xlog" % new_lsn)
> -with open(old_wal, "r+") as f:
> +snap_dir = os.path.join(server.vardir, server.name)
> +old_snap = os.path.join(snap_dir, "%020d.snap" % old_lsn)
> +new_snap = os.path.join(snap_dir, "%020d.snap" % new_lsn)
> +with open(old_snap, "r+") as f:
>      s = f.read()
>      s = s.replace("VClock: {1: %d}" % old_lsn,
>                    "VClock: {1: %d}" % new_lsn)
>      f.seek(0)
>      f.write(s)
> -os.rename(old_wal, new_wal)
> +os.rename(old_snap, new_snap)

5. Why did you change the tests, if this commit is just
refactoring?

>  
>  # Recover and make a snapshot.
>  server.start()

^ permalink raw reply	[flat|nested] 14+ messages in thread

* Re: [Tarantool-patches] [PATCH 6/6] recovery: follow transaction boundaries while recovery or join
  2019-11-19 16:04 ` [Tarantool-patches] [PATCH 6/6] recovery: follow transaction boundaries while recovery or join Georgy Kirichenko
@ 2019-11-23 13:46   ` Vladislav Shpilevoy
  0 siblings, 0 replies; 14+ messages in thread
From: Vladislav Shpilevoy @ 2019-11-23 13:46 UTC (permalink / raw)
  To: Georgy Kirichenko, tarantool-patches

Thanks for the patch!

See 13 comments below.

On 19/11/2019 17:04, Georgy Kirichenko wrote:
> Do not start a transaction for each local journal or final join row
> but follow transaction boundaries instead.

1. Why don't we already have boundaries on join? I thought,
that transactional replication is done.

> 
> Part of #980
> ---
>  src/box/applier.cc                     | 92 +++++++++++++-------------
>  src/box/box.cc                         | 72 ++++++++++++++------
>  test/xlog/panic_on_broken_lsn.result   |  9 ++-
>  test/xlog/panic_on_broken_lsn.test.lua |  7 +-
>  4 files changed, 107 insertions(+), 73 deletions(-)
> 
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 294765195..d00b1b04a 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -478,27 +477,29 @@ applier_join(struct applier *applier)
>  	 * Receive final data.
>  	 */
>  	while (true) {
> -		if (coio_read_xrow(coio, ibuf, &row) < 0)
> -			diag_raise();
> -		applier->last_row_time = ev_monotonic_now(loop());
> -		if (iproto_type_is_dml(row.type)) {
> -			vclock_follow_xrow(&replicaset.vclock, &row);
> -			if (apply_final_join_row(&row) != 0)
> -				diag_raise();
> -			if (++row_count % 100000 == 0)
> -				say_info("%.1fM rows received", row_count / 1e6);
> -		} else if (row.type == IPROTO_OK) {
> -			/*
> -			 * Current vclock. This is not used now,
> -			 * ignore.
> -			 */
> +		struct stailq rows;
> +		applier_read_tx(applier, &rows);
> +		struct xrow_header *first_row =
> +			&(stailq_first_entry(&rows, struct applier_tx_row,
> +					    next)->row);
> +		if (first_row->type == IPROTO_OK) {
> +			if (applier->version_id < version_id(1, 7, 0)) {
> +				/*
> +				 * This is the start vclock if the
> +				 * server is 1.6. Since we have
> +				 * not initialized replication
> +				 * vclock yet, do it now. In 1.7+
> +				 * this vclock is not used.
> +				 */
> +				xrow_decode_vclock_xc(first_row, &replicaset.vclock);
> +			}
>  			break; /* end of stream */
> -		} else if (iproto_type_is_error(row.type)) {
> -			xrow_decode_error_xc(&row);  /* rethrow error */
> -		} else {
> -			tnt_raise(ClientError, ER_UNKNOWN_REQUEST_TYPE,
> -				  (uint32_t) row.type);
>  		}
> +		if (applier_apply_tx(&rows) != 0)
> +			diag_raise();
> +		if (ibuf_used(ibuf) == 0)
> +			ibuf_reset(ibuf);

2. I propose you to move ibuf reset and fiber_gc() into
applier_apply_tx(). This looks confusing, that the cycle does
not touch ibuf nor fiber->gc directly anywhere, but you reset
it each iteration.

And why do you reset it only when used == 0? It is filled
with transaction's data, right? After applier_apply_tx() the
data should not be needed anymore, regardless of what is in
the ibuf.

> +		fiber_gc();
>  	}
>  	say_info("final data received");
>  
> @@ -532,6 +523,9 @@ applier_read_tx_row(struct applier *applier)
>  	struct xrow_header *row = &tx_row->row;
>  
>  	double timeout = replication_disconnect_timeout();
> +	/* We check timeout only in case of subscribe. */
> +	if (applier->state == APPLIER_FINAL_JOIN)
> +		timeout = TIMEOUT_INFINITY;

3. Why? And how is it related to transactionality of
recovery and join?

>  	/*
>  	 * Tarantool < 1.7.7 does not send periodic heartbeat
>  	 * messages so we can't assume that if we haven't heard
> diff --git a/src/box/box.cc b/src/box/box.cc
> index 71822551e..9464eee63 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -286,6 +286,8 @@ struct wal_stream {
>  	struct xstream base;
>  	/** How many rows have been recovered so far. */
>  	size_t rows;
> +	/** Current transaction.*/

4. Please, put a whitespace before '*/'.

> +	struct txn *txn;
>  };
>  
>  /**
> @@ -321,7 +323,8 @@ recovery_journal_write(struct journal *base,
>  }
>  
>  static inline void
> -recovery_journal_create(struct recovery_journal *journal, const struct vclock *v)
> +recovery_journal_create(struct recovery_journal *journal,
> +			const struct vclock *v)

5. Unnecessary change.

>  {
>  	journal_create(&journal->base, recovery_journal_write, NULL);
>  	vclock_copy(&journal->vclock, v);
> @@ -330,33 +333,44 @@ recovery_journal_create(struct recovery_journal *journal, const struct vclock *v
>  static int
>  apply_wal_row(struct xstream *stream, struct xrow_header *row)
>  {
> +	struct wal_stream *wal_stream =
> +		container_of(stream, struct wal_stream, base);
> +	if (wal_stream->txn == NULL) {
> +		wal_stream->txn = txn_begin();
> +		if (wal_stream->txn == NULL)
> +			return -1;
> +	}
>  	struct request request;
>  	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
> +	int rc = 0;
>  	if (request.type != IPROTO_NOP) {
>  		struct space *space = space_cache_find_xc(request.space_id);
> -		if (box_process_rw(&request, space, NULL) != 0) {
> +		rc = box_process_rw(&request, space, NULL);
> +		if (rc != 0)
>  			say_error("error applying row: %s", request_str(&request));
> -			return -1;

6. So now you can apply a row, get an error on that,
then you will commit the broken transaction below.
Please, don't.

> -		}
>  	} else {
> -		struct txn *txn = txn_begin();
> -		if (txn == NULL || txn_begin_stmt(txn, NULL) != 0 ||
> -		    txn_commit_stmt(txn, &request) != 0) {
> -			txn_rollback(txn);
> +		struct txn *txn = in_txn();
> +		rc = txn_begin_stmt(txn, NULL);

7. Why do you need this empty statement?

> +		if (rc == 0)
> +			rc = txn_commit_stmt(txn, &request);
> +	}
> +	if (row->is_commit) {
> +		if (txn_commit(wal_stream->txn) != 0) {
> +			wal_stream->txn = NULL;
>  			return -1;
>  		}
> -		if (txn_commit(txn) != 0)
> -			return -1;
> +		wal_stream->txn = NULL;
>  	}
> -	struct wal_stream *xstream =
> -		container_of(stream, struct wal_stream, base);
>  	/**
>  	 * Yield once in a while, but not too often,
>  	 * mostly to allow signal handling to take place.
>  	 */
> -	if (++xstream->rows % WAL_ROWS_PER_YIELD == 0)
> +	if (++(wal_stream->rows) > WAL_ROWS_PER_YIELD &&
> +	    wal_stream->txn == NULL) {
> +		wal_stream->rows -= WAL_ROWS_PER_YIELD;

8. Why did you change this?

>  		fiber_sleep(0);
> -	return 0;
> +	}
> +	return rc;
>  }
>  
>  static void
> @@ -1917,6 +1946,9 @@ local_recovery(const struct tt_uuid *instance_uuid,
>  
>  	struct wal_stream wal_stream;
>  	wal_stream_create(&wal_stream);
> +	auto wal_stream_guard = make_scoped_guard([&]{
> +		wal_stream_destroy(&wal_stream);

9. You ignore wal_stream_destroy() error here.

> +	});
>  
>  	struct recovery *recovery;
>  	recovery = recovery_new(cfg_gets("wal_dir"),
> @@ -1925,13 +1957,7 @@ local_recovery(const struct tt_uuid *instance_uuid,
>  	if (recovery == NULL)
>  		diag_raise();
>  
> -	/*
> -	 * Make sure we report the actual recovery position
> -	 * in box.info while local recovery is in progress.
> -	 */
> -	box_vclock = &recovery->vclock;

10. So now you don't report it?

>  	auto guard = make_scoped_guard([&]{
> -		box_vclock = &replicaset.vclock;
>  		recovery_stop_local(recovery);
>  		recovery_delete(recovery);
>  	});
> @@ -1981,12 +2007,12 @@ local_recovery(const struct tt_uuid *instance_uuid,
>  	 */
>  	memtx_engine_recover_snapshot_xc(memtx, checkpoint_vclock);
>  
> -	vclock_copy(&replicaset.vclock, checkpoint_vclock);
>  	struct recovery_journal journal;
>  	recovery_journal_create(&journal, &recovery->vclock);
>  	journal_set(&journal.base);
>  
>  	engine_begin_final_recovery_xc();
> +

11. Unnecessary change.

> diff --git a/test/xlog/panic_on_broken_lsn.result b/test/xlog/panic_on_broken_lsn.result
> index 1e62680eb..e209374b6 100644
> --- a/test/xlog/panic_on_broken_lsn.result
> +++ b/test/xlog/panic_on_broken_lsn.result
> @@ -141,8 +141,11 @@ test_run:cmd("setopt delimiter ';'")
>  _ = fiber.create(function()
>      test_run:wait_cond(function() return box.info.replication[2] ~= nil end)
>      lsn = box.info.vclock[1]
> -    box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", lsn + 1)
> +    box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", lsn + 2)
> +    box.begin()
>      box.space.test:auto_increment{'v1'}
> +    box.space.test:auto_increment{'v1'}
> +    box.commit()

12. Why?

If you want to test transactional recovery/join, then please,
add a new test case. Try not to change the old ones except
when they get broken, or a change is very trivial.

If this is not a test for transactional recovery/join, then
please, add one. I think this is testable, since it is not
just refactoring.

>      box.error.injection.set("ERRINJ_REPLICA_JOIN_DELAY", false)
>  end);
>  ---
> @@ -164,9 +167,9 @@ box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", -1)
>  - ok
>  ...
>  -- Check that log contains the mention of broken LSN and the request printout
> -grep_broken_lsn(fio.pathjoin(fio.cwd(), 'replica.log'), lsn)
> +grep_broken_lsn(fio.pathjoin(fio.cwd(), 'replica.log'), lsn + 1)

13. Why +1?

^ permalink raw reply	[flat|nested] 14+ messages in thread

end of thread, other threads:[~2019-11-23 13:39 UTC | newest]

Thread overview: 14+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2019-11-19 16:04 [Tarantool-patches] [PATCH 0/6] Synchronous replication preparation Georgy Kirichenko
2019-11-19 16:04 ` [Tarantool-patches] [PATCH 1/6] recovery: do not throw an error Georgy Kirichenko
2019-11-23 13:45   ` Vladislav Shpilevoy
2019-11-19 16:04 ` [Tarantool-patches] [PATCH 2/6] coio: do not htrow an exception Georgy Kirichenko
2019-11-23 13:45   ` Vladislav Shpilevoy
2019-11-19 16:04 ` [Tarantool-patches] [PATCH 3/6] xstream: get rid of " Georgy Kirichenko
2019-11-23 13:45   ` Vladislav Shpilevoy
2019-11-19 16:04 ` [Tarantool-patches] [PATCH 4/6] vinyl: do not insert vy_tx twice into writers list Georgy Kirichenko
2019-11-19 16:04 ` [Tarantool-patches] [PATCH 5/6] box: improve recovery journal Georgy Kirichenko
2019-11-23 13:46   ` Vladislav Shpilevoy
2019-11-19 16:04 ` [Tarantool-patches] [PATCH 6/6] recovery: follow transaction boundaries while recovery or join Georgy Kirichenko
2019-11-23 13:46   ` Vladislav Shpilevoy
2019-11-20 17:15 ` [Tarantool-patches] [PATCH 0/6] Synchronous replication preparation Konstantin Osipov
2019-11-23 13:45 ` Vladislav Shpilevoy

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox