* [tarantool-patches] [PATCH v2 1/8] Encode a dml statement to a transaction memory region
2019-05-23 8:19 [tarantool-patches] [PATCH v2 0/8] Make transaction autonomous from a fiber internals Georgy Kirichenko
@ 2019-05-23 8:19 ` Georgy Kirichenko
2019-05-28 1:21 ` [tarantool-patches] " Kirill Yukhin
2019-05-23 8:19 ` [tarantool-patches] [PATCH v2 2/8] Get rid of autocommit from a txn structure Georgy Kirichenko
` (6 subsequent siblings)
7 siblings, 1 reply; 21+ messages in thread
From: Georgy Kirichenko @ 2019-05-23 8:19 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
Encode all statements to be written out to wal onto a transaction
memory region. This relaxes a relation between transaction and fiber
state and required for an autonomous transaction feature.
Prerequisites: #1254
---
src/box/request.c | 2 +-
src/box/txn.c | 27 ++++++++++++++-------------
src/box/vy_log.c | 2 +-
src/box/vy_stmt.c | 4 ++--
src/box/xrow.c | 14 ++++++++------
src/box/xrow.h | 12 +++++++++---
6 files changed, 35 insertions(+), 26 deletions(-)
diff --git a/src/box/request.c b/src/box/request.c
index 041a8d548..e2a98fdf9 100644
--- a/src/box/request.c
+++ b/src/box/request.c
@@ -61,7 +61,7 @@ request_update_header(struct request *request, struct xrow_header *row)
if (row == NULL)
return 0;
row->type = request->type;
- row->bodycnt = xrow_encode_dml(request, row->body);
+ row->bodycnt = xrow_encode_dml(request, &fiber()->gc, row->body);
if (row->bodycnt < 0)
return -1;
request->header = row;
diff --git a/src/box/txn.c b/src/box/txn.c
index 4182d5791..da749d7cc 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -49,11 +49,7 @@ fiber_set_txn(struct fiber *fiber, struct txn *txn)
static int
txn_add_redo(struct txn *txn, struct txn_stmt *stmt, struct request *request)
{
- stmt->row = request->header;
- if (request->header != NULL)
- return 0;
-
- /* Create a redo log row for Lua requests */
+ /* Create a redo log row. */
struct xrow_header *row;
row = region_alloc_object(&txn->region, struct xrow_header);
if (row == NULL) {
@@ -61,14 +57,19 @@ txn_add_redo(struct txn *txn, struct txn_stmt *stmt, struct request *request)
"region", "struct xrow_header");
return -1;
}
- /* Initialize members explicitly to save time on memset() */
- row->type = request->type;
- row->replica_id = 0;
- row->group_id = stmt->space != NULL ? space_group_id(stmt->space) : 0;
- row->lsn = 0;
- row->sync = 0;
- row->tm = 0;
- row->bodycnt = xrow_encode_dml(request, row->body);
+ if (request->header != NULL) {
+ *row = *request->header;
+ } else {
+ /* Initialize members explicitly to save time on memset() */
+ row->type = request->type;
+ row->replica_id = 0;
+ row->group_id = stmt->space != NULL ?
+ space_group_id(stmt->space) : 0;
+ row->lsn = 0;
+ row->sync = 0;
+ row->tm = 0;
+ }
+ row->bodycnt = xrow_encode_dml(request, &txn->region, row->body);
if (row->bodycnt < 0)
return -1;
stmt->row = row;
diff --git a/src/box/vy_log.c b/src/box/vy_log.c
index edd61b33a..bdc1cfa31 100644
--- a/src/box/vy_log.c
+++ b/src/box/vy_log.c
@@ -520,7 +520,7 @@ vy_log_record_encode(const struct vy_log_record *record,
req.tuple_end = pos;
memset(row, 0, sizeof(*row));
row->type = req.type;
- row->bodycnt = xrow_encode_dml(&req, row->body);
+ row->bodycnt = xrow_encode_dml(&req, &fiber()->gc, row->body);
return 0;
}
diff --git a/src/box/vy_stmt.c b/src/box/vy_stmt.c
index 9e20b6bfb..f936cd61f 100644
--- a/src/box/vy_stmt.c
+++ b/src/box/vy_stmt.c
@@ -653,7 +653,7 @@ vy_stmt_encode_primary(struct tuple *value, struct key_def *key_def,
}
if (vy_stmt_meta_encode(value, &request, true) != 0)
return -1;
- xrow->bodycnt = xrow_encode_dml(&request, xrow->body);
+ xrow->bodycnt = xrow_encode_dml(&request, &fiber()->gc, xrow->body);
if (xrow->bodycnt < 0)
return -1;
return 0;
@@ -688,7 +688,7 @@ vy_stmt_encode_secondary(struct tuple *value, struct key_def *cmp_def,
}
if (vy_stmt_meta_encode(value, &request, false) != 0)
return -1;
- xrow->bodycnt = xrow_encode_dml(&request, xrow->body);
+ xrow->bodycnt = xrow_encode_dml(&request, &fiber()->gc, xrow->body);
if (xrow->bodycnt < 0)
return -1;
else
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 886fb57fe..0ae5271c1 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -752,15 +752,18 @@ request_str(const struct request *request)
}
int
-xrow_encode_dml(const struct request *request, struct iovec *iov)
+xrow_encode_dml(const struct request *request, struct region *region,
+ struct iovec *iov)
{
int iovcnt = 1;
const int MAP_LEN_MAX = 40;
uint32_t key_len = request->key_end - request->key;
uint32_t ops_len = request->ops_end - request->ops;
uint32_t tuple_meta_len = request->tuple_meta_end - request->tuple_meta;
- uint32_t len = MAP_LEN_MAX + key_len + ops_len + tuple_meta_len;
- char *begin = (char *) region_alloc(&fiber()->gc, len);
+ uint32_t tuple_len = request->tuple_end - request->tuple;
+ uint32_t len = MAP_LEN_MAX + key_len + ops_len + tuple_meta_len +
+ tuple_len;
+ char *begin = (char *) region_alloc(region, len);
if (begin == NULL) {
diag_set(OutOfMemory, len, "region_alloc", "begin");
return -1;
@@ -802,9 +805,8 @@ xrow_encode_dml(const struct request *request, struct iovec *iov)
}
if (request->tuple) {
pos = mp_encode_uint(pos, IPROTO_TUPLE);
- iov[iovcnt].iov_base = (void *) request->tuple;
- iov[iovcnt].iov_len = (request->tuple_end - request->tuple);
- iovcnt++;
+ memcpy(pos, request->tuple, tuple_len);
+ pos += tuple_len;
map_size++;
}
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 3a8cba191..35ec06dc0 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -54,6 +54,8 @@ enum {
IPROTO_SELECT_HEADER_LEN = IPROTO_HEADER_LEN + 7,
};
+struct region;
+
struct xrow_header {
/* (!) Please update txn_add_redo() after changing members */
@@ -195,12 +197,15 @@ xrow_decode_dml(struct xrow_header *xrow, struct request *request,
/**
* Encode the request fields to iovec using region_alloc().
* @param request request to encode
+ * @param region region to encode
+ * @param copy_tuple if true then tuple is going to be copied to the region
* @param iov[out] iovec to fill
* @retval -1 on error, see diag
* @retval > 0 the number of iovecs used
*/
int
-xrow_encode_dml(const struct request *request, struct iovec *iov);
+xrow_encode_dml(const struct request *request, struct region *region,
+ struct iovec *iov);
/**
* CALL/EVAL request.
@@ -713,9 +718,10 @@ xrow_decode_dml_xc(struct xrow_header *row, struct request *request,
/** @copydoc xrow_encode_dml. */
static inline int
-xrow_encode_dml_xc(const struct request *request, struct iovec *iov)
+xrow_encode_dml_xc(const struct request *request, struct region *region,
+ struct iovec *iov)
{
- int iovcnt = xrow_encode_dml(request, iov);
+ int iovcnt = xrow_encode_dml(request, region, iov);
if (iovcnt < 0)
diag_raise();
return iovcnt;
--
2.21.0
^ permalink raw reply [flat|nested] 21+ messages in thread
* [tarantool-patches] [PATCH v2 2/8] Get rid of autocommit from a txn structure
2019-05-23 8:19 [tarantool-patches] [PATCH v2 0/8] Make transaction autonomous from a fiber internals Georgy Kirichenko
2019-05-23 8:19 ` [tarantool-patches] [PATCH v2 1/8] Encode a dml statement to a transaction memory region Georgy Kirichenko
@ 2019-05-23 8:19 ` Georgy Kirichenko
2019-05-27 20:51 ` [tarantool-patches] " Konstantin Osipov
2019-05-31 19:21 ` Konstantin Osipov
2019-05-23 8:19 ` [tarantool-patches] [PATCH v2 3/8] Get rid of fiber_gc from txn_rollback Georgy Kirichenko
` (5 subsequent siblings)
7 siblings, 2 replies; 21+ messages in thread
From: Georgy Kirichenko @ 2019-05-23 8:19 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
Move transaction auto start and auto commit behavior to the box level.
From now a transaction won't start and commit automatically without
txn_begin/txn_commit invocations. This is a part of a bigger transaction
refactoring in order to implement detachable transactions and a parallel
applier.
Prerequisites: #1254
---
src/box/applier.cc | 31 +++++++++++---
src/box/box.cc | 91 ++++++++++++++++++++++++++++++++----------
src/box/index.cc | 10 ++---
src/box/journal.h | 1 +
src/box/memtx_engine.c | 10 ++++-
src/box/memtx_space.c | 8 ++--
src/box/sql.c | 2 +-
src/box/txn.c | 47 ++++++++--------------
src/box/txn.h | 16 +++-----
src/box/vinyl.c | 12 ++----
src/box/vy_scheduler.c | 6 +--
src/box/wal.c | 1 -
12 files changed, 142 insertions(+), 93 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 373e1feb9..9d2989094 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -172,11 +172,22 @@ applier_writer_f(va_list ap)
static int
apply_initial_join_row(struct xrow_header *row)
{
+ struct txn *txn = txn_begin();
+ if (txn == NULL)
+ return -1;
struct request request;
xrow_decode_dml(row, &request, dml_request_key_map(row->type));
- struct space *space = space_cache_find_xc(request.space_id);
+ struct space *space = space_cache_find(request.space_id);
+ if (space == NULL)
+ return -1;
/* no access checks here - applier always works with admin privs */
- return space_apply_initial_join_row(space, &request);
+ if (space_apply_initial_join_row(space, &request)) {
+ txn_rollback();
+ return -1;
+ }
+ int rc = txn_commit(txn);
+ fiber_gc();
+ return rc;
}
/**
@@ -189,8 +200,8 @@ static int
process_nop(struct request *request)
{
assert(request->type == IPROTO_NOP);
- struct txn *txn = txn_begin_stmt(NULL);
- if (txn == NULL)
+ struct txn *txn = in_txn();
+ if (txn_begin_stmt(txn, NULL) == NULL)
return -1;
return txn_commit_stmt(txn, request);
}
@@ -403,8 +414,16 @@ applier_join(struct applier *applier)
applier->last_row_time = ev_monotonic_now(loop());
if (iproto_type_is_dml(row.type)) {
vclock_follow_xrow(&replicaset.vclock, &row);
- if (apply_row(&row) != 0)
+ struct txn *txn = txn_begin();
+ if (txn == NULL)
+ diag_raise();
+ if (apply_row(&row) != 0) {
+ txn_rollback();
+ diag_raise();
+ }
+ if (txn_commit(txn) != 0)
diag_raise();
+ fiber_gc();
if (++row_count % 100000 == 0)
say_info("%.1fM rows received", row_count / 1e6);
} else if (row.type == IPROTO_OK) {
@@ -555,7 +574,7 @@ applier_apply_tx(struct stailq *rows)
* conflict safely access failed xrow object and allocate
* IPROTO_NOP on gc.
*/
- struct txn *txn = txn_begin(false);
+ struct txn *txn = txn_begin();
struct applier_tx_row *item;
if (txn == NULL)
diag_raise();
diff --git a/src/box/box.cc b/src/box/box.cc
index 57419ee01..2a738fa36 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -169,34 +169,65 @@ int
box_process_rw(struct request *request, struct space *space,
struct tuple **result)
{
+ struct tuple *tuple = NULL;
+ struct txn *txn = in_txn();
+ bool is_autocommit = txn == NULL;
+ if (is_autocommit && (txn = txn_begin()) == NULL)
+ return -1;
assert(iproto_type_is_dml(request->type));
rmean_collect(rmean_box, request->type, 1);
if (access_check_space(space, PRIV_W) != 0)
- return -1;
- struct txn *txn = txn_begin_stmt(space);
- if (txn == NULL)
- return -1;
- struct tuple *tuple;
+ goto fail;
+ if (txn_begin_stmt(txn, space) == NULL)
+ goto fail;
if (space_execute_dml(space, txn, request, &tuple) != 0) {
- txn_rollback_stmt();
- return -1;
+ txn_rollback_stmt(txn);
+ goto fail;
}
- if (result == NULL)
- return txn_commit_stmt(txn, request);
- *result = tuple;
- if (tuple == NULL)
- return txn_commit_stmt(txn, request);
/*
* Pin the tuple locally before the commit,
* otherwise it may go away during yield in
* when WAL is written in autocommit mode.
*/
+ if (result != NULL)
+ *result = tuple;
+
+ if (result == NULL || tuple == NULL) {
+ if (!is_autocommit)
+ return txn_commit_stmt(txn, request);
+ /* Autocommit mode. */
+ if (txn_commit_stmt(txn, request) != 0) {
+ txn_rollback();
+ return -1;
+ }
+ if (txn_commit(txn) != 0)
+ return -1;
+ fiber_gc();
+ return 0;
+ }
tuple_ref(tuple);
- int rc = txn_commit_stmt(txn, request);
- if (rc == 0)
- tuple_bless(tuple);
+
+ if (txn_commit_stmt(txn, request)) {
+ /* Unref tuple and rollback if autocommit. */
+ tuple_unref(tuple);
+ goto fail;
+ }
+ if (is_autocommit) {
+ if (txn_commit(txn) != 0) {
+ /* Unref tuple and exit. */
+ tuple_unref(tuple);
+ return -1;
+ }
+ fiber_gc();
+ }
+ tuple_bless(tuple);
tuple_unref(tuple);
- return rc;
+ return 0;
+
+fail:
+ if (is_autocommit)
+ txn_rollback();
+ return -1;
}
void
@@ -299,8 +330,12 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
if (request.type != IPROTO_NOP) {
struct space *space = space_cache_find_xc(request.space_id);
- if (box_process_rw(&request, space, NULL) != 0) {
+ struct txn *txn = txn_begin();
+ if (txn == NULL || box_process_rw(&request, space, NULL) != 0 ||
+ txn_commit(txn) != 0) {
say_error("error applying row: %s", request_str(&request));
+ if (txn != NULL)
+ txn_rollback();
diag_raise();
}
}
@@ -1055,7 +1090,7 @@ box_select(uint32_t space_id, uint32_t index_id,
struct iterator *it = index_create_iterator(index, type,
key, part_count);
if (it == NULL) {
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
@@ -1080,7 +1115,7 @@ box_select(uint32_t space_id, uint32_t index_id,
if (rc != 0) {
port_destroy(port);
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
txn_commit_ro_stmt(txn);
@@ -1313,8 +1348,15 @@ box_sequence_reset(uint32_t seq_id)
static inline void
box_register_replica(uint32_t id, const struct tt_uuid *uuid)
{
+ struct txn *txn = txn_begin();
+ if (txn == NULL)
+ diag_raise();
if (boxk(IPROTO_INSERT, BOX_CLUSTER_ID, "[%u%s]",
- (unsigned) id, tt_uuid_str(uuid)) != 0)
+ (unsigned) id, tt_uuid_str(uuid)) != 0) {
+ txn_rollback();
+ diag_raise();
+ }
+ if (txn_commit(txn) != 0)
diag_raise();
assert(replica_by_uuid(uuid)->id == id);
}
@@ -1636,9 +1678,16 @@ box_set_replicaset_uuid(const struct tt_uuid *replicaset_uuid)
uu = *replicaset_uuid;
else
tt_uuid_create(&uu);
+ struct txn *txn = txn_begin();
+ if (txn == NULL)
+ diag_raise();
/* Save replica set UUID in _schema */
if (boxk(IPROTO_INSERT, BOX_SCHEMA_ID, "[%s%s]", "cluster",
- tt_uuid_str(&uu)))
+ tt_uuid_str(&uu))) {
+ txn_rollback();
+ diag_raise();
+ }
+ if (txn_commit(txn) != 0)
diag_raise();
}
diff --git a/src/box/index.cc b/src/box/index.cc
index 4a444e5d0..7f26c9bc2 100644
--- a/src/box/index.cc
+++ b/src/box/index.cc
@@ -240,7 +240,7 @@ box_index_get(uint32_t space_id, uint32_t index_id, const char *key,
if (txn_begin_ro_stmt(space, &txn) != 0)
return -1;
if (index_get(index, key, part_count, result) != 0) {
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
txn_commit_ro_stmt(txn);
@@ -274,7 +274,7 @@ box_index_min(uint32_t space_id, uint32_t index_id, const char *key,
if (txn_begin_ro_stmt(space, &txn) != 0)
return -1;
if (index_min(index, key, part_count, result) != 0) {
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
txn_commit_ro_stmt(txn);
@@ -306,7 +306,7 @@ box_index_max(uint32_t space_id, uint32_t index_id, const char *key,
if (txn_begin_ro_stmt(space, &txn) != 0)
return -1;
if (index_max(index, key, part_count, result) != 0) {
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
txn_commit_ro_stmt(txn);
@@ -340,7 +340,7 @@ box_index_count(uint32_t space_id, uint32_t index_id, int type,
return -1;
ssize_t count = index_count(index, itype, key, part_count);
if (count < 0) {
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
txn_commit_ro_stmt(txn);
@@ -377,7 +377,7 @@ box_index_iterator(uint32_t space_id, uint32_t index_id, int type,
struct iterator *it = index_create_iterator(index, itype,
key, part_count);
if (it == NULL) {
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return NULL;
}
txn_commit_ro_stmt(txn);
diff --git a/src/box/journal.h b/src/box/journal.h
index 8ac32ee5e..77d3073c8 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -32,6 +32,7 @@
*/
#include <stdint.h>
#include <stdbool.h>
+#include "trigger.h"
#include "salad/stailq.h"
#if defined(__cplusplus)
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index f4312484a..149215b87 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -272,16 +272,22 @@ memtx_engine_recover_snapshot_row(struct memtx_engine *memtx,
diag_set(ClientError, ER_CROSS_ENGINE_TRANSACTION);
return -1;
}
+ struct txn *txn = txn_begin();
+ if (txn == NULL)
+ return -1;
/* no access checks here - applier always works with admin privs */
- if (space_apply_initial_join_row(space, &request) != 0)
+ if (space_apply_initial_join_row(space, &request) != 0) {
+ txn_rollback();
return -1;
+ }
+ int rc = txn_commit(txn);
/*
* Don't let gc pool grow too much. Yet to
* it before reading the next row, to make
* sure it's not freed along here.
*/
fiber_gc();
- return 0;
+ return rc;
}
/** Called at start to tell memtx to recover to a given LSN. */
diff --git a/src/box/memtx_space.c b/src/box/memtx_space.c
index 5ddb4f7ee..829ab7f4c 100644
--- a/src/box/memtx_space.c
+++ b/src/box/memtx_space.c
@@ -310,10 +310,10 @@ memtx_space_apply_initial_join_row(struct space *space, struct request *request)
return -1;
}
request->header->replica_id = 0;
- struct txn *txn = txn_begin_stmt(space);
- if (txn == NULL)
+ struct txn *txn = in_txn();
+ struct txn_stmt *stmt = txn_begin_stmt(txn, space);
+ if (stmt == NULL)
return -1;
- struct txn_stmt *stmt = txn_current_stmt(txn);
stmt->new_tuple = memtx_tuple_new(space->format, request->tuple,
request->tuple_end);
if (stmt->new_tuple == NULL)
@@ -326,7 +326,7 @@ memtx_space_apply_initial_join_row(struct space *space, struct request *request)
rollback:
say_error("rollback: %s", diag_last_error(diag_get())->errmsg);
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
diff --git a/src/box/sql.c b/src/box/sql.c
index fbfa59992..1bdbdb815 100644
--- a/src/box/sql.c
+++ b/src/box/sql.c
@@ -915,7 +915,7 @@ cursor_seek(BtCursor *pCur, int *pRes)
part_count);
if (it == NULL) {
if (txn != NULL)
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
pCur->eState = CURSOR_INVALID;
return SQL_TARANTOOL_ITERATOR_FAIL;
}
diff --git a/src/box/txn.c b/src/box/txn.c
index da749d7cc..87fd8b3bc 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -174,7 +174,7 @@ txn_free(struct txn *txn)
}
struct txn *
-txn_begin(bool is_autocommit)
+txn_begin()
{
static int64_t tsn = 0;
assert(! in_txn());
@@ -187,7 +187,6 @@ txn_begin(bool is_autocommit)
txn->n_new_rows = 0;
txn->n_local_rows = 0;
txn->n_applier_rows = 0;
- txn->is_autocommit = is_autocommit;
txn->has_triggers = false;
txn->is_aborted = false;
txn->in_sub_stmt = 0;
@@ -220,26 +219,20 @@ txn_begin_in_engine(struct engine *engine, struct txn *txn)
return 0;
}
-struct txn *
-txn_begin_stmt(struct space *space)
+struct txn_stmt *
+txn_begin_stmt(struct txn *txn, struct space *space)
{
- struct txn *txn = in_txn();
- if (txn == NULL) {
- txn = txn_begin(true);
- if (txn == NULL)
- return NULL;
- } else if (txn->in_sub_stmt > TXN_SUB_STMT_MAX) {
+ assert(txn == in_txn());
+ assert(txn != NULL);
+ if (txn->in_sub_stmt > TXN_SUB_STMT_MAX) {
diag_set(ClientError, ER_SUB_STMT_MAX);
return NULL;
}
struct txn_stmt *stmt = txn_stmt_new(txn);
- if (stmt == NULL) {
- if (txn->is_autocommit && txn->in_sub_stmt == 0)
- txn_rollback();
+ if (stmt == NULL)
return NULL;
- }
if (space == NULL)
- return txn;
+ return stmt;
if (trigger_run(&space->on_stmt_begin, txn) != 0)
goto fail;
@@ -252,9 +245,9 @@ txn_begin_stmt(struct space *space)
if (engine_begin_statement(engine, txn) != 0)
goto fail;
- return txn;
+ return stmt;
fail:
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return NULL;
}
@@ -272,8 +265,7 @@ txn_is_distributed(struct txn *txn)
}
/**
- * End a statement. In autocommit mode, end
- * the current transaction as well.
+ * End a statement.
*/
int
txn_commit_stmt(struct txn *txn, struct request *request)
@@ -316,14 +308,9 @@ txn_commit_stmt(struct txn *txn, struct request *request)
goto fail;
}
--txn->in_sub_stmt;
- if (txn->is_autocommit && txn->in_sub_stmt == 0) {
- int rc = txn_commit(txn);
- fiber_gc();
- return rc;
- }
return 0;
fail:
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
@@ -359,7 +346,7 @@ txn_write_to_wal(struct txn *txn)
if (res < 0) {
/* Cascading rollback. */
- txn_rollback(); /* Perform our part of cascading rollback. */
+ txn_rollback(txn); /* Perform our part of cascading rollback. */
/*
* Move fiber to end of event loop to avoid
* execution of any new requests before all
@@ -441,14 +428,11 @@ fail:
}
void
-txn_rollback_stmt()
+txn_rollback_stmt(struct txn *txn)
{
- struct txn *txn = in_txn();
if (txn == NULL || txn->in_sub_stmt == 0)
return;
txn->in_sub_stmt--;
- if (txn->is_autocommit && txn->in_sub_stmt == 0)
- return txn_rollback();
txn_rollback_to_svp(txn, txn->sub_stmt_begin[txn->in_sub_stmt]);
}
@@ -465,6 +449,7 @@ txn_rollback()
unreachable();
panic("rollback trigger failed");
}
+
if (txn->engine)
engine_rollback(txn->engine, txn);
@@ -519,7 +504,7 @@ box_txn_begin()
diag_set(ClientError, ER_ACTIVE_TRANSACTION);
return -1;
}
- if (txn_begin(false) == NULL)
+ if (txn_begin() == NULL)
return -1;
return 0;
}
diff --git a/src/box/txn.h b/src/box/txn.h
index f4d861824..5a66f8e53 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -162,11 +162,6 @@ struct txn {
* already assigned LSN.
*/
int n_applier_rows;
- /**
- * True if this transaction is running in autocommit mode
- * (statement end causes an automatic transaction commit).
- */
- bool is_autocommit;
/**
* True if the transaction was aborted so should be
* rolled back at commit.
@@ -214,7 +209,7 @@ in_txn()
* @pre no transaction is active
*/
struct txn *
-txn_begin(bool is_autocommit);
+txn_begin();
/**
* Commit a transaction.
@@ -271,11 +266,10 @@ txn_on_rollback(struct txn *txn, struct trigger *trigger)
}
/**
- * Start a new statement. If no current transaction,
- * start a new transaction with autocommit = true.
+ * Start a new statement.
*/
-struct txn *
-txn_begin_stmt(struct space *space);
+struct txn_stmt *
+txn_begin_stmt(struct txn *txn, struct space *space);
int
txn_begin_in_engine(struct engine *engine, struct txn *txn);
@@ -334,7 +328,7 @@ txn_commit_stmt(struct txn *txn, struct request *request);
* rolls back the entire transaction.
*/
void
-txn_rollback_stmt();
+txn_rollback_stmt(struct txn *txn);
/**
* Raise an error if this is a multi-statement transaction: DDL
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index 9372e5f7f..2abfab366 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -2415,10 +2415,8 @@ vinyl_engine_begin(struct engine *engine, struct txn *txn)
txn->engine_tx = vy_tx_begin(env->xm);
if (txn->engine_tx == NULL)
return -1;
- if (!txn->is_autocommit) {
- trigger_create(&txn->fiber_on_stop, txn_on_stop, NULL, NULL);
- trigger_add(&fiber()->on_stop, &txn->fiber_on_stop);
- }
+ trigger_create(&txn->fiber_on_stop, txn_on_stop, NULL, NULL);
+ trigger_add(&fiber()->on_stop, &txn->fiber_on_stop);
return 0;
}
@@ -2488,8 +2486,7 @@ vinyl_engine_commit(struct engine *engine, struct txn *txn)
vy_regulator_check_dump_watermark(&env->regulator);
txn->engine_tx = NULL;
- if (!txn->is_autocommit)
- trigger_clear(&txn->fiber_on_stop);
+ trigger_clear(&txn->fiber_on_stop);
}
static void
@@ -2503,8 +2500,7 @@ vinyl_engine_rollback(struct engine *engine, struct txn *txn)
vy_tx_rollback(tx);
txn->engine_tx = NULL;
- if (!txn->is_autocommit)
- trigger_clear(&txn->fiber_on_stop);
+ trigger_clear(&txn->fiber_on_stop);
}
static int
diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c
index fabb4bb48..9d5f18e32 100644
--- a/src/box/vy_scheduler.c
+++ b/src/box/vy_scheduler.c
@@ -840,14 +840,14 @@ vy_deferred_delete_process_one(struct space *deferred_delete_space,
tuple_unref(delete);
- struct txn *txn = txn_begin_stmt(deferred_delete_space);
- if (txn == NULL)
+ struct txn *txn = in_txn();
+ if (txn_begin_stmt(txn, deferred_delete_space) == NULL)
return -1;
struct tuple *unused;
if (space_execute_dml(deferred_delete_space, txn,
&request, &unused) != 0) {
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
return txn_commit_stmt(txn, &request);
diff --git a/src/box/wal.c b/src/box/wal.c
index 0ea15a432..628e7529a 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -303,7 +303,6 @@ tx_schedule_rollback(struct cmsg *msg)
* in-memory database state.
*/
stailq_reverse(&writer->rollback);
- /* Must not yield. */
tx_schedule_queue(&writer->rollback);
stailq_create(&writer->rollback);
if (msg != &writer->in_rollback)
--
2.21.0
^ permalink raw reply [flat|nested] 21+ messages in thread
* [tarantool-patches] Re: [PATCH v2 2/8] Get rid of autocommit from a txn structure
2019-05-23 8:19 ` [tarantool-patches] [PATCH v2 2/8] Get rid of autocommit from a txn structure Georgy Kirichenko
@ 2019-05-27 20:51 ` Konstantin Osipov
2019-05-31 19:21 ` Konstantin Osipov
1 sibling, 0 replies; 21+ messages in thread
From: Konstantin Osipov @ 2019-05-27 20:51 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
* Georgy Kirichenko <georgy@tarantool.org> [19/05/23 12:14]:
> Move transaction auto start and auto commit behavior to the box level.
> >From now a transaction won't start and commit automatically without
> txn_begin/txn_commit invocations. This is a part of a bigger transaction
> refactoring in order to implement detachable transactions and a parallel
> applier.
I started looking at this patch and I don't fully understand it.
- why do you sometimes call fiber_gc() after txn_commit() and
sometimes do not?
It used to be called before, and not called in some cases after
your change.
Here's the diff (it doesn't pass tests, which baffles me, could
you please explain?
---
src/box/applier.cc | 4 +++-
src/box/box.cc | 52 ++++++++++++++++++++++++++--------------------
src/box/journal.h | 1 -
3 files changed, 32 insertions(+), 25 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 9d2989094..75698fea7 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -178,8 +178,10 @@ apply_initial_join_row(struct xrow_header *row)
struct request request;
xrow_decode_dml(row, &request, dml_request_key_map(row->type));
struct space *space = space_cache_find(request.space_id);
- if (space == NULL)
+ if (space == NULL) {
+ txn_rollback();
return -1;
+ }
/* no access checks here - applier always works with admin privs */
if (space_apply_initial_join_row(space, &request)) {
txn_rollback();
diff --git a/src/box/box.cc b/src/box/box.cc
index 2a738fa36..a371af813 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -184,30 +184,27 @@ box_process_rw(struct request *request, struct space *space,
txn_rollback_stmt(txn);
goto fail;
}
- /*
- * Pin the tuple locally before the commit,
- * otherwise it may go away during yield in
- * when WAL is written in autocommit mode.
- */
if (result != NULL)
*result = tuple;
if (result == NULL || tuple == NULL) {
- if (!is_autocommit)
- return txn_commit_stmt(txn, request);
- /* Autocommit mode. */
- if (txn_commit_stmt(txn, request) != 0) {
- txn_rollback();
- return -1;
+ if (txn_commit_stmt(txn, request) != 0)
+ goto fail;
+ if (is_autocommit) {
+ if (txn_commit(txn) != 0)
+ return -1;
+ fiber_gc();
}
- if (txn_commit(txn) != 0)
- return -1;
- fiber_gc();
return 0;
}
+ /*
+ * Pin the tuple locally before the commit,
+ * otherwise it may go away during yield in
+ * when WAL is written in autocommit mode.
+ */
tuple_ref(tuple);
- if (txn_commit_stmt(txn, request)) {
+ if (txn_commit_stmt(txn, request) != 0) {
/* Unref tuple and rollback if autocommit. */
tuple_unref(tuple);
goto fail;
@@ -328,25 +325,32 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
{
struct request request;
xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
+ struct wal_stream *xstream =
+ container_of(stream, struct wal_stream, base);
+
if (request.type != IPROTO_NOP) {
struct space *space = space_cache_find_xc(request.space_id);
struct txn *txn = txn_begin();
- if (txn == NULL || box_process_rw(&request, space, NULL) != 0 ||
- txn_commit(txn) != 0) {
- say_error("error applying row: %s", request_str(&request));
- if (txn != NULL)
- txn_rollback();
- diag_raise();
+ if (txn == NULL)
+ goto error;
+ if (box_process_rw(&request, space, NULL) != 0) {
+ txn_rollback();
+ goto error;
}
+ if (txn_commit(txn) != 0)
+ goto error;
+ fiber_gc();
}
- struct wal_stream *xstream =
- container_of(stream, struct wal_stream, base);
/**
* Yield once in a while, but not too often,
* mostly to allow signal handling to take place.
*/
if (++xstream->rows % xstream->yield == 0)
fiber_sleep(0);
+error:
+ say_error("error applying row: %s", request_str(&request));
+ diag_raise();
+
}
static void
@@ -1358,6 +1362,7 @@ box_register_replica(uint32_t id, const struct tt_uuid *uuid)
}
if (txn_commit(txn) != 0)
diag_raise();
+ fiber_gc();
assert(replica_by_uuid(uuid)->id == id);
}
@@ -1689,6 +1694,7 @@ box_set_replicaset_uuid(const struct tt_uuid *replicaset_uuid)
}
if (txn_commit(txn) != 0)
diag_raise();
+ fiber_gc();
}
void
diff --git a/src/box/journal.h b/src/box/journal.h
index 77d3073c8..8ac32ee5e 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -32,7 +32,6 @@
*/
#include <stdint.h>
#include <stdbool.h>
-#include "trigger.h"
#include "salad/stailq.h"
#if defined(__cplusplus)
--
Konstantin Osipov, Moscow, Russia
^ permalink raw reply [flat|nested] 21+ messages in thread
* [tarantool-patches] Re: [PATCH v2 2/8] Get rid of autocommit from a txn structure
2019-05-23 8:19 ` [tarantool-patches] [PATCH v2 2/8] Get rid of autocommit from a txn structure Georgy Kirichenko
2019-05-27 20:51 ` [tarantool-patches] " Konstantin Osipov
@ 2019-05-31 19:21 ` Konstantin Osipov
1 sibling, 0 replies; 21+ messages in thread
From: Konstantin Osipov @ 2019-05-31 19:21 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
* Georgy Kirichenko <georgy@tarantool.org> [19/05/23 12:14]:
apply_initial_join_row(struct xrow_header *row)
> {
> + struct txn *txn = txn_begin();
> + if (txn == NULL)
> + return -1;
> struct request request;
> xrow_decode_dml(row, &request, dml_request_key_map(row->type));
> - struct space *space = space_cache_find_xc(request.space_id);
> + struct space *space = space_cache_find(request.space_id);
> + if (space == NULL)
> + return -1;
Missing txn_rollback().
> /* no access checks here - applier always works with admin privs */
> - return space_apply_initial_join_row(space, &request);
> + if (space_apply_initial_join_row(space, &request)) {
> + txn_rollback();
> + return -1;
> + }
> + int rc = txn_commit(txn);
> + fiber_gc();
> + return rc;
> }
>
> /**
> @@ -189,8 +200,8 @@ static int
> box_process_rw(struct request *request, struct space *space,
> struct tuple **result)
> {
> + struct tuple *tuple = NULL;
> + struct txn *txn = in_txn();
> + bool is_autocommit = txn == NULL;
> + if (is_autocommit && (txn = txn_begin()) == NULL)
> + return -1;
> assert(iproto_type_is_dml(request->type));
> rmean_collect(rmean_box, request->type, 1);
> if (access_check_space(space, PRIV_W) != 0)
> - return -1;
> - struct txn *txn = txn_begin_stmt(space);
> - if (txn == NULL)
> - return -1;
> - struct tuple *tuple;
> + goto fail;
> + if (txn_begin_stmt(txn, space) == NULL)
> + goto fail;
> if (space_execute_dml(space, txn, request, &tuple) != 0) {
> - txn_rollback_stmt();
> - return -1;
> + txn_rollback_stmt(txn);
> + goto fail;
> }
> - if (result == NULL)
> - return txn_commit_stmt(txn, request);
> - *result = tuple;
> - if (tuple == NULL)
> - return txn_commit_stmt(txn, request);
> /*
> * Pin the tuple locally before the commit,
> * otherwise it may go away during yield in
> * when WAL is written in autocommit mode.
> */
Please move the comment to its place, where we actually pin the
tuple.
> + if (result != NULL)
> + *result = tuple;
> +
> + if (result == NULL || tuple == NULL) {
> + if (!is_autocommit)
> + return txn_commit_stmt(txn, request);
> + /* Autocommit mode. */
> + if (txn_commit_stmt(txn, request) != 0) {
> + txn_rollback();
> + return -1;
> + }
> + if (txn_commit(txn) != 0)
> + return -1;
> + fiber_gc();
> + return 0;
> + }
This code flow can be a bit more straightforward, I sent
a diff to the list in a separate email.
> tuple_ref(tuple);
> - int rc = txn_commit_stmt(txn, request);
> - if (rc == 0)
> - tuple_bless(tuple);
> +
> + if (txn_commit_stmt(txn, request)) {
> + /* Unref tuple and rollback if autocommit. */
> + tuple_unref(tuple);
> + goto fail;
> + }
> + if (is_autocommit) {
> + if (txn_commit(txn) != 0) {
> + /* Unref tuple and exit. */
> + tuple_unref(tuple);
> + return -1;
> + }
> + fiber_gc();
> + }
> + tuple_bless(tuple);
> tuple_unref(tuple);
> - return rc;
> + return 0;
> +
> +fail:
> + if (is_autocommit)
> + txn_rollback();
> + return -1;
> }
>
> void
> @@ -299,8 +330,12 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
> xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
> if (request.type != IPROTO_NOP) {
> struct space *space = space_cache_find_xc(request.space_id);
> - if (box_process_rw(&request, space, NULL) != 0) {
> + struct txn *txn = txn_begin();
> + if (txn == NULL || box_process_rw(&request, space, NULL) != 0 ||
> + txn_commit(txn) != 0) {
> say_error("error applying row: %s", request_str(&request));
> + if (txn != NULL)
> + txn_rollback();
Missing fiber_gc() after txn_commit()
> diag_raise();
> }
> }
> txn_commit_ro_stmt(txn);
> @@ -1313,8 +1348,15 @@ box_sequence_reset(uint32_t seq_id)
> static inline void
> box_register_replica(uint32_t id, const struct tt_uuid *uuid)
> {
> + struct txn *txn = txn_begin();
> + if (txn == NULL)
> + diag_raise();
> if (boxk(IPROTO_INSERT, BOX_CLUSTER_ID, "[%u%s]",
> - (unsigned) id, tt_uuid_str(uuid)) != 0)
> + (unsigned) id, tt_uuid_str(uuid)) != 0) {
> + txn_rollback();
> + diag_raise();
> + }
> + if (txn_commit(txn) != 0)
> diag_raise();
Missing fiber_gc() after txn_commit().
> assert(replica_by_uuid(uuid)->id == id);
> }
> @@ -1636,9 +1678,16 @@ box_set_replicaset_uuid(const struct tt_uuid *replicaset_uuid)
> uu = *replicaset_uuid;
> else
> tt_uuid_create(&uu);
> + struct txn *txn = txn_begin();
> + if (txn == NULL)
> + diag_raise();
> /* Save replica set UUID in _schema */
> if (boxk(IPROTO_INSERT, BOX_SCHEMA_ID, "[%s%s]", "cluster",
> - tt_uuid_str(&uu)))
> + tt_uuid_str(&uu))) {
> + txn_rollback();
> + diag_raise();
> + }
> + if (txn_commit(txn) != 0)
> diag_raise();
Missing fiber_gc().
> --- a/src/box/journal.h
> +++ b/src/box/journal.h
> @@ -32,6 +32,7 @@
> */
> #include <stdint.h>
> #include <stdbool.h>
> +#include "trigger.h"
> #include "salad/stailq.h"
Stray change, please remove.
>
> - if (!txn->is_autocommit) {
> - trigger_create(&txn->fiber_on_stop, txn_on_stop, NULL, NULL);
> - trigger_add(&fiber()->on_stop, &txn->fiber_on_stop);
> - }
> + trigger_create(&txn->fiber_on_stop, txn_on_stop, NULL, NULL);
> + trigger_add(&fiber()->on_stop, &txn->fiber_on_stop);
Come to think of it, I see no reason why txn->is_autocommit flag
is removed. We can pass is_autocommit property to txn, even though
it doesn't behave in a special way on autocommit. Or we could
rename the option to set_on_yield_on_stop.
Now the triggers are set and cleared even for trivial statements
using memtx. This could be a performance issue. Did you benchmark
your patch, it may introduce a yet another regression. Please ask
@avitikhon to help with benchmarks, if there is a slow down from
setting the triggers all the time, we can fix the patch to not do
it, otherwise it's ok to simply set the triggers always.
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -303,7 +303,6 @@ tx_schedule_rollback(struct cmsg *msg)
> * in-memory database state.
> */
> stailq_reverse(&writer->rollback);
> - /* Must not yield. */
> tx_schedule_queue(&writer->rollback);
> stailq_create(&writer->rollback);
> if (msg != &writer->in_rollback)
Please explain in the changeset comment why you removed this
comment. Or is this a stray change?
--
Konstantin Osipov, Moscow, Russia
^ permalink raw reply [flat|nested] 21+ messages in thread
* [tarantool-patches] [PATCH v2 3/8] Get rid of fiber_gc from txn_rollback
2019-05-23 8:19 [tarantool-patches] [PATCH v2 0/8] Make transaction autonomous from a fiber internals Georgy Kirichenko
2019-05-23 8:19 ` [tarantool-patches] [PATCH v2 1/8] Encode a dml statement to a transaction memory region Georgy Kirichenko
2019-05-23 8:19 ` [tarantool-patches] [PATCH v2 2/8] Get rid of autocommit from a txn structure Georgy Kirichenko
@ 2019-05-23 8:19 ` Georgy Kirichenko
2019-05-31 19:27 ` [tarantool-patches] " Konstantin Osipov
2019-05-23 8:19 ` [tarantool-patches] [PATCH v2 4/8] Remove fiber from a journal_entry structure Georgy Kirichenko
` (4 subsequent siblings)
7 siblings, 1 reply; 21+ messages in thread
From: Georgy Kirichenko @ 2019-05-23 8:19 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
Don't touch a fiber gc storage on a transaction rollback. This relaxes
dependencies between fiber and transaction life cycles.
Prerequisites: #1254
---
src/box/applier.cc | 9 ++++++---
src/box/box.cc | 20 +++++++++++++-------
src/box/call.c | 16 ++++++++++++----
src/box/memtx_engine.c | 3 ++-
src/box/txn.c | 20 ++++++++++----------
src/box/txn.h | 8 ++++++--
src/box/vy_scheduler.c | 10 +++++++---
7 files changed, 56 insertions(+), 30 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 9d2989094..43b1a84bc 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -182,7 +182,8 @@ apply_initial_join_row(struct xrow_header *row)
return -1;
/* no access checks here - applier always works with admin privs */
if (space_apply_initial_join_row(space, &request)) {
- txn_rollback();
+ txn_rollback(txn);
+ fiber_gc();
return -1;
}
int rc = txn_commit(txn);
@@ -418,7 +419,8 @@ applier_join(struct applier *applier)
if (txn == NULL)
diag_raise();
if (apply_row(&row) != 0) {
- txn_rollback();
+ txn_rollback(txn);
+ fiber_gc();
diag_raise();
}
if (txn_commit(txn) != 0)
@@ -621,7 +623,8 @@ applier_apply_tx(struct stailq *rows)
return txn_commit(txn);
rollback:
- txn_rollback();
+ txn_rollback(txn);
+ fiber_gc();
return -1;
}
diff --git a/src/box/box.cc b/src/box/box.cc
index 2a738fa36..4d6515dd4 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -197,7 +197,7 @@ box_process_rw(struct request *request, struct space *space,
return txn_commit_stmt(txn, request);
/* Autocommit mode. */
if (txn_commit_stmt(txn, request) != 0) {
- txn_rollback();
+ txn_rollback(txn);
return -1;
}
if (txn_commit(txn) != 0)
@@ -225,8 +225,10 @@ box_process_rw(struct request *request, struct space *space,
return 0;
fail:
- if (is_autocommit)
- txn_rollback();
+ if (is_autocommit) {
+ txn_rollback(txn);
+ fiber_gc();
+ }
return -1;
}
@@ -334,8 +336,10 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
if (txn == NULL || box_process_rw(&request, space, NULL) != 0 ||
txn_commit(txn) != 0) {
say_error("error applying row: %s", request_str(&request));
- if (txn != NULL)
- txn_rollback();
+ if (txn != NULL) {
+ txn_rollback(txn);
+ fiber_gc();
+ }
diag_raise();
}
}
@@ -1353,7 +1357,8 @@ box_register_replica(uint32_t id, const struct tt_uuid *uuid)
diag_raise();
if (boxk(IPROTO_INSERT, BOX_CLUSTER_ID, "[%u%s]",
(unsigned) id, tt_uuid_str(uuid)) != 0) {
- txn_rollback();
+ txn_rollback(txn);
+ fiber_gc();
diag_raise();
}
if (txn_commit(txn) != 0)
@@ -1684,7 +1689,8 @@ box_set_replicaset_uuid(const struct tt_uuid *replicaset_uuid)
/* Save replica set UUID in _schema */
if (boxk(IPROTO_INSERT, BOX_SCHEMA_ID, "[%s%s]", "cluster",
tt_uuid_str(&uu))) {
- txn_rollback();
+ txn_rollback(txn);
+ fiber_gc();
diag_raise();
}
if (txn_commit(txn) != 0)
diff --git a/src/box/call.c b/src/box/call.c
index 56da53fb3..e8779562a 100644
--- a/src/box/call.c
+++ b/src/box/call.c
@@ -209,13 +209,17 @@ box_process_call(struct call_request *request, struct port *port)
fiber_set_user(fiber(), orig_credentials);
if (rc != 0) {
- txn_rollback();
+ if (in_txn() != NULL)
+ txn_rollback(in_txn());
+ fiber_gc();
return -1;
}
if (in_txn()) {
diag_set(ClientError, ER_FUNCTION_TX_ACTIVE);
- txn_rollback();
+ if (in_txn() != NULL)
+ txn_rollback(in_txn());
+ fiber_gc();
return -1;
}
@@ -230,13 +234,17 @@ box_process_eval(struct call_request *request, struct port *port)
if (access_check_universe(PRIV_X) != 0)
return -1;
if (box_lua_eval(request, port) != 0) {
- txn_rollback();
+ if (in_txn() != NULL)
+ txn_rollback(in_txn());
+ fiber_gc();
return -1;
}
if (in_txn()) {
diag_set(ClientError, ER_FUNCTION_TX_ACTIVE);
- txn_rollback();
+ if (in_txn() != NULL)
+ txn_rollback(in_txn());
+ fiber_gc();
return -1;
}
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index 149215b87..918885318 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -277,7 +277,8 @@ memtx_engine_recover_snapshot_row(struct memtx_engine *memtx,
return -1;
/* no access checks here - applier always works with admin privs */
if (space_apply_initial_join_row(space, &request) != 0) {
- txn_rollback();
+ txn_rollback(txn);
+ fiber_gc();
return -1;
}
int rc = txn_commit(txn);
diff --git a/src/box/txn.c b/src/box/txn.c
index 87fd8b3bc..f677e1d33 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -395,7 +395,7 @@ txn_commit(struct txn *txn)
if (txn->n_new_rows + txn->n_applier_rows > 0) {
txn->signature = txn_write_to_wal(txn);
if (txn->signature < 0)
- goto fail;
+ return -1;
}
/*
* The transaction is in the binary log. No action below
@@ -423,7 +423,7 @@ txn_commit(struct txn *txn)
txn_free(txn);
return 0;
fail:
- txn_rollback();
+ txn_rollback(txn);
return -1;
}
@@ -437,11 +437,9 @@ txn_rollback_stmt(struct txn *txn)
}
void
-txn_rollback()
+txn_rollback(struct txn *txn)
{
- struct txn *txn = in_txn();
- if (txn == NULL)
- return;
+ assert(txn == in_txn());
/* Rollback triggers must not throw. */
if (txn->has_triggers &&
trigger_run(&txn->on_rollback, txn) != 0) {
@@ -457,8 +455,6 @@ txn_rollback()
stailq_foreach_entry(stmt, &txn->stmts, next)
txn_stmt_unref_tuples(stmt);
- /** Free volatile txn memory. */
- fiber_gc();
fiber_set_txn(fiber(), NULL);
txn_free(txn);
}
@@ -534,11 +530,14 @@ int
box_txn_rollback()
{
struct txn *txn = in_txn();
+ if (txn == NULL)
+ return 0;
if (txn && txn->in_sub_stmt) {
diag_set(ClientError, ER_ROLLBACK_IN_SUB_STMT);
return -1;
}
- txn_rollback(); /* doesn't throw */
+ txn_rollback(txn); /* doesn't throw */
+ fiber_gc();
return 0;
}
@@ -616,6 +615,7 @@ txn_on_stop(struct trigger *trigger, void *event)
{
(void) trigger;
(void) event;
- txn_rollback(); /* doesn't yield or fail */
+ txn_rollback(in_txn()); /* doesn't yield or fail */
+
}
diff --git a/src/box/txn.h b/src/box/txn.h
index 5a66f8e53..569978ce9 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -221,9 +221,12 @@ txn_begin();
int
txn_commit(struct txn *txn);
-/** Rollback a transaction, if any. */
+/**
+ * Rollback a transaction.
+ * @pre txn == in_txn()
+ */
void
-txn_rollback();
+txn_rollback(struct txn *txn);
/**
* Roll back the transaction but keep the object around.
@@ -267,6 +270,7 @@ txn_on_rollback(struct txn *txn, struct trigger *trigger)
/**
* Start a new statement.
+ * Return a new statement or NULL in case of error.
*/
struct txn_stmt *
txn_begin_stmt(struct txn *txn, struct space *space);
diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c
index 9d5f18e32..7e34ed8fc 100644
--- a/src/box/vy_scheduler.c
+++ b/src/box/vy_scheduler.c
@@ -889,18 +889,22 @@ vy_deferred_delete_batch_process_f(struct cmsg *cmsg)
for (int i = 0; i < batch->count; i++) {
if (vy_deferred_delete_process_one(deferred_delete_space,
pk->space_id, pk->mem_format,
- &batch->stmt[i]) != 0)
- goto fail;
+ &batch->stmt[i]) != 0) {
+ goto fail_rollback;
+ }
}
if (txn_commit(txn) != 0)
goto fail;
fiber_gc();
return;
+
+fail_rollback:
+ txn_rollback(txn);
fail:
batch->is_failed = true;
diag_move(diag_get(), &batch->diag);
- txn_rollback();
+ fiber_gc();
}
/**
--
2.21.0
^ permalink raw reply [flat|nested] 21+ messages in thread
* [tarantool-patches] Re: [PATCH v2 3/8] Get rid of fiber_gc from txn_rollback
2019-05-23 8:19 ` [tarantool-patches] [PATCH v2 3/8] Get rid of fiber_gc from txn_rollback Georgy Kirichenko
@ 2019-05-31 19:27 ` Konstantin Osipov
0 siblings, 0 replies; 21+ messages in thread
From: Konstantin Osipov @ 2019-05-31 19:27 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
* Georgy Kirichenko <georgy@tarantool.org> [19/05/23 12:14]:
> diff --git a/src/box/box.cc b/src/box/box.cc
> index 2a738fa36..4d6515dd4 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -197,7 +197,7 @@ box_process_rw(struct request *request, struct space *space,
> return txn_commit_stmt(txn, request);
> /* Autocommit mode. */
> if (txn_commit_stmt(txn, request) != 0) {
> - txn_rollback();
> + txn_rollback(txn);
Please add fiber_gc() here.
> return -1;
> }
> if (txn_commit(txn) != 0)
> --- a/src/box/call.c
> +++ b/src/box/call.c
> @@ -209,13 +209,17 @@ box_process_call(struct call_request *request, struct port *port)
> fiber_set_user(fiber(), orig_credentials);
>
> if (rc != 0) {
> - txn_rollback();
> + if (in_txn() != NULL)
> + txn_rollback(in_txn());
> + fiber_gc();
> return -1;
> }
>
> if (in_txn()) {
> diag_set(ClientError, ER_FUNCTION_TX_ACTIVE);
> - txn_rollback();
> + if (in_txn() != NULL)
> + txn_rollback(in_txn());
in_txn() is expensive and can not be optimzied out, please use a
local variable.
> + fiber_gc();
> return -1;
> }
>
> @@ -230,13 +234,17 @@ box_process_eval(struct call_request *request, struct port *port)
> if (access_check_universe(PRIV_X) != 0)
> return -1;
> if (box_lua_eval(request, port) != 0) {
> - txn_rollback();
> + if (in_txn() != NULL)
> + txn_rollback(in_txn());
> + fiber_gc();
Same here and in other places.
> return -1;
> }
>
> index 87fd8b3bc..f677e1d33 100644
> --- a/src/box/txn.c
> +++ b/src/box/txn.c
> @@ -395,7 +395,7 @@ txn_commit(struct txn *txn)
> if (txn->n_new_rows + txn->n_applier_rows > 0) {
> txn->signature = txn_write_to_wal(txn);
> if (txn->signature < 0)
> - goto fail;
> + return -1;
Why this change?
> }
> /*
> * The transaction is in the binary log. No action below
> @@ -423,7 +423,7 @@ txn_commit(struct txn *txn)
> txn_free(txn);
> return 0;
> fail:
> - txn_rollback();
> + txn_rollback(txn);
> return -1;
> }
>
> @@ -437,11 +437,9 @@ txn_rollback_stmt(struct txn *txn)
> }
>
> @@ -534,11 +530,14 @@ int
> box_txn_rollback()
> {
> struct txn *txn = in_txn();
> + if (txn == NULL)
> + return 0;
> if (txn && txn->in_sub_stmt) {
> diag_set(ClientError, ER_ROLLBACK_IN_SUB_STMT);
> return -1;
> }
> - txn_rollback(); /* doesn't throw */
> + txn_rollback(txn); /* doesn't throw */
> + fiber_gc();
box_txn_rollback() is used in a bunch of places in sql. please fix
it to use txn_rollback() and an explicit fiber_gc as well in a
follow up patch.
Thanks!
--
Konstantin Osipov, Moscow, Russia
^ permalink raw reply [flat|nested] 21+ messages in thread
* [tarantool-patches] [PATCH v2 4/8] Remove fiber from a journal_entry structure
2019-05-23 8:19 [tarantool-patches] [PATCH v2 0/8] Make transaction autonomous from a fiber internals Georgy Kirichenko
` (2 preceding siblings ...)
2019-05-23 8:19 ` [tarantool-patches] [PATCH v2 3/8] Get rid of fiber_gc from txn_rollback Georgy Kirichenko
@ 2019-05-23 8:19 ` Georgy Kirichenko
2019-05-31 19:29 ` [tarantool-patches] " Konstantin Osipov
2019-05-23 8:19 ` [tarantool-patches] [PATCH v2 5/8] Commit engine before all triggers Georgy Kirichenko
` (3 subsequent siblings)
7 siblings, 1 reply; 21+ messages in thread
From: Georgy Kirichenko @ 2019-05-23 8:19 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
Use a trigger to handle journal entry write done event. This relaxes
friction between fiber and transaction life cycles.
Prerequisites: #1254
---
src/box/box.cc | 4 +++-
src/box/journal.c | 7 ++++---
src/box/journal.h | 16 ++++++++++++++--
src/box/wal.c | 36 +++++++++++++++++++++++++-----------
4 files changed, 46 insertions(+), 17 deletions(-)
diff --git a/src/box/box.cc b/src/box/box.cc
index 4d6515dd4..e10b73277 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -312,9 +312,11 @@ struct recovery_journal {
*/
static int64_t
recovery_journal_write(struct journal *base,
- struct journal_entry * /* entry */)
+ struct journal_entry *entry)
{
struct recovery_journal *journal = (struct recovery_journal *) base;
+ entry->done = true;
+ trigger_run(&entry->done_trigger, NULL);
return vclock_sum(journal->vclock);
}
diff --git a/src/box/journal.c b/src/box/journal.c
index fe13fb6ee..8d213d57e 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -30,7 +30,6 @@
*/
#include "journal.h"
#include <small/region.h>
-#include <fiber.h>
#include <diag.h>
/**
@@ -41,7 +40,8 @@ static int64_t
dummy_journal_write(struct journal *journal, struct journal_entry *entry)
{
(void) journal;
- (void) entry;
+ entry->done = true;
+ trigger_run(&entry->done_trigger, NULL);
return 0;
}
@@ -66,10 +66,11 @@ journal_entry_new(size_t n_rows, struct region *region)
diag_set(OutOfMemory, size, "region", "struct journal_entry");
return NULL;
}
+ rlist_create(&entry->done_trigger);
entry->approx_len = 0;
entry->n_rows = n_rows;
entry->res = -1;
- entry->fiber = fiber();
+ entry->done = false;
return entry;
}
diff --git a/src/box/journal.h b/src/box/journal.h
index 77d3073c8..5e1323464 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -34,6 +34,7 @@
#include <stdbool.h>
#include "trigger.h"
#include "salad/stailq.h"
+#include "trigger.h"
#if defined(__cplusplus)
extern "C" {
@@ -56,9 +57,14 @@ struct journal_entry {
*/
int64_t res;
/**
- * The fiber issuing the request.
+ * Turns to true when entry is processed by wal.
+ */
+ bool done;
+ /**
+ * Triggers fired when journal entry processing is done
+ * despite of its success.
*/
- struct fiber *fiber;
+ struct rlist done_trigger;
/**
* Approximate size of this request when encoded.
*/
@@ -94,6 +100,12 @@ struct journal {
void (*destroy)(struct journal *journal);
};
+static inline void
+journal_entry_on_done(struct journal_entry *entry, struct trigger *trigger)
+{
+ trigger_add(&entry->done_trigger, trigger);
+}
+
/**
* Depending on the step of recovery and instance configuration
* points at a concrete implementation of the journal.
diff --git a/src/box/wal.c b/src/box/wal.c
index 628e7529a..4b0a7c802 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -260,8 +260,10 @@ tx_schedule_queue(struct stailq *queue)
* are many ready fibers.
*/
struct journal_entry *req;
- stailq_foreach_entry(req, queue, fifo)
- fiber_wakeup(req->fiber);
+ stailq_foreach_entry(req, queue, fifo) {
+ req->done = true;
+ trigger_run(&req->done_trigger, NULL);
+ }
}
/**
@@ -1121,6 +1123,14 @@ wal_writer_f(va_list ap)
return 0;
}
+static void
+on_wal_write_done(struct trigger *trigger, void *event)
+{
+ (void) event;
+ struct fiber_cond *cond = (struct fiber_cond *)trigger->data;
+ fiber_cond_signal(cond);
+}
+
/**
* WAL writer main entry point: queue a single request
* to be written to disk and wait until this task is completed.
@@ -1171,15 +1181,19 @@ wal_write(struct journal *journal, struct journal_entry *entry)
batch->approx_len += entry->approx_len;
writer->wal_pipe.n_input += entry->n_rows * XROW_IOVMAX;
cpipe_flush_input(&writer->wal_pipe);
- /**
- * It's not safe to spuriously wakeup this fiber
- * since in that case it will ignore a possible
- * error from WAL writer and not roll back the
- * transaction.
- */
- bool cancellable = fiber_set_cancellable(false);
- fiber_yield(); /* Request was inserted. */
- fiber_set_cancellable(cancellable);
+
+ struct fiber_cond done_cond;
+ fiber_cond_create(&done_cond);
+
+ struct trigger done_trigger;
+ trigger_create(&done_trigger, on_wal_write_done, &done_cond, NULL);
+ journal_entry_on_done(entry, &done_trigger);
+ while (!entry->done)
+ fiber_cond_wait(&done_cond);
+
+ fiber_cond_destroy(&done_cond);
+ trigger_clear(&done_trigger);
+
return entry->res;
}
--
2.21.0
^ permalink raw reply [flat|nested] 21+ messages in thread
* [tarantool-patches] [PATCH v2 5/8] Commit engine before all triggers
2019-05-23 8:19 [tarantool-patches] [PATCH v2 0/8] Make transaction autonomous from a fiber internals Georgy Kirichenko
` (3 preceding siblings ...)
2019-05-23 8:19 ` [tarantool-patches] [PATCH v2 4/8] Remove fiber from a journal_entry structure Georgy Kirichenko
@ 2019-05-23 8:19 ` Georgy Kirichenko
2019-05-31 19:32 ` [tarantool-patches] " Konstantin Osipov
2019-05-23 8:19 ` [tarantool-patches] [PATCH v2 6/8] Offload tx_prio processing to a fiber Georgy Kirichenko
` (2 subsequent siblings)
7 siblings, 1 reply; 21+ messages in thread
From: Georgy Kirichenko @ 2019-05-23 8:19 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
---
src/box/txn.c | 14 +++++++-------
1 file changed, 7 insertions(+), 7 deletions(-)
diff --git a/src/box/txn.c b/src/box/txn.c
index f677e1d33..273964d51 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -397,6 +397,13 @@ txn_commit(struct txn *txn)
if (txn->signature < 0)
return -1;
}
+ /*
+ * Engine can be NULL if transaction contains IPROTO_NOP
+ * statements only.
+ */
+ if (txn->engine != NULL)
+ engine_commit(txn->engine, txn);
+
/*
* The transaction is in the binary log. No action below
* may throw. In case an error has happened, there is
@@ -408,13 +415,6 @@ txn_commit(struct txn *txn)
unreachable();
panic("commit trigger failed");
}
- /*
- * Engine can be NULL if transaction contains IPROTO_NOP
- * statements only.
- */
- if (txn->engine != NULL)
- engine_commit(txn->engine, txn);
-
struct txn_stmt *stmt;
stailq_foreach_entry(stmt, &txn->stmts, next)
txn_stmt_unref_tuples(stmt);
--
2.21.0
^ permalink raw reply [flat|nested] 21+ messages in thread
* [tarantool-patches] Re: [PATCH v2 5/8] Commit engine before all triggers
2019-05-23 8:19 ` [tarantool-patches] [PATCH v2 5/8] Commit engine before all triggers Georgy Kirichenko
@ 2019-05-31 19:32 ` Konstantin Osipov
2019-06-03 8:07 ` Георгий Кириченко
0 siblings, 1 reply; 21+ messages in thread
From: Konstantin Osipov @ 2019-05-31 19:32 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
* Georgy Kirichenko <georgy@tarantool.org> [19/05/23 11:47]:
> ---
> src/box/txn.c | 14 +++++++-------
> 1 file changed, 7 insertions(+), 7 deletions(-)
Why do you need this change?
Since on_commit() triggers are not allowed to fail, this is
harmless.
--
Konstantin Osipov, Moscow, Russia
^ permalink raw reply [flat|nested] 21+ messages in thread
* [tarantool-patches] Re: [PATCH v2 5/8] Commit engine before all triggers
2019-05-31 19:32 ` [tarantool-patches] " Konstantin Osipov
@ 2019-06-03 8:07 ` Георгий Кириченко
0 siblings, 0 replies; 21+ messages in thread
From: Георгий Кириченко @ 2019-06-03 8:07 UTC (permalink / raw)
To: Konstantin Osipov; +Cc: tarantool-patches
[-- Attachment #1: Type: text/plain, Size: 515 bytes --]
On Friday, May 31, 2019 10:32:12 PM MSK Konstantin Osipov wrote:
> * Georgy Kirichenko <georgy@tarantool.org> [19/05/23 11:47]:
> > ---
> >
> > src/box/txn.c | 14 +++++++-------
> > 1 file changed, 7 insertions(+), 7 deletions(-)
>
> Why do you need this change?
>
> Since on_commit() triggers are not allowed to fail, this is
> harmless.
on_commit trigger could yield and break engine->commit order. However, I
doesn't affect us right now (at least I don't have proofs), and this patch
could be eliminated.
[-- Attachment #2: This is a digitally signed message part. --]
[-- Type: application/pgp-signature, Size: 488 bytes --]
^ permalink raw reply [flat|nested] 21+ messages in thread
* [tarantool-patches] [PATCH v2 6/8] Offload tx_prio processing to a fiber
2019-05-23 8:19 [tarantool-patches] [PATCH v2 0/8] Make transaction autonomous from a fiber internals Georgy Kirichenko
` (4 preceding siblings ...)
2019-05-23 8:19 ` [tarantool-patches] [PATCH v2 5/8] Commit engine before all triggers Georgy Kirichenko
@ 2019-05-23 8:19 ` Georgy Kirichenko
2019-05-31 19:36 ` [tarantool-patches] " Konstantin Osipov
2019-05-23 8:19 ` [tarantool-patches] [PATCH v2 7/8] Enable asyncronous wal writes Georgy Kirichenko
2019-05-23 8:19 ` [tarantool-patches] [PATCH v2 8/8] Introduce asynchronous txn commit Georgy Kirichenko
7 siblings, 1 reply; 21+ messages in thread
From: Georgy Kirichenko @ 2019-05-23 8:19 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
---
src/box/box.cc | 38 ++++++++++++++++++++++++++++++++++++--
1 file changed, 36 insertions(+), 2 deletions(-)
diff --git a/src/box/box.cc b/src/box/box.cc
index e10b73277..b8ef4b9ed 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -2061,13 +2061,41 @@ local_recovery(const struct tt_uuid *instance_uuid,
}
}
+/* A structure containing tx_prio endpoint fiber context. */
+static struct tx_prio_ctx {
+ /* The fiber processing the tx_prio endpoint. */
+ struct fiber *fiber;
+ /* True if there are more messages to process. */
+ bool has_message;
+ /* Condition to signal when a new message arrived. */
+ struct fiber_cond message_cond;
+} tx_prio_ctx;
+
static void
tx_prio_cb(struct ev_loop *loop, ev_watcher *watcher, int events)
{
(void) loop;
(void) events;
- struct cbus_endpoint *endpoint = (struct cbus_endpoint *)watcher->data;
- cbus_process(endpoint);
+ (void) watcher;
+ tx_prio_ctx.has_message = true;
+ fiber_cond_signal(&tx_prio_ctx.message_cond);
+}
+
+/*
+ * Tx prio endpoint fiber function.
+ */
+static int
+tx_prio_process_f(va_list ap)
+{
+ (void) ap;
+ while (!fiber_is_cancelled()) {
+ while (tx_prio_ctx.has_message) {
+ tx_prio_ctx.has_message = false;
+ cbus_process(&tx_prio_endpoint);
+ }
+ fiber_cond_wait(&tx_prio_ctx.message_cond);
+ }
+ return 0;
}
static void
@@ -2119,6 +2147,12 @@ box_cfg_xc(void)
IPROTO_MSG_MAX_MIN * IPROTO_FIBER_POOL_SIZE_FACTOR,
FIBER_POOL_IDLE_TIMEOUT);
/* Add an extra endpoint for WAL wake up/rollback messages. */
+ memset(&tx_prio_ctx, 0, sizeof(struct tx_prio_ctx));
+ fiber_cond_create(&tx_prio_ctx.message_cond);
+ tx_prio_ctx.fiber = fiber_new("tx_prio", tx_prio_process_f);
+ if (tx_prio_ctx.fiber == NULL)
+ panic("Could not create tx_prio fiber");
+ fiber_start(tx_prio_ctx.fiber, NULL);
cbus_endpoint_create(&tx_prio_endpoint, "tx_prio", tx_prio_cb, &tx_prio_endpoint);
rmean_box = rmean_new(iproto_type_strs, IPROTO_TYPE_STAT_MAX);
--
2.21.0
^ permalink raw reply [flat|nested] 21+ messages in thread
* [tarantool-patches] [PATCH v2 7/8] Enable asyncronous wal writes
2019-05-23 8:19 [tarantool-patches] [PATCH v2 0/8] Make transaction autonomous from a fiber internals Georgy Kirichenko
` (5 preceding siblings ...)
2019-05-23 8:19 ` [tarantool-patches] [PATCH v2 6/8] Offload tx_prio processing to a fiber Georgy Kirichenko
@ 2019-05-23 8:19 ` Georgy Kirichenko
2019-05-31 19:41 ` [tarantool-patches] " Konstantin Osipov
2019-05-23 8:19 ` [tarantool-patches] [PATCH v2 8/8] Introduce asynchronous txn commit Georgy Kirichenko
7 siblings, 1 reply; 21+ messages in thread
From: Georgy Kirichenko @ 2019-05-23 8:19 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
Allow to send a journal entry to wal without wait until the writing
was finished. Two methods were introduced:
* async_write method emits an entry to be written, returns 0 if the
entry was successfully scheduled;
* async_wait method waits until writing was finished and returns a
result of journal write.
Prerequisites: #1254
---
src/box/box.cc | 21 ++++++++++++++++++++-
src/box/journal.c | 18 ++++++++++++++++++
src/box/journal.h | 30 ++++++++++++++++++++++++++++++
src/box/wal.c | 42 +++++++++++++++++++++++++++++++++++-------
4 files changed, 103 insertions(+), 8 deletions(-)
diff --git a/src/box/box.cc b/src/box/box.cc
index b8ef4b9ed..95c406ea7 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -320,10 +320,29 @@ recovery_journal_write(struct journal *base,
return vclock_sum(journal->vclock);
}
+static int64_t
+recovery_journal_async_write(struct journal *base,
+ struct journal_entry * /* entry */)
+{
+ (void) base;
+ return 0;
+}
+
+static int64_t
+recovery_journal_async_wait(struct journal *base,
+ struct journal_entry * /* entry */)
+{
+ struct recovery_journal *journal = (struct recovery_journal *) base;
+ return vclock_sum(journal->vclock);
+}
+
static inline void
recovery_journal_create(struct recovery_journal *journal, struct vclock *v)
{
- journal_create(&journal->base, recovery_journal_write, NULL);
+ journal_create(&journal->base, recovery_journal_write,
+ recovery_journal_async_write,
+ recovery_journal_async_wait,
+ NULL);
journal->vclock = v;
}
diff --git a/src/box/journal.c b/src/box/journal.c
index 8d213d57e..4a1d38dd5 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -45,8 +45,26 @@ dummy_journal_write(struct journal *journal, struct journal_entry *entry)
return 0;
}
+static int64_t
+dummy_async_write(struct journal *journal, struct journal_entry *entry)
+{
+ (void) journal;
+ (void) entry;
+ return 0;
+}
+
+static int64_t
+dummy_async_wait(struct journal *journal, struct journal_entry *entry)
+{
+ (void) journal;
+ (void) entry;
+ return 0;
+}
+
static struct journal dummy_journal = {
dummy_journal_write,
+ dummy_async_write,
+ dummy_async_wait,
NULL,
};
diff --git a/src/box/journal.h b/src/box/journal.h
index 5e1323464..978942a83 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -97,6 +97,10 @@ journal_entry_new(size_t n_rows, struct region *region);
struct journal {
int64_t (*write)(struct journal *journal,
struct journal_entry *req);
+ int64_t (*async_write)(struct journal *journal,
+ struct journal_entry *req);
+ int64_t (*async_wait)(struct journal *journal,
+ struct journal_entry *req);
void (*destroy)(struct journal *journal);
};
@@ -124,6 +128,28 @@ journal_write(struct journal_entry *entry)
return current_journal->write(current_journal, entry);
}
+/**
+ * Send a single entry to write.
+ *
+ * @return 0 if write was scheduled or -1 on error.
+ */
+static inline int64_t
+journal_async_write(struct journal_entry *entry)
+{
+ return current_journal->async_write(current_journal, entry);
+}
+
+/**
+ * Wait until entry processing finished.
+ * @return a log sequence number (vclock signature) of the entry
+ * or -1 on error.
+ */
+static inline int64_t
+journal_async_wait(struct journal_entry *entry)
+{
+ return current_journal->async_wait(current_journal, entry);
+}
+
/**
* Change the current implementation of the journaling API.
* Happens during life cycle of an instance:
@@ -156,9 +182,13 @@ journal_set(struct journal *new_journal)
static inline void
journal_create(struct journal *journal,
int64_t (*write)(struct journal *, struct journal_entry *),
+ int64_t (*async_write)(struct journal *, struct journal_entry *),
+ int64_t (*async_wait)(struct journal *, struct journal_entry *),
void (*destroy)(struct journal *))
{
journal->write = write;
+ journal->async_write = async_write,
+ journal->async_wait = async_wait,
journal->destroy = destroy;
}
diff --git a/src/box/wal.c b/src/box/wal.c
index 4b0a7c802..d27dbff59 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -63,6 +63,12 @@ int wal_dir_lock = -1;
static int64_t
wal_write(struct journal *, struct journal_entry *);
+static int64_t
+wal_async_write(struct journal *, struct journal_entry *);
+
+static int64_t
+wal_async_wait(struct journal *, struct journal_entry *);
+
static int64_t
wal_write_in_wal_mode_none(struct journal *, struct journal_entry *);
@@ -358,7 +364,10 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
writer->wal_max_rows = wal_max_rows;
writer->wal_max_size = wal_max_size;
journal_create(&writer->base, wal_mode == WAL_NONE ?
- wal_write_in_wal_mode_none : wal_write, NULL);
+ wal_write_in_wal_mode_none : wal_write,
+ wal_mode == WAL_NONE ?
+ wal_write_in_wal_mode_none: wal_async_write,
+ wal_async_wait, NULL);
struct xlog_opts opts = xlog_opts_default;
opts.sync_is_async = true;
@@ -1131,12 +1140,8 @@ on_wal_write_done(struct trigger *trigger, void *event)
fiber_cond_signal(cond);
}
-/**
- * WAL writer main entry point: queue a single request
- * to be written to disk and wait until this task is completed.
- */
int64_t
-wal_write(struct journal *journal, struct journal_entry *entry)
+wal_async_write(struct journal *journal, struct journal_entry *entry)
{
struct wal_writer *writer = (struct wal_writer *) journal;
@@ -1181,6 +1186,15 @@ wal_write(struct journal *journal, struct journal_entry *entry)
batch->approx_len += entry->approx_len;
writer->wal_pipe.n_input += entry->n_rows * XROW_IOVMAX;
cpipe_flush_input(&writer->wal_pipe);
+ return 0;
+}
+
+int64_t
+wal_async_wait(struct journal *journal, struct journal_entry *entry)
+{
+ (void) journal;
+ if (entry->done)
+ return entry->res;
struct fiber_cond done_cond;
fiber_cond_create(&done_cond);
@@ -1197,6 +1211,18 @@ wal_write(struct journal *journal, struct journal_entry *entry)
return entry->res;
}
+/**
+ * WAL writer main entry point: queue a single request
+ * to be written to disk and wait until this task is completed.
+ */
+int64_t
+wal_write(struct journal *journal, struct journal_entry *entry)
+{
+ if (wal_async_write(journal, entry) != 0)
+ return -1;
+ return wal_async_wait(journal, entry);
+}
+
int64_t
wal_write_in_wal_mode_none(struct journal *journal,
struct journal_entry *entry)
@@ -1208,7 +1234,9 @@ wal_write_in_wal_mode_none(struct journal *journal,
entry->rows + entry->n_rows);
vclock_merge(&writer->vclock, &vclock_diff);
vclock_copy(&replicaset.vclock, &writer->vclock);
- return vclock_sum(&writer->vclock);
+ entry->done = true;
+ entry->res = vclock_sum(&writer->vclock);
+ return entry->res;
}
void
--
2.21.0
^ permalink raw reply [flat|nested] 21+ messages in thread
* [tarantool-patches] Re: [PATCH v2 7/8] Enable asyncronous wal writes
2019-05-23 8:19 ` [tarantool-patches] [PATCH v2 7/8] Enable asyncronous wal writes Georgy Kirichenko
@ 2019-05-31 19:41 ` Konstantin Osipov
2019-06-03 8:09 ` Георгий Кириченко
0 siblings, 1 reply; 21+ messages in thread
From: Konstantin Osipov @ 2019-05-31 19:41 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
* Georgy Kirichenko <georgy@tarantool.org> [19/05/23 11:48]:
> Allow to send a journal entry to wal without wait until the writing
> was finished. Two methods were introduced:
> * async_write method emits an entry to be written, returns 0 if the
> entry was successfully scheduled;
> * async_wait method waits until writing was finished and returns a
> result of journal write.
Seems that write async and wait async are the default uses for the
journal now. Can we swap the names around, and use
journal_write/journal_wait for async methods, and
journal_write_sync for the few places that use the sync one?
--
Konstantin Osipov, Moscow, Russia
^ permalink raw reply [flat|nested] 21+ messages in thread
* [tarantool-patches] [PATCH v2 8/8] Introduce asynchronous txn commit
2019-05-23 8:19 [tarantool-patches] [PATCH v2 0/8] Make transaction autonomous from a fiber internals Georgy Kirichenko
` (6 preceding siblings ...)
2019-05-23 8:19 ` [tarantool-patches] [PATCH v2 7/8] Enable asyncronous wal writes Georgy Kirichenko
@ 2019-05-23 8:19 ` Georgy Kirichenko
2019-05-31 19:43 ` [tarantool-patches] " Konstantin Osipov
7 siblings, 1 reply; 21+ messages in thread
From: Georgy Kirichenko @ 2019-05-23 8:19 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
Allow asynchronous transaction commit. This adds two functions:
* txn_async_commit that sends a transaction to a journal
* txn_async_wait that waits until the transaction processing was done
Prerequisites: #1254
---
src/box/txn.c | 117 +++++++++++++++++++++++++++++++++++---------------
src/box/txn.h | 6 +++
2 files changed, 89 insertions(+), 34 deletions(-)
diff --git a/src/box/txn.c b/src/box/txn.c
index 273964d51..ae18db168 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -195,6 +195,7 @@ txn_begin()
txn->engine = NULL;
txn->engine_tx = NULL;
txn->psql_txn = NULL;
+ txn->entry = NULL;
/* fiber_on_yield/fiber_on_stop initialized by engine on demand */
fiber_set_txn(fiber(), txn);
return txn;
@@ -315,10 +316,12 @@ fail:
}
static int64_t
-txn_write_to_wal(struct txn *txn)
+txn_journal_async_write(struct txn *txn)
{
+ assert(txn->entry == NULL);
assert(txn->n_new_rows + txn->n_applier_rows > 0);
+ /* Prepare a journal entry. */
struct journal_entry *req = journal_entry_new(txn->n_new_rows +
txn->n_applier_rows,
&txn->region);
@@ -340,37 +343,34 @@ txn_write_to_wal(struct txn *txn)
assert(remote_row == req->rows + txn->n_applier_rows);
assert(local_row == remote_row + txn->n_new_rows);
- ev_tstamp start = ev_monotonic_now(loop());
- int64_t res = journal_write(req);
- ev_tstamp stop = ev_monotonic_now(loop());
-
- if (res < 0) {
- /* Cascading rollback. */
- txn_rollback(txn); /* Perform our part of cascading rollback. */
- /*
- * Move fiber to end of event loop to avoid
- * execution of any new requests before all
- * pending rollbacks are processed.
- */
- fiber_reschedule();
+ txn->entry = req;
+ /* Send entry to a journal. */
+ if (journal_async_write(txn->entry) < 0) {
diag_set(ClientError, ER_WAL_IO);
- diag_log();
- } else if (stop - start > too_long_threshold) {
- int n_rows = txn->n_new_rows + txn->n_applier_rows;
- say_warn_ratelimited("too long WAL write: %d rows at "
- "LSN %lld: %.3f sec", n_rows,
- res - n_rows + 1, stop - start);
+ return -1;
}
- /*
- * Use vclock_sum() from WAL writer as transaction signature.
- */
- return res;
+ return 0;
}
-int
-txn_commit(struct txn *txn)
+/*
+ * Wait until journal processing finished.
+ */
+static int64_t
+txn_journal_async_wait(struct txn *txn)
+{
+ assert(txn->entry != NULL);
+ txn->signature = journal_async_wait(txn->entry);
+ if (txn->signature < 0)
+ diag_set(ClientError, ER_WAL_IO);
+ return txn->signature;
+}
+
+/*
+ * Prepare a transaction using engines.
+ */
+static int
+txn_prepare(struct txn *txn)
{
- assert(txn == in_txn());
/*
* If transaction has been started in SQL, deferred
* foreign key constraints must not be violated.
@@ -380,7 +380,7 @@ txn_commit(struct txn *txn)
struct sql_txn *sql_txn = txn->psql_txn;
if (sql_txn->fk_deferred_count != 0) {
diag_set(ClientError, ER_FOREIGN_KEY_CONSTRAINT);
- goto fail;
+ return -1;
}
}
/*
@@ -388,15 +388,54 @@ txn_commit(struct txn *txn)
* we have a bunch of IPROTO_NOP statements.
*/
if (txn->engine != NULL) {
- if (engine_prepare(txn->engine, txn) != 0)
- goto fail;
+ if (engine_prepare(txn->engine, txn) != 0) {
+ return -1;
+ }
}
+ return 0;
+}
- if (txn->n_new_rows + txn->n_applier_rows > 0) {
- txn->signature = txn_write_to_wal(txn);
- if (txn->signature < 0)
- return -1;
+/*
+ * Send a transaction to a journal.
+ */
+int
+txn_async_commit(struct txn *txn)
+{
+ assert(txn == in_txn());
+ if (txn_prepare(txn) != 0)
+ goto fail;
+
+ txn->start_tm = ev_monotonic_now(loop());
+ if (txn->n_new_rows + txn->n_applier_rows == 0)
+ return 0;
+
+ if (txn_journal_async_write(txn) != 0)
+ goto fail;
+
+ return 0;
+fail:
+ txn_rollback(txn);
+ return -1;
+}
+
+/*
+ * Wait until transaction processing was finished.
+ */
+int
+txn_async_wait(struct txn *txn)
+{
+ if (txn->n_new_rows + txn->n_applier_rows > 0 &&
+ txn_journal_async_wait(txn) < 0)
+ goto fail;
+ ev_tstamp stop_tm = ev_monotonic_now(loop());
+ if (stop_tm - txn->start_tm > too_long_threshold) {
+ int n_rows = txn->n_new_rows + txn->n_applier_rows;
+ say_warn_ratelimited("too long WAL write: %d rows at "
+ "LSN %lld: %.3f sec", n_rows,
+ txn->signature - n_rows + 1,
+ stop_tm - txn->start_tm);
}
+
/*
* Engine can be NULL if transaction contains IPROTO_NOP
* statements only.
@@ -422,11 +461,21 @@ txn_commit(struct txn *txn)
fiber_set_txn(fiber(), NULL);
txn_free(txn);
return 0;
+
fail:
txn_rollback(txn);
return -1;
}
+int
+txn_commit(struct txn *txn)
+{
+ if (txn_async_commit(txn) != 0 ||
+ txn_async_wait(txn) < 0)
+ return -1;
+ return 0;
+}
+
void
txn_rollback_stmt(struct txn *txn)
{
diff --git a/src/box/txn.h b/src/box/txn.h
index 569978ce9..0b04b5488 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -195,6 +195,12 @@ struct txn {
/** Commit and rollback triggers */
struct rlist on_commit, on_rollback;
struct sql_txn *psql_txn;
+ /**
+ * Journal entry to control txn write.
+ */
+ struct journal_entry *entry;
+ /** Timestampt of entry write start. */
+ ev_tstamp start_tm;
};
/* Pointer to the current transaction (if any) */
--
2.21.0
^ permalink raw reply [flat|nested] 21+ messages in thread
* [tarantool-patches] Re: [PATCH v2 8/8] Introduce asynchronous txn commit
2019-05-23 8:19 ` [tarantool-patches] [PATCH v2 8/8] Introduce asynchronous txn commit Georgy Kirichenko
@ 2019-05-31 19:43 ` Konstantin Osipov
0 siblings, 0 replies; 21+ messages in thread
From: Konstantin Osipov @ 2019-05-31 19:43 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
* Georgy Kirichenko <georgy@tarantool.org> [19/05/23 11:48]:
> Allow asynchronous transaction commit. This adds two functions:
> * txn_async_commit that sends a transaction to a journal
> * txn_async_wait that waits until the transaction processing was done
>
The same comment applies as to journal api: the async api becomes
the standard one, can we use txn_write/txn_wait for names, and
txn_commit would do a pair of write/wait? We don't have to stress
that much in the name that the write is async since there is no
sync write at all.
In other words:
txn_async_commit -> txn_write()
txn_journal_async_write -> txn_write_to_wal() (same as before).
--
Konstantin Osipov, Moscow, Russia
^ permalink raw reply [flat|nested] 21+ messages in thread