[Tarantool-patches] [PATCH v2 5/7] applier: make final join transactional
Serge Petrenko
sergepetrenko at tarantool.org
Wed Mar 24 15:24:15 MSK 2021
Now applier assembles rows into transactions not only on subscribe
stage, but also during final join / register.
This was necessary for correct handling of rolled back synchronous
transactions in final join stream.
Part of #5566
---
src/box/applier.cc | 126 ++++++++++++++++++++++-----------------------
1 file changed, 61 insertions(+), 65 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index d53f13711..9a8b0f0fc 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -292,34 +292,6 @@ apply_row(struct xrow_header *row)
return 0;
}
-static int
-apply_final_join_row(struct xrow_header *row)
-{
- /*
- * Confirms are ignored during join. All the data master
- * sends us is valid.
- */
- if (iproto_type_is_synchro_request(row->type))
- return 0;
- struct txn *txn = txn_begin();
- if (txn == NULL)
- return -1;
- /*
- * Do not wait for confirmation while processing final
- * join rows. See apply_snapshot_row().
- */
- txn_set_flags(txn, TXN_FORCE_ASYNC);
- if (apply_row(row) != 0) {
- txn_rollback(txn);
- fiber_gc();
- return -1;
- }
- if (txn_commit(txn) != 0)
- return -1;
- fiber_gc();
- return 0;
-}
-
/**
* Connect to a remote host and authenticate the client.
*/
@@ -505,13 +477,26 @@ applier_fetch_snapshot(struct applier *applier)
applier_set_state(applier, APPLIER_READY);
}
+static void
+applier_read_tx(struct applier *applier, struct stailq *rows,
+ uint64_t *row_count);
+
+static int
+apply_final_join_tx(struct stailq *rows);
+
+/**
+ * A helper struct to link xrow objects in a list.
+ */
+struct applier_tx_row {
+ /* Next transaction row. */
+ struct stailq_entry next;
+ /* xrow_header struct for the current transaction row. */
+ struct xrow_header row;
+};
+
static uint64_t
applier_wait_register(struct applier *applier, uint64_t row_count)
{
- struct ev_io *coio = &applier->io;
- struct ibuf *ibuf = &applier->ibuf;
- struct xrow_header row;
-
/*
* Tarantool < 1.7.0: there is no "final join" stage.
* Proceed to "subscribe" and do not finish bootstrap
@@ -524,27 +509,19 @@ applier_wait_register(struct applier *applier, uint64_t row_count)
* Receive final data.
*/
while (true) {
- coio_read_xrow(coio, ibuf, &row);
- applier->last_row_time = ev_monotonic_now(loop());
- if (iproto_type_is_dml(row.type)) {
- vclock_follow_xrow(&replicaset.vclock, &row);
- if (apply_final_join_row(&row) != 0)
- diag_raise();
- if (++row_count % 100000 == 0)
- say_info("%.1fM rows received", row_count / 1e6);
- } else if (row.type == IPROTO_OK) {
- /*
- * Current vclock. This is not used now,
- * ignore.
- */
- ++row_count;
- break; /* end of stream */
- } else if (iproto_type_is_error(row.type)) {
- xrow_decode_error_xc(&row); /* rethrow error */
- } else {
- tnt_raise(ClientError, ER_UNKNOWN_REQUEST_TYPE,
- (uint32_t) row.type);
+ struct stailq rows;
+ applier_read_tx(applier, &rows, &row_count);
+ struct xrow_header *first_row =
+ &stailq_first_entry(&rows, struct applier_tx_row,
+ next)->row;
+ if (first_row->type == IPROTO_OK) {
+ assert(first_row ==
+ &stailq_last_entry(&rows, struct applier_tx_row,
+ next)->row);
+ break;
}
+ if (apply_final_join_tx(&rows) != 0)
+ diag_raise();
}
return row_count;
@@ -616,16 +593,6 @@ applier_join(struct applier *applier)
applier_set_state(applier, APPLIER_READY);
}
-/**
- * A helper struct to link xrow objects in a list.
- */
-struct applier_tx_row {
- /* Next transaction row. */
- struct stailq_entry next;
- /* xrow_header struct for the current transaction row. */
- struct xrow_header row;
-};
-
static struct applier_tx_row *
applier_read_tx_row(struct applier *applier)
{
@@ -646,8 +613,11 @@ applier_read_tx_row(struct applier *applier)
* messages so we can't assume that if we haven't heard
* from the master for quite a while the connection is
* broken - the master might just be idle.
+ * Also there are no timeouts during final join and register.
*/
- if (applier->version_id < version_id(1, 7, 7))
+ if (applier->version_id < version_id(1, 7, 7) ||
+ applier->state == APPLIER_FINAL_JOIN ||
+ applier->state == APPLIER_REGISTER)
coio_read_xrow(coio, ibuf, row);
else
coio_read_xrow_timeout_xc(coio, ibuf, row, timeout);
@@ -723,7 +693,8 @@ set_next_tx_row(struct stailq *rows, struct applier_tx_row *tx_row, int64_t tsn)
* network input space is reused for the next xrow.
*/
static void
-applier_read_tx(struct applier *applier, struct stailq *rows)
+applier_read_tx(struct applier *applier, struct stailq *rows,
+ uint64_t *row_count)
{
int64_t tsn = 0;
@@ -731,6 +702,9 @@ applier_read_tx(struct applier *applier, struct stailq *rows)
do {
struct applier_tx_row *tx_row = applier_read_tx_row(applier);
tsn = set_next_tx_row(rows, tx_row, tsn);
+
+ if (row_count != NULL && ++*row_count % 100000 == 0)
+ say_info("%.1fM rows received", *row_count / 1e6);
} while (tsn != 0);
}
@@ -988,6 +962,28 @@ fail:
return -1;
}
+/** A simpler version of applier_apply_tx() for final join stage. */
+static int
+apply_final_join_tx(struct stailq *rows)
+{
+ struct xrow_header *first_row =
+ &stailq_first_entry(rows, struct applier_tx_row, next)->row;
+ struct xrow_header *last_row =
+ &stailq_last_entry(rows, struct applier_tx_row, next)->row;
+ int rc = 0;
+ /* WAL isn't enabled yet, so follow vclock manually. */
+ vclock_follow_xrow(&replicaset.vclock, last_row);
+ if (unlikely(iproto_type_is_synchro_request(first_row->type))) {
+ assert(first_row == last_row);
+ rc = apply_synchro_row(first_row);
+ goto end;
+ }
+
+ rc = apply_plain_tx(rows, false, false);
+end:
+ fiber_gc();
+ return rc;
+}
/**
* Apply all rows in the rows queue as a single transaction.
@@ -1254,7 +1250,7 @@ applier_subscribe(struct applier *applier)
}
struct stailq rows;
- applier_read_tx(applier, &rows);
+ applier_read_tx(applier, &rows, NULL);
/*
* In case of an heartbeat message wake a writer up
--
2.24.3 (Apple Git-128)
More information about the Tarantool-patches
mailing list