* [tarantool-patches] [PATCH v5 1/7] txn: unref statement at txn_free
2019-06-21 21:48 [tarantool-patches] [PATCH v5 0/7] Parallel applier Georgy Kirichenko
@ 2019-06-21 21:48 ` Georgy Kirichenko
2019-06-21 21:48 ` [tarantool-patches] [PATCH v5 2/7] txn: get rid of autocommit from a txn structure Georgy Kirichenko
` (6 subsequent siblings)
7 siblings, 0 replies; 9+ messages in thread
From: Georgy Kirichenko @ 2019-06-21 21:48 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
Refactoring: put txn statement unref code into transaction free function.
---
src/box/txn.c | 12 ++++--------
1 file changed, 4 insertions(+), 8 deletions(-)
diff --git a/src/box/txn.c b/src/box/txn.c
index 7a2c8cdaf..9aa460f50 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -168,6 +168,10 @@ txn_new()
inline static void
txn_free(struct txn *txn)
{
+ struct txn_stmt *stmt;
+ stailq_foreach_entry(stmt, &txn->stmts, next)
+ txn_stmt_unref_tuples(stmt);
+
/* Truncate region up to struct txn size. */
region_truncate(&txn->region, sizeof(struct txn));
stailq_add(&txn_cache, &txn->in_txn_cache);
@@ -448,10 +452,6 @@ txn_commit(struct txn *txn)
panic("commit trigger failed");
}
- struct txn_stmt *stmt;
- stailq_foreach_entry(stmt, &txn->stmts, next)
- txn_stmt_unref_tuples(stmt);
-
fiber_set_txn(fiber(), NULL);
txn_free(txn);
return 0;
@@ -489,10 +489,6 @@ txn_rollback()
panic("rollback trigger failed");
}
- struct txn_stmt *stmt;
- stailq_foreach_entry(stmt, &txn->stmts, next)
- txn_stmt_unref_tuples(stmt);
-
/** Free volatile txn memory. */
fiber_gc();
fiber_set_txn(fiber(), NULL);
--
2.22.0
^ permalink raw reply [flat|nested] 9+ messages in thread
* [tarantool-patches] [PATCH v5 2/7] txn: get rid of autocommit from a txn structure
2019-06-21 21:48 [tarantool-patches] [PATCH v5 0/7] Parallel applier Georgy Kirichenko
2019-06-21 21:48 ` [tarantool-patches] [PATCH v5 1/7] txn: unref statement at txn_free Georgy Kirichenko
@ 2019-06-21 21:48 ` Georgy Kirichenko
2019-06-21 21:48 ` [tarantool-patches] [PATCH v5 3/7] txn: get rid of fiber_gc from txn_rollback Georgy Kirichenko
` (5 subsequent siblings)
7 siblings, 0 replies; 9+ messages in thread
From: Georgy Kirichenko @ 2019-06-21 21:48 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
Move transaction auto start and auto commit behavior to the box level.
From now a transaction won't start and commit automatically without
txn_begin/txn_commit invocations. This is a part of a bigger transaction
refactoring in order to implement detachable transactions and a parallel
applier.
Prerequisites: #1254
---
src/box/applier.cc | 43 ++++++++++++++++++++++----
src/box/box.cc | 70 +++++++++++++++++++++++++++---------------
src/box/index.cc | 10 +++---
src/box/memtx_engine.c | 10 ++++--
src/box/memtx_space.c | 6 ++--
src/box/sql.c | 2 +-
src/box/txn.c | 52 +++++++++++--------------------
src/box/txn.h | 16 +++-------
src/box/vy_scheduler.c | 6 ++--
9 files changed, 126 insertions(+), 89 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 373e1feb9..d12a835d0 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -172,11 +172,26 @@ applier_writer_f(va_list ap)
static int
apply_initial_join_row(struct xrow_header *row)
{
+ struct txn *txn = txn_begin();
+ if (txn == NULL)
+ return -1;
struct request request;
xrow_decode_dml(row, &request, dml_request_key_map(row->type));
- struct space *space = space_cache_find_xc(request.space_id);
+ struct space *space = space_cache_find(request.space_id);
+ if (space == NULL)
+ goto rollback;
/* no access checks here - applier always works with admin privs */
- return space_apply_initial_join_row(space, &request);
+ if (space_apply_initial_join_row(space, &request))
+ goto rollback;
+ int rc;
+ rc = txn_commit(txn);
+ if (rc < 0)
+ return -1;
+ fiber_gc();
+ return rc;
+rollback:
+ txn_rollback();
+ return -1;
}
/**
@@ -189,8 +204,8 @@ static int
process_nop(struct request *request)
{
assert(request->type == IPROTO_NOP);
- struct txn *txn = txn_begin_stmt(NULL);
- if (txn == NULL)
+ struct txn *txn = in_txn();
+ if (txn_begin_stmt(txn, NULL) != 0)
return -1;
return txn_commit_stmt(txn, request);
}
@@ -213,6 +228,22 @@ apply_row(struct xrow_header *row)
return 0;
}
+static int
+apply_final_join_row(struct xrow_header *row)
+{
+ struct txn *txn = txn_begin();
+ if (txn == NULL)
+ return -1;
+ if (apply_row(row) != 0) {
+ txn_rollback();
+ return -1;
+ }
+ if (txn_commit(txn) != 0)
+ return -1;
+ fiber_gc();
+ return 0;
+}
+
/**
* Connect to a remote host and authenticate the client.
*/
@@ -403,7 +434,7 @@ applier_join(struct applier *applier)
applier->last_row_time = ev_monotonic_now(loop());
if (iproto_type_is_dml(row.type)) {
vclock_follow_xrow(&replicaset.vclock, &row);
- if (apply_row(&row) != 0)
+ if (apply_final_join_row(&row) != 0)
diag_raise();
if (++row_count % 100000 == 0)
say_info("%.1fM rows received", row_count / 1e6);
@@ -555,7 +586,7 @@ applier_apply_tx(struct stailq *rows)
* conflict safely access failed xrow object and allocate
* IPROTO_NOP on gc.
*/
- struct txn *txn = txn_begin(false);
+ struct txn *txn = txn_begin();
struct applier_tx_row *item;
if (txn == NULL)
diag_raise();
diff --git a/src/box/box.cc b/src/box/box.cc
index 57419ee01..a32f1ba0f 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -169,34 +169,56 @@ int
box_process_rw(struct request *request, struct space *space,
struct tuple **result)
{
+ struct tuple *tuple = NULL;
+ bool return_tuple = false;
+ struct txn *txn = in_txn();
+ bool is_autocommit = txn == NULL;
+ if (is_autocommit && (txn = txn_begin()) == NULL)
+ return -1;
assert(iproto_type_is_dml(request->type));
rmean_collect(rmean_box, request->type, 1);
if (access_check_space(space, PRIV_W) != 0)
- return -1;
- struct txn *txn = txn_begin_stmt(space);
- if (txn == NULL)
- return -1;
- struct tuple *tuple;
+ goto rollback;
+ if (txn_begin_stmt(txn, space) != 0)
+ goto rollback;
if (space_execute_dml(space, txn, request, &tuple) != 0) {
- txn_rollback_stmt();
- return -1;
+ txn_rollback_stmt(txn);
+ goto rollback;
}
- if (result == NULL)
- return txn_commit_stmt(txn, request);
- *result = tuple;
- if (tuple == NULL)
- return txn_commit_stmt(txn, request);
- /*
- * Pin the tuple locally before the commit,
- * otherwise it may go away during yield in
- * when WAL is written in autocommit mode.
- */
- tuple_ref(tuple);
- int rc = txn_commit_stmt(txn, request);
- if (rc == 0)
+ if (result != NULL)
+ *result = tuple;
+
+ return_tuple = result != NULL && tuple != NULL;
+ if (return_tuple) {
+ /*
+ * Pin the tuple locally before the commit,
+ * otherwise it may go away during yield in
+ * when WAL is written in autocommit mode.
+ */
+ tuple_ref(tuple);
+ }
+
+ if (txn_commit_stmt(txn, request))
+ goto rollback;
+
+ if (is_autocommit) {
+ if (txn_commit(txn) != 0)
+ goto error;
+ fiber_gc();
+ }
+ if (return_tuple) {
tuple_bless(tuple);
- tuple_unref(tuple);
- return rc;
+ tuple_unref(tuple);
+ }
+ return 0;
+
+rollback:
+ if (is_autocommit)
+ txn_rollback();
+error:
+ if (return_tuple)
+ tuple_unref(tuple);
+ return -1;
}
void
@@ -1055,7 +1077,7 @@ box_select(uint32_t space_id, uint32_t index_id,
struct iterator *it = index_create_iterator(index, type,
key, part_count);
if (it == NULL) {
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
@@ -1080,7 +1102,7 @@ box_select(uint32_t space_id, uint32_t index_id,
if (rc != 0) {
port_destroy(port);
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
txn_commit_ro_stmt(txn);
diff --git a/src/box/index.cc b/src/box/index.cc
index 4a444e5d0..7f26c9bc2 100644
--- a/src/box/index.cc
+++ b/src/box/index.cc
@@ -240,7 +240,7 @@ box_index_get(uint32_t space_id, uint32_t index_id, const char *key,
if (txn_begin_ro_stmt(space, &txn) != 0)
return -1;
if (index_get(index, key, part_count, result) != 0) {
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
txn_commit_ro_stmt(txn);
@@ -274,7 +274,7 @@ box_index_min(uint32_t space_id, uint32_t index_id, const char *key,
if (txn_begin_ro_stmt(space, &txn) != 0)
return -1;
if (index_min(index, key, part_count, result) != 0) {
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
txn_commit_ro_stmt(txn);
@@ -306,7 +306,7 @@ box_index_max(uint32_t space_id, uint32_t index_id, const char *key,
if (txn_begin_ro_stmt(space, &txn) != 0)
return -1;
if (index_max(index, key, part_count, result) != 0) {
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
txn_commit_ro_stmt(txn);
@@ -340,7 +340,7 @@ box_index_count(uint32_t space_id, uint32_t index_id, int type,
return -1;
ssize_t count = index_count(index, itype, key, part_count);
if (count < 0) {
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
txn_commit_ro_stmt(txn);
@@ -377,7 +377,7 @@ box_index_iterator(uint32_t space_id, uint32_t index_id, int type,
struct iterator *it = index_create_iterator(index, itype,
key, part_count);
if (it == NULL) {
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return NULL;
}
txn_commit_ro_stmt(txn);
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index cd763e547..dae9955b2 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -269,16 +269,22 @@ memtx_engine_recover_snapshot_row(struct memtx_engine *memtx,
diag_set(ClientError, ER_CROSS_ENGINE_TRANSACTION);
return -1;
}
+ struct txn *txn = txn_begin();
+ if (txn == NULL)
+ return -1;
/* no access checks here - applier always works with admin privs */
- if (space_apply_initial_join_row(space, &request) != 0)
+ if (space_apply_initial_join_row(space, &request) != 0) {
+ txn_rollback();
return -1;
+ }
+ int rc = txn_commit(txn);
/*
* Don't let gc pool grow too much. Yet to
* it before reading the next row, to make
* sure it's not freed along here.
*/
fiber_gc();
- return 0;
+ return rc;
}
/** Called at start to tell memtx to recover to a given LSN. */
diff --git a/src/box/memtx_space.c b/src/box/memtx_space.c
index 78a0059a0..77a8f7d5f 100644
--- a/src/box/memtx_space.c
+++ b/src/box/memtx_space.c
@@ -325,8 +325,8 @@ memtx_space_apply_initial_join_row(struct space *space, struct request *request)
return -1;
}
request->header->replica_id = 0;
- struct txn *txn = txn_begin_stmt(space);
- if (txn == NULL)
+ struct txn *txn = in_txn();
+ if (txn_begin_stmt(txn, space) != 0)
return -1;
struct txn_stmt *stmt = txn_current_stmt(txn);
stmt->new_tuple = memtx_tuple_new(space->format, request->tuple,
@@ -341,7 +341,7 @@ memtx_space_apply_initial_join_row(struct space *space, struct request *request)
rollback:
say_error("rollback: %s", diag_last_error(diag_get())->errmsg);
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
diff --git a/src/box/sql.c b/src/box/sql.c
index a0350da6b..4c9a4c15b 100644
--- a/src/box/sql.c
+++ b/src/box/sql.c
@@ -862,7 +862,7 @@ cursor_seek(BtCursor *pCur, int *pRes)
part_count);
if (it == NULL) {
if (txn != NULL)
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
pCur->eState = CURSOR_INVALID;
return -1;
}
diff --git a/src/box/txn.c b/src/box/txn.c
index 9aa460f50..583ae6ce0 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -178,7 +178,7 @@ txn_free(struct txn *txn)
}
struct txn *
-txn_begin(bool is_autocommit)
+txn_begin()
{
static int64_t tsn = 0;
assert(! in_txn());
@@ -191,7 +191,6 @@ txn_begin(bool is_autocommit)
txn->n_new_rows = 0;
txn->n_local_rows = 0;
txn->n_applier_rows = 0;
- txn->is_autocommit = is_autocommit;
txn->has_triggers = false;
txn->is_aborted = false;
txn->in_sub_stmt = 0;
@@ -226,26 +225,20 @@ txn_begin_in_engine(struct engine *engine, struct txn *txn)
return 0;
}
-struct txn *
-txn_begin_stmt(struct space *space)
+int
+txn_begin_stmt(struct txn *txn, struct space *space)
{
- struct txn *txn = in_txn();
- if (txn == NULL) {
- txn = txn_begin(true);
- if (txn == NULL)
- return NULL;
- } else if (txn->in_sub_stmt > TXN_SUB_STMT_MAX) {
+ assert(txn == in_txn());
+ assert(txn != NULL);
+ if (txn->in_sub_stmt > TXN_SUB_STMT_MAX) {
diag_set(ClientError, ER_SUB_STMT_MAX);
- return NULL;
+ return -1;
}
struct txn_stmt *stmt = txn_stmt_new(txn);
- if (stmt == NULL) {
- if (txn->is_autocommit && txn->in_sub_stmt == 0)
- txn_rollback();
- return NULL;
- }
+ if (stmt == NULL)
+ return -1;
if (space == NULL)
- return txn;
+ return 0;
if (trigger_run(&space->on_stmt_begin, txn) != 0)
goto fail;
@@ -258,10 +251,10 @@ txn_begin_stmt(struct space *space)
if (engine_begin_statement(engine, txn) != 0)
goto fail;
- return txn;
+ return 0;
fail:
- txn_rollback_stmt();
- return NULL;
+ txn_rollback_stmt(txn);
+ return -1;
}
bool
@@ -278,8 +271,7 @@ txn_is_distributed(struct txn *txn)
}
/**
- * End a statement. In autocommit mode, end
- * the current transaction as well.
+ * End a statement.
*/
int
txn_commit_stmt(struct txn *txn, struct request *request)
@@ -339,14 +331,9 @@ txn_commit_stmt(struct txn *txn, struct request *request)
goto fail;
}
--txn->in_sub_stmt;
- if (txn->is_autocommit && txn->in_sub_stmt == 0) {
- int rc = txn_commit(txn);
- fiber_gc();
- return rc;
- }
return 0;
fail:
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
@@ -382,7 +369,7 @@ txn_write_to_wal(struct txn *txn)
if (res < 0) {
/* Cascading rollback. */
- txn_rollback(); /* Perform our part of cascading rollback. */
+ txn_rollback(txn); /* Perform our part of cascading rollback. */
/*
* Move fiber to end of event loop to avoid
* execution of any new requests before all
@@ -461,14 +448,11 @@ fail:
}
void
-txn_rollback_stmt()
+txn_rollback_stmt(struct txn *txn)
{
- struct txn *txn = in_txn();
if (txn == NULL || txn->in_sub_stmt == 0)
return;
txn->in_sub_stmt--;
- if (txn->is_autocommit && txn->in_sub_stmt == 0)
- return txn_rollback();
txn_rollback_to_svp(txn, txn->sub_stmt_begin[txn->in_sub_stmt]);
}
@@ -536,7 +520,7 @@ box_txn_begin()
diag_set(ClientError, ER_ACTIVE_TRANSACTION);
return -1;
}
- if (txn_begin(false) == NULL)
+ if (txn_begin() == NULL)
return -1;
return 0;
}
diff --git a/src/box/txn.h b/src/box/txn.h
index f4d861824..33926f6f3 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -162,11 +162,6 @@ struct txn {
* already assigned LSN.
*/
int n_applier_rows;
- /**
- * True if this transaction is running in autocommit mode
- * (statement end causes an automatic transaction commit).
- */
- bool is_autocommit;
/**
* True if the transaction was aborted so should be
* rolled back at commit.
@@ -214,7 +209,7 @@ in_txn()
* @pre no transaction is active
*/
struct txn *
-txn_begin(bool is_autocommit);
+txn_begin();
/**
* Commit a transaction.
@@ -271,11 +266,10 @@ txn_on_rollback(struct txn *txn, struct trigger *trigger)
}
/**
- * Start a new statement. If no current transaction,
- * start a new transaction with autocommit = true.
+ * Start a new statement.
*/
-struct txn *
-txn_begin_stmt(struct space *space);
+int
+txn_begin_stmt(struct txn *txn, struct space *space);
int
txn_begin_in_engine(struct engine *engine, struct txn *txn);
@@ -334,7 +328,7 @@ txn_commit_stmt(struct txn *txn, struct request *request);
* rolls back the entire transaction.
*/
void
-txn_rollback_stmt();
+txn_rollback_stmt(struct txn *txn);
/**
* Raise an error if this is a multi-statement transaction: DDL
diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c
index 85c1659b0..ed8f7dd86 100644
--- a/src/box/vy_scheduler.c
+++ b/src/box/vy_scheduler.c
@@ -840,14 +840,14 @@ vy_deferred_delete_process_one(struct space *deferred_delete_space,
tuple_unref(delete);
- struct txn *txn = txn_begin_stmt(deferred_delete_space);
- if (txn == NULL)
+ struct txn *txn = in_txn();
+ if (txn_begin_stmt(txn, deferred_delete_space) != 0)
return -1;
struct tuple *unused;
if (space_execute_dml(deferred_delete_space, txn,
&request, &unused) != 0) {
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
return txn_commit_stmt(txn, &request);
--
2.22.0
^ permalink raw reply [flat|nested] 9+ messages in thread
* [tarantool-patches] [PATCH v5 3/7] txn: get rid of fiber_gc from txn_rollback
2019-06-21 21:48 [tarantool-patches] [PATCH v5 0/7] Parallel applier Georgy Kirichenko
2019-06-21 21:48 ` [tarantool-patches] [PATCH v5 1/7] txn: unref statement at txn_free Georgy Kirichenko
2019-06-21 21:48 ` [tarantool-patches] [PATCH v5 2/7] txn: get rid of autocommit from a txn structure Georgy Kirichenko
@ 2019-06-21 21:48 ` Georgy Kirichenko
2019-06-21 21:48 ` [tarantool-patches] [PATCH v5 4/7] wal: introduce a journal entry finalization callback Georgy Kirichenko
` (4 subsequent siblings)
7 siblings, 0 replies; 9+ messages in thread
From: Georgy Kirichenko @ 2019-06-21 21:48 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
Refactoring: don't touch a fiber gc storage on a transaction rollback
explicitly. This relaxes dependencies between fiber and transaction
life cycles.
Prerequisites: #1254
---
src/box/applier.cc | 9 ++++++---
src/box/box.cc | 6 ++++--
src/box/call.c | 22 ++++++++++++++++------
src/box/memtx_engine.c | 3 ++-
src/box/txn.c | 23 ++++++++++++-----------
src/box/txn.h | 7 +++++--
src/box/vy_scheduler.c | 10 +++++++---
7 files changed, 52 insertions(+), 28 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index d12a835d0..6f93759a8 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -190,7 +190,8 @@ apply_initial_join_row(struct xrow_header *row)
fiber_gc();
return rc;
rollback:
- txn_rollback();
+ txn_rollback(txn);
+ fiber_gc();
return -1;
}
@@ -235,7 +236,8 @@ apply_final_join_row(struct xrow_header *row)
if (txn == NULL)
return -1;
if (apply_row(row) != 0) {
- txn_rollback();
+ txn_rollback(txn);
+ fiber_gc();
return -1;
}
if (txn_commit(txn) != 0)
@@ -633,7 +635,8 @@ applier_apply_tx(struct stailq *rows)
return txn_commit(txn);
rollback:
- txn_rollback();
+ txn_rollback(txn);
+ fiber_gc();
return -1;
}
diff --git a/src/box/box.cc b/src/box/box.cc
index a32f1ba0f..d53b0cdc5 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -213,8 +213,10 @@ box_process_rw(struct request *request, struct space *space,
return 0;
rollback:
- if (is_autocommit)
- txn_rollback();
+ if (is_autocommit) {
+ txn_rollback(txn);
+ fiber_gc();
+ }
error:
if (return_tuple)
tuple_unref(tuple);
diff --git a/src/box/call.c b/src/box/call.c
index 56da53fb3..7f6fc8bba 100644
--- a/src/box/call.c
+++ b/src/box/call.c
@@ -208,14 +208,18 @@ box_process_call(struct call_request *request, struct port *port)
if (orig_credentials)
fiber_set_user(fiber(), orig_credentials);
+ struct txn *txn = in_txn();
if (rc != 0) {
- txn_rollback();
+ if (txn != NULL)
+ txn_rollback(txn);
+ fiber_gc();
return -1;
}
- if (in_txn()) {
+ if (txn != NULL) {
diag_set(ClientError, ER_FUNCTION_TX_ACTIVE);
- txn_rollback();
+ txn_rollback(txn);
+ fiber_gc();
return -1;
}
@@ -229,14 +233,20 @@ box_process_eval(struct call_request *request, struct port *port)
/* Check permissions */
if (access_check_universe(PRIV_X) != 0)
return -1;
+ struct txn *txn;
if (box_lua_eval(request, port) != 0) {
- txn_rollback();
+ txn = in_txn();
+ if (txn != NULL)
+ txn_rollback(txn);
+ fiber_gc();
return -1;
}
- if (in_txn()) {
+ txn = in_txn();
+ if (txn != NULL) {
diag_set(ClientError, ER_FUNCTION_TX_ACTIVE);
- txn_rollback();
+ txn_rollback(txn);
+ fiber_gc();
return -1;
}
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index dae9955b2..f371d147f 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -274,7 +274,8 @@ memtx_engine_recover_snapshot_row(struct memtx_engine *memtx,
return -1;
/* no access checks here - applier always works with admin privs */
if (space_apply_initial_join_row(space, &request) != 0) {
- txn_rollback();
+ txn_rollback(txn);
+ fiber_gc();
return -1;
}
int rc = txn_commit(txn);
diff --git a/src/box/txn.c b/src/box/txn.c
index 583ae6ce0..1eb4db6a3 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -345,8 +345,10 @@ txn_write_to_wal(struct txn *txn)
struct journal_entry *req = journal_entry_new(txn->n_new_rows +
txn->n_applier_rows,
&txn->region);
- if (req == NULL)
+ if (req == NULL) {
+ txn_rollback(txn);
return -1;
+ }
struct txn_stmt *stmt;
struct xrow_header **remote_row = req->rows;
@@ -419,7 +421,7 @@ txn_commit(struct txn *txn)
if (txn->n_new_rows + txn->n_applier_rows > 0) {
txn->signature = txn_write_to_wal(txn);
if (txn->signature < 0)
- goto fail;
+ return -1;
}
/*
* Engine can be NULL if transaction contains IPROTO_NOP
@@ -443,7 +445,7 @@ txn_commit(struct txn *txn)
txn_free(txn);
return 0;
fail:
- txn_rollback();
+ txn_rollback(txn);
return -1;
}
@@ -457,11 +459,9 @@ txn_rollback_stmt(struct txn *txn)
}
void
-txn_rollback()
+txn_rollback(struct txn *txn)
{
- struct txn *txn = in_txn();
- if (txn == NULL)
- return;
+ assert(txn == in_txn());
trigger_clear(&txn->fiber_on_stop);
if (txn->engine)
engine_rollback(txn->engine, txn);
@@ -473,8 +473,6 @@ txn_rollback()
panic("rollback trigger failed");
}
- /** Free volatile txn memory. */
- fiber_gc();
fiber_set_txn(fiber(), NULL);
txn_free(txn);
}
@@ -550,11 +548,14 @@ int
box_txn_rollback()
{
struct txn *txn = in_txn();
+ if (txn == NULL)
+ return 0;
if (txn && txn->in_sub_stmt) {
diag_set(ClientError, ER_ROLLBACK_IN_SUB_STMT);
return -1;
}
- txn_rollback(); /* doesn't throw */
+ txn_rollback(txn); /* doesn't throw */
+ fiber_gc();
return 0;
}
@@ -632,6 +633,6 @@ txn_on_stop(struct trigger *trigger, void *event)
{
(void) trigger;
(void) event;
- txn_rollback(); /* doesn't yield or fail */
+ txn_rollback(in_txn()); /* doesn't yield or fail */
}
diff --git a/src/box/txn.h b/src/box/txn.h
index 33926f6f3..f20428fad 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -221,9 +221,12 @@ txn_begin();
int
txn_commit(struct txn *txn);
-/** Rollback a transaction, if any. */
+/**
+ * Rollback a transaction.
+ * @pre txn == in_txn()
+ */
void
-txn_rollback();
+txn_rollback(struct txn *txn);
/**
* Roll back the transaction but keep the object around.
diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c
index ed8f7dd86..0df55818f 100644
--- a/src/box/vy_scheduler.c
+++ b/src/box/vy_scheduler.c
@@ -889,18 +889,22 @@ vy_deferred_delete_batch_process_f(struct cmsg *cmsg)
for (int i = 0; i < batch->count; i++) {
if (vy_deferred_delete_process_one(deferred_delete_space,
pk->space_id, pk->mem_format,
- &batch->stmt[i]) != 0)
- goto fail;
+ &batch->stmt[i]) != 0) {
+ goto fail_rollback;
+ }
}
if (txn_commit(txn) != 0)
goto fail;
fiber_gc();
return;
+
+fail_rollback:
+ txn_rollback(txn);
+ fiber_gc();
fail:
batch->is_failed = true;
diag_move(diag_get(), &batch->diag);
- txn_rollback();
}
/**
--
2.22.0
^ permalink raw reply [flat|nested] 9+ messages in thread
* [tarantool-patches] [PATCH v5 4/7] wal: introduce a journal entry finalization callback
2019-06-21 21:48 [tarantool-patches] [PATCH v5 0/7] Parallel applier Georgy Kirichenko
` (2 preceding siblings ...)
2019-06-21 21:48 ` [tarantool-patches] [PATCH v5 3/7] txn: get rid of fiber_gc from txn_rollback Georgy Kirichenko
@ 2019-06-21 21:48 ` Georgy Kirichenko
2019-06-21 21:48 ` [tarantool-patches] [PATCH v5 5/7] txn: introduce asynchronous txn commit Georgy Kirichenko
` (3 subsequent siblings)
7 siblings, 0 replies; 9+ messages in thread
From: Georgy Kirichenko @ 2019-06-21 21:48 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
Finalize a transaction thorough a journal entry callback. So transaction
processing doesn't rely on fiber schedule.
Also allow to steal locked latch ownership for fiber which isn't owner
of the latch. This is required to process transaction triggers
asynchronously.
Prerequisites: #1254
---
src/box/alter.cc | 6 +++
src/box/box.cc | 6 ++-
src/box/journal.c | 9 +++-
src/box/journal.h | 29 +++++++++-
src/box/txn.c | 123 +++++++++++++++++++++++++++++--------------
src/box/vy_log.c | 3 +-
src/box/wal.c | 21 ++++++--
src/lib/core/latch.h | 10 ++++
8 files changed, 157 insertions(+), 50 deletions(-)
diff --git a/src/box/alter.cc b/src/box/alter.cc
index a37a68ce4..1595e27af 100644
--- a/src/box/alter.cc
+++ b/src/box/alter.cc
@@ -3558,6 +3558,12 @@ unlock_after_dd(struct trigger *trigger, void *event)
{
(void) trigger;
(void) event;
+ /*
+ * In case of yielding journal will this trigger be processed
+ * in a context of tx_prio endpoint instead of a context of
+ * a fiber which has this latch locked. So steal the latch first.
+ */
+ latch_steal(&schema_lock);
latch_unlock(&schema_lock);
}
diff --git a/src/box/box.cc b/src/box/box.cc
index d53b0cdc5..f5bd29dd5 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -303,10 +303,12 @@ struct recovery_journal {
*/
static int64_t
recovery_journal_write(struct journal *base,
- struct journal_entry * /* entry */)
+ struct journal_entry *entry)
{
struct recovery_journal *journal = (struct recovery_journal *) base;
- return vclock_sum(journal->vclock);
+ entry->res = vclock_sum(journal->vclock);
+ journal_entry_complete(entry);
+ return entry->res;
}
static inline void
diff --git a/src/box/journal.c b/src/box/journal.c
index fe13fb6ee..4c1997f36 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -41,7 +41,8 @@ static int64_t
dummy_journal_write(struct journal *journal, struct journal_entry *entry)
{
(void) journal;
- (void) entry;
+ entry->res = 0;
+ journal_entry_complete(entry);
return 0;
}
@@ -53,7 +54,9 @@ static struct journal dummy_journal = {
struct journal *current_journal = &dummy_journal;
struct journal_entry *
-journal_entry_new(size_t n_rows, struct region *region)
+journal_entry_new(size_t n_rows, struct region *region,
+ journal_entry_done_cb on_done_cb,
+ void *on_done_cb_data)
{
struct journal_entry *entry;
@@ -70,6 +73,8 @@ journal_entry_new(size_t n_rows, struct region *region)
entry->n_rows = n_rows;
entry->res = -1;
entry->fiber = fiber();
+ entry->on_done_cb = on_done_cb;
+ entry->on_done_cb_data = on_done_cb_data;
return entry;
}
diff --git a/src/box/journal.h b/src/box/journal.h
index 8ac32ee5e..52b8a715c 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -39,6 +39,11 @@ extern "C" {
#endif /* defined(__cplusplus) */
struct xrow_header;
+struct journal_entry;
+
+/** Journal entry finalization callback typedef. */
+typedef void (*journal_entry_done_cb)(struct journal_entry *entry, void *data);
+
/**
* An entry for an abstract journal.
* Simply put, a write ahead log request.
@@ -58,6 +63,17 @@ struct journal_entry {
* The fiber issuing the request.
*/
struct fiber *fiber;
+ /**
+ * A journal entry finalization callback which is going to be called
+ * after the entry processing was winished in both cases: succes
+ * or fail. Entry->res is set to a result value before the callback
+ * is fired.
+ */
+ journal_entry_done_cb on_done_cb;
+ /**
+ * A journal entry completion callback argument.
+ */
+ void *on_done_cb_data;
/**
* Approximate size of this request when encoded.
*/
@@ -80,7 +96,18 @@ struct region;
* @return NULL if out of memory, fiber diagnostics area is set
*/
struct journal_entry *
-journal_entry_new(size_t n_rows, struct region *region);
+journal_entry_new(size_t n_rows, struct region *region,
+ journal_entry_done_cb on_done_cb,
+ void *on_done_cb_data);
+
+/**
+ * Finalize a signle entry.
+ */
+static inline void
+journal_entry_complete(struct journal_entry *entry)
+{
+ entry->on_done_cb(entry, entry->on_done_cb_data);
+}
/**
* An API for an abstract journal for all transactions of this
diff --git a/src/box/txn.c b/src/box/txn.c
index 1eb4db6a3..5825acc34 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -337,6 +337,66 @@ fail:
return -1;
}
+/*
+ * A helper function to process on_commit/on_rollback triggers.
+ */
+static inline void
+txn_process_trigger(struct rlist *trigger, struct txn *txn)
+{
+ /*
+ * Some of triggers require for in_txn variable is set so
+ * restore it for time a trigger is in progress.
+ */
+ fiber_set_txn(fiber(), txn);
+ /* Rollback triggers must not throw. */
+ if (trigger_run(trigger, txn) != 0) {
+ /*
+ * As transaction couldn't handle a trigger error so
+ * there is no option except than panic.
+ */
+ diag_log();
+ unreachable();
+ panic("rollback trigger failed");
+ }
+ fiber_set_txn(fiber(), NULL);
+}
+
+/**
+ * Complete transaction processing.
+ */
+static void
+txn_complete(struct txn *txn)
+{
+ if (txn->signature < 0) {
+ /* Undo the transaction. */
+ if (txn->engine)
+ engine_rollback(txn->engine, txn);
+ if (txn->has_triggers)
+ txn_process_trigger(&txn->on_rollback, txn);
+
+ return;
+ } else {
+ /* Accept the transaction. */
+ /*
+ * Engine can be NULL if transaction contains IPROTO_NOP
+ * statements only.
+ */
+ if (txn->engine != NULL)
+ engine_commit(txn->engine, txn);
+ if (txn->has_triggers)
+ txn_process_trigger(&txn->on_commit, txn);
+ }
+}
+
+static void
+txn_entry_done_cb(struct journal_entry *entry, void *data)
+{
+ struct txn *txn = (struct txn *)data;
+ txn->signature = entry->res;
+ txn_complete(txn);
+}
+
+
static int64_t
txn_write_to_wal(struct txn *txn)
{
@@ -344,7 +404,9 @@ txn_write_to_wal(struct txn *txn)
struct journal_entry *req = journal_entry_new(txn->n_new_rows +
txn->n_applier_rows,
- &txn->region);
+ &txn->region,
+ txn_entry_done_cb,
+ txn);
if (req == NULL) {
txn_rollback(txn);
return -1;
@@ -370,16 +432,14 @@ txn_write_to_wal(struct txn *txn)
ev_tstamp stop = ev_monotonic_now(loop());
if (res < 0) {
- /* Cascading rollback. */
- txn_rollback(txn); /* Perform our part of cascading rollback. */
- /*
- * Move fiber to end of event loop to avoid
- * execution of any new requests before all
- * pending rollbacks are processed.
- */
- fiber_reschedule();
diag_set(ClientError, ER_WAL_IO);
diag_log();
+ /*
+ * However, the transaction is rolled back by
+ * finalization handler we are still in duty to
+ * free it.
+ */
+ txn_free(txn);
} else if (stop - start > too_long_threshold) {
int n_rows = txn->n_new_rows + txn->n_applier_rows;
say_warn_ratelimited("too long WAL write: %d rows at "
@@ -418,30 +478,23 @@ txn_commit(struct txn *txn)
}
trigger_clear(&txn->fiber_on_stop);
+ /*
+ * After this transaction could not be used more
+ * so reset corresponding key in a fiber storage.
+ */
+ fiber_set_txn(fiber(), NULL);
if (txn->n_new_rows + txn->n_applier_rows > 0) {
txn->signature = txn_write_to_wal(txn);
if (txn->signature < 0)
return -1;
+ } else {
+ /*
+ * However, there is nothing to write to wal a finalization
+ * should be fired.
+ */
+ txn->signature = 0;
+ txn_complete(txn);
}
- /*
- * Engine can be NULL if transaction contains IPROTO_NOP
- * statements only.
- */
- if (txn->engine != NULL)
- engine_commit(txn->engine, txn);
- /*
- * The transaction is in the binary log. No action below
- * may throw. In case an error has happened, there is
- * no other option but terminate.
- */
- if (txn->has_triggers &&
- trigger_run(&txn->on_commit, txn) != 0) {
- diag_log();
- unreachable();
- panic("commit trigger failed");
- }
-
- fiber_set_txn(fiber(), NULL);
txn_free(txn);
return 0;
fail:
@@ -463,18 +516,10 @@ txn_rollback(struct txn *txn)
{
assert(txn == in_txn());
trigger_clear(&txn->fiber_on_stop);
- if (txn->engine)
- engine_rollback(txn->engine, txn);
- /* Rollback triggers must not throw. */
- if (txn->has_triggers &&
- trigger_run(&txn->on_rollback, txn) != 0) {
- diag_log();
- unreachable();
- panic("rollback trigger failed");
- }
-
- fiber_set_txn(fiber(), NULL);
+ txn->signature = -1;
+ txn_complete(txn);
txn_free(txn);
+ fiber_set_txn(fiber(), NULL);
}
void
diff --git a/src/box/vy_log.c b/src/box/vy_log.c
index 098a01419..bf50f5520 100644
--- a/src/box/vy_log.c
+++ b/src/box/vy_log.c
@@ -819,7 +819,8 @@ vy_log_tx_flush(struct vy_log_tx *tx)
tx_size++;
size_t used = region_used(&fiber()->gc);
- struct journal_entry *entry = journal_entry_new(tx_size, &fiber()->gc);
+ struct journal_entry *entry = journal_entry_new(tx_size, &fiber()->gc,
+ NULL, NULL);
if (entry == NULL)
goto err;
diff --git a/src/box/wal.c b/src/box/wal.c
index 0ea15a432..4fa9beca0 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -260,8 +260,10 @@ tx_schedule_queue(struct stailq *queue)
* are many ready fibers.
*/
struct journal_entry *req;
- stailq_foreach_entry(req, queue, fifo)
+ stailq_foreach_entry(req, queue, fifo) {
+ journal_entry_complete(req);
fiber_wakeup(req->fiber);
+ }
}
/**
@@ -1131,7 +1133,9 @@ wal_write(struct journal *journal, struct journal_entry *entry)
{
struct wal_writer *writer = (struct wal_writer *) journal;
- ERROR_INJECT_RETURN(ERRINJ_WAL_IO);
+ ERROR_INJECT(ERRINJ_WAL_IO, {
+ goto fail;
+ });
if (! stailq_empty(&writer->rollback)) {
/*
@@ -1144,7 +1148,7 @@ wal_write(struct journal *journal, struct journal_entry *entry)
say_error("Aborting transaction %llu during "
"cascading rollback",
vclock_sum(&writer->vclock));
- return -1;
+ goto fail;
}
struct wal_msg *batch;
@@ -1158,7 +1162,7 @@ wal_write(struct journal *journal, struct journal_entry *entry)
if (batch == NULL) {
diag_set(OutOfMemory, sizeof(struct wal_msg),
"region", "struct wal_msg");
- return -1;
+ goto fail;
}
wal_msg_create(batch);
/*
@@ -1182,6 +1186,11 @@ wal_write(struct journal *journal, struct journal_entry *entry)
fiber_yield(); /* Request was inserted. */
fiber_set_cancellable(cancellable);
return entry->res;
+
+fail:
+ entry->res = -1;
+ journal_entry_complete(entry);
+ return -1;
}
int64_t
@@ -1195,7 +1204,9 @@ wal_write_in_wal_mode_none(struct journal *journal,
entry->rows + entry->n_rows);
vclock_merge(&writer->vclock, &vclock_diff);
vclock_copy(&replicaset.vclock, &writer->vclock);
- return vclock_sum(&writer->vclock);
+ entry->res = vclock_sum(&writer->vclock);
+ journal_entry_complete(entry);
+ return entry->res;
}
void
diff --git a/src/lib/core/latch.h b/src/lib/core/latch.h
index 49c59cf63..580942564 100644
--- a/src/lib/core/latch.h
+++ b/src/lib/core/latch.h
@@ -155,6 +155,16 @@ latch_trylock(struct latch *l)
return latch_lock_timeout(l, 0);
}
+/**
+ * Take a latch ownership
+ */
+static inline void
+latch_steal(struct latch *l)
+{
+ assert(l->owner != NULL);
+ l->owner = fiber();
+}
+
/**
* \copydoc box_latch_unlock
*/
--
2.22.0
^ permalink raw reply [flat|nested] 9+ messages in thread
* [tarantool-patches] [PATCH v5 5/7] txn: introduce asynchronous txn commit
2019-06-21 21:48 [tarantool-patches] [PATCH v5 0/7] Parallel applier Georgy Kirichenko
` (3 preceding siblings ...)
2019-06-21 21:48 ` [tarantool-patches] [PATCH v5 4/7] wal: introduce a journal entry finalization callback Georgy Kirichenko
@ 2019-06-21 21:48 ` Georgy Kirichenko
2019-06-21 21:48 ` [tarantool-patches] [PATCH v5 6/7] applier: apply transaction in parallel Georgy Kirichenko
` (2 subsequent siblings)
7 siblings, 0 replies; 9+ messages in thread
From: Georgy Kirichenko @ 2019-06-21 21:48 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
This commit implements asynchronous transaction processing using
txn_write. The method prepares a transaction and sends it to an journal
without an yield until the transaction was finished. The transaction
status could be controlled via on_commit/on_rollback triggers.
In order to support asynchronous transaction journal_write method turned
to an asynchronous one and now a transaction engine controls journal status
using journal entry finalization callback.
Prerequisites: #1254
---
src/box/journal.c | 2 -
src/box/journal.h | 10 +--
src/box/txn.c | 153 ++++++++++++++++++++++++++++++----------------
src/box/txn.h | 16 +++++
src/box/wal.c | 29 ++-------
5 files changed, 125 insertions(+), 85 deletions(-)
diff --git a/src/box/journal.c b/src/box/journal.c
index 4c1997f36..d762613dd 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -30,7 +30,6 @@
*/
#include "journal.h"
#include <small/region.h>
-#include <fiber.h>
#include <diag.h>
/**
@@ -72,7 +71,6 @@ journal_entry_new(size_t n_rows, struct region *region,
entry->approx_len = 0;
entry->n_rows = n_rows;
entry->res = -1;
- entry->fiber = fiber();
entry->on_done_cb = on_done_cb;
entry->on_done_cb_data = on_done_cb_data;
return entry;
diff --git a/src/box/journal.h b/src/box/journal.h
index 52b8a715c..cac82c15e 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -33,6 +33,7 @@
#include <stdint.h>
#include <stdbool.h>
#include "salad/stailq.h"
+#include "fiber.h"
#if defined(__cplusplus)
extern "C" {
@@ -59,10 +60,6 @@ struct journal_entry {
* the committed transaction, on error is -1
*/
int64_t res;
- /**
- * The fiber issuing the request.
- */
- struct fiber *fiber;
/**
* A journal entry finalization callback which is going to be called
* after the entry processing was winished in both cases: succes
@@ -127,10 +124,9 @@ struct journal {
extern struct journal *current_journal;
/**
- * Record a single entry.
+ * Send a single entry to write.
*
- * @return a log sequence number (vclock signature) of the entry
- * or -1 on error.
+ * @return 0 if write was scheduled or -1 in case of an error.
*/
static inline int64_t
journal_write(struct journal_entry *entry)
diff --git a/src/box/txn.c b/src/box/txn.c
index 5825acc34..f331642f9 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -199,6 +199,9 @@ txn_begin()
txn->engine = NULL;
txn->engine_tx = NULL;
txn->psql_txn = NULL;
+ txn->entry = NULL;
+ txn->fiber = NULL;
+ txn->done = false;
/* fiber_on_yield/fiber_on_stop initialized by engine on demand */
fiber_set_txn(fiber(), txn);
trigger_create(&txn->fiber_on_stop, txn_on_stop, NULL, NULL);
@@ -374,7 +377,6 @@ txn_complete(struct txn *txn)
if (txn->has_triggers)
txn_process_trigger(&txn->on_rollback, txn);
- return;
} else {
/* Accept the transaction. */
/*
@@ -385,6 +387,27 @@ txn_complete(struct txn *txn)
engine_commit(txn->engine, txn);
if (txn->has_triggers)
txn_process_trigger(&txn->on_commit, txn);
+ ev_tstamp stop_tm = ev_monotonic_now(loop());
+ if (stop_tm - txn->start_tm > too_long_threshold) {
+ int n_rows = txn->n_new_rows + txn->n_applier_rows;
+ say_warn_ratelimited("too long WAL write: %d rows at "
+ "LSN %lld: %.3f sec", n_rows,
+ txn->signature - n_rows + 1,
+ stop_tm - txn->start_tm);
+ }
+ }
+ /*
+ * If there is no fiber waiting for the transaction then
+ * the transaction could be safely freed. In the opposite case
+ * the fiber is in duty to free this transaction.
+ */
+ if (txn->fiber == NULL)
+ txn_free(txn);
+ else {
+ txn->done = true;
+ if (txn->fiber != fiber())
+ /* Wake a waiting fiber up. */
+ fiber_wakeup(txn->fiber);
}
}
@@ -392,29 +415,30 @@ static void
txn_entry_done_cb(struct journal_entry *entry, void *data)
{
struct txn *txn = (struct txn *)data;
+ assert(txn->entry == entry);
txn->signature = entry->res;
txn_complete(txn);
}
-
static int64_t
txn_write_to_wal(struct txn *txn)
{
+ assert(txn->entry == NULL);
assert(txn->n_new_rows + txn->n_applier_rows > 0);
- struct journal_entry *req = journal_entry_new(txn->n_new_rows +
- txn->n_applier_rows,
- &txn->region,
- txn_entry_done_cb,
- txn);
- if (req == NULL) {
+ /* Prepare a journal entry. */
+ txn->entry = journal_entry_new(txn->n_new_rows +
+ txn->n_applier_rows,
+ &txn->region,
+ txn_entry_done_cb, txn);
+ if (txn->entry == NULL) {
txn_rollback(txn);
return -1;
}
struct txn_stmt *stmt;
- struct xrow_header **remote_row = req->rows;
- struct xrow_header **local_row = req->rows + txn->n_applier_rows;
+ struct xrow_header **remote_row = txn->entry->rows;
+ struct xrow_header **local_row = txn->entry->rows + txn->n_applier_rows;
stailq_foreach_entry(stmt, &txn->stmts, next) {
if (stmt->row == NULL)
continue; /* A read (e.g. select) request */
@@ -422,40 +446,26 @@ txn_write_to_wal(struct txn *txn)
*local_row++ = stmt->row;
else
*remote_row++ = stmt->row;
- req->approx_len += xrow_approx_len(stmt->row);
+ txn->entry->approx_len += xrow_approx_len(stmt->row);
}
- assert(remote_row == req->rows + txn->n_applier_rows);
+ assert(remote_row == txn->entry->rows + txn->n_applier_rows);
assert(local_row == remote_row + txn->n_new_rows);
- ev_tstamp start = ev_monotonic_now(loop());
- int64_t res = journal_write(req);
- ev_tstamp stop = ev_monotonic_now(loop());
-
- if (res < 0) {
+ /* Send entry to a journal. */
+ if (journal_write(txn->entry) < 0) {
diag_set(ClientError, ER_WAL_IO);
diag_log();
- /*
- * However, the transaction is rolled back by
- * finalization handler we are still in duty to
- * free it.
- */
- txn_free(txn);
- } else if (stop - start > too_long_threshold) {
- int n_rows = txn->n_new_rows + txn->n_applier_rows;
- say_warn_ratelimited("too long WAL write: %d rows at "
- "LSN %lld: %.3f sec", n_rows,
- res - n_rows + 1, stop - start);
+ return -1;
}
- /*
- * Use vclock_sum() from WAL writer as transaction signature.
- */
- return res;
+ return 0;
}
-int
-txn_commit(struct txn *txn)
+/*
+ * Prepare a transaction using engines.
+ */
+static int
+txn_prepare(struct txn *txn)
{
- assert(txn == in_txn());
/*
* If transaction has been started in SQL, deferred
* foreign key constraints must not be violated.
@@ -465,7 +475,7 @@ txn_commit(struct txn *txn)
struct sql_txn *sql_txn = txn->psql_txn;
if (sql_txn->fk_deferred_count != 0) {
diag_set(ClientError, ER_FOREIGN_KEY_CONSTRAINT);
- goto fail;
+ return -1;
}
}
/*
@@ -473,33 +483,71 @@ txn_commit(struct txn *txn)
* we have a bunch of IPROTO_NOP statements.
*/
if (txn->engine != NULL) {
- if (engine_prepare(txn->engine, txn) != 0)
- goto fail;
+ if (engine_prepare(txn->engine, txn) != 0) {
+ return -1;
+ }
}
trigger_clear(&txn->fiber_on_stop);
+ return 0;
+}
+
+/*
+ * Send a transaction to a journal.
+ */
+int
+txn_write(struct txn *txn)
+{
+ if (txn_prepare(txn) != 0) {
+ txn_rollback(txn);
+ return -1;
+ }
/*
* After this transaction could not be used more
* so reset corresponding key in a fiber storage.
- */
+ */
fiber_set_txn(fiber(), NULL);
- if (txn->n_new_rows + txn->n_applier_rows > 0) {
- txn->signature = txn_write_to_wal(txn);
- if (txn->signature < 0)
- return -1;
- } else {
- /*
- * However, there is nothing to write to wal a finalization
- * should be fired.
- */
+ txn->start_tm = ev_monotonic_now(loop());
+ if (txn->n_new_rows + txn->n_applier_rows == 0) {
+ /* Nothing to do. */
txn->signature = 0;
txn_complete(txn);
+ return 0;
+ }
+
+ if (txn_write_to_wal(txn) != 0) {
+ /*
+ * After journal write the transaction would be finalized
+ * with its journal entry finalization callback,
+ * just return an error.
+ */
+ return -1;
}
- txn_free(txn);
return 0;
-fail:
- txn_rollback(txn);
- return -1;
+}
+
+int
+txn_commit(struct txn *txn)
+{
+ txn->fiber = fiber();
+
+ if (txn_write(txn) != 0)
+ return -1;
+ /*
+ * In case of non-yielding journal the transaction could already
+ * be done and there is nothing to wait in such cases.
+ */
+ if (!txn->done) {
+ bool cancellable = fiber_set_cancellable(false);
+ fiber_yield();
+ fiber_set_cancellable(cancellable);
+ }
+ int res = txn->signature >= 0? 0: -1;
+ if (res != 0)
+ diag_set(ClientError, ER_WAL_IO);
+ /* As the current fiber is waiting for the transaction so free it. */
+ txn_free(txn);
+ return res;
}
void
@@ -518,7 +566,6 @@ txn_rollback(struct txn *txn)
trigger_clear(&txn->fiber_on_stop);
txn->signature = -1;
txn_complete(txn);
- txn_free(txn);
fiber_set_txn(fiber(), NULL);
}
diff --git a/src/box/txn.h b/src/box/txn.h
index f20428fad..ddcac3bb9 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -195,6 +195,16 @@ struct txn {
/** Commit and rollback triggers */
struct rlist on_commit, on_rollback;
struct sql_txn *psql_txn;
+ /** Journal entry to control txn write. */
+ struct journal_entry *entry;
+ /** Transaction completion trigger. */
+ struct trigger entry_done;
+ /** Timestampt of entry write start. */
+ ev_tstamp start_tm;
+ /* A fiber to wake up when transaction is finished. */
+ struct fiber *fiber;
+ /* True when transaction is processed. */
+ bool done;
};
/* Pointer to the current transaction (if any) */
@@ -228,6 +238,12 @@ txn_commit(struct txn *txn);
void
txn_rollback(struct txn *txn);
+int
+txn_write(struct txn *txn);
+
+int
+txn_wait(struct txn *txn);
+
/**
* Roll back the transaction but keep the object around.
* A special case for memtx transaction abort on yield. In this
diff --git a/src/box/wal.c b/src/box/wal.c
index 4fa9beca0..dce5fee6b 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -245,13 +245,6 @@ xlog_write_entry(struct xlog *l, struct journal_entry *entry)
return xlog_tx_commit(l);
}
-/**
- * Invoke fibers waiting for their journal_entry's to be
- * completed. The fibers are invoked in strict fifo order:
- * this ensures that, in case of rollback, requests are
- * rolled back in strict reverse order, producing
- * a consistent database state.
- */
static void
tx_schedule_queue(struct stailq *queue)
{
@@ -259,10 +252,9 @@ tx_schedule_queue(struct stailq *queue)
* fiber_wakeup() is faster than fiber_call() when there
* are many ready fibers.
*/
- struct journal_entry *req;
- stailq_foreach_entry(req, queue, fifo) {
+ struct journal_entry *req, *tmp;
+ stailq_foreach_entry_safe(req, tmp, queue, fifo) {
journal_entry_complete(req);
- fiber_wakeup(req->fiber);
}
}
@@ -1126,9 +1118,9 @@ wal_writer_f(va_list ap)
/**
* WAL writer main entry point: queue a single request
- * to be written to disk and wait until this task is completed.
+ * to be written to disk.
*/
-int64_t
+static int64_t
wal_write(struct journal *journal, struct journal_entry *entry)
{
struct wal_writer *writer = (struct wal_writer *) journal;
@@ -1176,16 +1168,7 @@ wal_write(struct journal *journal, struct journal_entry *entry)
batch->approx_len += entry->approx_len;
writer->wal_pipe.n_input += entry->n_rows * XROW_IOVMAX;
cpipe_flush_input(&writer->wal_pipe);
- /**
- * It's not safe to spuriously wakeup this fiber
- * since in that case it will ignore a possible
- * error from WAL writer and not roll back the
- * transaction.
- */
- bool cancellable = fiber_set_cancellable(false);
- fiber_yield(); /* Request was inserted. */
- fiber_set_cancellable(cancellable);
- return entry->res;
+ return 0;
fail:
entry->res = -1;
@@ -1193,7 +1176,7 @@ fail:
return -1;
}
-int64_t
+static int64_t
wal_write_in_wal_mode_none(struct journal *journal,
struct journal_entry *entry)
{
--
2.22.0
^ permalink raw reply [flat|nested] 9+ messages in thread
* [tarantool-patches] [PATCH v5 6/7] applier: apply transaction in parallel
2019-06-21 21:48 [tarantool-patches] [PATCH v5 0/7] Parallel applier Georgy Kirichenko
` (4 preceding siblings ...)
2019-06-21 21:48 ` [tarantool-patches] [PATCH v5 5/7] txn: introduce asynchronous txn commit Georgy Kirichenko
@ 2019-06-21 21:48 ` Georgy Kirichenko
2019-06-21 21:48 ` [tarantool-patches] [PATCH v5 7/7] test: fix flaky test Georgy Kirichenko
2019-06-25 16:08 ` [tarantool-patches] [PATCH v5 0/7] Parallel applier Vladimir Davydov
7 siblings, 0 replies; 9+ messages in thread
From: Georgy Kirichenko @ 2019-06-21 21:48 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
Applier use asynchronous transaction to batch journal writes. All
appliers share the replicaset.applier.tx_vclock which means the vclock
applied but not necessarily written to a journal. Appliers use a trigger
to coordinate in case of failure - when a transaction is going to
be rolled back.
Closes: #1254
---
src/box/applier.cc | 188 ++++++++++++++++++++++++++++++++---------
src/box/applier.h | 7 ++
src/box/replication.cc | 7 ++
src/box/replication.h | 11 +++
4 files changed, 172 insertions(+), 41 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 6f93759a8..9465b071a 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -50,6 +50,7 @@
#include "schema.h"
#include "txn.h"
#include "box.h"
+#include "scoped_guard.h"
STRS(applier_state, applier_STATE);
@@ -108,6 +109,26 @@ applier_log_error(struct applier *applier, struct error *e)
applier->last_logged_errcode = errcode;
}
+/*
+ * A helper function to track an applier state.
+ */
+static inline void
+applier_check_state(struct applier *applier)
+{
+ /*
+ * Stay 'orphan' until appliers catch up with
+ * the remote vclock at the time of SUBSCRIBE
+ * and the lag is less than configured.
+ */
+ if (applier->state == APPLIER_SYNC &&
+ applier->lag <= replication_sync_lag &&
+ vclock_compare(&applier->remote_vclock_at_subscribe,
+ &replicaset.vclock) <= 0) {
+ /* Applier is synced, switch to "follow". */
+ applier_set_state(applier, APPLIER_FOLLOW);
+ }
+}
+
/*
* Fiber function to write vclock to replication master.
* To track connection status, replica answers master
@@ -135,6 +156,12 @@ applier_writer_f(va_list ap)
else
fiber_cond_wait_timeout(&applier->writer_cond,
replication_timeout);
+ /* A writer fiber is going to be awaken after a commit or
+ * a heartbeat message. So this is a appropriate place to
+ * update an applier status because the applier state could
+ * yield and doesn't fit into a commit trigger.
+ */
+ applier_check_state(applier);
/* Send ACKs only when in FOLLOW mode ,*/
if (applier->state != APPLIER_SYNC &&
applier->state != APPLIER_FOLLOW)
@@ -574,6 +601,27 @@ applier_read_tx(struct applier *applier, struct stailq *rows)
next)->row.is_commit);
}
+/* A trigger to control a replicated transaction rollback. */
+static void
+applier_txn_rollback_cb(struct trigger *trigger, void *event)
+{
+ (void) trigger;
+ /* Setup shared applier diagnostic area. */
+ diag_set(ClientError, ER_WAL_IO);
+ diag_move(&fiber()->diag, &replicaset.applier.diag);
+ trigger_run(&replicaset.applier.on_rollback, event);
+ /* Rollback applier vclock to the commited one. */
+ vclock_copy(&replicaset.applier.vclock, &replicaset.vclock);
+}
+
+static void
+applier_txn_commit_cb(struct trigger *trigger, void *event)
+{
+ (void) trigger;
+ /* Broadcast the commit event across all appliers. */
+ trigger_run(&replicaset.applier.on_commit, event);
+}
+
/**
* Apply all rows in the rows queue as a single transaction.
*
@@ -582,6 +630,19 @@ applier_read_tx(struct applier *applier, struct stailq *rows)
static int
applier_apply_tx(struct stailq *rows)
{
+ struct xrow_header *first_row =
+ &stailq_first_entry(rows, struct applier_tx_row,
+ next)->row;
+ struct replica *replica = replica_by_id(first_row->replica_id);
+ struct latch *latch = (replica ? &replica->order_latch :
+ &replicaset.applier.order_latch);
+ latch_lock(latch);
+ if (vclock_get(&replicaset.applier.vclock, first_row->replica_id) >=
+ first_row->lsn) {
+ latch_unlock(latch);
+ return 0;
+ }
+
/**
* Explicitly begin the transaction so that we can
* control fiber->gc life cycle and, in case of apply
@@ -590,8 +651,10 @@ applier_apply_tx(struct stailq *rows)
*/
struct txn *txn = txn_begin();
struct applier_tx_row *item;
- if (txn == NULL)
- diag_raise();
+ if (txn == NULL) {
+ latch_unlock(latch);
+ return -1;
+ }
stailq_foreach_entry(item, rows, next) {
struct xrow_header *row = &item->row;
int res = apply_row(row);
@@ -632,14 +695,63 @@ applier_apply_tx(struct stailq *rows)
"Replication", "distributed transactions");
goto rollback;
}
- return txn_commit(txn);
+ /* We are ready to submit txn to wal. */
+ struct trigger *on_rollback, *on_commit;
+ on_rollback = (struct trigger *)region_alloc(&txn->region,
+ sizeof(struct trigger));
+ on_commit = (struct trigger *)region_alloc(&txn->region,
+ sizeof(struct trigger));
+ if (on_rollback == NULL || on_commit == NULL)
+ goto rollback;
+
+ trigger_create(on_rollback, applier_txn_rollback_cb, NULL, NULL);
+ txn_on_rollback(txn, on_rollback);
+
+ trigger_create(on_commit, applier_txn_commit_cb, NULL, NULL);
+ txn_on_commit(txn, on_commit);
+
+ if (txn_write(txn) < 0)
+ goto fail;
+ /* Transaction was sent to journal so promote vclock. */
+ vclock_follow(&replicaset.applier.vclock, first_row->replica_id,
+ first_row->lsn);
+ latch_unlock(latch);
+
+ return 0;
rollback:
txn_rollback(txn);
+fail:
+ latch_unlock(latch);
fiber_gc();
return -1;
}
+/*
+ * A trigger to update an applier state after a replication commit.
+ */
+static void
+applier_on_commit(struct trigger *trigger, void *event)
+{
+ (void) event;
+ struct applier *applier = (struct applier *)trigger->data;
+ fiber_cond_signal(&applier->writer_cond);
+}
+
+/*
+ * A trigger to update an applier state after a replication rollback.
+ */
+static void
+applier_on_rollback(struct trigger *trigger, void *event)
+{
+ (void) event;
+ struct applier *applier = (struct applier *)trigger->data;
+ /* Setup a shared error. */
+ if (!diag_is_empty(&replicaset.applier.diag))
+ diag_add_error(&applier->diag, diag_last_error(&replicaset.applier.diag));
+ fiber_cancel(applier->reader);
+}
+
/**
* Execute and process SUBSCRIBE request (follow updates from a master).
*/
@@ -650,7 +762,6 @@ applier_subscribe(struct applier *applier)
struct ev_io *coio = &applier->io;
struct ibuf *ibuf = &applier->ibuf;
struct xrow_header row;
- struct vclock remote_vclock_at_subscribe;
struct tt_uuid cluster_id = uuid_nil;
struct vclock vclock;
@@ -677,10 +788,10 @@ applier_subscribe(struct applier *applier)
* the replica, and replica has to check whether
* its and master's cluster ids match.
*/
- vclock_create(&remote_vclock_at_subscribe);
+ vclock_create(&applier->remote_vclock_at_subscribe);
xrow_decode_subscribe_response_xc(&row,
&cluster_id,
- &remote_vclock_at_subscribe);
+ &applier->remote_vclock_at_subscribe);
/*
* If master didn't send us its cluster id
* assume that it has done all the checks.
@@ -695,7 +806,7 @@ applier_subscribe(struct applier *applier)
say_info("subscribed");
say_info("remote vclock %s local vclock %s",
- vclock_to_string(&remote_vclock_at_subscribe),
+ vclock_to_string(&applier->remote_vclock_at_subscribe),
vclock_to_string(&vclock));
}
/*
@@ -744,6 +855,21 @@ applier_subscribe(struct applier *applier)
applier->lag = TIMEOUT_INFINITY;
+ /* Register triggers to handle replication commits and rollbacks. */
+ struct trigger on_commit;
+ trigger_create(&on_commit, applier_on_commit, applier, NULL);
+ trigger_add(&replicaset.applier.on_commit, &on_commit);
+
+ struct trigger on_rollback;
+ trigger_create(&on_rollback, applier_on_rollback, applier, NULL);
+ trigger_add(&replicaset.applier.on_rollback, &on_rollback);
+
+ auto trigger_guard = make_scoped_guard([&] {
+ trigger_clear(&on_commit);
+ trigger_clear(&on_rollback);
+ });
+
+
/*
* Process a stream of rows from the binary log.
*/
@@ -756,47 +882,22 @@ applier_subscribe(struct applier *applier)
applier_set_state(applier, APPLIER_FOLLOW);
}
- /*
- * Stay 'orphan' until appliers catch up with
- * the remote vclock at the time of SUBSCRIBE
- * and the lag is less than configured.
- */
- if (applier->state == APPLIER_SYNC &&
- applier->lag <= replication_sync_lag &&
- vclock_compare(&remote_vclock_at_subscribe,
- &replicaset.vclock) <= 0) {
- /* Applier is synced, switch to "follow". */
- applier_set_state(applier, APPLIER_FOLLOW);
- }
-
struct stailq rows;
applier_read_tx(applier, &rows);
- struct xrow_header *first_row =
- &stailq_first_entry(&rows, struct applier_tx_row,
- next)->row;
applier->last_row_time = ev_monotonic_now(loop());
- struct replica *replica = replica_by_id(first_row->replica_id);
- struct latch *latch = (replica ? &replica->order_latch :
- &replicaset.applier.order_latch);
/*
- * In a full mesh topology, the same set of changes
- * may arrive via two concurrently running appliers.
- * Hence we need a latch to strictly order all changes
- * that belong to the same server id.
+ * In case of an heathbeat message wake a writer up and
+ * check aplier state.
*/
- latch_lock(latch);
- if (vclock_get(&replicaset.vclock, first_row->replica_id) <
- first_row->lsn &&
- applier_apply_tx(&rows) != 0) {
- latch_unlock(latch);
- diag_raise();
+ if (stailq_first_entry(&rows, struct applier_tx_row,
+ next)->row.lsn == 0) {
+ fiber_cond_signal(&applier->writer_cond);
+ // applier_check_state(applier);
}
- latch_unlock(latch);
+ else if (applier_apply_tx(&rows) != 0)
+ diag_raise();
- if (applier->state == APPLIER_SYNC ||
- applier->state == APPLIER_FOLLOW)
- fiber_cond_signal(&applier->writer_cond);
if (ibuf_used(ibuf) == 0)
ibuf_reset(ibuf);
fiber_gc();
@@ -881,6 +982,11 @@ applier_f(va_list ap)
return -1;
}
} catch (FiberIsCancelled *e) {
+ if (!diag_is_empty(&applier->diag)) {
+ diag_move(&applier->diag, &fiber()->diag);
+ applier_disconnect(applier, APPLIER_STOPPED);
+ break;
+ }
applier_disconnect(applier, APPLIER_OFF);
break;
} catch (SocketError *e) {
@@ -969,6 +1075,7 @@ applier_new(const char *uri)
rlist_create(&applier->on_state);
fiber_cond_create(&applier->resume_cond);
fiber_cond_create(&applier->writer_cond);
+ diag_create(&applier->diag);
return applier;
}
@@ -981,7 +1088,6 @@ applier_delete(struct applier *applier)
assert(applier->io.fd == -1);
trigger_destroy(&applier->on_state);
fiber_cond_destroy(&applier->resume_cond);
- fiber_cond_destroy(&applier->writer_cond);
free(applier);
}
diff --git a/src/box/applier.h b/src/box/applier.h
index 5bff90031..b9bb14198 100644
--- a/src/box/applier.h
+++ b/src/box/applier.h
@@ -114,8 +114,15 @@ struct applier {
bool is_paused;
/** Condition variable signaled to resume the applier. */
struct fiber_cond resume_cond;
+ /* Diag to raise an error. */
+ struct diag diag;
+ /* The masters vclock while subscribe. */
+ struct vclock remote_vclock_at_subscribe;
};
+void
+applier_init();
+
/**
* Start a client to a remote master using a background fiber.
*
diff --git a/src/box/replication.cc b/src/box/replication.cc
index a1a2a9eb3..617b9538f 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -90,6 +90,13 @@ replication_init(void)
fiber_cond_create(&replicaset.applier.cond);
replicaset.replica_by_id = (struct replica **)calloc(VCLOCK_MAX, sizeof(struct replica *));
latch_create(&replicaset.applier.order_latch);
+
+ vclock_create(&replicaset.applier.vclock);
+ vclock_copy(&replicaset.applier.vclock, &replicaset.vclock);
+ rlist_create(&replicaset.applier.on_rollback);
+ rlist_create(&replicaset.applier.on_commit);
+
+ diag_create(&replicaset.applier.diag);
}
void
diff --git a/src/box/replication.h b/src/box/replication.h
index 8c8a9927e..19f283c7d 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -232,6 +232,17 @@ struct replicaset {
* struct replica object).
*/
struct latch order_latch;
+ /*
+ * A vclock of the last transaction wich was read
+ * by applier and processed by tx.
+ */
+ struct vclock vclock;
+ /* Trigger to fire when a replication request failed to apply. */
+ struct rlist on_rollback;
+ /* Trigget to fire a replication request commited to a wal. */
+ struct rlist on_commit;
+ /* Shared applier diagnostic area. */
+ struct diag diag;
} applier;
/** Map of all known replica_id's to correspponding replica's. */
struct replica **replica_by_id;
--
2.22.0
^ permalink raw reply [flat|nested] 9+ messages in thread
* [tarantool-patches] [PATCH v5 7/7] test: fix flaky test
2019-06-21 21:48 [tarantool-patches] [PATCH v5 0/7] Parallel applier Georgy Kirichenko
` (5 preceding siblings ...)
2019-06-21 21:48 ` [tarantool-patches] [PATCH v5 6/7] applier: apply transaction in parallel Georgy Kirichenko
@ 2019-06-21 21:48 ` Georgy Kirichenko
2019-06-25 16:08 ` [tarantool-patches] [PATCH v5 0/7] Parallel applier Vladimir Davydov
7 siblings, 0 replies; 9+ messages in thread
From: Georgy Kirichenko @ 2019-06-21 21:48 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
This test fails sporadically
---
test/replication/sync.result | 7 +++++--
test/replication/sync.test.lua | 4 ++--
test/replication/transaction.result | 16 +++++++++++++++-
test/replication/transaction.test.lua | 7 ++++++-
4 files changed, 28 insertions(+), 6 deletions(-)
diff --git a/test/replication/sync.result b/test/replication/sync.result
index eddc7cbc8..6b5a14d3f 100644
--- a/test/replication/sync.result
+++ b/test/replication/sync.result
@@ -46,7 +46,7 @@ function fill()
box.space.test:replace{i}
end
fiber.create(function()
- box.error.injection.set('ERRINJ_RELAY_TIMEOUT', 0.1)
+ box.error.injection.set('ERRINJ_RELAY_TIMEOUT', 0.0025)
test_run:wait_cond(function()
local r = box.info.replication[2]
return r ~= nil and r.downstream ~= nil and
@@ -55,7 +55,6 @@ function fill()
for i = count + 101, count + 200 do
box.space.test:replace{i}
end
- box.error.injection.set('ERRINJ_RELAY_TIMEOUT', 0)
end)
count = count + 200
end;
@@ -250,6 +249,10 @@ test_run:cmd("switch default")
---
- true
...
+box.error.injection.set('ERRINJ_RELAY_TIMEOUT', 0)
+---
+- ok
+...
box.error.injection.set('ERRINJ_WAL_DELAY', true)
---
- ok
diff --git a/test/replication/sync.test.lua b/test/replication/sync.test.lua
index 52ce88fe2..f0f530ad4 100644
--- a/test/replication/sync.test.lua
+++ b/test/replication/sync.test.lua
@@ -30,7 +30,7 @@ function fill()
box.space.test:replace{i}
end
fiber.create(function()
- box.error.injection.set('ERRINJ_RELAY_TIMEOUT', 0.1)
+ box.error.injection.set('ERRINJ_RELAY_TIMEOUT', 0.0025)
test_run:wait_cond(function()
local r = box.info.replication[2]
return r ~= nil and r.downstream ~= nil and
@@ -39,7 +39,6 @@ function fill()
for i = count + 101, count + 200 do
box.space.test:replace{i}
end
- box.error.injection.set('ERRINJ_RELAY_TIMEOUT', 0)
end)
count = count + 200
end;
@@ -136,6 +135,7 @@ box.cfg{replication_sync_lag = 1}
box.cfg{replication_sync_timeout = 10}
test_run:cmd("switch default")
+box.error.injection.set('ERRINJ_RELAY_TIMEOUT', 0)
box.error.injection.set('ERRINJ_WAL_DELAY', true)
test_run:cmd("setopt delimiter ';'")
_ = fiber.create(function()
diff --git a/test/replication/transaction.result b/test/replication/transaction.result
index 8c2ac6ee4..c54c1e8d5 100644
--- a/test/replication/transaction.result
+++ b/test/replication/transaction.result
@@ -7,12 +7,21 @@ test_run = env.new()
box.schema.user.grant('guest', 'replication')
---
...
-s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')})
+engine = test_run:get_cfg('engine')
+---
+...
+s = box.schema.space.create('test', {engine = engine})
---
...
_ = s:create_index('pk')
---
...
+l = box.schema.space.create('l_space', {engine = engine, is_local = true})
+---
+...
+_ = l:create_index('pk')
+---
+...
-- transaction w/o conflict
box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit()
---
@@ -92,6 +101,11 @@ box.cfg{replication = replication}
---
...
-- replication stopped of third transaction
+-- flush wal
+box.space.l_space:replace({1})
+---
+- [1]
+...
v1[1] + 2 == box.info.vclock[1]
---
- true
diff --git a/test/replication/transaction.test.lua b/test/replication/transaction.test.lua
index f25a4737d..4e0323128 100644
--- a/test/replication/transaction.test.lua
+++ b/test/replication/transaction.test.lua
@@ -1,9 +1,12 @@
env = require('test_run')
test_run = env.new()
box.schema.user.grant('guest', 'replication')
+engine = test_run:get_cfg('engine')
-s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')})
+s = box.schema.space.create('test', {engine = engine})
_ = s:create_index('pk')
+l = box.schema.space.create('l_space', {engine = engine, is_local = true})
+_ = l:create_index('pk')
-- transaction w/o conflict
box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit()
@@ -37,6 +40,8 @@ replication = box.cfg.replication
box.cfg{replication = {}}
box.cfg{replication = replication}
-- replication stopped of third transaction
+-- flush wal
+box.space.l_space:replace({1})
v1[1] + 2 == box.info.vclock[1]
box.space.test:select()
-- check replication status
--
2.22.0
^ permalink raw reply [flat|nested] 9+ messages in thread
* Re: [tarantool-patches] [PATCH v5 0/7] Parallel applier
2019-06-21 21:48 [tarantool-patches] [PATCH v5 0/7] Parallel applier Georgy Kirichenko
` (6 preceding siblings ...)
2019-06-21 21:48 ` [tarantool-patches] [PATCH v5 7/7] test: fix flaky test Georgy Kirichenko
@ 2019-06-25 16:08 ` Vladimir Davydov
7 siblings, 0 replies; 9+ messages in thread
From: Vladimir Davydov @ 2019-06-25 16:08 UTC (permalink / raw)
To: Georgy Kirichenko; +Cc: tarantool-patches
The patch set look good to me. I pushed it to master with some very
minor changes primarily regarding coding style (see the diff below).
Note I didn't push the patch fixing tests, because
- replication/transaction.test.lua works even without it
- although replication/sync.test.lua fails occasionally, I don't
believe that your fix is quite correct. I'd rather disable the
test instead for now. BTW we have a ticket to fix it properly:
https://github.com/tarantool/tarantool/issues/4129
diff --git a/src/box/alter.cc b/src/box/alter.cc
index cb5f2b17..e76b9e68 100644
--- a/src/box/alter.cc
+++ b/src/box/alter.cc
@@ -3576,7 +3576,7 @@ unlock_after_dd(struct trigger *trigger, void *event)
(void) trigger;
(void) event;
/*
- * In case of yielding journal will this trigger be processed
+ * In case of yielding journal this trigger will be processed
* in a context of tx_prio endpoint instead of a context of
* a fiber which has this latch locked. So steal the latch first.
*/
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 9465b071..cf03ea16 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -109,11 +109,12 @@ applier_log_error(struct applier *applier, struct error *e)
applier->last_logged_errcode = errcode;
}
-/*
- * A helper function to track an applier state.
+/**
+ * A helper function which switches the applier to FOLLOW state
+ * if it has synchronized with its master.
*/
static inline void
-applier_check_state(struct applier *applier)
+applier_check_sync(struct applier *applier)
{
/*
* Stay 'orphan' until appliers catch up with
@@ -156,12 +157,14 @@ applier_writer_f(va_list ap)
else
fiber_cond_wait_timeout(&applier->writer_cond,
replication_timeout);
- /* A writer fiber is going to be awaken after a commit or
- * a heartbeat message. So this is a appropriate place to
+ /*
+ * A writer fiber is going to be awaken after a commit or
+ * a heartbeat message. So this is an appropriate place to
* update an applier status because the applier state could
* yield and doesn't fit into a commit trigger.
*/
- applier_check_state(applier);
+ applier_check_sync(applier);
+
/* Send ACKs only when in FOLLOW mode ,*/
if (applier->state != APPLIER_SYNC &&
applier->state != APPLIER_FOLLOW)
@@ -601,7 +604,6 @@ applier_read_tx(struct applier *applier, struct stailq *rows)
next)->row.is_commit);
}
-/* A trigger to control a replicated transaction rollback. */
static void
applier_txn_rollback_cb(struct trigger *trigger, void *event)
{
@@ -609,8 +611,9 @@ applier_txn_rollback_cb(struct trigger *trigger, void *event)
/* Setup shared applier diagnostic area. */
diag_set(ClientError, ER_WAL_IO);
diag_move(&fiber()->diag, &replicaset.applier.diag);
+ /* Broadcast the rollback event across all appliers. */
trigger_run(&replicaset.applier.on_rollback, event);
- /* Rollback applier vclock to the commited one. */
+ /* Rollback applier vclock to the committed one. */
vclock_copy(&replicaset.applier.vclock, &replicaset.vclock);
}
@@ -630,15 +633,20 @@ applier_txn_commit_cb(struct trigger *trigger, void *event)
static int
applier_apply_tx(struct stailq *rows)
{
- struct xrow_header *first_row =
- &stailq_first_entry(rows, struct applier_tx_row,
- next)->row;
+ struct xrow_header *first_row = &stailq_first_entry(rows,
+ struct applier_tx_row, next)->row;
struct replica *replica = replica_by_id(first_row->replica_id);
+ /*
+ * In a full mesh topology, the same set of changes
+ * may arrive via two concurrently running appliers.
+ * Hence we need a latch to strictly order all changes
+ * that belong to the same server id.
+ */
struct latch *latch = (replica ? &replica->order_latch :
&replicaset.applier.order_latch);
latch_lock(latch);
- if (vclock_get(&replicaset.applier.vclock, first_row->replica_id) >=
- first_row->lsn) {
+ if (vclock_get(&replicaset.applier.vclock,
+ first_row->replica_id) >= first_row->lsn) {
latch_unlock(latch);
return 0;
}
@@ -713,11 +721,11 @@ applier_apply_tx(struct stailq *rows)
if (txn_write(txn) < 0)
goto fail;
+
/* Transaction was sent to journal so promote vclock. */
- vclock_follow(&replicaset.applier.vclock, first_row->replica_id,
- first_row->lsn);
+ vclock_follow(&replicaset.applier.vclock,
+ first_row->replica_id, first_row->lsn);
latch_unlock(latch);
-
return 0;
rollback:
txn_rollback(txn);
@@ -747,8 +755,11 @@ applier_on_rollback(struct trigger *trigger, void *event)
(void) event;
struct applier *applier = (struct applier *)trigger->data;
/* Setup a shared error. */
- if (!diag_is_empty(&replicaset.applier.diag))
- diag_add_error(&applier->diag, diag_last_error(&replicaset.applier.diag));
+ if (!diag_is_empty(&replicaset.applier.diag)) {
+ diag_add_error(&applier->diag,
+ diag_last_error(&replicaset.applier.diag));
+ }
+ /* Stop the applier fiber. */
fiber_cancel(applier->reader);
}
@@ -789,9 +800,8 @@ applier_subscribe(struct applier *applier)
* its and master's cluster ids match.
*/
vclock_create(&applier->remote_vclock_at_subscribe);
- xrow_decode_subscribe_response_xc(&row,
- &cluster_id,
- &applier->remote_vclock_at_subscribe);
+ xrow_decode_subscribe_response_xc(&row, &cluster_id,
+ &applier->remote_vclock_at_subscribe);
/*
* If master didn't send us its cluster id
* assume that it has done all the checks.
@@ -869,7 +879,6 @@ applier_subscribe(struct applier *applier)
trigger_clear(&on_rollback);
});
-
/*
* Process a stream of rows from the binary log.
*/
@@ -887,14 +896,12 @@ applier_subscribe(struct applier *applier)
applier->last_row_time = ev_monotonic_now(loop());
/*
- * In case of an heathbeat message wake a writer up and
- * check aplier state.
+ * In case of an heartbeat message wake a writer up
+ * and check applier state.
*/
if (stailq_first_entry(&rows, struct applier_tx_row,
- next)->row.lsn == 0) {
+ next)->row.lsn == 0)
fiber_cond_signal(&applier->writer_cond);
- // applier_check_state(applier);
- }
else if (applier_apply_tx(&rows) != 0)
diag_raise();
@@ -1087,7 +1094,7 @@ applier_delete(struct applier *applier)
ibuf_destroy(&applier->ibuf);
assert(applier->io.fd == -1);
trigger_destroy(&applier->on_state);
- fiber_cond_destroy(&applier->resume_cond);
+ diag_destroy(&applier->diag);
free(applier);
}
diff --git a/src/box/applier.h b/src/box/applier.h
index b9bb1419..b406e6aa 100644
--- a/src/box/applier.h
+++ b/src/box/applier.h
@@ -116,13 +116,10 @@ struct applier {
struct fiber_cond resume_cond;
/* Diag to raise an error. */
struct diag diag;
- /* The masters vclock while subscribe. */
+ /* Master's vclock at the time of SUBSCRIBE. */
struct vclock remote_vclock_at_subscribe;
};
-void
-applier_init();
-
/**
* Start a client to a remote master using a background fiber.
*
diff --git a/src/box/box.cc b/src/box/box.cc
index f5bd29dd..80249919 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -308,7 +308,7 @@ recovery_journal_write(struct journal *base,
struct recovery_journal *journal = (struct recovery_journal *) base;
entry->res = vclock_sum(journal->vclock);
journal_entry_complete(entry);
- return entry->res;
+ return 0;
}
static inline void
diff --git a/src/box/journal.h b/src/box/journal.h
index cac82c15..236058bb 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -62,7 +62,7 @@ struct journal_entry {
int64_t res;
/**
* A journal entry finalization callback which is going to be called
- * after the entry processing was winished in both cases: succes
+ * after the entry processing was finished in both cases: success
* or fail. Entry->res is set to a result value before the callback
* is fired.
*/
@@ -98,7 +98,7 @@ journal_entry_new(size_t n_rows, struct region *region,
void *on_done_cb_data);
/**
- * Finalize a signle entry.
+ * Finalize a single entry.
*/
static inline void
journal_entry_complete(struct journal_entry *entry)
diff --git a/src/box/replication.cc b/src/box/replication.cc
index 617b9538..28f7aced 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -110,6 +110,7 @@ replication_free(void)
replicaset_foreach(replica)
relay_cancel(replica->relay);
+ diag_destroy(&replicaset.applier.diag);
free(replicaset.replica_by_id);
}
diff --git a/src/box/txn.c b/src/box/txn.c
index f331642f..95076773 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -192,6 +192,7 @@ txn_begin()
txn->n_local_rows = 0;
txn->n_applier_rows = 0;
txn->has_triggers = false;
+ txn->is_done = false;
txn->is_aborted = false;
txn->in_sub_stmt = 0;
txn->id = ++tsn;
@@ -199,9 +200,7 @@ txn_begin()
txn->engine = NULL;
txn->engine_tx = NULL;
txn->psql_txn = NULL;
- txn->entry = NULL;
txn->fiber = NULL;
- txn->done = false;
/* fiber_on_yield/fiber_on_stop initialized by engine on demand */
fiber_set_txn(fiber(), txn);
trigger_create(&txn->fiber_on_stop, txn_on_stop, NULL, NULL);
@@ -344,22 +343,22 @@ fail:
* A helper function to process on_commit/on_rollback triggers.
*/
static inline void
-txn_process_trigger(struct rlist *trigger, struct txn *txn)
+txn_run_triggers(struct txn *txn, struct rlist *trigger)
{
/*
- * Some of triggers require for in_txn variable is set so
- * restore it for time a trigger is in progress.
+ * Some triggers require for in_txn variable to be set so
+ * restore it for the time triggers are in progress.
*/
fiber_set_txn(fiber(), txn);
/* Rollback triggers must not throw. */
if (trigger_run(trigger, txn) != 0) {
/*
* As transaction couldn't handle a trigger error so
- * there is no option except than panic.
+ * there is no option except panic.
*/
diag_log();
unreachable();
- panic("rollback trigger failed");
+ panic("commit/rollback trigger failed");
}
fiber_set_txn(fiber(), NULL);
}
@@ -370,24 +369,24 @@ txn_process_trigger(struct rlist *trigger, struct txn *txn)
static void
txn_complete(struct txn *txn)
{
+ /*
+ * Note, engine can be NULL if transaction contains
+ * IPROTO_NOP statements only.
+ */
if (txn->signature < 0) {
/* Undo the transaction. */
if (txn->engine)
engine_rollback(txn->engine, txn);
if (txn->has_triggers)
- txn_process_trigger(&txn->on_rollback, txn);
-
+ txn_run_triggers(txn, &txn->on_rollback);
} else {
- /* Accept the transaction. */
- /*
- * Engine can be NULL if transaction contains IPROTO_NOP
- * statements only.
- */
+ /* Commit the transaction. */
if (txn->engine != NULL)
engine_commit(txn->engine, txn);
if (txn->has_triggers)
- txn_process_trigger(&txn->on_commit, txn);
- ev_tstamp stop_tm = ev_monotonic_now(loop());
+ txn_run_triggers(txn, &txn->on_commit);
+
+ double stop_tm = ev_monotonic_now(loop());
if (stop_tm - txn->start_tm > too_long_threshold) {
int n_rows = txn->n_new_rows + txn->n_applier_rows;
say_warn_ratelimited("too long WAL write: %d rows at "
@@ -404,7 +403,7 @@ txn_complete(struct txn *txn)
if (txn->fiber == NULL)
txn_free(txn);
else {
- txn->done = true;
+ txn->is_done = true;
if (txn->fiber != fiber())
/* Wake a waiting fiber up. */
fiber_wakeup(txn->fiber);
@@ -414,8 +413,7 @@ txn_complete(struct txn *txn)
static void
txn_entry_done_cb(struct journal_entry *entry, void *data)
{
- struct txn *txn = (struct txn *)data;
- assert(txn->entry == entry);
+ struct txn *txn = data;
txn->signature = entry->res;
txn_complete(txn);
}
@@ -423,22 +421,22 @@ txn_entry_done_cb(struct journal_entry *entry, void *data)
static int64_t
txn_write_to_wal(struct txn *txn)
{
- assert(txn->entry == NULL);
assert(txn->n_new_rows + txn->n_applier_rows > 0);
/* Prepare a journal entry. */
- txn->entry = journal_entry_new(txn->n_new_rows +
- txn->n_applier_rows,
- &txn->region,
- txn_entry_done_cb, txn);
- if (txn->entry == NULL) {
+ struct journal_entry *req = journal_entry_new(txn->n_new_rows +
+ txn->n_applier_rows,
+ &txn->region,
+ txn_entry_done_cb,
+ txn);
+ if (req == NULL) {
txn_rollback(txn);
return -1;
}
struct txn_stmt *stmt;
- struct xrow_header **remote_row = txn->entry->rows;
- struct xrow_header **local_row = txn->entry->rows + txn->n_applier_rows;
+ struct xrow_header **remote_row = req->rows;
+ struct xrow_header **local_row = req->rows + txn->n_applier_rows;
stailq_foreach_entry(stmt, &txn->stmts, next) {
if (stmt->row == NULL)
continue; /* A read (e.g. select) request */
@@ -446,13 +444,13 @@ txn_write_to_wal(struct txn *txn)
*local_row++ = stmt->row;
else
*remote_row++ = stmt->row;
- txn->entry->approx_len += xrow_approx_len(stmt->row);
+ req->approx_len += xrow_approx_len(stmt->row);
}
- assert(remote_row == txn->entry->rows + txn->n_applier_rows);
+ assert(remote_row == req->rows + txn->n_applier_rows);
assert(local_row == remote_row + txn->n_new_rows);
- /* Send entry to a journal. */
- if (journal_write(txn->entry) < 0) {
+ /* Send the entry to the journal. */
+ if (journal_write(req) < 0) {
diag_set(ClientError, ER_WAL_IO);
diag_log();
return -1;
@@ -483,17 +481,13 @@ txn_prepare(struct txn *txn)
* we have a bunch of IPROTO_NOP statements.
*/
if (txn->engine != NULL) {
- if (engine_prepare(txn->engine, txn) != 0) {
+ if (engine_prepare(txn->engine, txn) != 0)
return -1;
- }
}
trigger_clear(&txn->fiber_on_stop);
return 0;
}
-/*
- * Send a transaction to a journal.
- */
int
txn_write(struct txn *txn)
{
@@ -503,9 +497,9 @@ txn_write(struct txn *txn)
}
/*
- * After this transaction could not be used more
- * so reset corresponding key in a fiber storage.
- */
+ * After this point the transaction must not be used
+ * so reset the corresponding key in the fiber storage.
+ */
fiber_set_txn(fiber(), NULL);
txn->start_tm = ev_monotonic_now(loop());
if (txn->n_new_rows + txn->n_applier_rows == 0) {
@@ -514,16 +508,7 @@ txn_write(struct txn *txn)
txn_complete(txn);
return 0;
}
-
- if (txn_write_to_wal(txn) != 0) {
- /*
- * After journal write the transaction would be finalized
- * with its journal entry finalization callback,
- * just return an error.
- */
- return -1;
- }
- return 0;
+ return txn_write_to_wal(txn);
}
int
@@ -537,15 +522,16 @@ txn_commit(struct txn *txn)
* In case of non-yielding journal the transaction could already
* be done and there is nothing to wait in such cases.
*/
- if (!txn->done) {
+ if (!txn->is_done) {
bool cancellable = fiber_set_cancellable(false);
fiber_yield();
fiber_set_cancellable(cancellable);
}
- int res = txn->signature >= 0? 0: -1;
+ int res = txn->signature >= 0 ? 0 : -1;
if (res != 0)
diag_set(ClientError, ER_WAL_IO);
- /* As the current fiber is waiting for the transaction so free it. */
+
+ /* Synchronous transactions are freed by the calling fiber. */
txn_free(txn);
return res;
}
diff --git a/src/box/txn.h b/src/box/txn.h
index ddcac3bb..a19becce 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -162,6 +162,8 @@ struct txn {
* already assigned LSN.
*/
int n_applier_rows;
+ /* True when transaction is processed. */
+ bool is_done;
/**
* True if the transaction was aborted so should be
* rolled back at commit.
@@ -182,6 +184,10 @@ struct txn {
struct engine *engine;
/** Engine-specific transaction data */
void *engine_tx;
+ /* A fiber to wake up when transaction is finished. */
+ struct fiber *fiber;
+ /** Timestampt of entry write start. */
+ double start_tm;
/**
* Triggers on fiber yield to abort transaction for
* for in-memory engine.
@@ -195,16 +201,6 @@ struct txn {
/** Commit and rollback triggers */
struct rlist on_commit, on_rollback;
struct sql_txn *psql_txn;
- /** Journal entry to control txn write. */
- struct journal_entry *entry;
- /** Transaction completion trigger. */
- struct trigger entry_done;
- /** Timestampt of entry write start. */
- ev_tstamp start_tm;
- /* A fiber to wake up when transaction is finished. */
- struct fiber *fiber;
- /* True when transaction is processed. */
- bool done;
};
/* Pointer to the current transaction (if any) */
@@ -238,12 +234,21 @@ txn_commit(struct txn *txn);
void
txn_rollback(struct txn *txn);
+/**
+ * Submit a transaction to the journal.
+ * @pre txn == in_txn()
+ *
+ * On success 0 is returned, and the transaction will be freed upon
+ * journal write completion. Note, the journal write may still fail.
+ * To track transaction status, one is supposed to use on_commit and
+ * on_rollback triggers.
+ *
+ * On failure -1 is returned and the transaction is rolled back and
+ * freed.
+ */
int
txn_write(struct txn *txn);
-int
-txn_wait(struct txn *txn);
-
/**
* Roll back the transaction but keep the object around.
* A special case for memtx transaction abort on yield. In this
diff --git a/src/box/wal.c b/src/box/wal.c
index dce5fee6..6f5d0a58 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -245,17 +245,19 @@ xlog_write_entry(struct xlog *l, struct journal_entry *entry)
return xlog_tx_commit(l);
}
+/**
+ * Invoke completion callbacks of journal entries to be
+ * completed. Callbacks are invoked in strict fifo order:
+ * this ensures that, in case of rollback, requests are
+ * rolled back in strict reverse order, producing
+ * a consistent database state.
+ */
static void
tx_schedule_queue(struct stailq *queue)
{
- /*
- * fiber_wakeup() is faster than fiber_call() when there
- * are many ready fibers.
- */
struct journal_entry *req, *tmp;
- stailq_foreach_entry_safe(req, tmp, queue, fifo) {
+ stailq_foreach_entry_safe(req, tmp, queue, fifo)
journal_entry_complete(req);
- }
}
/**
@@ -1189,7 +1191,7 @@ wal_write_in_wal_mode_none(struct journal *journal,
vclock_copy(&replicaset.vclock, &writer->vclock);
entry->res = vclock_sum(&writer->vclock);
journal_entry_complete(entry);
- return entry->res;
+ return 0;
}
void
^ permalink raw reply [flat|nested] 9+ messages in thread