* [Tarantool-patches] [PATCH v4 01/12] vinyl: rename tx_manager -> vy_tx_manager
2020-09-08 10:22 [Tarantool-patches] [PATCH v4 00/12] Transaction engine for memtx engine Aleksandr Lyapunov
@ 2020-09-08 10:22 ` Aleksandr Lyapunov
2020-09-08 10:22 ` [Tarantool-patches] [PATCH v4 02/12] txm: add TX status Aleksandr Lyapunov
` (11 subsequent siblings)
12 siblings, 0 replies; 26+ messages in thread
From: Aleksandr Lyapunov @ 2020-09-08 10:22 UTC (permalink / raw)
To: tarantool-patches
Apart from other vinyl objects that are named with "vy_" prefix,
its transaction manager (tx_manager) have no such prefix.
It should have in order to avoid conflicts with global tx manager.
Needed for #4897
---
src/box/vinyl.c | 30 ++++++++++++++---------------
src/box/vy_scheduler.h | 2 +-
src/box/vy_tx.c | 51 +++++++++++++++++++++++++-------------------------
src/box/vy_tx.h | 33 ++++++++++++++++----------------
4 files changed, 59 insertions(+), 57 deletions(-)
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index 5fa3ea3..aa6e50f 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -98,7 +98,7 @@ struct vy_env {
/** Recovery status */
enum vy_status status;
/** TX manager */
- struct tx_manager *xm;
+ struct vy_tx_manager *xm;
/** Upsert squash queue */
struct vy_squash_queue *squash_queue;
/** Memory pool for index iterator. */
@@ -267,7 +267,7 @@ vy_info_append_regulator(struct vy_env *env, struct info_handler *h)
static void
vy_info_append_tx(struct vy_env *env, struct info_handler *h)
{
- struct tx_manager *xm = env->xm;
+ struct vy_tx_manager *xm = env->xm;
info_table_begin(h, "tx");
@@ -292,7 +292,7 @@ static void
vy_info_append_memory(struct vy_env *env, struct info_handler *h)
{
info_table_begin(h, "memory");
- info_append_int(h, "tx", tx_manager_mem_used(env->xm));
+ info_append_int(h, "tx", vy_tx_manager_mem_used(env->xm));
info_append_int(h, "level0", lsregion_used(&env->mem_env.allocator));
info_append_int(h, "tuple_cache", env->cache_env.mem_used);
info_append_int(h, "page_index", env->lsm_env.page_index_size);
@@ -509,7 +509,7 @@ vinyl_engine_memory_stat(struct engine *engine, struct engine_memory_stat *stat)
stat->index += env->lsm_env.bloom_size;
stat->index += env->lsm_env.page_index_size;
stat->cache += env->cache_env.mem_used;
- stat->tx += tx_manager_mem_used(env->xm);
+ stat->tx += vy_tx_manager_mem_used(env->xm);
}
static void
@@ -517,7 +517,7 @@ vinyl_engine_reset_stat(struct engine *engine)
{
struct vy_env *env = vy_env(engine);
- struct tx_manager *xm = env->xm;
+ struct vy_tx_manager *xm = env->xm;
memset(&xm->stat, 0, sizeof(xm->stat));
vy_scheduler_reset_stat(&env->scheduler);
@@ -999,7 +999,7 @@ vinyl_space_invalidate(struct space *space)
* request bail out early, without dereferencing the space.
*/
bool unused;
- tx_manager_abort_writers_for_ddl(env->xm, space, &unused);
+ vy_tx_manager_abort_writers_for_ddl(env->xm, space, &unused);
}
/** Argument passed to vy_check_format_on_replace(). */
@@ -1067,7 +1067,7 @@ vinyl_space_check_format(struct space *space, struct tuple_format *format)
* be checked with on_replace trigger so we abort them.
*/
bool need_wal_sync;
- tx_manager_abort_writers_for_ddl(env->xm, space, &need_wal_sync);
+ vy_tx_manager_abort_writers_for_ddl(env->xm, space, &need_wal_sync);
if (!need_wal_sync && vy_lsm_is_empty(pk))
return 0; /* space is empty, nothing to do */
@@ -2489,7 +2489,7 @@ static void
vinyl_engine_switch_to_ro(struct engine *engine)
{
struct vy_env *env = vy_env(engine);
- tx_manager_abort_writers_for_ro(env->xm);
+ vy_tx_manager_abort_writers_for_ro(env->xm);
}
/* }}} Public API of transaction control */
@@ -2576,7 +2576,7 @@ vy_env_new(const char *path, size_t memory,
goto error_path;
}
- e->xm = tx_manager_new();
+ e->xm = vy_tx_manager_new();
if (e->xm == NULL)
goto error_xm;
e->squash_queue = vy_squash_queue_new();
@@ -2612,7 +2612,7 @@ error_lsm_env:
vy_scheduler_destroy(&e->scheduler);
vy_squash_queue_delete(e->squash_queue);
error_squash_queue:
- tx_manager_delete(e->xm);
+ vy_tx_manager_delete(e->xm);
error_xm:
free(e->path);
error_path:
@@ -2626,7 +2626,7 @@ vy_env_delete(struct vy_env *e)
vy_regulator_destroy(&e->regulator);
vy_scheduler_destroy(&e->scheduler);
vy_squash_queue_delete(e->squash_queue);
- tx_manager_delete(e->xm);
+ vy_tx_manager_delete(e->xm);
free(e->path);
mempool_destroy(&e->iterator_pool);
vy_run_env_destroy(&e->run_env);
@@ -2885,7 +2885,7 @@ vinyl_engine_end_recovery(struct engine *engine)
/*
* During recovery we skip statements that have
* been dumped to disk - see vy_is_committed() -
- * so it may turn out that tx_manager::lsn stays
+ * so it may turn out that vy_tx_manager::lsn stays
* behind the instance vclock while we need it
* to be up-to-date once recovery is complete,
* because we use it while building an index to
@@ -3709,7 +3709,7 @@ vinyl_snapshot_iterator_free(struct snapshot_iterator *base)
struct vy_lsm *lsm = it->iterator.lsm;
struct vy_env *env = vy_env(lsm->base.engine);
vy_read_iterator_close(&it->iterator);
- tx_manager_destroy_read_view(env->xm, it->rv);
+ vy_tx_manager_destroy_read_view(env->xm, it->rv);
vy_lsm_unref(lsm);
free(it);
}
@@ -3729,7 +3729,7 @@ vinyl_index_create_snapshot_iterator(struct index *base)
it->base.next = vinyl_snapshot_iterator_next;
it->base.free = vinyl_snapshot_iterator_free;
- it->rv = tx_manager_read_view(env->xm);
+ it->rv = vy_tx_manager_read_view(env->xm);
if (it->rv == NULL) {
free(it);
return NULL;
@@ -4152,7 +4152,7 @@ vinyl_space_build_index(struct space *src_space, struct index *new_index,
* be checked with on_replace trigger so we abort them.
*/
bool need_wal_sync;
- tx_manager_abort_writers_for_ddl(env->xm, src_space, &need_wal_sync);
+ vy_tx_manager_abort_writers_for_ddl(env->xm, src_space, &need_wal_sync);
if (!need_wal_sync && vy_lsm_is_empty(pk))
return 0; /* space is empty, nothing to do */
diff --git a/src/box/vy_scheduler.h b/src/box/vy_scheduler.h
index 42e7b2f..f487b42 100644
--- a/src/box/vy_scheduler.h
+++ b/src/box/vy_scheduler.h
@@ -148,7 +148,7 @@ struct vy_scheduler {
* by the dump.
*/
vy_scheduler_dump_complete_f dump_complete_cb;
- /** List of read views, see tx_manager::read_views. */
+ /** List of read views, see vy_tx_manager::read_views. */
struct rlist *read_views;
/** Context needed for writing runs. */
struct vy_run_env *run_env;
diff --git a/src/box/vy_tx.c b/src/box/vy_tx.c
index 846c632..ff63cd7 100644
--- a/src/box/vy_tx.c
+++ b/src/box/vy_tx.c
@@ -98,13 +98,13 @@ vy_global_read_view_create(struct vy_read_view *rv, int64_t lsn)
rv->refs = 0;
}
-struct tx_manager *
-tx_manager_new(void)
+struct vy_tx_manager *
+vy_tx_manager_new(void)
{
- struct tx_manager *xm = calloc(1, sizeof(*xm));
+ struct vy_tx_manager *xm = calloc(1, sizeof(*xm));
if (xm == NULL) {
diag_set(OutOfMemory, sizeof(*xm),
- "malloc", "struct tx_manager");
+ "malloc", "struct vy_tx_manager");
return NULL;
}
@@ -128,7 +128,7 @@ tx_manager_new(void)
}
void
-tx_manager_delete(struct tx_manager *xm)
+vy_tx_manager_delete(struct vy_tx_manager *xm)
{
mempool_destroy(&xm->read_view_mempool);
mempool_destroy(&xm->read_interval_mempool);
@@ -138,7 +138,7 @@ tx_manager_delete(struct tx_manager *xm)
}
size_t
-tx_manager_mem_used(struct tx_manager *xm)
+vy_tx_manager_mem_used(struct vy_tx_manager *xm)
{
struct mempool_stats mstats;
size_t ret = 0;
@@ -157,7 +157,7 @@ tx_manager_mem_used(struct tx_manager *xm)
}
struct vy_read_view *
-tx_manager_read_view(struct tx_manager *xm)
+vy_tx_manager_read_view(struct vy_tx_manager *xm)
{
struct vy_read_view *rv;
/*
@@ -195,7 +195,8 @@ tx_manager_read_view(struct tx_manager *xm)
}
void
-tx_manager_destroy_read_view(struct tx_manager *xm, struct vy_read_view *rv)
+vy_tx_manager_destroy_read_view(struct vy_tx_manager *xm,
+ struct vy_read_view *rv)
{
if (rv == xm->p_global_read_view)
return;
@@ -209,7 +210,7 @@ tx_manager_destroy_read_view(struct tx_manager *xm, struct vy_read_view *rv)
static struct txv *
txv_new(struct vy_tx *tx, struct vy_lsm *lsm, struct vy_entry entry)
{
- struct tx_manager *xm = tx->xm;
+ struct vy_tx_manager *xm = tx->xm;
struct txv *v = mempool_alloc(&xm->txv_mempool);
if (v == NULL) {
diag_set(OutOfMemory, sizeof(*v), "mempool", "struct txv");
@@ -234,7 +235,7 @@ txv_new(struct vy_tx *tx, struct vy_lsm *lsm, struct vy_entry entry)
static void
txv_delete(struct txv *v)
{
- struct tx_manager *xm = v->tx->xm;
+ struct vy_tx_manager *xm = v->tx->xm;
xm->write_set_size -= tuple_size(v->entry.stmt);
vy_stmt_counter_unacct_tuple(&v->lsm->stat.txw.count, v->entry.stmt);
tuple_unref(v->entry.stmt);
@@ -248,7 +249,7 @@ txv_delete(struct txv *v)
static void
vy_read_interval_acct(struct vy_read_interval *interval)
{
- struct tx_manager *xm = interval->tx->xm;
+ struct vy_tx_manager *xm = interval->tx->xm;
xm->read_set_size += tuple_size(interval->left.stmt);
if (interval->left.stmt != interval->right.stmt)
xm->read_set_size += tuple_size(interval->right.stmt);
@@ -260,7 +261,7 @@ vy_read_interval_acct(struct vy_read_interval *interval)
static void
vy_read_interval_unacct(struct vy_read_interval *interval)
{
- struct tx_manager *xm = interval->tx->xm;
+ struct vy_tx_manager *xm = interval->tx->xm;
xm->read_set_size -= tuple_size(interval->left.stmt);
if (interval->left.stmt != interval->right.stmt)
xm->read_set_size -= tuple_size(interval->right.stmt);
@@ -271,7 +272,7 @@ vy_read_interval_new(struct vy_tx *tx, struct vy_lsm *lsm,
struct vy_entry left, bool left_belongs,
struct vy_entry right, bool right_belongs)
{
- struct tx_manager *xm = tx->xm;
+ struct vy_tx_manager *xm = tx->xm;
struct vy_read_interval *interval;
interval = mempool_alloc(&xm->read_interval_mempool);
if (interval == NULL) {
@@ -296,7 +297,7 @@ vy_read_interval_new(struct vy_tx *tx, struct vy_lsm *lsm,
static void
vy_read_interval_delete(struct vy_read_interval *interval)
{
- struct tx_manager *xm = interval->tx->xm;
+ struct vy_tx_manager *xm = interval->tx->xm;
vy_read_interval_unacct(interval);
vy_lsm_unref(interval->lsm);
tuple_unref(interval->left.stmt);
@@ -316,7 +317,7 @@ vy_tx_read_set_free_cb(vy_tx_read_set_t *read_set,
}
void
-vy_tx_create(struct tx_manager *xm, struct vy_tx *tx)
+vy_tx_create(struct vy_tx_manager *xm, struct vy_tx *tx)
{
tx->last_stmt_space = NULL;
stailq_create(&tx->log);
@@ -339,7 +340,7 @@ vy_tx_destroy(struct vy_tx *tx)
trigger_run(&tx->on_destroy, NULL);
trigger_destroy(&tx->on_destroy);
- tx_manager_destroy_read_view(tx->xm, tx->read_view);
+ vy_tx_manager_destroy_read_view(tx->xm, tx->read_view);
struct txv *v, *tmp;
stailq_foreach_entry_safe(v, tmp, &tx->log, next_in_log)
@@ -392,7 +393,7 @@ vy_tx_send_to_read_view(struct vy_tx *tx, struct txv *v)
/* already in (earlier) read view */
if (vy_tx_is_in_read_view(abort))
continue;
- struct vy_read_view *rv = tx_manager_read_view(tx->xm);
+ struct vy_read_view *rv = vy_tx_manager_read_view(tx->xm);
if (rv == NULL)
return -1;
abort->read_view = rv;
@@ -422,7 +423,7 @@ vy_tx_abort_readers(struct vy_tx *tx, struct txv *v)
}
struct vy_tx *
-vy_tx_begin(struct tx_manager *xm)
+vy_tx_begin(struct vy_tx_manager *xm)
{
struct vy_tx *tx = mempool_alloc(&xm->tx_mempool);
if (unlikely(tx == NULL)) {
@@ -662,7 +663,7 @@ vy_tx_handle_deferred_delete(struct vy_tx *tx, struct txv *v)
int
vy_tx_prepare(struct vy_tx *tx)
{
- struct tx_manager *xm = tx->xm;
+ struct vy_tx_manager *xm = tx->xm;
if (tx->state == VINYL_TX_ABORT) {
/* Conflict is already accounted - see vy_tx_abort(). */
@@ -793,7 +794,7 @@ void
vy_tx_commit(struct vy_tx *tx, int64_t lsn)
{
assert(tx->state == VINYL_TX_COMMIT);
- struct tx_manager *xm = tx->xm;
+ struct vy_tx_manager *xm = tx->xm;
xm->stat.commit++;
@@ -833,7 +834,7 @@ vy_tx_rollback_after_prepare(struct vy_tx *tx)
{
assert(tx->state == VINYL_TX_COMMIT);
- struct tx_manager *xm = tx->xm;
+ struct vy_tx_manager *xm = tx->xm;
/*
* There are two reasons of rollback_after_prepare:
@@ -878,7 +879,7 @@ vy_tx_rollback_after_prepare(struct vy_tx *tx)
void
vy_tx_rollback(struct vy_tx *tx)
{
- struct tx_manager *xm = tx->xm;
+ struct vy_tx_manager *xm = tx->xm;
xm->stat.rollback++;
@@ -1140,8 +1141,8 @@ vy_tx_set(struct vy_tx *tx, struct vy_lsm *lsm, struct tuple *stmt)
}
void
-tx_manager_abort_writers_for_ddl(struct tx_manager *xm, struct space *space,
- bool *need_wal_sync)
+vy_tx_manager_abort_writers_for_ddl(struct vy_tx_manager *xm,
+ struct space *space, bool *need_wal_sync)
{
*need_wal_sync = false;
if (space->index_count == 0)
@@ -1166,7 +1167,7 @@ tx_manager_abort_writers_for_ddl(struct tx_manager *xm, struct space *space,
}
void
-tx_manager_abort_writers_for_ro(struct tx_manager *xm)
+vy_tx_manager_abort_writers_for_ro(struct vy_tx_manager *xm)
{
struct vy_tx *tx;
rlist_foreach_entry(tx, &xm->writers, in_writers) {
diff --git a/src/box/vy_tx.h b/src/box/vy_tx.h
index 3144c92..4fac5f6 100644
--- a/src/box/vy_tx.h
+++ b/src/box/vy_tx.h
@@ -54,7 +54,7 @@ extern "C" {
struct space;
struct tuple;
-struct tx_manager;
+struct vy_tx_manager;
struct vy_mem;
struct vy_tx;
struct vy_history;
@@ -140,16 +140,16 @@ write_set_search_key(write_set_t *tree, struct vy_lsm *lsm,
/** Transaction object. */
struct vy_tx {
- /** Link in tx_manager::writers. */
+ /** Link in vy_tx_manager::writers. */
struct rlist in_writers;
/** Transaction manager. */
- struct tx_manager *xm;
+ struct vy_tx_manager *xm;
/**
* Pointer to the space affected by the last prepared statement.
* We need it so that we can abort a transaction on DDL even
* if it hasn't inserted anything into the write set yet (e.g.
* yielded on unique check) and therefore would otherwise be
- * ignored by tx_manager_abort_writers_for_ddl().
+ * ignored by vy_tx_manager_abort_writers_for_ddl().
*/
struct space *last_stmt_space;
/**
@@ -209,7 +209,7 @@ vy_tx_read_view(struct vy_tx *tx)
}
/** Transaction manager object. */
-struct tx_manager {
+struct vy_tx_manager {
/**
* The last committed log sequence number known to
* vinyl. Updated in vy_commit().
@@ -278,24 +278,25 @@ struct tx_manager {
};
/** Allocate a tx manager object. */
-struct tx_manager *
-tx_manager_new(void);
+struct vy_tx_manager *
+vy_tx_manager_new(void);
/** Delete a tx manager object. */
void
-tx_manager_delete(struct tx_manager *xm);
+vy_tx_manager_delete(struct vy_tx_manager *xm);
/** Return total amount of memory used by active transactions. */
size_t
-tx_manager_mem_used(struct tx_manager *xm);
+vy_tx_manager_mem_used(struct vy_tx_manager *xm);
/** Create or reuse an instance of a read view. */
struct vy_read_view *
-tx_manager_read_view(struct tx_manager *xm);
+vy_tx_manager_read_view(struct vy_tx_manager *xm);
/** Dereference and possibly destroy a read view. */
void
-tx_manager_destroy_read_view(struct tx_manager *xm, struct vy_read_view *rv);
+vy_tx_manager_destroy_read_view(struct vy_tx_manager *xm,
+ struct vy_read_view *rv);
/**
* Abort all rw transactions that affect the given space
@@ -307,19 +308,19 @@ tx_manager_destroy_read_view(struct tx_manager *xm, struct vy_read_view *rv);
* to call wal_sync() to flush them.
*/
void
-tx_manager_abort_writers_for_ddl(struct tx_manager *xm, struct space *space,
- bool *need_wal_sync);
+vy_tx_manager_abort_writers_for_ddl(struct vy_tx_manager *xm,
+ struct space *space, bool *need_wal_sync);
/**
* Abort all local rw transactions that haven't reached WAL yet.
* Called before switching to read-only mode.
*/
void
-tx_manager_abort_writers_for_ro(struct tx_manager *xm);
+vy_tx_manager_abort_writers_for_ro(struct vy_tx_manager *xm);
/** Initialize a tx object. */
void
-vy_tx_create(struct tx_manager *xm, struct vy_tx *tx);
+vy_tx_create(struct vy_tx_manager *xm, struct vy_tx *tx);
/** Destroy a tx object. */
void
@@ -327,7 +328,7 @@ vy_tx_destroy(struct vy_tx *tx);
/** Begin a new transaction. */
struct vy_tx *
-vy_tx_begin(struct tx_manager *xm);
+vy_tx_begin(struct vy_tx_manager *xm);
/** Prepare a transaction to be committed. */
int
--
2.7.4
^ permalink raw reply [flat|nested] 26+ messages in thread
* [Tarantool-patches] [PATCH v4 02/12] txm: add TX status
2020-09-08 10:22 [Tarantool-patches] [PATCH v4 00/12] Transaction engine for memtx engine Aleksandr Lyapunov
2020-09-08 10:22 ` [Tarantool-patches] [PATCH v4 01/12] vinyl: rename tx_manager -> vy_tx_manager Aleksandr Lyapunov
@ 2020-09-08 10:22 ` Aleksandr Lyapunov
2020-09-08 10:22 ` [Tarantool-patches] [PATCH v4 03/12] txm: save does_require_old_tuple flag in txn_stmt Aleksandr Lyapunov
` (10 subsequent siblings)
12 siblings, 0 replies; 26+ messages in thread
From: Aleksandr Lyapunov @ 2020-09-08 10:22 UTC (permalink / raw)
To: tarantool-patches
Transaction engine (see further commits) needs to distinguish and
maniputate transactions by their status. The status describe the
lifetime point of a transaction (inprogress, prepared, committed)
and its abilities (conflicted, read view).
Part of #4897
Part of #5108
---
src/box/txn.c | 5 +++++
src/box/txn.h | 36 ++++++++++++++++++++++++++++++++++++
2 files changed, 41 insertions(+)
diff --git a/src/box/txn.c b/src/box/txn.c
index b2d3423..093808a 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -224,6 +224,7 @@ txn_begin(void)
txn->flags = 0;
txn->in_sub_stmt = 0;
txn->id = ++tsn;
+ txn->status = TXN_INPROGRESS;
txn->signature = TXN_SIGNATURE_ROLLBACK;
txn->engine = NULL;
txn->engine_tx = NULL;
@@ -449,6 +450,7 @@ txn_complete(struct txn *txn)
* IPROTO_NOP or IPROTO_CONFIRM statements.
*/
if (txn->signature < 0) {
+ txn->status = TXN_ABORTED;
/* Undo the transaction. */
struct txn_stmt *stmt;
stailq_reverse(&txn->stmts);
@@ -459,6 +461,7 @@ txn_complete(struct txn *txn)
if (txn_has_flag(txn, TXN_HAS_TRIGGERS))
txn_run_rollback_triggers(txn, &txn->on_rollback);
} else if (!txn_has_flag(txn, TXN_WAIT_SYNC)) {
+ txn->status = TXN_COMMITTED;
/* Commit the transaction. */
if (txn->engine != NULL)
engine_commit(txn->engine, txn);
@@ -656,6 +659,7 @@ txn_prepare(struct txn *txn)
trigger_clear(&txn->fiber_on_yield);
txn->start_tm = ev_monotonic_now(loop());
+ txn->status = TXN_PREPARED;
return 0;
}
@@ -872,6 +876,7 @@ void
txn_rollback(struct txn *txn)
{
assert(txn == in_txn());
+ txn->status = TXN_ABORTED;
trigger_clear(&txn->fiber_on_stop);
if (!txn_has_flag(txn, TXN_CAN_YIELD))
trigger_clear(&txn->fiber_on_yield);
diff --git a/src/box/txn.h b/src/box/txn.h
index 36b1a03..35e1ab9 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -121,6 +121,40 @@ enum {
};
/**
+ * Status of a transaction.
+ */
+enum txn_status {
+ /**
+ * Initial state of TX. The only state of a TX that allowed to do
+ * read or write actions.
+ */
+ TXN_INPROGRESS,
+ /**
+ * The TX have passed conflict checks and is ready to be committed.
+ */
+ TXN_PREPARED,
+ /**
+ * The TX was aborted when other TX was committed due to conflict.
+ */
+ TXN_CONFLICTED,
+ /**
+ * The TX was read_only, has a conflict and was sent to read view.
+ * Read-only and does not participate in conflict resolution ever more.
+ * This transaction can only see a state of the database at some fixed
+ * point in the past.
+ */
+ TXN_IN_READ_VIEW,
+ /**
+ * The TX was committed.
+ */
+ TXN_COMMITTED,
+ /**
+ * The TX was aborted.
+ */
+ TXN_ABORTED,
+};
+
+/**
* A single statement of a multi-statement
* transaction: undo and redo info.
*/
@@ -217,6 +251,8 @@ struct txn {
* Valid IDs start from 1.
*/
int64_t id;
+ /** Status of the TX */
+ enum txn_status status;
/** List of statements in a transaction. */
struct stailq stmts;
/** Number of new rows without an assigned LSN. */
--
2.7.4
^ permalink raw reply [flat|nested] 26+ messages in thread
* [Tarantool-patches] [PATCH v4 03/12] txm: save does_require_old_tuple flag in txn_stmt
2020-09-08 10:22 [Tarantool-patches] [PATCH v4 00/12] Transaction engine for memtx engine Aleksandr Lyapunov
2020-09-08 10:22 ` [Tarantool-patches] [PATCH v4 01/12] vinyl: rename tx_manager -> vy_tx_manager Aleksandr Lyapunov
2020-09-08 10:22 ` [Tarantool-patches] [PATCH v4 02/12] txm: add TX status Aleksandr Lyapunov
@ 2020-09-08 10:22 ` Aleksandr Lyapunov
2020-09-08 10:22 ` [Tarantool-patches] [PATCH v4 04/12] txm: introduce prepare sequence number Aleksandr Lyapunov
` (9 subsequent siblings)
12 siblings, 0 replies; 26+ messages in thread
From: Aleksandr Lyapunov @ 2020-09-08 10:22 UTC (permalink / raw)
To: tarantool-patches
That flag is needed for transactional conflict manager - if any
other transaction commits a replacement of old_tuple before
current one and the flag is set - the current transaction will
be aborted.
For example REPLACE just replaces a key, no matter what tuple
lays in the index and thus does_require_old_tuple = false.
In contrast, UPDATE makes new tuple using old_tuple and thus
the statement will require old_tuple (does_require_old_tuple = true).
INSERT also does_require_old_tuple = true because it requires
old_tuple to be NULL.
Part of #4897
---
src/box/memtx_space.c | 17 +++++++++++++++++
src/box/txn.c | 53 ++++++++++++++++++++++++++++++++++-----------------
src/box/txn.h | 13 +++++++++++++
3 files changed, 65 insertions(+), 18 deletions(-)
diff --git a/src/box/memtx_space.c b/src/box/memtx_space.c
index d30ce44..7624130 100644
--- a/src/box/memtx_space.c
+++ b/src/box/memtx_space.c
@@ -316,6 +316,10 @@ memtx_space_execute_replace(struct space *space, struct txn *txn,
if (stmt->new_tuple == NULL)
return -1;
tuple_ref(stmt->new_tuple);
+
+ if (mode == DUP_INSERT)
+ stmt->does_require_old_tuple = true;
+
if (memtx_space->replace(space, NULL, stmt->new_tuple,
mode, &stmt->old_tuple) != 0)
return -1;
@@ -342,6 +346,13 @@ memtx_space_execute_delete(struct space *space, struct txn *txn,
struct tuple *old_tuple;
if (index_get(pk, key, part_count, &old_tuple) != 0)
return -1;
+
+ /*
+ * We have to delete exactly old_tuple just because we return it as
+ * a result.
+ */
+ stmt->does_require_old_tuple = true;
+
if (old_tuple != NULL &&
memtx_space->replace(space, old_tuple, NULL,
DUP_REPLACE_OR_INSERT, &stmt->old_tuple) != 0)
@@ -390,6 +401,9 @@ memtx_space_execute_update(struct space *space, struct txn *txn,
if (stmt->new_tuple == NULL)
return -1;
tuple_ref(stmt->new_tuple);
+
+ stmt->does_require_old_tuple = true;
+
if (memtx_space->replace(space, old_tuple, stmt->new_tuple,
DUP_REPLACE, &stmt->old_tuple) != 0)
return -1;
@@ -496,6 +510,9 @@ memtx_space_execute_upsert(struct space *space, struct txn *txn,
stmt->new_tuple = NULL;
}
}
+
+ stmt->does_require_old_tuple = true;
+
/*
* It's OK to use DUP_REPLACE_OR_INSERT: we don't risk
* inserting a new tuple if the old one exists, since
diff --git a/src/box/txn.c b/src/box/txn.c
index 093808a..d9fbb51 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -109,6 +109,7 @@ txn_stmt_new(struct region *region)
stmt->engine_savepoint = NULL;
stmt->row = NULL;
stmt->has_triggers = false;
+ stmt->does_require_old_tuple = false;
return stmt;
}
@@ -351,27 +352,43 @@ txn_commit_stmt(struct txn *txn, struct request *request)
* - perhaps we should run triggers even for deletes which
* doesn't find any rows
*/
- if (stmt->space != NULL && !rlist_empty(&stmt->space->on_replace) &&
- stmt->space->run_triggers && (stmt->old_tuple || stmt->new_tuple)) {
- int rc = 0;
- if(!space_is_temporary(stmt->space)) {
- rc = trigger_run(&stmt->space->on_replace, txn);
- } else {
+ if (stmt->space != NULL && stmt->space->run_triggers &&
+ (stmt->old_tuple || stmt->new_tuple)) {
+ if (!rlist_empty(&stmt->space->before_replace)) {
/*
- * There is no row attached to txn_stmt for
- * temporary spaces, since DML operations on them
- * are not written to WAL. Fake a row to pass operation
- * type to lua on_replace triggers.
+ * Triggers see old_tuple and that tuple
+ * must remain the same
*/
- assert(stmt->row == NULL);
- struct xrow_header temp_header;
- temp_header.type = request->type;
- stmt->row = &temp_header;
- rc = trigger_run(&stmt->space->on_replace, txn);
- stmt->row = NULL;
+ stmt->does_require_old_tuple = true;
+ }
+ if (!rlist_empty(&stmt->space->on_replace)) {
+ /*
+ * Triggers see old_tuple and that tuple
+ * must remain the same
+ */
+ stmt->does_require_old_tuple = true;
+
+ int rc = 0;
+ if(!space_is_temporary(stmt->space)) {
+ rc = trigger_run(&stmt->space->on_replace, txn);
+ } else {
+ /*
+ * There is no row attached to txn_stmt for
+ * temporary spaces, since DML operations on
+ * them are not written to WAL.
+ * Fake a row to pass operation type to lua
+ * on_replace triggers.
+ */
+ assert(stmt->row == NULL);
+ struct xrow_header temp_header;
+ temp_header.type = request->type;
+ stmt->row = &temp_header;
+ rc = trigger_run(&stmt->space->on_replace, txn);
+ stmt->row = NULL;
+ }
+ if (rc != 0)
+ goto fail;
}
- if (rc != 0)
- goto fail;
}
--txn->in_sub_stmt;
return 0;
diff --git a/src/box/txn.h b/src/box/txn.h
index 35e1ab9..007284f 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -175,6 +175,19 @@ struct txn_stmt {
struct xrow_header *row;
/** on_commit and/or on_rollback list is not empty. */
bool has_triggers;
+ /**
+ * Whether the stmt requires to replace exactly old_tuple (member).
+ * That flag is needed for transactional conflict manager - if any
+ * other transaction commits a replacement of old_tuple before current
+ * one and the flag is set - the current transaction will be aborted.
+ * For example REPLACE just replaces a key, no matter what tuple
+ * lays in the index and thus does_require_old_tuple = false.
+ * In contrast, UPDATE makes new tuple using old_tuple and thus
+ * the statement will require old_tuple (does_require_old_tuple = true).
+ * INSERT also does_require_old_tuple = true because it requires
+ * old_tuple to be NULL.
+ */
+ bool does_require_old_tuple;
/** Commit/rollback triggers associated with this statement. */
struct rlist on_commit;
struct rlist on_rollback;
--
2.7.4
^ permalink raw reply [flat|nested] 26+ messages in thread
* [Tarantool-patches] [PATCH v4 04/12] txm: introduce prepare sequence number
2020-09-08 10:22 [Tarantool-patches] [PATCH v4 00/12] Transaction engine for memtx engine Aleksandr Lyapunov
` (2 preceding siblings ...)
2020-09-08 10:22 ` [Tarantool-patches] [PATCH v4 03/12] txm: save does_require_old_tuple flag in txn_stmt Aleksandr Lyapunov
@ 2020-09-08 10:22 ` Aleksandr Lyapunov
2020-09-08 10:22 ` [Tarantool-patches] [PATCH v4 05/12] txm: introduce memtx tx manager Aleksandr Lyapunov
` (8 subsequent siblings)
12 siblings, 0 replies; 26+ messages in thread
From: Aleksandr Lyapunov @ 2020-09-08 10:22 UTC (permalink / raw)
To: tarantool-patches
Prepare sequence number is a monotonically increasing ID that is
assigned to any prepared transaction. This ID is suitable for
serialization order resolution: the bigger is ID - the later the
transaction exists in the serialization order of transactions.
Note that id of transactions has quite different order in case
when transaction could yield - an younger (bigger id) transaction
can prepare/commit first (lower psn) while older tx sleeps in vain.
Also it should be mentioned that LSN has the same order as PSN,
but it has two general differences:
1. The LSN sequence has no holes, i.e. it is a natural number
sequence. This property is useless for transaction engine.
2. The LSN sequence is provided by WAL writer and thus LSN is not
available for TX thas was prepared and haven't been committed yet.
That feature makes psn more suitable sequence for transactions as
it allows to order prepared but not committed transaction and
allows, for example, to create a read view between prepared
transactions.
Part of #4897
---
src/box/txn.c | 6 ++++++
src/box/txn.h | 8 ++++++++
2 files changed, 14 insertions(+)
diff --git a/src/box/txn.c b/src/box/txn.c
index d9fbb51..1dfe59f 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -40,6 +40,9 @@
double too_long_threshold;
+/** Last prepare-sequence-number that was assigned to prepared TX. */
+int64_t txn_last_psn = 0;
+
/* Txn cache. */
static struct stailq txn_cache = {NULL, &txn_cache.first};
@@ -225,6 +228,7 @@ txn_begin(void)
txn->flags = 0;
txn->in_sub_stmt = 0;
txn->id = ++tsn;
+ txn->psn = 0;
txn->status = TXN_INPROGRESS;
txn->signature = TXN_SIGNATURE_ROLLBACK;
txn->engine = NULL;
@@ -648,6 +652,8 @@ txn_journal_entry_new(struct txn *txn)
static int
txn_prepare(struct txn *txn)
{
+ txn->psn = ++txn_last_psn;
+
if (txn_has_flag(txn, TXN_IS_ABORTED_BY_YIELD)) {
assert(!txn_has_flag(txn, TXN_CAN_YIELD));
diag_set(ClientError, ER_TRANSACTION_YIELD);
diff --git a/src/box/txn.h b/src/box/txn.h
index 007284f..d7e77e5 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -44,6 +44,9 @@ extern "C" {
/** box statistics */
extern struct rmean *rmean_box;
+/** Last prepare-sequence-number that was assigned to prepared TX. */
+extern int64_t txn_last_psn;
+
struct journal_entry;
struct engine;
struct space;
@@ -264,6 +267,11 @@ struct txn {
* Valid IDs start from 1.
*/
int64_t id;
+ /**
+ * A sequential ID that is assigned when the TX becomes prepared.
+ * Transactions are committed in that order.
+ */
+ int64_t psn;
/** Status of the TX */
enum txn_status status;
/** List of statements in a transaction. */
--
2.7.4
^ permalink raw reply [flat|nested] 26+ messages in thread
* [Tarantool-patches] [PATCH v4 05/12] txm: introduce memtx tx manager
2020-09-08 10:22 [Tarantool-patches] [PATCH v4 00/12] Transaction engine for memtx engine Aleksandr Lyapunov
` (3 preceding siblings ...)
2020-09-08 10:22 ` [Tarantool-patches] [PATCH v4 04/12] txm: introduce prepare sequence number Aleksandr Lyapunov
@ 2020-09-08 10:22 ` Aleksandr Lyapunov
2020-09-08 10:22 ` [Tarantool-patches] [PATCH v4 06/12] txm: introduce conflict tracker Aleksandr Lyapunov
` (7 subsequent siblings)
12 siblings, 0 replies; 26+ messages in thread
From: Aleksandr Lyapunov @ 2020-09-08 10:22 UTC (permalink / raw)
To: tarantool-patches
Define memtx TX manager. It will store data for MVCC and conflict
manager. Define also 'memtx_use_mvcc_engine' in config that
enables that MVCC engine.
Part of #4897
---
src/box/CMakeLists.txt | 1 +
src/box/lua/load_cfg.lua | 2 ++
src/box/memtx_tx.c | 52 +++++++++++++++++++++++++++++++++++
src/box/memtx_tx.h | 61 +++++++++++++++++++++++++++++++++++++++++
src/main.cc | 5 ++++
test/app-tap/init_script.result | 1 +
test/box/admin.result | 2 ++
test/box/cfg.result | 4 +++
8 files changed, 128 insertions(+)
create mode 100644 src/box/memtx_tx.c
create mode 100644 src/box/memtx_tx.h
diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt
index b8b2689..2ed7270 100644
--- a/src/box/CMakeLists.txt
+++ b/src/box/CMakeLists.txt
@@ -128,6 +128,7 @@ add_library(box STATIC
memtx_tree.c
memtx_rtree.c
memtx_bitset.c
+ memtx_tx.c
engine.c
memtx_engine.c
memtx_space.c
diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
index 53f5728..92347a9 100644
--- a/src/box/lua/load_cfg.lua
+++ b/src/box/lua/load_cfg.lua
@@ -82,6 +82,7 @@ local default_cfg = {
coredump = false,
read_only = false,
hot_standby = false,
+ memtx_use_mvcc_engine = false,
checkpoint_interval = 3600,
checkpoint_wal_threshold = 1e18,
checkpoint_count = 2,
@@ -162,6 +163,7 @@ local template_cfg = {
checkpoint_count = 'number',
read_only = 'boolean',
hot_standby = 'boolean',
+ memtx_use_mvcc_engine = 'boolean',
worker_pool_threads = 'number',
replication_timeout = 'number',
replication_sync_lag = 'number',
diff --git a/src/box/memtx_tx.c b/src/box/memtx_tx.c
new file mode 100644
index 0000000..479aa48
--- /dev/null
+++ b/src/box/memtx_tx.c
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include "memtx_tx.h"
+
+struct tx_manager
+{
+};
+
+/** That's a definition, see declaration for description. */
+bool memtx_tx_manager_use_mvcc_engine = false;
+
+/** The one and only instance of tx_manager. */
+static struct tx_manager txm;
+
+void
+memtx_tx_manager_init()
+{
+ (void)txm;
+}
+
+void
+memtx_tx_manager_free()
+{
+}
diff --git a/src/box/memtx_tx.h b/src/box/memtx_tx.h
new file mode 100644
index 0000000..fb2cb4d
--- /dev/null
+++ b/src/box/memtx_tx.h
@@ -0,0 +1,61 @@
+#pragma once
+/*
+ * Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#include <stdbool.h>
+
+#if defined(__cplusplus)
+extern "C" {
+#endif /* defined(__cplusplus) */
+
+/**
+ * Global flag that enables mvcc engine.
+ * If set, memtx starts to apply statements through txm history mechanism
+ * and tx manager itself transaction reads in order to detect conflicts.
+ */
+extern bool memtx_tx_manager_use_mvcc_engine;
+
+/**
+ * Initialize memtx transaction manager.
+ */
+void
+memtx_tx_manager_init();
+
+/**
+ * Free resources of memtx transaction manager.
+ */
+void
+memtx_tx_manager_free();
+
+
+#if defined(__cplusplus)
+} /* extern "C" */
+#endif /* defined(__cplusplus) */
diff --git a/src/main.cc b/src/main.cc
index 65b1606..2f48f47 100644
--- a/src/main.cc
+++ b/src/main.cc
@@ -75,6 +75,7 @@
#include <libutil.h>
#include "box/lua/init.h" /* box_lua_init() */
#include "box/session.h"
+#include "box/memtx_tx.h"
#include "systemd.h"
#include "crypto/crypto.h"
#include "core/popen.h"
@@ -569,6 +570,8 @@ load_cfg(void)
log_format,
background);
+ memtx_tx_manager_use_mvcc_engine = cfg_getb("memtx_use_mvcc_engine");
+
if (background)
daemonize();
@@ -667,6 +670,7 @@ tarantool_free(void)
random_free();
#endif
crypto_free();
+ memtx_tx_manager_free();
coll_free();
systemd_free();
say_logger_free();
@@ -830,6 +834,7 @@ main(int argc, char **argv)
signal_init();
cbus_init();
coll_init();
+ memtx_tx_manager_init();
crypto_init();
systemd_init();
tarantool_lua_init(tarantool_bin, main_argc, main_argv);
diff --git a/test/app-tap/init_script.result b/test/app-tap/init_script.result
index 857f0c9..c8974d7 100644
--- a/test/app-tap/init_script.result
+++ b/test/app-tap/init_script.result
@@ -21,6 +21,7 @@ memtx_dir:.
memtx_max_tuple_size:1048576
memtx_memory:107374182
memtx_min_tuple_size:16
+memtx_use_mvcc_engine:false
net_msg_max:768
pid_file:box.pid
read_only:false
diff --git a/test/box/admin.result b/test/box/admin.result
index ab3e80a..d1540a7 100644
--- a/test/box/admin.result
+++ b/test/box/admin.result
@@ -63,6 +63,8 @@ cfg_filter(box.cfg)
- 107374182
- - memtx_min_tuple_size
- <hidden>
+ - - memtx_use_mvcc_engine
+ - false
- - net_msg_max
- 768
- - pid_file
diff --git a/test/box/cfg.result b/test/box/cfg.result
index bdd210b..fcfc64b 100644
--- a/test/box/cfg.result
+++ b/test/box/cfg.result
@@ -51,6 +51,8 @@ cfg_filter(box.cfg)
| - 107374182
| - - memtx_min_tuple_size
| - <hidden>
+ | - - memtx_use_mvcc_engine
+ | - false
| - - net_msg_max
| - 768
| - - pid_file
@@ -158,6 +160,8 @@ cfg_filter(box.cfg)
| - 107374182
| - - memtx_min_tuple_size
| - <hidden>
+ | - - memtx_use_mvcc_engine
+ | - false
| - - net_msg_max
| - 768
| - - pid_file
--
2.7.4
^ permalink raw reply [flat|nested] 26+ messages in thread
* [Tarantool-patches] [PATCH v4 06/12] txm: introduce conflict tracker
2020-09-08 10:22 [Tarantool-patches] [PATCH v4 00/12] Transaction engine for memtx engine Aleksandr Lyapunov
` (4 preceding siblings ...)
2020-09-08 10:22 ` [Tarantool-patches] [PATCH v4 05/12] txm: introduce memtx tx manager Aleksandr Lyapunov
@ 2020-09-08 10:22 ` Aleksandr Lyapunov
2020-09-14 16:36 ` Nikita Pettik
2020-09-08 10:22 ` [Tarantool-patches] [PATCH v4 07/12] txm: introduce memtx_story Aleksandr Lyapunov
` (6 subsequent siblings)
12 siblings, 1 reply; 26+ messages in thread
From: Aleksandr Lyapunov @ 2020-09-08 10:22 UTC (permalink / raw)
To: tarantool-patches
There are situations when we have to track that if some TX is
committed then some others must be aborted due to conflict.
The common case is that one r/w TX have read some value while the
second is about to overwrite the value; if the second is committed,
the first must be aborted.
Thus we have to store many-to-many TX relations between breaker
TX and victim TX.
The patch implements that.
Part of #4897
---
src/box/memtx_tx.c | 88 +++++++++++++++++++++++++++++++++++++++++++++++++++++-
src/box/memtx_tx.h | 38 +++++++++++++++++++++++
src/box/txn.c | 63 ++++++++++++++++++++++++++++++++++++++
src/box/txn.h | 21 +++++++++++++
4 files changed, 209 insertions(+), 1 deletion(-)
diff --git a/src/box/memtx_tx.c b/src/box/memtx_tx.c
index 479aa48..eb2ab51 100644
--- a/src/box/memtx_tx.c
+++ b/src/box/memtx_tx.c
@@ -30,8 +30,20 @@
*/
#include "memtx_tx.h"
+#include <assert.h>
+#include <stddef.h>
+#include <stdint.h>
+
+#include "txn.h"
+
struct tx_manager
{
+ /**
+ * List of all transactions that are in a read view.
+ * New transactions are added to the tail of this list,
+ * so the list is ordered by rv_psn.
+ */
+ struct rlist read_view_txs;
};
/** That's a definition, see declaration for description. */
@@ -43,10 +55,84 @@ static struct tx_manager txm;
void
memtx_tx_manager_init()
{
- (void)txm;
+ rlist_create(&txm.read_view_txs);
}
void
memtx_tx_manager_free()
{
}
+
+int
+memtx_tx_cause_conflict(struct txn *breaker, struct txn *victim)
+{
+ struct tx_conflict_tracker *tracker = NULL;
+ struct rlist *r1 = breaker->conflict_list.next;
+ struct rlist *r2 = victim->conflicted_by_list.next;
+ while (r1 != &breaker->conflict_list &&
+ r2 != &victim->conflicted_by_list) {
+ tracker = rlist_entry(r1, struct tx_conflict_tracker,
+ in_conflict_list);
+ assert(tracker->breaker == breaker);
+ if (tracker->victim == victim)
+ break;
+ tracker = rlist_entry(r2, struct tx_conflict_tracker,
+ in_conflicted_by_list);
+ assert(tracker->victim == victim);
+ if (tracker->breaker == breaker)
+ break;
+ tracker = NULL;
+ r1 = r1->next;
+ r2 = r2->next;
+ }
+ if (tracker != NULL) {
+ /*
+ * Move to the beginning of a list
+ * for a case of subsequent lookups.
+ */
+ rlist_del(&tracker->in_conflict_list);
+ rlist_del(&tracker->in_conflicted_by_list);
+ } else {
+ size_t size;
+ tracker = region_alloc_object(&victim->region,
+ struct tx_conflict_tracker,
+ &size);
+ if (tracker == NULL) {
+ diag_set(OutOfMemory, size, "tx region",
+ "conflict_tracker");
+ return -1;
+ }
+ tracker->breaker = breaker;
+ tracker->victim = victim;
+ }
+ rlist_add(&breaker->conflict_list, &tracker->in_conflict_list);
+ rlist_add(&victim->conflicted_by_list, &tracker->in_conflicted_by_list);
+ return 0;
+}
+
+/**
+ * Handle conflict when @breaker transaction is prepared.
+ * The conflict is happened if @victim have read something that @breaker
+ * overwrites.
+ * If @victim is read-only or haven't made any changes, it should be send
+ * to read view, in which is will not see @breaker.
+ * Otherwise @vistim must be marked as conflicted.
+ */
+void
+memtx_tx_handle_conflict(struct txn *breaker, struct txn *victim)
+{
+ assert(breaker->psn != 0);
+ if (victim->status != TXN_INPROGRESS) {
+ /* Was conflicted by somebody else. */
+ return;
+ }
+ if (stailq_empty(&victim->stmts)) {
+ /* Send to read view. */
+ victim->status = TXN_IN_READ_VIEW;
+ victim->rv_psn = breaker->psn;
+ rlist_add_tail(&txm.read_view_txs, &victim->in_read_view_txs);
+ } else {
+ /* Mark as conflicted. */
+ victim->status = TXN_CONFLICTED;
+ }
+}
diff --git a/src/box/memtx_tx.h b/src/box/memtx_tx.h
index fb2cb4d..6143a22 100644
--- a/src/box/memtx_tx.h
+++ b/src/box/memtx_tx.h
@@ -32,6 +32,8 @@
#include <stdbool.h>
+#include "small/rlist.h"
+
#if defined(__cplusplus)
extern "C" {
#endif /* defined(__cplusplus) */
@@ -44,6 +46,21 @@ extern "C" {
extern bool memtx_tx_manager_use_mvcc_engine;
/**
+ * Record that links two transactions, breaker and victim.
+ * See memtx_tx_cause_conflict for details.
+ */
+struct tx_conflict_tracker {
+ /** TX that aborts victim on commit. */
+ struct txn *breaker;
+ /** TX that will be aborted on breaker's commit. */
+ struct txn *victim;
+ /** Link in breaker->conflict_list. */
+ struct rlist in_conflict_list;
+ /** Link in victim->conflicted_by_list. */
+ struct rlist in_conflicted_by_list;
+};
+
+/**
* Initialize memtx transaction manager.
*/
void
@@ -55,6 +72,27 @@ memtx_tx_manager_init();
void
memtx_tx_manager_free();
+/**
+ * Notify TX manager that if transaction @breaker is committed then the
+ * transaction @victim must be aborted due to conflict.
+ * For example: there's two rw transaction in progress, one have read
+ * some value while the second is about to overwrite it. If the second
+ * is committed first, the first must be aborted.
+ * @return 0 on success, -1 on memory error.
+ */
+int
+memtx_tx_cause_conflict(struct txn *breaker, struct txn *victim);
+
+/**
+ * Handle conflict when @breaker transaction is prepared.
+ * The conflict is happened if @victim have read something that @breaker
+ * overwrites.
+ * If @victim is read-only or haven't made any changes, it should be sent
+ * to read view, in which is will not see @breaker.
+ * Otherwise @victim must be marked as conflicted.
+ */
+void
+memtx_tx_handle_conflict(struct txn *breaker, struct txn *victim);
#if defined(__cplusplus)
} /* extern "C" */
diff --git a/src/box/txn.c b/src/box/txn.c
index 1dfe59f..976e17c 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -29,6 +29,7 @@
* SUCH DAMAGE.
*/
#include "txn.h"
+#include "memtx_tx.h"
#include "txn_limbo.h"
#include "engine.h"
#include "tuple.h"
@@ -193,6 +194,9 @@ txn_new(void)
}
assert(region_used(®ion) == sizeof(*txn));
txn->region = region;
+ rlist_create(&txn->conflict_list);
+ rlist_create(&txn->conflicted_by_list);
+ rlist_create(&txn->in_read_view_txs);
return txn;
}
@@ -202,6 +206,22 @@ txn_new(void)
inline static void
txn_free(struct txn *txn)
{
+ struct tx_conflict_tracker *entry, *next;
+ rlist_foreach_entry_safe(entry, &txn->conflict_list,
+ in_conflict_list, next) {
+ rlist_del(&entry->in_conflict_list);
+ rlist_del(&entry->in_conflicted_by_list);
+ }
+ rlist_foreach_entry_safe(entry, &txn->conflicted_by_list,
+ in_conflicted_by_list, next) {
+ rlist_del(&entry->in_conflict_list);
+ rlist_del(&entry->in_conflicted_by_list);
+ }
+ assert(rlist_empty(&txn->conflict_list));
+ assert(rlist_empty(&txn->conflicted_by_list));
+
+ rlist_del(&txn->in_read_view_txs);
+
struct txn_stmt *stmt;
stailq_foreach_entry(stmt, &txn->stmts, next)
txn_stmt_destroy(stmt);
@@ -219,6 +239,8 @@ txn_begin(void)
struct txn *txn = txn_new();
if (txn == NULL)
return NULL;
+ assert(rlist_empty(&txn->conflict_list));
+ assert(rlist_empty(&txn->conflicted_by_list));
/* Initialize members explicitly to save time on memset() */
stailq_create(&txn->stmts);
@@ -229,6 +251,7 @@ txn_begin(void)
txn->in_sub_stmt = 0;
txn->id = ++tsn;
txn->psn = 0;
+ txn->rv_psn = 0;
txn->status = TXN_INPROGRESS;
txn->signature = TXN_SIGNATURE_ROLLBACK;
txn->engine = NULL;
@@ -277,6 +300,15 @@ txn_begin_stmt(struct txn *txn, struct space *space)
diag_set(ClientError, ER_SUB_STMT_MAX);
return -1;
}
+
+ /*
+ * A conflict have happened; there is no reason to continue the TX.
+ */
+ if (txn->status == TXN_CONFLICTED) {
+ diag_set(ClientError, ER_TRANSACTION_CONFLICT);
+ return -1;
+ }
+
struct txn_stmt *stmt = txn_stmt_new(&txn->region);
if (stmt == NULL)
return -1;
@@ -669,6 +701,17 @@ txn_prepare(struct txn *txn)
diag_set(ClientError, ER_FOREIGN_KEY_CONSTRAINT);
return -1;
}
+
+ /*
+ * Somebody else has written some value that we have read.
+ * The RW transaction is not possible.
+ */
+ if (txn->status == TXN_CONFLICTED ||
+ (txn->status == TXN_IN_READ_VIEW && !stailq_empty(&txn->stmts))) {
+ diag_set(ClientError, ER_TRANSACTION_CONFLICT);
+ return -1;
+ }
+
/*
* Perform transaction conflict resolution. Engine == NULL when
* we have a bunch of IPROTO_NOP statements.
@@ -677,6 +720,26 @@ txn_prepare(struct txn *txn)
if (engine_prepare(txn->engine, txn) != 0)
return -1;
}
+
+ struct tx_conflict_tracker *entry, *next;
+ /* Handle conflicts. */
+ rlist_foreach_entry_safe(entry, &txn->conflict_list,
+ in_conflict_list, next) {
+ assert(entry->breaker == txn);
+ memtx_tx_handle_conflict(txn, entry->victim);
+ rlist_del(&entry->in_conflict_list);
+ rlist_del(&entry->in_conflicted_by_list);
+ }
+ /* Just free conflict list - we don't need it anymore. */
+ rlist_foreach_entry_safe(entry, &txn->conflicted_by_list,
+ in_conflicted_by_list, next) {
+ assert(entry->victim == txn);
+ rlist_del(&entry->in_conflict_list);
+ rlist_del(&entry->in_conflicted_by_list);
+ }
+ assert(rlist_empty(&txn->conflict_list));
+ assert(rlist_empty(&txn->conflicted_by_list));
+
trigger_clear(&txn->fiber_on_stop);
if (!txn_has_flag(txn, TXN_CAN_YIELD))
trigger_clear(&txn->fiber_on_yield);
diff --git a/src/box/txn.h b/src/box/txn.h
index d7e77e5..f957d1e 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -272,6 +272,11 @@ struct txn {
* Transactions are committed in that order.
*/
int64_t psn;
+ /**
+ * Read view of that TX. The TX can see only changes with ps < rv_psn.
+ * Is nonzero if and only if status = TXN_IN_READ_VIEW.
+ */
+ int64_t rv_psn;
/** Status of the TX */
enum txn_status status;
/** List of statements in a transaction. */
@@ -333,6 +338,22 @@ struct txn {
uint32_t fk_deferred_count;
/** List of savepoints to find savepoint by name. */
struct rlist savepoints;
+ /**
+ * List of tx_conflict_tracker records where .breaker is the current
+ * transaction and .victim is the transactions that must be aborted
+ * if the current transaction is committed.
+ */
+ struct rlist conflict_list;
+ /**
+ * List of tx_conflict_tracker records where .victim is the current
+ * transaction and .breaker is the transactions that, if committed,
+ * will abort the current transaction.
+ */
+ struct rlist conflicted_by_list;
+ /**
+ * Link in tx_manager::read_view_txs.
+ */
+ struct rlist in_read_view_txs;
};
static inline bool
--
2.7.4
^ permalink raw reply [flat|nested] 26+ messages in thread
* Re: [Tarantool-patches] [PATCH v4 06/12] txm: introduce conflict tracker
2020-09-08 10:22 ` [Tarantool-patches] [PATCH v4 06/12] txm: introduce conflict tracker Aleksandr Lyapunov
@ 2020-09-14 16:36 ` Nikita Pettik
0 siblings, 0 replies; 26+ messages in thread
From: Nikita Pettik @ 2020-09-14 16:36 UTC (permalink / raw)
To: Aleksandr Lyapunov; +Cc: tarantool-patches
On 08 Sep 13:22, Aleksandr Lyapunov wrote:
> +/**
> + * Handle conflict when @breaker transaction is prepared.
> + * The conflict is happened if @victim have read something that @breaker
> + * overwrites.
> + * If @victim is read-only or haven't made any changes, it should be send
> + * to read view, in which is will not see @breaker.
> + * Otherwise @vistim must be marked as conflicted.
> + */
Duplicated comment: it is already placed in memtx_tx.h
> +void
> +memtx_tx_handle_conflict(struct txn *breaker, struct txn *victim)
> +{
> + assert(breaker->psn != 0);
> + if (victim->status != TXN_INPROGRESS) {
> + /* Was conflicted by somebody else. */
> + return;
> + }
> +}
> diff --git a/src/box/memtx_tx.h b/src/box/memtx_tx.h
> index fb2cb4d..6143a22 100644
> --- a/src/box/memtx_tx.h
> +++ b/src/box/memtx_tx.h
>
> +/**
> + * Notify TX manager that if transaction @breaker is committed then the
> + * transaction @victim must be aborted due to conflict.
> + * For example: there's two rw transaction in progress, one have read
> + * some value while the second is about to overwrite it. If the second
> + * is committed first, the first must be aborted.
> + * @return 0 on success, -1 on memory error.
> + */
I'd a bit expose comment:
diff --git a/src/box/memtx_tx.h b/src/box/memtx_tx.h
index 2ebca715d..8307e1ee7 100644
--- a/src/box/memtx_tx.h
+++ b/src/box/memtx_tx.h
@@ -193,7 +193,10 @@ memtx_tx_manager_free();
/**
* Notify TX manager that if transaction @breaker is committed then the
- * transaction @victim must be aborted due to conflict.
+ * transaction @victim must be aborted due to conflict. It is achieved
+ * by adding corresponding entry (of tx_conflict_tracker type) to @a breaker
+ * conflict list. In case there's already such entry, then move it to the head
+ * of the list in order to optimize next invocations of this function.
* For example: there's two rw transaction in progress, one have read
* some value while the second is about to overwrite it. If the second
* is committed first, the first must be aborted.
Also, as Vlad always notice, in doxygen conversion variable referring
starts from @: @a breaker, @a victim etc.
The rest in this and previous patches LGTM.
^ permalink raw reply [flat|nested] 26+ messages in thread
* [Tarantool-patches] [PATCH v4 07/12] txm: introduce memtx_story
2020-09-08 10:22 [Tarantool-patches] [PATCH v4 00/12] Transaction engine for memtx engine Aleksandr Lyapunov
` (5 preceding siblings ...)
2020-09-08 10:22 ` [Tarantool-patches] [PATCH v4 06/12] txm: introduce conflict tracker Aleksandr Lyapunov
@ 2020-09-08 10:22 ` Aleksandr Lyapunov
2020-09-15 14:33 ` Nikita Pettik
2020-09-08 10:22 ` [Tarantool-patches] [PATCH v4 08/12] txm: introduce snapshot cleaner Aleksandr Lyapunov
` (5 subsequent siblings)
12 siblings, 1 reply; 26+ messages in thread
From: Aleksandr Lyapunov @ 2020-09-08 10:22 UTC (permalink / raw)
To: tarantool-patches
Memtx story is a part of a history of a value in space.
It's a story about a tuple, from the point it was added to space
to the point when it was deleted from the space.
All stories are linked into a list of stories of the same key of
each index.
Part of #4897
---
src/box/memtx_tx.c | 969 +++++++++++++++++++++++++++++++++++++++++++++++++++++
src/box/memtx_tx.h | 217 ++++++++++++
src/box/space.c | 2 +
src/box/space.h | 4 +
src/box/txn.c | 12 +
src/box/txn.h | 24 ++
6 files changed, 1228 insertions(+)
diff --git a/src/box/memtx_tx.c b/src/box/memtx_tx.c
index eb2ab51..dfad6f7 100644
--- a/src/box/memtx_tx.c
+++ b/src/box/memtx_tx.c
@@ -35,6 +35,29 @@
#include <stdint.h>
#include "txn.h"
+#include "schema_def.h"
+#include "small/mempool.h"
+
+static uint32_t
+memtx_tx_story_key_hash(const struct tuple *a)
+{
+ uintptr_t u = (uintptr_t)a;
+ if (sizeof(uintptr_t) <= sizeof(uint32_t))
+ return u;
+ else
+ return u ^ (u >> 32);
+}
+
+#define mh_name _history
+#define mh_key_t struct tuple *
+#define mh_node_t struct memtx_story *
+#define mh_arg_t int
+#define mh_hash(a, arg) (memtx_tx_story_key_hash((*(a))->tuple))
+#define mh_hash_key(a, arg) (memtx_tx_story_key_hash(a))
+#define mh_cmp(a, b, arg) ((*(a))->tuple != (*(b))->tuple)
+#define mh_cmp_key(a, b, arg) ((a) != (*(b))->tuple)
+#define MH_SOURCE
+#include "salad/mhash.h"
struct tx_manager
{
@@ -44,6 +67,23 @@ struct tx_manager
* so the list is ordered by rv_psn.
*/
struct rlist read_view_txs;
+ /** Mempools for tx_story objects with different index count. */
+ struct mempool memtx_tx_story_pool[BOX_INDEX_MAX];
+ /** Hash table tuple -> memtx_story of that tuple. */
+ struct mh_history_t *history;
+ /** List of all memtx_story objects. */
+ struct rlist all_stories;
+ /** Iterator that sequentially traverses all memtx_story objects. */
+ struct rlist *traverse_all_stories;
+};
+
+enum {
+ /**
+ * Number of iterations that is allowed for TX manager to do for
+ * searching and deleting no more used memtx_tx_stories per creation of
+ * a new story.
+ */
+ TX_MANAGER_GC_STEPS_SIZE = 2,
};
/** That's a definition, see declaration for description. */
@@ -56,6 +96,15 @@ void
memtx_tx_manager_init()
{
rlist_create(&txm.read_view_txs);
+ for (size_t i = 0; i < BOX_INDEX_MAX; i++) {
+ size_t item_size = sizeof(struct memtx_story) +
+ i * sizeof(struct memtx_story_link);
+ mempool_create(&txm.memtx_tx_story_pool[i],
+ cord_slab_cache(), item_size);
+ }
+ txm.history = mh_history_new();
+ rlist_create(&txm.all_stories);
+ txm.traverse_all_stories = &txm.all_stories;
}
void
@@ -136,3 +185,923 @@ memtx_tx_handle_conflict(struct txn *breaker, struct txn *victim)
victim->status = TXN_CONFLICTED;
}
}
+
+/** See definition for details */
+static void
+memtx_tx_story_gc_step();
+
+/**
+ * Create a new story and link it with the @tuple.
+ * @return story on success, NULL on error (diag is set).
+ */
+static struct memtx_story *
+memtx_tx_story_new(struct space *space, struct tuple *tuple)
+{
+ /* Free some memory. */
+ for (size_t i = 0; i < TX_MANAGER_GC_STEPS_SIZE; i++)
+ memtx_tx_story_gc_step();
+ assert(!tuple->is_dirty);
+ uint32_t index_count = space->index_count;
+ assert(index_count < BOX_INDEX_MAX);
+ struct mempool *pool = &txm.memtx_tx_story_pool[index_count];
+ struct memtx_story *story = (struct memtx_story *) mempool_alloc(pool);
+ if (story == NULL) {
+ size_t item_size = sizeof(struct memtx_story) +
+ index_count *
+ sizeof(struct memtx_story_link);
+ diag_set(OutOfMemory, item_size, "mempool_alloc", "story");
+ return NULL;
+ }
+ story->tuple = tuple;
+
+ const struct memtx_story **put_story =
+ (const struct memtx_story **) &story;
+ struct memtx_story **empty = NULL;
+ mh_int_t pos = mh_history_put(txm.history, put_story, &empty, 0);
+ if (pos == mh_end(txm.history)) {
+ mempool_free(pool, story);
+ diag_set(OutOfMemory, pos + 1, "mh_history_put",
+ "mh_history_node");
+ return NULL;
+ }
+ tuple->is_dirty = true;
+ tuple_ref(tuple);
+
+ story->space = space;
+ story->index_count = index_count;
+ story->add_stmt = NULL;
+ story->add_psn = 0;
+ story->del_stmt = NULL;
+ story->del_psn = 0;
+ rlist_create(&story->reader_list);
+ rlist_add_tail(&txm.all_stories, &story->in_all_stories);
+ rlist_add(&space->memtx_stories, &story->in_space_stories);
+ memset(story->link, 0, sizeof(story->link[0]) * index_count);
+ return story;
+}
+
+static void
+memtx_tx_story_delete(struct memtx_story *story);
+
+/**
+ * Create a new story of a @tuple that was added by @stmt.
+ * @return story on success, NULL on error (diag is set).
+ */
+static struct memtx_story *
+memtx_tx_story_new_add_stmt(struct tuple *tuple, struct txn_stmt *stmt)
+{
+ struct memtx_story *res = memtx_tx_story_new(stmt->space, tuple);
+ if (res == NULL)
+ return NULL;
+ res->add_stmt = stmt;
+ assert(stmt->add_story == NULL);
+ stmt->add_story = res;
+ return res;
+}
+
+/**
+ * Create a new story of a @tuple that was deleted by @stmt.
+ * @return story on success, NULL on error (diag is set).
+ */
+static struct memtx_story *
+memtx_tx_story_new_del_stmt(struct tuple *tuple, struct txn_stmt *stmt)
+{
+ struct memtx_story *res = memtx_tx_story_new(stmt->space, tuple);
+ if (res == NULL)
+ return NULL;
+ res->del_stmt = stmt;
+ assert(stmt->del_story == NULL);
+ stmt->del_story = res;
+ return res;
+}
+
+/**
+ * Undo memtx_tx_story_new_add_stmt.
+ */
+static void
+memtx_tx_story_delete_add_stmt(struct memtx_story *story)
+{
+ story->add_stmt->add_story = NULL;
+ story->add_stmt = NULL;
+ memtx_tx_story_delete(story);
+}
+
+/**
+ * Undo memtx_tx_story_new_del_stmt.
+ */
+static void
+memtx_tx_story_delete_del_stmt(struct memtx_story *story)
+{
+ story->del_stmt->del_story = NULL;
+ story->del_stmt = NULL;
+ memtx_tx_story_delete(story);
+}
+
+
+/**
+ * Find a story of a @tuple. The story expected to be present (assert).
+ */
+static struct memtx_story *
+memtx_tx_story_get(struct tuple *tuple)
+{
+ assert(tuple->is_dirty);
+
+ mh_int_t pos = mh_history_find(txm.history, tuple, 0);
+ assert(pos != mh_end(txm.history));
+ return *mh_history_node(txm.history, pos);
+}
+
+/**
+ * Get the older tuple, extracting it from older story if necessary.
+ */
+static struct tuple *
+memtx_tx_story_older_tuple(struct memtx_story_link *link)
+{
+ return link->older.is_story ? link->older.story->tuple
+ : link->older.tuple;
+}
+
+/**
+ * Link a @story with @older_story in @index (in both directions).
+ */
+static void
+memtx_tx_story_link_story(struct memtx_story *story,
+ struct memtx_story *older_story,
+ uint32_t index)
+{
+ assert(older_story != NULL);
+ struct memtx_story_link *link = &story->link[index];
+ /* Must be unlinked. */
+ assert(!link->older.is_story);
+ assert(link->older.tuple == NULL);
+ link->older.is_story = true;
+ link->older.story = older_story;
+ older_story->link[index].newer_story = story;
+}
+
+/**
+ * Link a @story with older @tuple in @index. In case if the tuple is dirty -
+ * find and link with the corresponding story.
+ */
+static void
+memtx_tx_story_link_tuple(struct memtx_story *story,
+ struct tuple *older_tuple,
+ uint32_t index)
+{
+ struct memtx_story_link *link = &story->link[index];
+ /* Must be unlinked. */
+ assert(!link->older.is_story);
+ assert(link->older.tuple == NULL);
+ if (older_tuple == NULL)
+ return;
+ if (older_tuple->is_dirty) {
+ memtx_tx_story_link_story(story,
+ memtx_tx_story_get(older_tuple),
+ index);
+ return;
+ }
+ link->older.tuple = older_tuple;
+ tuple_ref(link->older.tuple);
+}
+
+/**
+ * Unlink a @story with older story/tuple in @index.
+ */
+static void
+memtx_tx_story_unlink(struct memtx_story *story, uint32_t index)
+{
+ struct memtx_story_link *link = &story->link[index];
+ if (link->older.is_story) {
+ link->older.story->link[index].newer_story = NULL;
+ } else if (link->older.tuple != NULL) {
+ tuple_unref(link->older.tuple);
+ link->older.tuple = NULL;
+ }
+ link->older.is_story = false;
+ link->older.tuple = NULL;
+}
+
+/**
+ * Run one step of a crawler that traverses all stories and removes no more
+ * used stories.
+ */
+static void
+memtx_tx_story_gc_step()
+{
+ if (txm.traverse_all_stories == &txm.all_stories) {
+ /* We came to the head of the list. */
+ txm.traverse_all_stories = txm.traverse_all_stories->next;
+ return;
+ }
+
+ /* Lowest read view PSN */
+ int64_t lowest_rv_psm = txn_last_psn;
+ if (!rlist_empty(&txm.read_view_txs)) {
+ struct txn *txn =
+ rlist_first_entry(&txm.read_view_txs, struct txn,
+ in_read_view_txs);
+ assert(txn->rv_psn != 0);
+ lowest_rv_psm = txn->rv_psn;
+ }
+
+ struct memtx_story *story =
+ rlist_entry(txm.traverse_all_stories, struct memtx_story,
+ in_all_stories);
+ txm.traverse_all_stories = txm.traverse_all_stories->next;
+
+ if (story->add_stmt != NULL || story->del_stmt != NULL ||
+ !rlist_empty(&story->reader_list)) {
+ /* The story is used directly by some transactions. */
+ return;
+ }
+ if (story->add_psn >= lowest_rv_psm ||
+ story->del_psn >= lowest_rv_psm) {
+ /* The story can be used by a read view. */
+ return;
+ }
+
+ /* Unlink and delete the story */
+ for (uint32_t i = 0; i < story->index_count; i++) {
+ struct memtx_story_link *link = &story->link[i];
+ if (link->newer_story == NULL) {
+ /*
+ * We are at the top of the chain. That means
+ * that story->tuple is in index. If the story is
+ * actually delete the tuple, it must be deleted from
+ * index.
+ */
+ if (story->del_psn > 0) {
+ struct index *index = story->space->index[i];
+ struct tuple *unused;
+ if (index_replace(index, story->tuple, NULL,
+ DUP_INSERT, &unused) != 0) {
+ diag_log();
+ unreachable();
+ panic("failed to rollback change");
+ }
+ assert(story->tuple == unused);
+ }
+ memtx_tx_story_unlink(story, i);
+ } else {
+ link->newer_story->link[i].older = link->older;
+ link->older.is_story = false;
+ link->older.story = NULL;
+ link->newer_story = NULL;
+ }
+ }
+
+ memtx_tx_story_delete(story);
+}
+
+/**
+ * Check if a @story is visible for transaction @txn. Return visible tuple to
+ * @visible_tuple (can be set to NULL).
+ * @param is_prepared_ok - whether prepared (not committed) change is acceptable.
+ * @param own_change - return true if the change was made by @txn itself.
+ * @return true if the story is visible, false otherwise.
+ */
+static bool
+memtx_tx_story_is_visible(struct memtx_story *story, struct txn *txn,
+ struct tuple **visible_tuple, bool is_prepared_ok,
+ bool *own_change)
+{
+ *own_change = false;
+ *visible_tuple = NULL;
+
+ int64_t rv_psn = INT64_MAX;
+ if (txn != NULL && txn->rv_psn != 0)
+ rv_psn = txn->rv_psn;
+
+ struct txn_stmt *dels = story->del_stmt;
+ while (dels != NULL) {
+ if (dels->txn == txn) {
+ /* Tuple is deleted by us (@txn). */
+ *own_change = true;
+ return true;
+ }
+ dels = dels->next_in_del_list;
+ }
+ if (is_prepared_ok && story->del_psn != 0 && story->del_psn < rv_psn) {
+ /* Tuple is deleted by prepared TX. */
+ return true;
+ }
+ if (story->del_psn != 0 && story->del_stmt == NULL &&
+ story->del_psn < rv_psn) {
+ /* Tuple is deleted by committed TX. */
+ return true;
+ }
+
+ if (story->add_stmt != NULL && story->add_stmt->txn == txn) {
+ /* Tuple is added by us (@txn). */
+ *visible_tuple = story->tuple;
+ *own_change = true;
+ return true;
+ }
+ if (is_prepared_ok && story->add_psn != 0 && story->add_psn < rv_psn) {
+ /* Tuple is added by another prepared TX. */
+ *visible_tuple = story->tuple;
+ return true;
+ }
+ if (story->add_psn != 0 && story->add_stmt == NULL &&
+ story->add_psn < rv_psn) {
+ /* Tuple is added by committed TX. */
+ *visible_tuple = story->tuple;
+ return true;
+ }
+ if (story->add_psn == 0 && story->add_stmt == NULL) {
+ /* added long time ago. */
+ *visible_tuple = story->tuple;
+ return true;
+ }
+ return false;
+}
+
+/**
+ * Temporary (allocated on region) struct that stores a conflicting TX.
+ */
+struct memtx_tx_conflict
+{
+ /* The thansaction that will conflict us upon commit. */
+ struct txn *breaker;
+ /* Link in single-linked list. */
+ struct memtx_tx_conflict *next;
+};
+
+/**
+ * Save @breaker in list with head @conflicts_head. New list node is allocated
+ * on @region.
+ * @return 0 on success, -1 on memory error.
+ */
+static int
+memtx_tx_save_conflict(struct txn *breaker,
+ struct memtx_tx_conflict **conflicts_head,
+ struct region *region)
+{
+ size_t err_size;
+ struct memtx_tx_conflict *next_conflict;
+ next_conflict = region_alloc_object(region, struct memtx_tx_conflict,
+ &err_size);
+ if (next_conflict == NULL) {
+ diag_set(OutOfMemory, err_size, "txn_region", "txn conflict");
+ return -1;
+ }
+ next_conflict->breaker = breaker;
+ next_conflict->next = *conflicts_head;
+ *conflicts_head = next_conflict;
+ return 0;
+}
+
+/**
+ * Scan a history starting by @stmt statement in @index for a visible tuple
+ * (prepared suits), returned via @visible_replaced.
+ * Collect a list of transactions that will abort current transaction if they
+ * are committed.
+ *
+ * @return 0 on success, -1 on memory error.
+ */
+static int
+memtx_tx_story_find_visible(struct memtx_story *story, struct txn_stmt *stmt,
+ uint32_t index, struct tuple **visible_replaced,
+ struct memtx_tx_conflict **collected_conflicts,
+ struct region *region)
+{
+ while (true) {
+ if (!story->link[index].older.is_story) {
+ /* The tuple is so old that we don't know its story. */
+ *visible_replaced = story->link[index].older.tuple;
+ assert(*visible_replaced == NULL ||
+ !(*visible_replaced)->is_dirty);
+ break;
+ }
+ story = story->link[index].older.story;
+ bool unused;
+ if (memtx_tx_story_is_visible(story, stmt->txn,
+ visible_replaced, true, &unused))
+ break;
+
+ /*
+ * We skip the story as invisible but the corresponding TX
+ * is committed our TX can become conflicted.
+ * The conflict will be unavoidable if this statement
+ * relies on old_tuple. If not (it's a replace),
+ * the conflict will take place only for secondary
+ * index if the story will not be overwritten in primary
+ * index.
+ */
+ bool cross_conflict = false;
+ if (stmt->does_require_old_tuple) {
+ cross_conflict = true;
+ } else if (index != 0) {
+ struct memtx_story *look_up = story;
+ cross_conflict = true;
+ while (look_up->link[0].newer_story != NULL) {
+ struct memtx_story *over;
+ over = look_up->link[0].newer_story;
+ if (over->add_stmt->txn == stmt->txn) {
+ cross_conflict = false;
+ break;
+ }
+ look_up = over;
+ }
+ }
+ if (cross_conflict) {
+ if (memtx_tx_save_conflict(story->add_stmt->txn,
+ collected_conflicts,
+ region) != 0)
+ return -1;
+
+ }
+ }
+ return 0;
+}
+
+int
+memtx_tx_history_add_stmt(struct txn_stmt *stmt, struct tuple *old_tuple,
+ struct tuple *new_tuple, enum dup_replace_mode mode,
+ struct tuple **result)
+{
+ assert(new_tuple != NULL || old_tuple != NULL);
+ struct space *space = stmt->space;
+ struct memtx_story *add_story = NULL;
+ uint32_t add_story_linked = 0;
+ struct memtx_story *del_story = NULL;
+ bool del_story_created = false;
+ struct region *region = &stmt->txn->region;
+ size_t region_svp = region_used(region);
+
+ /*
+ * List of transactions that will conflict us once one of them
+ * become committed.
+ */
+ struct memtx_tx_conflict *collected_conflicts = NULL;
+
+ /* Create add_story if necessary. */
+ if (new_tuple != NULL) {
+ add_story = memtx_tx_story_new_add_stmt(new_tuple, stmt);
+ if (add_story == NULL)
+ goto fail;
+
+ for (uint32_t i = 0; i < space->index_count; i++) {
+ struct tuple *replaced;
+ struct index *index = space->index[i];
+ if (index_replace(index, NULL, new_tuple,
+ DUP_REPLACE_OR_INSERT,
+ &replaced) != 0)
+ goto fail;
+ memtx_tx_story_link_tuple(add_story, replaced, i);
+ add_story_linked++;
+
+ struct tuple *visible_replaced = NULL;
+ if (memtx_tx_story_find_visible(add_story, stmt, i,
+ &visible_replaced,
+ &collected_conflicts,
+ region) != 0)
+ goto fail;
+
+ uint32_t errcode;
+ errcode = replace_check_dup(old_tuple, visible_replaced,
+ i == 0 ? mode : DUP_INSERT);
+ if (errcode != 0) {
+ if (space != NULL)
+ diag_set(ClientError, errcode,
+ index->def->name,
+ space_name(space));
+ goto fail;
+ }
+
+ if (i == 0)
+ old_tuple = visible_replaced;
+ }
+ }
+
+ /* Create del_story if necessary. */
+ struct tuple *del_tuple = NULL;
+ if (new_tuple != NULL) {
+ struct memtx_story_link *link = &add_story->link[0];
+ if (link->older.is_story) {
+ del_story = link->older.story;
+ del_tuple = del_story->tuple;
+ } else {
+ del_tuple = link->older.tuple;
+ }
+ } else {
+ del_tuple = old_tuple;
+ }
+ if (del_tuple != NULL && del_story == NULL) {
+ if (del_tuple->is_dirty) {
+ del_story = memtx_tx_story_get(del_tuple);
+ } else {
+ del_story = memtx_tx_story_new_del_stmt(del_tuple,
+ stmt);
+ if (del_story == NULL)
+ goto fail;
+ del_story_created = true;
+ }
+ }
+ if (new_tuple != NULL && del_story_created) {
+ for (uint32_t i = 0; i < add_story->index_count; i++) {
+ struct memtx_story_link *link = &add_story->link[i];
+ if (link->older.is_story)
+ continue;
+ if (link->older.tuple == del_tuple) {
+ memtx_tx_story_unlink(add_story, i);
+ memtx_tx_story_link_story(add_story, del_story,
+ i);
+ }
+ }
+ }
+ if (del_story != NULL && !del_story_created) {
+ stmt->next_in_del_list = del_story->del_stmt;
+ del_story->del_stmt = stmt;
+ stmt->del_story = del_story;
+ }
+
+ /* Purge found conflicts. */
+ while (collected_conflicts != NULL) {
+ if (memtx_tx_cause_conflict(collected_conflicts->breaker,
+ stmt->txn) != 0)
+ goto fail;
+ collected_conflicts = collected_conflicts->next;
+ }
+
+ /*
+ * We now reference both new and old tuple because the stmt holds
+ * pointers to them.
+ */
+ if (stmt->new_tuple != NULL)
+ tuple_ref(stmt->new_tuple);
+ *result = old_tuple;
+ if (*result != NULL)
+ tuple_ref(*result);
+ return 0;
+
+ fail:
+ if (add_story != NULL) {
+ while (add_story_linked > 0) {
+ --add_story_linked;
+ uint32_t i = add_story_linked;
+
+ struct index *index = space->index[i];
+ struct memtx_story_link *link = &add_story->link[i];
+ struct tuple *was = memtx_tx_story_older_tuple(link);
+ struct tuple *unused;
+ if (index_replace(index, new_tuple, was,
+ DUP_INSERT,
+ &unused) != 0) {
+ diag_log();
+ unreachable();
+ panic("failed to rollback change");
+ }
+
+ memtx_tx_story_unlink(stmt->add_story, i);
+
+ }
+ memtx_tx_story_delete_add_stmt(stmt->add_story);
+ }
+
+ if (del_story != NULL && del_story->del_stmt == stmt) {
+ del_story->del_stmt = stmt->next_in_del_list;
+ stmt->next_in_del_list = NULL;
+ }
+
+ if (del_story_created)
+ memtx_tx_story_delete_del_stmt(stmt->del_story);
+ else
+ stmt->del_story = NULL;
+
+ region_truncate(region, region_svp);
+ return -1;
+}
+
+void
+memtx_tx_history_rollback_stmt(struct txn_stmt *stmt)
+{
+ if (stmt->add_story != NULL) {
+ assert(stmt->add_story->tuple == stmt->new_tuple);
+ struct memtx_story *story = stmt->add_story;
+
+ for (uint32_t i = 0; i < story->index_count; i++) {
+ struct memtx_story_link *link = &story->link[i];
+ if (link->newer_story == NULL) {
+ struct tuple *unused;
+ struct index *index = stmt->space->index[i];
+ struct tuple *was = memtx_tx_story_older_tuple(link);
+ if (index_replace(index, story->tuple, was,
+ DUP_INSERT, &unused) != 0) {
+ diag_log();
+ unreachable();
+ panic("failed to rollback change");
+ }
+ } else {
+ struct memtx_story *newer = link->newer_story;
+ assert(newer->link[i].older.is_story);
+ assert(newer->link[i].older.story == story);
+ memtx_tx_story_unlink(newer, i);
+ if (link->older.is_story) {
+ struct memtx_story *to = link->older.story;
+ memtx_tx_story_link_story(newer, to, i);
+ } else {
+ struct tuple *to = link->older.tuple;
+ memtx_tx_story_link_tuple(newer, to, i);
+ }
+ }
+ memtx_tx_story_unlink(story, i);
+ }
+ stmt->add_story->add_stmt = NULL;
+ memtx_tx_story_delete(stmt->add_story);
+ stmt->add_story = NULL;
+ tuple_unref(stmt->new_tuple);
+ }
+
+ if (stmt->del_story != NULL) {
+ struct memtx_story *story = stmt->del_story;
+
+ struct txn_stmt **prev = &story->del_stmt;
+ while (*prev != stmt) {
+ prev = &(*prev)->next_in_del_list;
+ assert(*prev != NULL);
+ }
+ *prev = stmt->next_in_del_list;
+ stmt->next_in_del_list = NULL;
+
+ stmt->del_story->del_stmt = NULL;
+ stmt->del_story = NULL;
+ }
+}
+
+void
+memtx_tx_history_prepare_stmt(struct txn_stmt *stmt)
+{
+ assert(stmt->txn->psn != 0);
+
+ /* Move story to the past to prepared stories. */
+
+ struct memtx_story *story = stmt->add_story;
+ uint32_t index_count = story == NULL ? 0 : story->index_count;
+ /*
+ * Note that if stmt->add_story == NULL, the index_count is set to 0,
+ * and we will not enter the loop.
+ */
+ for (uint32_t i = 0; i < index_count; ) {
+ if (!story->link[i].older.is_story) {
+ /* tuple is old. */
+ i++;
+ continue;
+ }
+ bool old_story_is_prepared = false;
+ struct memtx_story *old_story = story->link[i].older.story;
+ if (old_story->del_psn != 0) {
+ /* if psn is set, the change is prepared. */
+ old_story_is_prepared = true;
+ } else if (old_story->add_psn != 0) {
+ /* if psn is set, the change is prepared. */
+ old_story_is_prepared = true;
+ } else if (old_story->add_stmt == NULL) {
+ /* ancient. */
+ old_story_is_prepared = true;
+ } else if (old_story->add_stmt->txn == stmt->txn) {
+ /* added by us. */
+ }
+
+ if (old_story_is_prepared) {
+ struct tx_read_tracker *tracker;
+ rlist_foreach_entry(tracker, &old_story->reader_list,
+ in_reader_list) {
+ if (tracker->reader == stmt->txn)
+ continue;
+ if (tracker->reader->status != TXN_INPROGRESS)
+ continue;
+ memtx_tx_handle_conflict(stmt->txn,
+ tracker->reader);
+ }
+ i++;
+ continue;
+ }
+
+ if (old_story->add_stmt->does_require_old_tuple || i != 0)
+ old_story->add_stmt->txn->status = TXN_CONFLICTED;
+
+ /* Swap story and old story. */
+ struct memtx_story_link *link = &story->link[i];
+ if (link->newer_story == NULL) {
+ /* we have to replace the tuple in index. */
+ struct tuple *unused;
+ struct index *index = stmt->space->index[i];
+ if (index_replace(index, story->tuple, old_story->tuple,
+ DUP_INSERT, &unused) != 0) {
+ diag_log();
+ panic("failed to rollback change");
+ }
+ } else {
+ struct memtx_story *newer = link->newer_story;
+ assert(newer->link[i].older.is_story);
+ assert(newer->link[i].older.story == story);
+ memtx_tx_story_unlink(newer, i);
+ memtx_tx_story_link_story(newer, old_story, i);
+ }
+
+ memtx_tx_story_unlink(story, i);
+ if (old_story->link[i].older.is_story) {
+ struct memtx_story *to =
+ old_story->link[i].older.story;
+ memtx_tx_story_unlink(old_story, i);
+ memtx_tx_story_link_story(story, to, i);
+ } else {
+ struct tuple *to =
+ old_story->link[i].older.tuple;
+ memtx_tx_story_unlink(old_story, i);
+ memtx_tx_story_link_tuple(story, to, i);
+ }
+
+ memtx_tx_story_link_story(old_story, story, i);
+
+ if (i == 0) {
+ assert(stmt->del_story == old_story);
+ assert(story->link[0].older.is_story ||
+ story->link[0].older.tuple == NULL);
+
+ struct txn_stmt *dels = old_story->del_stmt;
+ assert(dels != NULL);
+ do {
+ if (dels->txn != stmt->txn)
+ dels->txn->status = TXN_CONFLICTED;
+ dels->del_story = NULL;
+ struct txn_stmt *next = dels->next_in_del_list;
+ dels->next_in_del_list = NULL;
+ dels = next;
+ } while (dels != NULL);
+ old_story->del_stmt = NULL;
+
+ if (story->link[0].older.is_story) {
+ struct memtx_story *oldest_story =
+ story->link[0].older.story;
+ dels = oldest_story->del_stmt;
+ while (dels != NULL) {
+ assert(dels->txn != stmt->txn);
+ dels->del_story = NULL;
+ struct txn_stmt *next =
+ dels->next_in_del_list;
+ dels->next_in_del_list = NULL;
+ dels = next;
+ }
+ oldest_story->del_stmt = stmt;
+ stmt->del_story = oldest_story;
+ }
+ }
+ }
+ if (stmt->add_story != NULL)
+ stmt->add_story->add_psn = stmt->txn->psn;
+
+ if (stmt->del_story != NULL)
+ stmt->del_story->del_psn = stmt->txn->psn;
+}
+
+ssize_t
+memtx_tx_history_commit_stmt(struct txn_stmt *stmt)
+{
+ size_t res = 0;
+ if (stmt->add_story != NULL) {
+ assert(stmt->add_story->add_stmt == stmt);
+ res += stmt->add_story->tuple->bsize;
+ stmt->add_story->add_stmt = NULL;
+ stmt->add_story = NULL;
+ }
+ if (stmt->del_story != NULL) {
+ assert(stmt->del_story->del_stmt == stmt);
+ assert(stmt->next_in_del_list == NULL);
+ res -= stmt->del_story->tuple->bsize;
+ tuple_unref(stmt->del_story->tuple);
+ stmt->del_story->del_stmt = NULL;
+ stmt->del_story = NULL;
+ }
+ return res;
+}
+
+struct tuple *
+memtx_tx_tuple_clarify_slow(struct txn *txn, struct space *space,
+ struct tuple *tuple, uint32_t index,
+ uint32_t mk_index, bool is_prepared_ok)
+{
+ assert(tuple->is_dirty);
+ struct memtx_story *story = memtx_tx_story_get(tuple);
+ bool own_change = false;
+ struct tuple *result = NULL;
+
+ while (true) {
+ if (memtx_tx_story_is_visible(story, txn, &result,
+ is_prepared_ok, &own_change)) {
+ break;
+ }
+ if (story->link[index].older.is_story) {
+ story = story->link[index].older.story;
+ } else {
+ result = story->link[index].older.tuple;
+ break;
+ }
+ }
+ if (!own_change)
+ memtx_tx_track_read(txn, space, tuple);
+ (void)mk_index; /* TODO: multiindex */
+ return result;
+}
+
+static void
+memtx_tx_story_delete(struct memtx_story *story)
+{
+ assert(story->add_stmt == NULL);
+ assert(story->del_stmt == NULL);
+
+ if (txm.traverse_all_stories == &story->in_all_stories)
+ txm.traverse_all_stories = rlist_next(txm.traverse_all_stories);
+ rlist_del(&story->in_all_stories);
+ rlist_del(&story->in_space_stories);
+
+ mh_int_t pos = mh_history_find(txm.history, story->tuple, 0);
+ assert(pos != mh_end(txm.history));
+ mh_history_del(txm.history, pos, 0);
+
+ story->tuple->is_dirty = false;
+ tuple_unref(story->tuple);
+
+#ifndef NDEBUG
+ /* Expecting to delete fully unlinked story. */
+ for (uint32_t i = 0; i < story->index_count; i++) {
+ assert(story->link[i].newer_story == NULL);
+ assert(story->link[i].older.is_story == false);
+ assert(story->link[i].older.tuple == NULL);
+ }
+#endif
+
+ struct mempool *pool = &txm.memtx_tx_story_pool[story->index_count];
+ mempool_free(pool, story);
+}
+
+int
+memtx_tx_track_read(struct txn *txn, struct space *space, struct tuple *tuple)
+{
+ if (tuple == NULL)
+ return 0;
+ if (txn == NULL)
+ return 0;
+ if (space == NULL)
+ return 0;
+
+ struct memtx_story *story;
+ struct tx_read_tracker *tracker = NULL;
+
+ if (!tuple->is_dirty) {
+ story = memtx_tx_story_new(space, tuple);
+ if (story == NULL)
+ return -1;
+ size_t sz;
+ tracker = region_alloc_object(&txn->region,
+ struct tx_read_tracker, &sz);
+ if (tracker == NULL) {
+ diag_set(OutOfMemory, sz, "tx region", "read_tracker");
+ memtx_tx_story_delete(story);
+ return -1;
+ }
+ tracker->reader = txn;
+ tracker->story = story;
+ rlist_add(&story->reader_list, &tracker->in_reader_list);
+ rlist_add(&txn->read_set, &tracker->in_read_set);
+ return 0;
+ }
+ story = memtx_tx_story_get(tuple);
+
+ struct rlist *r1 = story->reader_list.next;
+ struct rlist *r2 = txn->read_set.next;
+ while (r1 != &story->reader_list && r2 != &txn->read_set) {
+ tracker = rlist_entry(r1, struct tx_read_tracker,
+ in_reader_list);
+ assert(tracker->story == story);
+ if (tracker->reader == txn)
+ break;
+ tracker = rlist_entry(r2, struct tx_read_tracker,
+ in_read_set);
+ assert(tracker->reader == txn);
+ if (tracker->story == story)
+ break;
+ tracker = NULL;
+ r1 = r1->next;
+ r2 = r2->next;
+ }
+ if (tracker != NULL) {
+ /* Move to the beginning of a list for faster further lookups.*/
+ rlist_del(&tracker->in_reader_list);
+ rlist_del(&tracker->in_read_set);
+ } else {
+ size_t sz;
+ tracker = region_alloc_object(&txn->region,
+ struct tx_read_tracker, &sz);
+ if (tracker == NULL) {
+ diag_set(OutOfMemory, sz, "tx region", "read_tracker");
+ return -1;
+ }
+ tracker->reader = txn;
+ tracker->story = story;
+ }
+ rlist_add(&story->reader_list, &tracker->in_reader_list);
+ rlist_add(&txn->read_set, &tracker->in_read_set);
+ return 0;
+}
diff --git a/src/box/memtx_tx.h b/src/box/memtx_tx.h
index 6143a22..4670ebe 100644
--- a/src/box/memtx_tx.h
+++ b/src/box/memtx_tx.h
@@ -31,6 +31,12 @@
*/
#include <stdbool.h>
+#include <stddef.h>
+#include <stdint.h>
+
+#include "small/rlist.h"
+#include "index.h"
+#include "tuple.h"
#include "small/rlist.h"
@@ -61,6 +67,119 @@ struct tx_conflict_tracker {
};
/**
+ * Record that links transaction and a story that the transaction have read.
+ */
+struct tx_read_tracker {
+ /** The TX that read story. */
+ struct txn *reader;
+ /** The story that was read by reader. */
+ struct memtx_story *story;
+ /** Link in story->reader_list. */
+ struct rlist in_reader_list;
+ /** Link in reader->read_set. */
+ struct rlist in_read_set;
+};
+
+/**
+ * Pointer to tuple or story.
+ */
+struct memtx_story_or_tuple {
+ /** Flag whether it's a story. */
+ bool is_story;
+ union {
+ /** Pointer to story, it must be reverse liked. */
+ struct memtx_story *story;
+ /** Smart pointer to tuple: the tuple is referenced if set. */
+ struct tuple *tuple;
+ };
+};
+
+/**
+ * Link that connects a memtx_story with older and newer stories of the same
+ * key in index.
+ */
+struct memtx_story_link {
+ /** Story that was happened after that story was ended. */
+ struct memtx_story *newer_story;
+ /**
+ * Older story or ancient tuple (so old that its story was lost).
+ * In case of tuple is can also be NULL.
+ */
+ struct memtx_story_or_tuple older;
+};
+
+/**
+ * A part of a history of a value in space.
+ * It's a story about a tuple, from the point it was added to space to the
+ * point when it was deleted from a space.
+ * All stories are linked into a list of stories of the same key of each index.
+ */
+struct memtx_story {
+ /** The story is about this tuple. The tuple is referenced. */
+
+ struct tuple *tuple;
+ /**
+ * Statement that told this story. Is set to NULL when the statement's
+ * transaction becomes committed. Can also be NULL if we don't know who
+ * introduced that story, the tuple was added by a transaction that
+ * was completed and destroyed some time ago.
+ */
+ struct txn_stmt *add_stmt;
+ /**
+ * Prepare sequence number of add_stmt's transaction. Is set when
+ * the transaction is prepared. Can be 0 if the transaction is
+ * in progress or we don't know who introduced that story.
+ */
+ int64_t add_psn;
+ /**
+ * Statement that ended this story. Is set to NULL when the statement's
+ * transaction becomes committed. Can also be NULL if the tuple has not
+ * been deleted yet.
+ */
+ struct txn_stmt *del_stmt;
+ /**
+ * Prepare sequence number of del_stmt's transaction. Is set when
+ * the transaction is prepared. Can be 0 if the transaction is
+ * in progress or if nobody has deleted the tuple.
+ */
+ int64_t del_psn;
+ /**
+ * List of trackers - transactions that has read this tuple.
+ */
+ struct rlist reader_list;
+ /**
+ * Link in tx_manager::all_stories
+ */
+ struct rlist in_all_stories;
+ /**
+ * Link in space::memtx_tx_stories.
+ */
+ struct rlist in_space_stories;
+ /**
+ * The space where the tuple is supposed to be.
+ */
+ struct space *space;
+ /**
+ * Number of indexes in this space - and the count of link[].
+ */
+ uint32_t index_count;
+ /**
+ * Link with older and newer stories (and just tuples) for each
+ * index respectively.
+ */
+ struct memtx_story_link link[];
+};
+
+/**
+ * Snapshot cleaner is a short part of history that is supposed to clarify
+ * tuples in a index snapshot. It's also supposed to be used in another
+ * thread while common clarify would probably crash in that case.
+ */
+struct memtx_tx_snapshot_cleaner {
+ struct mh_snapshot_cleaner_t *ht;
+};
+
+/**
* Initialize memtx transaction manager.
*/
void
@@ -94,6 +213,104 @@ memtx_tx_cause_conflict(struct txn *breaker, struct txn *victim);
void
memtx_tx_handle_conflict(struct txn *breaker, struct txn *victim);
+/**
+ * @brief Add a statement to transaction manager's history.
+ * Until unlinking or releasing the space could internally contain
+ * wrong tuples and must be cleaned through memtx_tx_tuple_clarify call.
+ * With that clarifying the statement will be visible to current transaction,
+ * but invisible to all others.
+ * Follows signature of @sa memtx_space_replace_all_keys .
+ *
+ * @param stmt current statement.
+ * @param old_tuple the tuple that should be removed (can be NULL).
+ * @param new_tuple the tuple that should be inserted (can be NULL).
+ * @param mode dup_replace_mode, used only if new_tuple is not
+ * NULL and old_tuple is NULL, and only for the
+ * primary key.
+ * @param result - old or replaced tuple.
+ * @return 0 on success, -1 on error (diag is set).
+ */
+int
+memtx_tx_history_add_stmt(struct txn_stmt *stmt, struct tuple *old_tuple,
+ struct tuple *new_tuple, enum dup_replace_mode mode,
+ struct tuple **result);
+
+/**
+ * @brief Rollback (undo) a statement from transaction manager's history.
+ * It's just make the statement invisible to all.
+ * Prepared statements could be also removed, but for consistency all latter
+ * prepared statement must be also rolled back.
+ *
+ * @param stmt current statement.
+ */
+void
+memtx_tx_history_rollback_stmt(struct txn_stmt *stmt);
+
+/**
+ * @brief Prepare statement in history for further commit.
+ * Prepared statements are still invisible for read-only transactions
+ * but are visible to all read-write transactions.
+ * Prepared and in-progress transactions use the same links for creating
+ * chains of stories in history. The difference is that the order of
+ * prepared transactions is fixed while in-progress transactions are
+ * added to the end of list in any order. Thus to switch to prepared
+ * we have to reorder story in such a way that current story will be
+ * between earlier prepared stories and in-progress stories. That's what
+ * this function does.
+ *
+ * @param stmt current statement.
+ */
+void
+memtx_tx_history_prepare_stmt(struct txn_stmt *stmt);
+
+/**
+ * @brief Commit statement in history.
+ * Make the statement's changes permanent. It becomes visible to all.
+ *
+ * @param stmt current statement.
+ * @return the change in space bsize.
+ */
+ssize_t
+memtx_tx_history_commit_stmt(struct txn_stmt *stmt);
+
+/** Helper of memtx_tx_tuple_clarify */
+struct tuple *
+memtx_tx_tuple_clarify_slow(struct txn *txn, struct space *space,
+ struct tuple *tuple, uint32_t index,
+ uint32_t mk_index, bool is_prepared_ok);
+
+/**
+ * Record in TX manager that a transaction @txn have read a @tuple in @space.
+ * @return 0 on success, -1 on memory error.
+ */
+int
+memtx_tx_track_read(struct txn *txn, struct space *space, struct tuple *tuple);
+
+/**
+ * Clean a tuple if it's dirty - finds a visible tuple in history.
+ * @param txn - current transactions.
+ * @param space - space in which the tuple was found.
+ * @param tuple - tuple to clean.
+ * @param index - index number.
+ * @param mk_index - multikey index (iа the index is multikey).
+ * @param is_prepared_ok - allow to return prepared tuples.
+ * @return clean tuple (can be NULL).
+ */
+static inline struct tuple *
+memtx_tx_tuple_clarify(struct txn *txn, struct space *space,
+ struct tuple *tuple, uint32_t index,
+ uint32_t mk_index, bool is_prepared_ok)
+{
+ if (!memtx_tx_manager_use_mvcc_engine)
+ return tuple;
+ if (!tuple->is_dirty) {
+ memtx_tx_track_read(txn, space, tuple);
+ return tuple;
+ }
+ return memtx_tx_tuple_clarify_slow(txn, space, tuple, index, mk_index,
+ is_prepared_ok);
+}
+
#if defined(__cplusplus)
} /* extern "C" */
#endif /* defined(__cplusplus) */
diff --git a/src/box/space.c b/src/box/space.c
index 1d375cc..1243932 100644
--- a/src/box/space.c
+++ b/src/box/space.c
@@ -210,6 +210,7 @@ space_create(struct space *space, struct engine *engine,
"constraint_ids");
goto fail;
}
+ rlist_create(&space->memtx_stories);
return 0;
fail_free_indexes:
@@ -252,6 +253,7 @@ space_new_ephemeral(struct space_def *def, struct rlist *key_list)
void
space_delete(struct space *space)
{
+ rlist_del(&space->memtx_stories);
assert(space->ck_constraint_trigger == NULL);
for (uint32_t j = 0; j <= space->index_id_max; j++) {
struct index *index = space->index_map[j];
diff --git a/src/box/space.h b/src/box/space.h
index bbdd3ef..7cfba65 100644
--- a/src/box/space.h
+++ b/src/box/space.h
@@ -239,6 +239,10 @@ struct space {
* Hash table with constraint identifiers hashed by name.
*/
struct mh_strnptr_t *constraint_ids;
+ /**
+ * List of all tx stories in the space.
+ */
+ struct rlist memtx_stories;
};
/** Initialize a base space instance. */
diff --git a/src/box/txn.c b/src/box/txn.c
index 976e17c..023da91 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -110,6 +110,9 @@ txn_stmt_new(struct region *region)
stmt->space = NULL;
stmt->old_tuple = NULL;
stmt->new_tuple = NULL;
+ stmt->add_story = NULL;
+ stmt->del_story = NULL;
+ stmt->next_in_del_list = NULL;
stmt->engine_savepoint = NULL;
stmt->row = NULL;
stmt->has_triggers = false;
@@ -194,6 +197,7 @@ txn_new(void)
}
assert(region_used(®ion) == sizeof(*txn));
txn->region = region;
+ rlist_create(&txn->read_set);
rlist_create(&txn->conflict_list);
rlist_create(&txn->conflicted_by_list);
rlist_create(&txn->in_read_view_txs);
@@ -206,6 +210,14 @@ txn_new(void)
inline static void
txn_free(struct txn *txn)
{
+ struct tx_read_tracker *tracker, *tmp;
+ rlist_foreach_entry_safe(tracker, &txn->read_set,
+ in_read_set, tmp) {
+ rlist_del(&tracker->in_reader_list);
+ rlist_del(&tracker->in_read_set);
+ }
+ assert(rlist_empty(&txn->read_set));
+
struct tx_conflict_tracker *entry, *next;
rlist_foreach_entry_safe(entry, &txn->conflict_list,
in_conflict_list, next) {
diff --git a/src/box/txn.h b/src/box/txn.h
index f957d1e..ba818d0 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -36,6 +36,7 @@
#include "trigger.h"
#include "fiber.h"
#include "space.h"
+#include "tuple.h"
#if defined(__cplusplus)
extern "C" {
@@ -172,6 +173,27 @@ struct txn_stmt {
struct space *space;
struct tuple *old_tuple;
struct tuple *new_tuple;
+ /**
+ * If new_tuple != NULL and this transaction was not prepared,
+ * this member holds added story of the new_tuple.
+ */
+ struct memtx_story *add_story;
+ /**
+ * If new_tuple == NULL and this transaction was not prepared,
+ * this member holds added story of the old_tuple.
+ */
+ struct memtx_story *del_story;
+ /**
+ * Link in memtx_story::del_stmt linked list.
+ * Only one prepared TX can delete a tuple and a story. But
+ * when there are several in-progress transactions and they delete
+ * the same tuple we have to store several delete statements in one
+ * story. It's implemented in that way: story has a pointer to the first
+ * deleting statement, that statement has a pointer to the next etc,
+ * with NULL in the end.
+ * That member is that the pointer to next deleting statement.
+ */
+ struct txn_stmt *next_in_del_list;
/** Engine savepoint for the start of this statement. */
void *engine_savepoint;
/** Redo info: the binary log row */
@@ -354,6 +376,8 @@ struct txn {
* Link in tx_manager::read_view_txs.
*/
struct rlist in_read_view_txs;
+ /** List of tx_read_trackers with stories that the TX have read. */
+ struct rlist read_set;
};
static inline bool
--
2.7.4
^ permalink raw reply [flat|nested] 26+ messages in thread
* Re: [Tarantool-patches] [PATCH v4 07/12] txm: introduce memtx_story
2020-09-08 10:22 ` [Tarantool-patches] [PATCH v4 07/12] txm: introduce memtx_story Aleksandr Lyapunov
@ 2020-09-15 14:33 ` Nikita Pettik
2020-09-22 17:51 ` Aleksandr Lyapunov
0 siblings, 1 reply; 26+ messages in thread
From: Nikita Pettik @ 2020-09-15 14:33 UTC (permalink / raw)
To: Aleksandr Lyapunov; +Cc: tarantool-patches
On 08 Sep 13:22, Aleksandr Lyapunov wrote:
> Memtx story is a part of a history of a value in space.
> It's a story about a tuple, from the point it was added to space
> to the point when it was deleted from the space.
> All stories are linked into a list of stories of the same key of
> each index.
>
> Part of #4897
> ---
> +/**
> + * Temporary (allocated on region) struct that stores a conflicting TX.
> + */
> +struct memtx_tx_conflict
> +{
> + /* The thansaction that will conflict us upon commit. */
-> transaction
> +}
> +
> +/**
> + * Scan a history starting by @stmt statement in @index for a visible tuple
> + * (prepared suits), returned via @visible_replaced.
> + * Collect a list of transactions that will abort current transaction if they
> + * are committed.
> + *
> + * @return 0 on success, -1 on memory error.
> + */
> +static int
> +memtx_tx_story_find_visible(struct memtx_story *story, struct txn_stmt *stmt,
-> ..find_visible_tuple?
> + uint32_t index, struct tuple **visible_replaced,
> + struct memtx_tx_conflict **collected_conflicts,
> + struct region *region)
> +{
> + while (true) {
> + if (!story->link[index].older.is_story) {
> + /* The tuple is so old that we don't know its story. */
> + *visible_replaced = story->link[index].older.tuple;
> + assert(*visible_replaced == NULL ||
> + !(*visible_replaced)->is_dirty);
> + break;
> + }
> + story = story->link[index].older.story;
> + bool unused;
> + if (memtx_tx_story_is_visible(story, stmt->txn,
> + visible_replaced, true, &unused))
> + break;
> +
> +int
> +memtx_tx_history_add_stmt(struct txn_stmt *stmt, struct tuple *old_tuple,
> + struct tuple *new_tuple, enum dup_replace_mode mode,
> + struct tuple **result)
> +{
> + assert(new_tuple != NULL || old_tuple != NULL);
> + struct space *space = stmt->space;
> + struct memtx_story *add_story = NULL;
> + uint32_t add_story_linked = 0;
> + struct memtx_story *del_story = NULL;
> + bool del_story_created = false;
> + struct region *region = &stmt->txn->region;
> + size_t region_svp = region_used(region);
> +
> + /*
> + * List of transactions that will conflict us once one of them
> + * become committed.
> + */
> + struct memtx_tx_conflict *collected_conflicts = NULL;
> +
> + /* Create add_story if necessary. */
> + if (new_tuple != NULL) {
> + add_story = memtx_tx_story_new_add_stmt(new_tuple, stmt);
> + if (add_story == NULL)
> + goto fail;
> +
> + for (uint32_t i = 0; i < space->index_count; i++) {
> + struct tuple *replaced;
> + struct index *index = space->index[i];
> + if (index_replace(index, NULL, new_tuple,
> + DUP_REPLACE_OR_INSERT,
> + &replaced) != 0)
> + goto fail;
> + memtx_tx_story_link_tuple(add_story, replaced, i);
> + add_story_linked++;
> +
> + struct tuple *visible_replaced = NULL;
> + if (memtx_tx_story_find_visible(add_story, stmt, i,
> + &visible_replaced,
> + &collected_conflicts,
> + region) != 0)
> + goto fail;
> +
> + uint32_t errcode;
> + errcode = replace_check_dup(old_tuple, visible_replaced,
> + i == 0 ? mode : DUP_INSERT);
> + if (errcode != 0) {
> + if (space != NULL)
Why space can be null, if we anyway dereference it to get index_count?
> + diag_set(ClientError, errcode,
> + index->def->name,
> + space_name(space));
> + goto fail;
> + }
> +
> + if (i == 0)
> + old_tuple = visible_replaced;
> + }
> +void
> +memtx_tx_history_rollback_stmt(struct txn_stmt *stmt)
> +{
> + if (stmt->add_story != NULL) {
> + assert(stmt->add_story->tuple == stmt->new_tuple);
> + struct memtx_story *story = stmt->add_story;
> +
> + for (uint32_t i = 0; i < story->index_count; i++) {
> + struct memtx_story_link *link = &story->link[i];
> + if (link->newer_story == NULL) {
> + struct tuple *unused;
> + struct index *index = stmt->space->index[i];
> + struct tuple *was = memtx_tx_story_older_tuple(link);
> + if (index_replace(index, story->tuple, was,
> + DUP_INSERT, &unused) != 0) {
> + diag_log();
> + unreachable();
> + panic("failed to rollback change");
> + }
> + } else {
> + struct memtx_story *newer = link->newer_story;
> + assert(newer->link[i].older.is_story);
> + assert(newer->link[i].older.story == story);
> + memtx_tx_story_unlink(newer, i);
> + if (link->older.is_story) {
> + struct memtx_story *to = link->older.story;
> + memtx_tx_story_link_story(newer, to, i);
> + } else {
> + struct tuple *to = link->older.tuple;
> + memtx_tx_story_link_tuple(newer, to, i);
> + }
> + }
> + memtx_tx_story_unlink(story, i);
> + }
> + stmt->add_story->add_stmt = NULL;
> + memtx_tx_story_delete(stmt->add_story);
> + stmt->add_story = NULL;
> + tuple_unref(stmt->new_tuple);
> + }
> +
> + if (stmt->del_story != NULL) {
> + struct memtx_story *story = stmt->del_story;
> +
> + struct txn_stmt **prev = &story->del_stmt;
> + while (*prev != stmt) {
> + prev = &(*prev)->next_in_del_list;
> + assert(*prev != NULL);
> + }
Some parts of this function are not covered by tests. For instance:
diff --git a/src/box/memtx_tx.c b/src/box/memtx_tx.c
index 1b546378a..e2c612a95 100644
--- a/src/box/memtx_tx.c
+++ b/src/box/memtx_tx.c
@@ -793,6 +793,7 @@ memtx_tx_history_rollback_stmt(struct txn_stmt *stmt)
panic("failed to rollback change");
}
} else {
+ assert(0);
struct memtx_story *newer = link->newer_story;
assert(newer->link[i].older.is_story);
assert(newer->link[i].older.story == story);
@@ -818,6 +819,7 @@ memtx_tx_history_rollback_stmt(struct txn_stmt *stmt)
struct txn_stmt **prev = &story->del_stmt;
while (*prev != stmt) {
+ assert(0);
prev = &(*prev)->next_in_del_list;
assert(*prev != NULL);
}
And tests are passing.
> + *prev = stmt->next_in_del_list;
> + stmt->next_in_del_list = NULL;
> +
> + stmt->del_story->del_stmt = NULL;
> + stmt->del_story = NULL;
> + }
> +}
> +
> +void
> +memtx_tx_history_prepare_stmt(struct txn_stmt *stmt)
> +{
> + assert(stmt->txn->psn != 0);
> +
> + /* Move story to the past to prepared stories. */
> +
> + struct memtx_story *story = stmt->add_story;
> + uint32_t index_count = story == NULL ? 0 : story->index_count;
> + /*
> + * Note that if stmt->add_story == NULL, the index_count is set to 0,
> + * and we will not enter the loop.
> + */
> + for (uint32_t i = 0; i < index_count; ) {
> + if (!story->link[i].older.is_story) {
> + /* tuple is old. */
> + i++;
> + continue;
> + }
> + bool old_story_is_prepared = false;
> + struct memtx_story *old_story = story->link[i].older.story;
> + if (old_story->del_psn != 0) {
> + /* if psn is set, the change is prepared. */
> + old_story_is_prepared = true;
> + } else if (old_story->add_psn != 0) {
> + /* if psn is set, the change is prepared. */
> + old_story_is_prepared = true;
> + } else if (old_story->add_stmt == NULL) {
> + /* ancient. */
> + old_story_is_prepared = true;
> + } else if (old_story->add_stmt->txn == stmt->txn) {
> + /* added by us. */
> + }
> +
> + if (old_story_is_prepared) {
> + struct tx_read_tracker *tracker;
> + rlist_foreach_entry(tracker, &old_story->reader_list,
> + in_reader_list) {
> + if (tracker->reader == stmt->txn)
> + continue;
> + if (tracker->reader->status != TXN_INPROGRESS)
> + continue;
> + memtx_tx_handle_conflict(stmt->txn,
> + tracker->reader);
> + }
> + i++;
> + continue;
> + }
> +
> + if (old_story->add_stmt->does_require_old_tuple || i != 0)
> + old_story->add_stmt->txn->status = TXN_CONFLICTED;
This function still seems a bit hard to understand. Mb it is worth adding
more comments?
> + /* Swap story and old story. */
> + struct memtx_story_link *link = &story->link[i];
> + if (link->newer_story == NULL) {
> + /* we have to replace the tuple in index. */
> + struct tuple *unused;
> + struct index *index = stmt->space->index[i];
> + if (index_replace(index, story->tuple, old_story->tuple,
> + DUP_INSERT, &unused) != 0) {
> + diag_log();
> + panic("failed to rollback change");
> + }
> + } else {
> + struct memtx_story *newer = link->newer_story;
> + assert(newer->link[i].older.is_story);
> + assert(newer->link[i].older.story == story);
> + memtx_tx_story_unlink(newer, i);
> + memtx_tx_story_link_story(newer, old_story, i);
> + }
> +struct tuple *
> +memtx_tx_tuple_clarify_slow(struct txn *txn, struct space *space,
> + struct tuple *tuple, uint32_t index,
> + uint32_t mk_index, bool is_prepared_ok)
> +{
> + assert(tuple->is_dirty);
> + struct memtx_story *story = memtx_tx_story_get(tuple);
> + bool own_change = false;
> + struct tuple *result = NULL;
> +
> + while (true) {
> + if (memtx_tx_story_is_visible(story, txn, &result,
> + is_prepared_ok, &own_change)) {
> + break;
> + }
> + if (story->link[index].older.is_story) {
> + story = story->link[index].older.story;
> + } else {
> + result = story->link[index].older.tuple;
> + break;
> + }
> + }
> + if (!own_change)
> + memtx_tx_track_read(txn, space, tuple);
> + (void)mk_index; /* TODO: multiindex */
Panic/assert/diag in case of multikey indexes?
> + return result;
> +}
> +
> +
> +int
> +memtx_tx_track_read(struct txn *txn, struct space *space, struct tuple *tuple)
> +{
> + if (tuple == NULL)
> + return 0;
> + if (txn == NULL)
> + return 0;
> + if (space == NULL)
> + return 0;
Space seems to be always != NULL. If so, let's replace it with an assertion.
> + struct memtx_story *story;
> + struct tx_read_tracker *tracker = NULL;
> +
> + if (!tuple->is_dirty) {
> + story = memtx_tx_story_new(space, tuple);
> + if (story == NULL)
> + return -1;
> + size_t sz;
> + tracker = region_alloc_object(&txn->region,
> + struct tx_read_tracker, &sz);
> + if (tracker == NULL) {
> + diag_set(OutOfMemory, sz, "tx region", "read_tracker");
> + memtx_tx_story_delete(story);
> + return -1;
> + }
> + tracker->reader = txn;
> + tracker->story = story;
> + rlist_add(&story->reader_list, &tracker->in_reader_list);
> + rlist_add(&txn->read_set, &tracker->in_read_set);
> + return 0;
> + }
> + story = memtx_tx_story_get(tuple);
> +
> + struct rlist *r1 = story->reader_list.next;
> + struct rlist *r2 = txn->read_set.next;
> + while (r1 != &story->reader_list && r2 != &txn->read_set) {
> + tracker = rlist_entry(r1, struct tx_read_tracker,
> + in_reader_list);
> + assert(tracker->story == story);
> + if (tracker->reader == txn)
> + break;
> + tracker = rlist_entry(r2, struct tx_read_tracker,
> + in_read_set);
> + assert(tracker->reader == txn);
> + if (tracker->story == story)
> + break;
> + tracker = NULL;
> + r1 = r1->next;
> + r2 = r2->next;
> + }
> + if (tracker != NULL) {
> + /* Move to the beginning of a list for faster further lookups.*/
> + rlist_del(&tracker->in_reader_list);
> + rlist_del(&tracker->in_read_set);
> + } else {
> + size_t sz;
> + tracker = region_alloc_object(&txn->region,
> + struct tx_read_tracker, &sz);
> + if (tracker == NULL) {
> + diag_set(OutOfMemory, sz, "tx region", "read_tracker");
> + return -1;
> + }
> + tracker->reader = txn;
> + tracker->story = story;
> + }
> + rlist_add(&story->reader_list, &tracker->in_reader_list);
> + rlist_add(&txn->read_set, &tracker->in_read_set);
This part is very similar to the one in memtx_tx_cause_conflict()
Mb it is worth to generalize this chunk (moving to a separate func or sort of)..?
> + return 0;
> +}
> diff --git a/src/box/memtx_tx.h b/src/box/memtx_tx.h
> index 6143a22..4670ebe 100644
> --- a/src/box/memtx_tx.h
> +++ b/src/box/memtx_tx.h
> @@ -31,6 +31,12 @@
> */
>
> #include <stdbool.h>
> +#include <stddef.h>
> +#include <stdint.h>
Redundant includes (?)..
> +#include "small/rlist.h"
> +#include "index.h"
> +#include "tuple.h"
>
> #include "small/rlist.h"
>
> diff --git a/src/box/txn.h b/src/box/txn.h
> index f957d1e..ba818d0 100644
> --- a/src/box/txn.h
> +++ b/src/box/txn.h
> @@ -36,6 +36,7 @@
> #include "trigger.h"
> #include "fiber.h"
> #include "space.h"
> +#include "tuple.h"
Redundant include x2
> #if defined(__cplusplus)
> extern "C" {
> @@ -172,6 +173,27 @@ struct txn_stmt {
> struct space *space;
> struct tuple *old_tuple;
> struct tuple *new_tuple;
> + /**
> + * If new_tuple != NULL and this transaction was not prepared,
> + * this member holds added story of the new_tuple.
> + */
> + struct memtx_story *add_story;
> + /**
> + * If new_tuple == NULL and this transaction was not prepared,
> + * this member holds added story of the old_tuple.
> + */
> + struct memtx_story *del_story;
> + /**
>
^ permalink raw reply [flat|nested] 26+ messages in thread
* Re: [Tarantool-patches] [PATCH v4 07/12] txm: introduce memtx_story
2020-09-15 14:33 ` Nikita Pettik
@ 2020-09-22 17:51 ` Aleksandr Lyapunov
2020-09-23 10:25 ` Nikita Pettik
0 siblings, 1 reply; 26+ messages in thread
From: Aleksandr Lyapunov @ 2020-09-22 17:51 UTC (permalink / raw)
To: Nikita Pettik; +Cc: tarantool-patches
Thank for the review.
Most is fixed, see my replies below.
On 15.09.2020 17:33, Nikita Pettik wrote:
>> +int
>> +memtx_tx_track_read(struct txn *txn, struct space *space, struct tuple *tuple)
>> +{
>> + if (tuple == NULL)
>> + return 0;
>> + if (txn == NULL)
>> + return 0;
>> + if (space == NULL)
>> + return 0;
> Space seems to be always != NULL. If so, let's replace it with an assertion.
I found out that in iterators the space can be NULL. I'm not sure
why, but I happens without my patch set. I just accepted it.
^ permalink raw reply [flat|nested] 26+ messages in thread
* Re: [Tarantool-patches] [PATCH v4 07/12] txm: introduce memtx_story
2020-09-22 17:51 ` Aleksandr Lyapunov
@ 2020-09-23 10:25 ` Nikita Pettik
2020-09-23 11:09 ` Aleksandr Lyapunov
0 siblings, 1 reply; 26+ messages in thread
From: Nikita Pettik @ 2020-09-23 10:25 UTC (permalink / raw)
To: Aleksandr Lyapunov; +Cc: tarantool-patches
On 22 Sep 20:51, Aleksandr Lyapunov wrote:
> Thank for the review.
> Most is fixed, see my replies below.
>
> On 15.09.2020 17:33, Nikita Pettik wrote:
>
> > > +int
> > > +memtx_tx_track_read(struct txn *txn, struct space *space, struct tuple *tuple)
> > > +{
> > > + if (tuple == NULL)
> > > + return 0;
> > > + if (txn == NULL)
> > > + return 0;
> > > + if (space == NULL)
> > > + return 0;
> > Space seems to be always != NULL. If so, let's replace it with an assertion.
> I found out that in iterators the space can be NULL. I'm not sure
> why, but I happens without my patch set. I just accepted it.
>
Hm, sounds strange. I've replaced this check with an assertion and all tests
seem to pass..
^ permalink raw reply [flat|nested] 26+ messages in thread
* Re: [Tarantool-patches] [PATCH v4 07/12] txm: introduce memtx_story
2020-09-23 10:25 ` Nikita Pettik
@ 2020-09-23 11:09 ` Aleksandr Lyapunov
0 siblings, 0 replies; 26+ messages in thread
From: Aleksandr Lyapunov @ 2020-09-23 11:09 UTC (permalink / raw)
To: Nikita Pettik; +Cc: tarantool-patches
You should enable the transaction manager by default for that.
On 23.09.2020 13:25, Nikita Pettik wrote:
> On 22 Sep 20:51, Aleksandr Lyapunov wrote:
>> Thank for the review.
>> Most is fixed, see my replies below.
>>
>> On 15.09.2020 17:33, Nikita Pettik wrote:
>>
>>>> +int
>>>> +memtx_tx_track_read(struct txn *txn, struct space *space, struct tuple *tuple)
>>>> +{
>>>> + if (tuple == NULL)
>>>> + return 0;
>>>> + if (txn == NULL)
>>>> + return 0;
>>>> + if (space == NULL)
>>>> + return 0;
>>> Space seems to be always != NULL. If so, let's replace it with an assertion.
>> I found out that in iterators the space can be NULL. I'm not sure
>> why, but I happens without my patch set. I just accepted it.
>>
> Hm, sounds strange. I've replaced this check with an assertion and all tests
> seem to pass..
>
^ permalink raw reply [flat|nested] 26+ messages in thread
* [Tarantool-patches] [PATCH v4 08/12] txm: introduce snapshot cleaner
2020-09-08 10:22 [Tarantool-patches] [PATCH v4 00/12] Transaction engine for memtx engine Aleksandr Lyapunov
` (6 preceding siblings ...)
2020-09-08 10:22 ` [Tarantool-patches] [PATCH v4 07/12] txm: introduce memtx_story Aleksandr Lyapunov
@ 2020-09-08 10:22 ` Aleksandr Lyapunov
2020-09-08 10:22 ` [Tarantool-patches] [PATCH v4 09/12] txm: clarify all fetched tuples Aleksandr Lyapunov
` (4 subsequent siblings)
12 siblings, 0 replies; 26+ messages in thread
From: Aleksandr Lyapunov @ 2020-09-08 10:22 UTC (permalink / raw)
To: tarantool-patches
When memtx snapshot iterator is created it could contain some
amount of dirty tuples that should be clarified before writing
to WAL file.
Implement special snapshot cleaner for this purpose.
Part of #4897
---
src/box/memtx_tx.c | 95 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
src/box/memtx_tx.h | 38 ++++++++++++++++++++++
2 files changed, 133 insertions(+)
diff --git a/src/box/memtx_tx.c b/src/box/memtx_tx.c
index dfad6f7..1b54637 100644
--- a/src/box/memtx_tx.c
+++ b/src/box/memtx_tx.c
@@ -1105,3 +1105,98 @@ memtx_tx_track_read(struct txn *txn, struct space *space, struct tuple *tuple)
rlist_add(&txn->read_set, &tracker->in_read_set);
return 0;
}
+
+static uint32_t
+memtx_tx_snapshot_cleaner_hash(const struct tuple *a)
+{
+ uintptr_t u = (uintptr_t)a;
+ if (sizeof(uintptr_t) <= sizeof(uint32_t))
+ return u;
+ else
+ return u ^ (u >> 32);
+}
+
+struct memtx_tx_snapshot_cleaner_entry
+{
+ struct tuple *from;
+ struct tuple *to;
+};
+
+#define mh_name _snapshot_cleaner
+#define mh_key_t struct tuple *
+#define mh_node_t struct memtx_tx_snapshot_cleaner_entry
+#define mh_arg_t int
+#define mh_hash(a, arg) (memtx_tx_snapshot_cleaner_hash((a)->from))
+#define mh_hash_key(a, arg) (memtx_tx_snapshot_cleaner_hash(a))
+#define mh_cmp(a, b, arg) (((a)->from) != ((b)->from))
+#define mh_cmp_key(a, b, arg) ((a) != ((b)->from))
+#define MH_SOURCE
+#include "salad/mhash.h"
+
+int
+memtx_tx_snapshot_cleaner_create(struct memtx_tx_snapshot_cleaner *cleaner,
+ struct space *space, const char *index_name)
+{
+ cleaner->ht = NULL;
+ if (space == NULL || rlist_empty(&space->memtx_stories))
+ return 0;
+ struct mh_snapshot_cleaner_t *ht = mh_snapshot_cleaner_new();
+ if (ht == NULL) {
+ diag_set(OutOfMemory, sizeof(*ht),
+ index_name, "snapshot cleaner");
+ free(ht);
+ return -1;
+ }
+
+ struct memtx_story *story;
+ rlist_foreach_entry(story, &space->memtx_stories, in_space_stories) {
+ struct tuple *tuple = story->tuple;
+ struct tuple *clean =
+ memtx_tx_tuple_clarify_slow(NULL, space, tuple, 0, 0,
+ true);
+ if (clean == tuple)
+ continue;
+
+ struct memtx_tx_snapshot_cleaner_entry entry;
+ entry.from = tuple;
+ entry.to = clean;
+ mh_int_t res = mh_snapshot_cleaner_put(ht, &entry, NULL, 0);
+ if (res == mh_end(ht)) {
+ diag_set(OutOfMemory, sizeof(entry),
+ index_name, "snapshot rollback entry");
+ mh_snapshot_cleaner_delete(ht);
+ return -1;
+ }
+ }
+
+ cleaner->ht = ht;
+ return 0;
+}
+
+struct tuple *
+memtx_tx_snapshot_clarify_slow(struct memtx_tx_snapshot_cleaner *cleaner,
+ struct tuple *tuple)
+{
+ assert(cleaner->ht != NULL);
+
+ struct mh_snapshot_cleaner_t *ht = cleaner->ht;
+ while (true) {
+ mh_int_t pos = mh_snapshot_cleaner_find(ht, tuple, 0);
+ if (pos == mh_end(ht))
+ break;
+ struct memtx_tx_snapshot_cleaner_entry *entry =
+ mh_snapshot_cleaner_node(ht, pos);
+ assert(entry->from == tuple);
+ tuple = entry->to;
+ }
+
+ return tuple;
+}
+
+
+void
+memtx_tx_snapshot_cleaner_destroy(struct memtx_tx_snapshot_cleaner *cleaner)
+{
+ if (cleaner->ht != NULL)
+ mh_snapshot_cleaner_delete(cleaner->ht);
+}
diff --git a/src/box/memtx_tx.h b/src/box/memtx_tx.h
index 4670ebe..2ebca71 100644
--- a/src/box/memtx_tx.h
+++ b/src/box/memtx_tx.h
@@ -311,6 +311,44 @@ memtx_tx_tuple_clarify(struct txn *txn, struct space *space,
is_prepared_ok);
}
+/**
+ * Create a snapshot cleaner.
+ * @param cleaner - cleaner to create.
+ * @param space - space for which the cleaner must be created.
+ * @param index_name - name of index for diag in case of memory error.
+ * @return 0 on success, -1 on memory erorr.
+ */
+int
+memtx_tx_snapshot_cleaner_create(struct memtx_tx_snapshot_cleaner *cleaner,
+ struct space *space, const char *index_name);
+
+/** Helper of txm_snapshot_clafify. */
+struct tuple *
+memtx_tx_snapshot_clarify_slow(struct memtx_tx_snapshot_cleaner *cleaner,
+ struct tuple *tuple);
+
+/**
+ * Like a common clarify that function returns proper tuple if original
+ * tuple in index is dirty.
+ * @param cleaner - pre-created snapshot cleaner.
+ * @param tuple - tuple to clean.
+ * @return cleaned tuple, can be NULL.
+ */
+static inline struct tuple *
+memtx_tx_snapshot_clarify(struct memtx_tx_snapshot_cleaner *cleaner,
+ struct tuple *tuple)
+{
+ if (cleaner->ht == NULL)
+ return tuple;
+ return memtx_tx_snapshot_clarify_slow(cleaner, tuple);
+}
+
+/**
+ * Free resources.in shapshot @cleaner.
+ */
+void
+memtx_tx_snapshot_cleaner_destroy(struct memtx_tx_snapshot_cleaner *cleaner);
+
#if defined(__cplusplus)
} /* extern "C" */
#endif /* defined(__cplusplus) */
--
2.7.4
^ permalink raw reply [flat|nested] 26+ messages in thread
* [Tarantool-patches] [PATCH v4 09/12] txm: clarify all fetched tuples
2020-09-08 10:22 [Tarantool-patches] [PATCH v4 00/12] Transaction engine for memtx engine Aleksandr Lyapunov
` (7 preceding siblings ...)
2020-09-08 10:22 ` [Tarantool-patches] [PATCH v4 08/12] txm: introduce snapshot cleaner Aleksandr Lyapunov
@ 2020-09-08 10:22 ` Aleksandr Lyapunov
2020-09-08 10:22 ` [Tarantool-patches] [PATCH v4 10/12] txm: use new tx manager in memtx Aleksandr Lyapunov
` (3 subsequent siblings)
12 siblings, 0 replies; 26+ messages in thread
From: Aleksandr Lyapunov @ 2020-09-08 10:22 UTC (permalink / raw)
To: tarantool-patches
If a tuple fetched from an index is dirty - it must be clarified.
Let's fix all fetched from indexeds in that way.
Also fix a snapshot iterator - it must save a part of history
along with creating a read view in order to clean tuple during
iteration from another thread.
Part of #4897
---
src/box/memtx_bitset.c | 31 ++++++++-----
src/box/memtx_hash.c | 82 +++++++++++++++++++++++++++------
src/box/memtx_rtree.c | 32 +++++++++++--
src/box/memtx_tree.c | 120 ++++++++++++++++++++++++++++++++++++++++++-------
4 files changed, 224 insertions(+), 41 deletions(-)
diff --git a/src/box/memtx_bitset.c b/src/box/memtx_bitset.c
index 67eaf6f..2283a47 100644
--- a/src/box/memtx_bitset.c
+++ b/src/box/memtx_bitset.c
@@ -39,7 +39,10 @@
#include "bitset/index.h"
#include "fiber.h"
#include "index.h"
+#include "schema.h"
#include "tuple.h"
+#include "txn.h"
+#include "memtx_tx.h"
#include "memtx_engine.h"
struct memtx_bitset_index {
@@ -198,19 +201,27 @@ bitset_index_iterator_next(struct iterator *iterator, struct tuple **ret)
assert(iterator->free == bitset_index_iterator_free);
struct bitset_index_iterator *it = bitset_index_iterator(iterator);
- size_t value = tt_bitset_iterator_next(&it->bitset_it);
- if (value == SIZE_MAX) {
- *ret = NULL;
- return 0;
- }
-
+ do {
+ size_t value = tt_bitset_iterator_next(&it->bitset_it);
+ if (value == SIZE_MAX) {
+ *ret = NULL;
+ return 0;
+ }
#ifndef OLD_GOOD_BITSET
- struct memtx_bitset_index *index =
- (struct memtx_bitset_index *)iterator->index;
- *ret = memtx_bitset_index_value_to_tuple(index, value);
+ struct memtx_bitset_index *index =
+ (struct memtx_bitset_index *)iterator->index;
+ struct tuple *tuple =
+ memtx_bitset_index_value_to_tuple(index, value);
#else /* #ifndef OLD_GOOD_BITSET */
- *ret = value_to_tuple(value);
+ struct tuple *tuple = value_to_tuple(value);
#endif /* #ifndef OLD_GOOD_BITSET */
+ uint32_t iid = iterator->index->def->iid;
+ struct txn *txn = in_txn();
+ struct space *space = space_by_id(iterator->space_id);
+ bool is_rw = txn != NULL;
+ *ret = memtx_tx_tuple_clarify(txn, space, tuple, iid, 0, is_rw);
+ } while (*ret == NULL);
+
return 0;
}
diff --git a/src/box/memtx_hash.c b/src/box/memtx_hash.c
index cdd531c..ed4dba9 100644
--- a/src/box/memtx_hash.c
+++ b/src/box/memtx_hash.c
@@ -33,9 +33,11 @@
#include "fiber.h"
#include "index.h"
#include "tuple.h"
+#include "txn.h"
+#include "memtx_tx.h"
#include "memtx_engine.h"
#include "space.h"
-#include "schema.h" /* space_cache_find() */
+#include "schema.h" /* space_by_id(), space_cache_find() */
#include "errinj.h"
#include <small/mempool.h>
@@ -101,7 +103,7 @@ hash_iterator_free(struct iterator *iterator)
}
static int
-hash_iterator_ge(struct iterator *ptr, struct tuple **ret)
+hash_iterator_ge_base(struct iterator *ptr, struct tuple **ret)
{
assert(ptr->free == hash_iterator_free);
struct hash_iterator *it = (struct hash_iterator *) ptr;
@@ -113,10 +115,10 @@ hash_iterator_ge(struct iterator *ptr, struct tuple **ret)
}
static int
-hash_iterator_gt(struct iterator *ptr, struct tuple **ret)
+hash_iterator_gt_base(struct iterator *ptr, struct tuple **ret)
{
assert(ptr->free == hash_iterator_free);
- ptr->next = hash_iterator_ge;
+ ptr->next = hash_iterator_ge_base;
struct hash_iterator *it = (struct hash_iterator *) ptr;
struct memtx_hash_index *index = (struct memtx_hash_index *)ptr->index;
struct tuple **res = light_index_iterator_get_and_next(&index->hash_table,
@@ -128,6 +130,32 @@ hash_iterator_gt(struct iterator *ptr, struct tuple **ret)
return 0;
}
+#define WRAP_ITERATOR_METHOD(name) \
+static int \
+name(struct iterator *iterator, struct tuple **ret) \
+{ \
+ struct txn *txn = in_txn(); \
+ struct space *space = space_by_id(iterator->space_id); \
+ bool is_rw = txn != NULL; \
+ uint32_t iid = iterator->index->def->iid; \
+ bool is_first = true; \
+ do { \
+ int rc = is_first ? name##_base(iterator, ret) \
+ : hash_iterator_ge_base(iterator, ret); \
+ if (rc != 0 || *ret == NULL) \
+ return rc; \
+ is_first = false; \
+ *ret = memtx_tx_tuple_clarify(txn, space, *ret, iid, 0, is_rw); \
+ } while (*ret == NULL); \
+ return 0; \
+} \
+struct forgot_to_add_semicolon
+
+WRAP_ITERATOR_METHOD(hash_iterator_ge);
+WRAP_ITERATOR_METHOD(hash_iterator_gt);
+
+#undef WRAP_ITERATOR_METHOD
+
static int
hash_iterator_eq_next(MAYBE_UNUSED struct iterator *it, struct tuple **ret)
{
@@ -139,7 +167,15 @@ static int
hash_iterator_eq(struct iterator *it, struct tuple **ret)
{
it->next = hash_iterator_eq_next;
- return hash_iterator_ge(it, ret);
+ hash_iterator_ge_base(it, ret); /* always returns zero. */
+ if (*ret == NULL)
+ return 0;
+ struct txn *txn = in_txn();
+ struct space *sp = space_by_id(it->space_id);
+ bool is_rw = txn != NULL;
+ *ret = memtx_tx_tuple_clarify(txn, sp, *ret, it->index->def->iid,
+ 0, is_rw);
+ return 0;
}
/* }}} */
@@ -279,11 +315,18 @@ memtx_hash_index_get(struct index *base, const char *key,
part_count == base->def->key_def->part_count);
(void) part_count;
+ struct space *space = space_by_id(base->def->space_id);
*result = NULL;
uint32_t h = key_hash(key, base->def->key_def);
uint32_t k = light_index_find_key(&index->hash_table, h, key);
- if (k != light_index_end)
- *result = light_index_get(&index->hash_table, k);
+ if (k != light_index_end) {
+ struct tuple *tuple = light_index_get(&index->hash_table, k);
+ uint32_t iid = base->def->iid;
+ struct txn *txn = in_txn();
+ bool is_rw = txn != NULL;
+ *result = memtx_tx_tuple_clarify(txn, space, tuple, iid,
+ 0, is_rw);
+ }
return 0;
}
@@ -401,6 +444,7 @@ struct hash_snapshot_iterator {
struct snapshot_iterator base;
struct memtx_hash_index *index;
struct light_index_iterator iterator;
+ struct memtx_tx_snapshot_cleaner cleaner;
};
/**
@@ -418,6 +462,7 @@ hash_snapshot_iterator_free(struct snapshot_iterator *iterator)
it->index->base.engine);
light_index_iterator_destroy(&it->index->hash_table, &it->iterator);
index_unref(&it->index->base);
+ memtx_tx_snapshot_cleaner_destroy(&it->cleaner);
free(iterator);
}
@@ -434,13 +479,24 @@ hash_snapshot_iterator_next(struct snapshot_iterator *iterator,
struct hash_snapshot_iterator *it =
(struct hash_snapshot_iterator *) iterator;
struct light_index_core *hash_table = &it->index->hash_table;
- struct tuple **res = light_index_iterator_get_and_next(hash_table,
- &it->iterator);
- if (res == NULL) {
- *data = NULL;
- return 0;
+
+ while (true) {
+ struct tuple **res =
+ light_index_iterator_get_and_next(hash_table,
+ &it->iterator);
+ if (res == NULL) {
+ *data = NULL;
+ return 0;
+ }
+
+ struct tuple *tuple = *res;
+ tuple = memtx_tx_snapshot_clarify(&it->cleaner, tuple);
+
+ if (tuple != NULL) {
+ *data = tuple_data_range(*res, size);
+ return 0;
+ }
}
- *data = tuple_data_range(*res, size);
return 0;
}
diff --git a/src/box/memtx_rtree.c b/src/box/memtx_rtree.c
index 612fcb2..0bd683d 100644
--- a/src/box/memtx_rtree.c
+++ b/src/box/memtx_rtree.c
@@ -40,7 +40,10 @@
#include "trivia/util.h"
#include "tuple.h"
+#include "txn.h"
+#include "memtx_tx.h"
#include "space.h"
+#include "schema.h"
#include "memtx_engine.h"
struct memtx_rtree_index {
@@ -148,7 +151,16 @@ static int
index_rtree_iterator_next(struct iterator *i, struct tuple **ret)
{
struct index_rtree_iterator *itr = (struct index_rtree_iterator *)i;
- *ret = (struct tuple *)rtree_iterator_next(&itr->impl);
+ do {
+ *ret = (struct tuple *) rtree_iterator_next(&itr->impl);
+ if (*ret == NULL)
+ break;
+ uint32_t iid = i->index->def->iid;
+ struct txn *txn = in_txn();
+ struct space *space = space_by_id(i->space_id);
+ bool is_rw = txn != NULL;
+ *ret = memtx_tx_tuple_clarify(txn, space, *ret, iid, 0, is_rw);
+ } while (*ret == NULL);
return 0;
}
@@ -213,8 +225,22 @@ memtx_rtree_index_get(struct index *base, const char *key,
unreachable();
*result = NULL;
- if (rtree_search(&index->tree, &rect, SOP_OVERLAPS, &iterator))
- *result = (struct tuple *)rtree_iterator_next(&iterator);
+ if (!rtree_search(&index->tree, &rect, SOP_OVERLAPS, &iterator)) {
+ rtree_iterator_destroy(&iterator);
+ return 0;
+ }
+ do {
+ struct tuple *tuple = (struct tuple *)
+ rtree_iterator_next(&iterator);
+ if (tuple == NULL)
+ break;
+ uint32_t iid = base->def->iid;
+ struct txn *txn = in_txn();
+ struct space *space = space_by_id(base->def->space_id);
+ bool is_rw = txn != NULL;
+ *result = memtx_tx_tuple_clarify(txn, space, tuple, iid,
+ 0, is_rw);
+ } while (*result == NULL);
rtree_iterator_destroy(&iterator);
return 0;
}
diff --git a/src/box/memtx_tree.c b/src/box/memtx_tree.c
index 76ff3fc..5af482f 100644
--- a/src/box/memtx_tree.c
+++ b/src/box/memtx_tree.c
@@ -31,12 +31,14 @@
#include "memtx_tree.h"
#include "memtx_engine.h"
#include "space.h"
-#include "schema.h" /* space_cache_find() */
+#include "schema.h" /* space_by_id(), space_cache_find() */
#include "errinj.h"
#include "memory.h"
#include "fiber.h"
#include "key_list.h"
#include "tuple.h"
+#include "txn.h"
+#include "memtx_tx.h"
#include <third_party/qsort_arg.h>
#include <small/mempool.h>
@@ -175,7 +177,7 @@ tree_iterator_dummie(struct iterator *iterator, struct tuple **ret)
}
static int
-tree_iterator_next(struct iterator *iterator, struct tuple **ret)
+tree_iterator_next_base(struct iterator *iterator, struct tuple **ret)
{
struct memtx_tree_index *index =
(struct memtx_tree_index *)iterator->index;
@@ -205,7 +207,7 @@ tree_iterator_next(struct iterator *iterator, struct tuple **ret)
}
static int
-tree_iterator_prev(struct iterator *iterator, struct tuple **ret)
+tree_iterator_prev_base(struct iterator *iterator, struct tuple **ret)
{
struct memtx_tree_index *index =
(struct memtx_tree_index *)iterator->index;
@@ -234,7 +236,7 @@ tree_iterator_prev(struct iterator *iterator, struct tuple **ret)
}
static int
-tree_iterator_next_equal(struct iterator *iterator, struct tuple **ret)
+tree_iterator_next_equal_base(struct iterator *iterator, struct tuple **ret)
{
struct memtx_tree_index *index =
(struct memtx_tree_index *)iterator->index;
@@ -270,7 +272,7 @@ tree_iterator_next_equal(struct iterator *iterator, struct tuple **ret)
}
static int
-tree_iterator_prev_equal(struct iterator *iterator, struct tuple **ret)
+tree_iterator_prev_equal_base(struct iterator *iterator, struct tuple **ret)
{
struct memtx_tree_index *index =
(struct memtx_tree_index *)iterator->index;
@@ -304,6 +306,47 @@ tree_iterator_prev_equal(struct iterator *iterator, struct tuple **ret)
return 0;
}
+#define WRAP_ITERATOR_METHOD(name) \
+static int \
+name(struct iterator *iterator, struct tuple **ret) \
+{ \
+ struct memtx_tree *tree = \
+ &((struct memtx_tree_index *)iterator->index)->tree; \
+ struct tree_iterator *it = tree_iterator(iterator); \
+ struct memtx_tree_iterator *ti = &it->tree_iterator; \
+ uint32_t iid = iterator->index->def->iid; \
+ bool is_multikey = iterator->index->def->key_def->is_multikey; \
+ struct txn *txn = in_txn(); \
+ struct space *space = space_by_id(iterator->space_id); \
+ bool is_rw = txn != NULL; \
+ do { \
+ int rc = name##_base(iterator, ret); \
+ if (rc != 0 || *ret == NULL) \
+ return rc; \
+ uint32_t mk_index = 0; \
+ if (is_multikey) { \
+ struct memtx_tree_data *check = \
+ memtx_tree_iterator_get_elem(tree, ti); \
+ assert(check != NULL); \
+ mk_index = check->hint; \
+ } \
+ *ret = memtx_tx_tuple_clarify(txn, space, *ret, \
+ iid, mk_index, is_rw); \
+ } while (*ret == NULL); \
+ tuple_unref(it->current.tuple); \
+ it->current.tuple = *ret; \
+ tuple_ref(it->current.tuple); \
+ return 0; \
+} \
+struct forgot_to_add_semicolon
+
+WRAP_ITERATOR_METHOD(tree_iterator_next);
+WRAP_ITERATOR_METHOD(tree_iterator_prev);
+WRAP_ITERATOR_METHOD(tree_iterator_next_equal);
+WRAP_ITERATOR_METHOD(tree_iterator_prev_equal);
+
+#undef WRAP_ITERATOR_METHOD
+
static void
tree_iterator_set_next_method(struct tree_iterator *it)
{
@@ -388,6 +431,22 @@ tree_iterator_start(struct iterator *iterator, struct tuple **ret)
tuple_ref(*ret);
it->current = *res;
tree_iterator_set_next_method(it);
+
+ uint32_t iid = iterator->index->def->iid;
+ bool is_multikey = iterator->index->def->key_def->is_multikey;
+ struct txn *txn = in_txn();
+ struct space *space = space_by_id(iterator->space_id);
+ bool is_rw = txn != NULL;
+ uint32_t mk_index = is_multikey ? res->hint : 0;
+ *ret = memtx_tx_tuple_clarify(txn, space, *ret, iid, mk_index, is_rw);
+ if (*ret == NULL) {
+ return iterator->next(iterator, ret);
+ } else {
+ tuple_unref(it->current.tuple);
+ it->current.tuple = *ret;
+ tuple_ref(it->current.tuple);
+ }
+
return 0;
}
@@ -539,7 +598,16 @@ memtx_tree_index_get(struct index *base, const char *key,
key_data.part_count = part_count;
key_data.hint = key_hint(key, part_count, cmp_def);
struct memtx_tree_data *res = memtx_tree_find(&index->tree, &key_data);
- *result = res != NULL ? res->tuple : NULL;
+ if (res == NULL) {
+ *result = NULL;
+ return 0;
+ }
+ struct txn *txn = in_txn();
+ struct space *space = space_by_id(base->def->space_id);
+ bool is_rw = txn != NULL;
+ uint32_t mk_index = base->def->key_def->is_multikey ? res->hint : 0;
+ *result = memtx_tx_tuple_clarify(txn, space, res->tuple, base->def->iid,
+ mk_index, is_rw);
return 0;
}
@@ -1208,6 +1276,7 @@ struct tree_snapshot_iterator {
struct snapshot_iterator base;
struct memtx_tree_index *index;
struct memtx_tree_iterator tree_iterator;
+ struct memtx_tx_snapshot_cleaner cleaner;
};
static void
@@ -1220,6 +1289,7 @@ tree_snapshot_iterator_free(struct snapshot_iterator *iterator)
it->index->base.engine);
memtx_tree_iterator_destroy(&it->index->tree, &it->tree_iterator);
index_unref(&it->index->base);
+ memtx_tx_snapshot_cleaner_destroy(&it->cleaner);
free(iterator);
}
@@ -1231,14 +1301,27 @@ tree_snapshot_iterator_next(struct snapshot_iterator *iterator,
struct tree_snapshot_iterator *it =
(struct tree_snapshot_iterator *)iterator;
struct memtx_tree *tree = &it->index->tree;
- struct memtx_tree_data *res = memtx_tree_iterator_get_elem(tree,
- &it->tree_iterator);
- if (res == NULL) {
- *data = NULL;
- return 0;
+
+ while (true) {
+ struct memtx_tree_data *res =
+ memtx_tree_iterator_get_elem(tree, &it->tree_iterator);
+
+ if (res == NULL) {
+ *data = NULL;
+ return 0;
+ }
+
+ memtx_tree_iterator_next(tree, &it->tree_iterator);
+
+ struct tuple *tuple = res->tuple;
+ tuple = memtx_tx_snapshot_clarify(&it->cleaner, tuple);
+
+ if (tuple != NULL) {
+ *data = tuple_data_range(tuple, size);
+ return 0;
+ }
}
- memtx_tree_iterator_next(tree, &it->tree_iterator);
- *data = tuple_data_range(res->tuple, size);
+
return 0;
}
@@ -1251,14 +1334,21 @@ static struct snapshot_iterator *
memtx_tree_index_create_snapshot_iterator(struct index *base)
{
struct memtx_tree_index *index = (struct memtx_tree_index *)base;
- struct tree_snapshot_iterator *it = (struct tree_snapshot_iterator *)
- calloc(1, sizeof(*it));
+ struct tree_snapshot_iterator *it =
+ (struct tree_snapshot_iterator *) calloc(1, sizeof(*it));
if (it == NULL) {
diag_set(OutOfMemory, sizeof(struct tree_snapshot_iterator),
"memtx_tree_index", "create_snapshot_iterator");
return NULL;
}
+ struct space *space = space_cache_find(base->def->space_id);
+ if (memtx_tx_snapshot_cleaner_create(&it->cleaner, space,
+ "memtx_tree_index") != 0) {
+ free(it);
+ return NULL;
+ }
+
it->base.free = tree_snapshot_iterator_free;
it->base.next = tree_snapshot_iterator_next;
it->index = index;
--
2.7.4
^ permalink raw reply [flat|nested] 26+ messages in thread
* [Tarantool-patches] [PATCH v4 10/12] txm: use new tx manager in memtx
2020-09-08 10:22 [Tarantool-patches] [PATCH v4 00/12] Transaction engine for memtx engine Aleksandr Lyapunov
` (8 preceding siblings ...)
2020-09-08 10:22 ` [Tarantool-patches] [PATCH v4 09/12] txm: clarify all fetched tuples Aleksandr Lyapunov
@ 2020-09-08 10:22 ` Aleksandr Lyapunov
2020-09-15 17:59 ` Nikita Pettik
2020-09-08 10:22 ` [Tarantool-patches] [PATCH v4 11/12] test: move txn_proxy.lua to box/lua Aleksandr Lyapunov
` (2 subsequent siblings)
12 siblings, 1 reply; 26+ messages in thread
From: Aleksandr Lyapunov @ 2020-09-08 10:22 UTC (permalink / raw)
To: tarantool-patches
Use mvcc transaction engine in memtx if the engine is enabled.
Closes #4897
---
src/box/memtx_engine.c | 38 +++++++++++++++++++++++++++++++++++---
src/box/memtx_space.c | 23 +++++++++++++++++++----
src/box/txn.c | 12 +++++++-----
src/box/txn.h | 14 +++++++++-----
src/box/vinyl.c | 8 ++++----
5 files changed, 74 insertions(+), 21 deletions(-)
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index 9f079a6..9a9ae26 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -40,6 +40,7 @@
#include "coio_file.h"
#include "tuple.h"
#include "txn.h"
+#include "memtx_tx.h"
#include "memtx_tree.h"
#include "iproto_constants.h"
#include "xrow.h"
@@ -335,11 +336,39 @@ static int
memtx_engine_begin(struct engine *engine, struct txn *txn)
{
(void)engine;
- txn_can_yield(txn, false);
+ txn_can_yield(txn, memtx_tx_manager_use_mvcc_engine);
+ return 0;
+}
+
+static int
+memtx_engine_prepare(struct engine *engine, struct txn *txn)
+{
+ (void)engine;
+ struct txn_stmt *stmt;
+ stailq_foreach_entry(stmt, &txn->stmts, next) {
+ if (stmt->add_story != NULL || stmt->del_story != NULL)
+ memtx_tx_history_prepare_stmt(stmt);
+ }
return 0;
}
static void
+memtx_engine_commit(struct engine *engine, struct txn *txn)
+{
+ (void)engine;
+ struct txn_stmt *stmt;
+ stailq_foreach_entry(stmt, &txn->stmts, next) {
+ if (stmt->add_story != NULL || stmt->del_story != NULL) {
+ ssize_t bsize = memtx_tx_history_commit_stmt(stmt);
+ assert(stmt->space->engine == engine);
+ struct memtx_space *mspace =
+ (struct memtx_space *)stmt->space;
+ mspace->bsize += bsize;
+ }
+ }
+}
+
+static void
memtx_engine_rollback_statement(struct engine *engine, struct txn *txn,
struct txn_stmt *stmt)
{
@@ -355,6 +384,9 @@ memtx_engine_rollback_statement(struct engine *engine, struct txn *txn,
if (stmt->engine_savepoint == NULL)
return;
+ if (stmt->add_story != NULL || stmt->del_story != NULL)
+ return memtx_tx_history_rollback_stmt(stmt);
+
if (memtx_space->replace == memtx_space_replace_all_keys)
index_count = space->index_count;
else if (memtx_space->replace == memtx_space_replace_primary_key)
@@ -914,8 +946,8 @@ static const struct engine_vtab memtx_engine_vtab = {
/* .complete_join = */ memtx_engine_complete_join,
/* .begin = */ memtx_engine_begin,
/* .begin_statement = */ generic_engine_begin_statement,
- /* .prepare = */ generic_engine_prepare,
- /* .commit = */ generic_engine_commit,
+ /* .prepare = */ memtx_engine_prepare,
+ /* .commit = */ memtx_engine_commit,
/* .rollback_statement = */ memtx_engine_rollback_statement,
/* .rollback = */ generic_engine_rollback,
/* .switch_to_ro = */ generic_engine_switch_to_ro,
diff --git a/src/box/memtx_space.c b/src/box/memtx_space.c
index 7624130..d4b18d9 100644
--- a/src/box/memtx_space.c
+++ b/src/box/memtx_space.c
@@ -32,6 +32,7 @@
#include "space.h"
#include "iproto_constants.h"
#include "txn.h"
+#include "memtx_tx.h"
#include "tuple.h"
#include "xrow_update.h"
#include "xrow.h"
@@ -260,6 +261,20 @@ memtx_space_replace_all_keys(struct space *space, struct tuple *old_tuple,
if (pk == NULL)
return -1;
assert(pk->def->opts.is_unique);
+
+ if (memtx_tx_manager_use_mvcc_engine) {
+ struct txn *txn = in_txn();
+ struct txn_stmt *stmt =
+ txn == NULL ? NULL : txn_current_stmt(txn);
+ if (stmt != NULL) {
+ return memtx_tx_history_add_stmt(stmt, old_tuple, new_tuple,
+ mode, result);
+ } else {
+ /** Ephemeral space */
+ assert(space->def->id == 0);
+ }
+ }
+
/*
* If old_tuple is not NULL, the index has to
* find and delete it, or return an error.
@@ -888,7 +903,7 @@ memtx_space_check_format(struct space *space, struct tuple_format *format)
if (txn_check_singlestatement(txn, "space format check") != 0)
return -1;
- txn_can_yield(txn, true);
+ bool could_yield = txn_can_yield(txn, true);
struct memtx_engine *memtx = (struct memtx_engine *)space->engine;
struct memtx_ddl_state state;
@@ -932,7 +947,7 @@ memtx_space_check_format(struct space *space, struct tuple_format *format)
iterator_delete(it);
diag_destroy(&state.diag);
trigger_clear(&on_replace);
- txn_can_yield(txn, false);
+ txn_can_yield(txn, could_yield);
return rc;
}
@@ -1046,7 +1061,7 @@ memtx_space_build_index(struct space *src_space, struct index *new_index,
if (txn_check_singlestatement(txn, "index build") != 0)
return -1;
- txn_can_yield(txn, true);
+ bool could_yield = txn_can_yield(txn, true);
struct memtx_engine *memtx = (struct memtx_engine *)src_space->engine;
struct memtx_ddl_state state;
@@ -1124,7 +1139,7 @@ memtx_space_build_index(struct space *src_space, struct index *new_index,
iterator_delete(it);
diag_destroy(&state.diag);
trigger_clear(&on_replace);
- txn_can_yield(txn, false);
+ txn_can_yield(txn, could_yield);
return rc;
}
diff --git a/src/box/txn.c b/src/box/txn.c
index 023da91..4f5484e 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -123,6 +123,8 @@ txn_stmt_new(struct region *region)
static inline void
txn_stmt_destroy(struct txn_stmt *stmt)
{
+ assert(stmt->add_story == NULL && stmt->del_story == NULL);
+
if (stmt->old_tuple != NULL)
tuple_unref(stmt->old_tuple);
if (stmt->new_tuple != NULL)
@@ -993,20 +995,20 @@ txn_check_singlestatement(struct txn *txn, const char *where)
return 0;
}
-void
+bool
txn_can_yield(struct txn *txn, bool set)
{
assert(txn == in_txn());
- if (set) {
- assert(!txn_has_flag(txn, TXN_CAN_YIELD));
+ bool could = txn_has_flag(txn, TXN_CAN_YIELD);
+ if (set && !could) {
txn_set_flag(txn, TXN_CAN_YIELD);
trigger_clear(&txn->fiber_on_yield);
- } else {
- assert(txn_has_flag(txn, TXN_CAN_YIELD));
+ } else if (!set && could) {
txn_clear_flag(txn, TXN_CAN_YIELD);
trigger_create(&txn->fiber_on_yield, txn_on_yield, NULL, NULL);
trigger_add(&fiber()->on_yield, &txn->fiber_on_yield);
}
+ return could;
}
int64_t
diff --git a/src/box/txn.h b/src/box/txn.h
index ba818d0..ed2ce82 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -620,12 +620,16 @@ txn_check_singlestatement(struct txn *txn, const char *where);
* as aborted, which results in rolling back the transaction on
* commit.
*
- * This function is used by the memtx engine, because it doesn't
- * support yields inside transactions. It is also used to temporarily
- * enable yields for long DDL operations such as building an index
- * or checking a space format.
+ *
+ * This function is used by the memtx engine, because there are cases
+ * when if doesn't support yields inside transactions. It is also
+ * used to temporarily enable yields for long DDL operations such as
+ * building an index or checking a space format.
+ *
+ * Return previous state of the flag: true - yields were enabled,
+ * false - yields were disabled before the function call.
*/
-void
+bool
txn_can_yield(struct txn *txn, bool set);
/**
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index aa6e50f..cee39c5 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -1076,7 +1076,7 @@ vinyl_space_check_format(struct space *space, struct tuple_format *format)
return -1;
/* See the comment in vinyl_space_build_index(). */
- txn_can_yield(txn, true);
+ bool could_yield = txn_can_yield(txn, true);
struct trigger on_replace;
struct vy_check_format_ctx ctx;
@@ -1128,7 +1128,7 @@ vinyl_space_check_format(struct space *space, struct tuple_format *format)
out:
diag_destroy(&ctx.diag);
trigger_clear(&on_replace);
- txn_can_yield(txn, false);
+ txn_can_yield(txn, could_yield);
return rc;
}
@@ -4175,7 +4175,7 @@ vinyl_space_build_index(struct space *src_space, struct index *new_index,
* change the data dictionary, so there is no dirty state
* that can be observed.
*/
- txn_can_yield(txn, true);
+ bool could_yield = txn_can_yield(txn, true);
/*
* Iterate over all tuples stored in the space and insert
@@ -4276,7 +4276,7 @@ vinyl_space_build_index(struct space *src_space, struct index *new_index,
out:
diag_destroy(&ctx.diag);
trigger_clear(&on_replace);
- txn_can_yield(txn, false);
+ txn_can_yield(txn, could_yield);
return rc;
}
--
2.7.4
^ permalink raw reply [flat|nested] 26+ messages in thread
* Re: [Tarantool-patches] [PATCH v4 10/12] txm: use new tx manager in memtx
2020-09-08 10:22 ` [Tarantool-patches] [PATCH v4 10/12] txm: use new tx manager in memtx Aleksandr Lyapunov
@ 2020-09-15 17:59 ` Nikita Pettik
2020-09-22 17:53 ` Aleksandr Lyapunov
0 siblings, 1 reply; 26+ messages in thread
From: Nikita Pettik @ 2020-09-15 17:59 UTC (permalink / raw)
To: Aleksandr Lyapunov; +Cc: tarantool-patches
On 08 Sep 13:22, Aleksandr Lyapunov wrote:
> index ba818d0..ed2ce82 100644
> --- a/src/box/txn.h
> +++ b/src/box/txn.h
> @@ -620,12 +620,16 @@ txn_check_singlestatement(struct txn *txn, const char *where);
> * as aborted, which results in rolling back the transaction on
> * commit.
> *
> - * This function is used by the memtx engine, because it doesn't
> - * support yields inside transactions. It is also used to temporarily
> - * enable yields for long DDL operations such as building an index
> - * or checking a space format.
> + *
> + * This function is used by the memtx engine, because there are cases
> + * when if doesn't support yields inside transactions. It is also
..if -> it?
> + * used to temporarily enable yields for long DDL operations such as
> + * building an index or checking a space format.
> + *
> + * Return previous state of the flag: true - yields were enabled,
> + * false - yields were disabled before the function call.
> */
> -void
> +bool
> txn_can_yield(struct txn *txn, bool set);
>
> /**
> diff --git a/src/box/vinyl.c b/src/box/vinyl.c
> index aa6e50f..cee39c5 100644
> --- a/src/box/vinyl.c
> +++ b/src/box/vinyl.c
> @@ -1076,7 +1076,7 @@ vinyl_space_check_format(struct space *space, struct tuple_format *format)
> return -1;
>
> /* See the comment in vinyl_space_build_index(). */
> - txn_can_yield(txn, true);
> + bool could_yield = txn_can_yield(txn, true);
Don't get why vinyl related functions should be involved in this change..
The rest is OK.
> struct trigger on_replace;
> struct vy_check_format_ctx ctx;
> @@ -1128,7 +1128,7 @@ vinyl_space_check_format(struct space *space, struct tuple_format *format)
> out:
> diag_destroy(&ctx.diag);
> trigger_clear(&on_replace);
> - txn_can_yield(txn, false);
> + txn_can_yield(txn, could_yield);
> return rc;
> }
>
> @@ -4175,7 +4175,7 @@ vinyl_space_build_index(struct space *src_space, struct index *new_index,
> * change the data dictionary, so there is no dirty state
> * that can be observed.
> */
> - txn_can_yield(txn, true);
> + bool could_yield = txn_can_yield(txn, true);
>
> /*
> * Iterate over all tuples stored in the space and insert
> @@ -4276,7 +4276,7 @@ vinyl_space_build_index(struct space *src_space, struct index *new_index,
> out:
> diag_destroy(&ctx.diag);
> trigger_clear(&on_replace);
> - txn_can_yield(txn, false);
> + txn_can_yield(txn, could_yield);
> return rc;
> }
>
> --
> 2.7.4
>
^ permalink raw reply [flat|nested] 26+ messages in thread
* Re: [Tarantool-patches] [PATCH v4 10/12] txm: use new tx manager in memtx
2020-09-15 17:59 ` Nikita Pettik
@ 2020-09-22 17:53 ` Aleksandr Lyapunov
2020-09-23 10:26 ` Nikita Pettik
0 siblings, 1 reply; 26+ messages in thread
From: Aleksandr Lyapunov @ 2020-09-22 17:53 UTC (permalink / raw)
To: Nikita Pettik; +Cc: tarantool-patches
Hi, thanks for the review.
Almost all if fixed, see my comment below:
On 15.09.2020 20:59, Nikita Pettik wrote:
>
>> diff --git a/src/box/vinyl.c b/src/box/vinyl.c
>> index aa6e50f..cee39c5 100644
>> --- a/src/box/vinyl.c
>> +++ b/src/box/vinyl.c
>> @@ -1076,7 +1076,7 @@ vinyl_space_check_format(struct space *space, struct tuple_format *format)
>> return -1;
>>
>> /* See the comment in vinyl_space_build_index(). */
>> - txn_can_yield(txn, true);
>> + bool could_yield = txn_can_yield(txn, true);
> Don't get why vinyl related functions should be involved in this change..
> The rest is OK.
These vinyl functions are called in memtx transaction that performs
alter of a vinyl space..
In the code vinyl changes behavior of the memtx transaction.
^ permalink raw reply [flat|nested] 26+ messages in thread
* Re: [Tarantool-patches] [PATCH v4 10/12] txm: use new tx manager in memtx
2020-09-22 17:53 ` Aleksandr Lyapunov
@ 2020-09-23 10:26 ` Nikita Pettik
0 siblings, 0 replies; 26+ messages in thread
From: Nikita Pettik @ 2020-09-23 10:26 UTC (permalink / raw)
To: Aleksandr Lyapunov; +Cc: tarantool-patches
On 22 Sep 20:53, Aleksandr Lyapunov wrote:
> Hi, thanks for the review.
> Almost all if fixed, see my comment below:
>
> On 15.09.2020 20:59, Nikita Pettik wrote:
> >
> > > diff --git a/src/box/vinyl.c b/src/box/vinyl.c
> > > index aa6e50f..cee39c5 100644
> > > --- a/src/box/vinyl.c
> > > +++ b/src/box/vinyl.c
> > > @@ -1076,7 +1076,7 @@ vinyl_space_check_format(struct space *space, struct tuple_format *format)
> > > return -1;
> > > /* See the comment in vinyl_space_build_index(). */
> > > - txn_can_yield(txn, true);
> > > + bool could_yield = txn_can_yield(txn, true);
> > Don't get why vinyl related functions should be involved in this change..
> > The rest is OK.
> These vinyl functions are called in memtx transaction that performs alter of
> a vinyl space..
> In the code vinyl changes behavior of the memtx transaction.
Ok, get it, thx.
^ permalink raw reply [flat|nested] 26+ messages in thread
* [Tarantool-patches] [PATCH v4 11/12] test: move txn_proxy.lua to box/lua
2020-09-08 10:22 [Tarantool-patches] [PATCH v4 00/12] Transaction engine for memtx engine Aleksandr Lyapunov
` (9 preceding siblings ...)
2020-09-08 10:22 ` [Tarantool-patches] [PATCH v4 10/12] txm: use new tx manager in memtx Aleksandr Lyapunov
@ 2020-09-08 10:22 ` Aleksandr Lyapunov
2020-09-08 10:22 ` [Tarantool-patches] [PATCH v4 12/12] txm: add a test Aleksandr Lyapunov
2020-09-23 12:18 ` [Tarantool-patches] [PATCH v4 00/12] Transaction engine for memtx engine Kirill Yukhin
12 siblings, 0 replies; 26+ messages in thread
From: Aleksandr Lyapunov @ 2020-09-08 10:22 UTC (permalink / raw)
To: tarantool-patches
txn_proxy is a special utility for transaction tests.
Formerly it was used only for vinyl tests and thus was placed in
vinyl folder.
Now the time has come to test memtx transactions and the utility
must be placed amongst other utils - in box/lua.
Needed for #4897
---
test/box/lua/txn_proxy.lua | 54 ++++++++++++++++++++++++++++++++++++++++++++++
test/vinyl/suite.ini | 2 +-
test/vinyl/txn_proxy.lua | 54 ----------------------------------------------
3 files changed, 55 insertions(+), 55 deletions(-)
create mode 100644 test/box/lua/txn_proxy.lua
delete mode 100644 test/vinyl/txn_proxy.lua
diff --git a/test/box/lua/txn_proxy.lua b/test/box/lua/txn_proxy.lua
new file mode 100644
index 0000000..7a4d0b8
--- /dev/null
+++ b/test/box/lua/txn_proxy.lua
@@ -0,0 +1,54 @@
+-- A fiber can't use multiple transactions simultaneously;
+-- i.e. [fiber] --? [transaction] in UML parlor.
+--
+-- This module provides a simple transaction proxy facility
+-- to control multiple transactions at once. A proxy executes
+-- statements in a worker fiber in order to overcome
+-- "one transaction per fiber" limitation.
+--
+-- Ex:
+-- proxy = require('txn_proxy').new()
+-- proxy:begin()
+-- proxy('box.space.test:replace{1, 42}')
+-- proxy:commit() -- or proxy:rollback()
+
+local ffi = require('ffi')
+local yaml = require('yaml')
+local fiber = require('fiber')
+local console = require('console')
+
+local array_mt = { __serialize = 'array' }
+
+local mt = {
+ __call = function(self, code_str)
+ self.c1:put(code_str)
+ local res = yaml.decode(self.c2:get())
+ return type(res) == 'table' and setmetatable(res, array_mt) or res
+ end,
+ __index = {
+ begin = function(self) return self('box.begin()') end,
+ commit = function(self) return self('box.commit()') end,
+ rollback = function(self) return self('box.rollback()') end,
+ close = function(self) self.c1:close(); self.c2:close() end
+ }
+}
+
+local function fiber_main(c1, c2)
+ local code_str = c1:get()
+ if code_str then
+ c2:put(console.eval(code_str))
+ return fiber_main(c1, c2) -- tail call
+ end
+end
+
+local function new_txn_proxy()
+ local c1, c2 = fiber.channel(), fiber.channel()
+ local function on_gc() c1:close(); c2:close() end
+ fiber.create(fiber_main, c1, c2)
+ return setmetatable({
+ c1 = c1, c2 = c2,
+ __gc = ffi.gc(ffi.new('char[1]'), on_gc)
+ }, mt)
+end
+
+return { new = new_txn_proxy }
diff --git a/test/vinyl/suite.ini b/test/vinyl/suite.ini
index 3e11b08..c6cb89a 100644
--- a/test/vinyl/suite.ini
+++ b/test/vinyl/suite.ini
@@ -4,7 +4,7 @@ description = vinyl integration tests
script = vinyl.lua
release_disabled = errinj.test.lua errinj_ddl.test.lua errinj_gc.test.lua errinj_stat.test.lua errinj_tx.test.lua errinj_vylog.test.lua partial_dump.test.lua quota_timeout.test.lua recovery_quota.test.lua replica_rejoin.test.lua gh-4864-stmt-alloc-fail-compact.test.lua gh-4805-open-run-err-recovery.test.lua gh-4821-ddl-during-throttled-dump.test.lua gh-3395-read-prepared-uncommitted.test.lua
config = suite.cfg
-lua_libs = suite.lua stress.lua large.lua txn_proxy.lua ../box/lua/utils.lua
+lua_libs = suite.lua stress.lua large.lua ../box/lua/txn_proxy.lua ../box/lua/utils.lua
use_unix_sockets = True
use_unix_sockets_iproto = True
long_run = stress.test.lua large.test.lua write_iterator_rand.test.lua dump_stress.test.lua select_consistency.test.lua throttle.test.lua
diff --git a/test/vinyl/txn_proxy.lua b/test/vinyl/txn_proxy.lua
deleted file mode 100644
index 7a4d0b8..0000000
--- a/test/vinyl/txn_proxy.lua
+++ /dev/null
@@ -1,54 +0,0 @@
--- A fiber can't use multiple transactions simultaneously;
--- i.e. [fiber] --? [transaction] in UML parlor.
---
--- This module provides a simple transaction proxy facility
--- to control multiple transactions at once. A proxy executes
--- statements in a worker fiber in order to overcome
--- "one transaction per fiber" limitation.
---
--- Ex:
--- proxy = require('txn_proxy').new()
--- proxy:begin()
--- proxy('box.space.test:replace{1, 42}')
--- proxy:commit() -- or proxy:rollback()
-
-local ffi = require('ffi')
-local yaml = require('yaml')
-local fiber = require('fiber')
-local console = require('console')
-
-local array_mt = { __serialize = 'array' }
-
-local mt = {
- __call = function(self, code_str)
- self.c1:put(code_str)
- local res = yaml.decode(self.c2:get())
- return type(res) == 'table' and setmetatable(res, array_mt) or res
- end,
- __index = {
- begin = function(self) return self('box.begin()') end,
- commit = function(self) return self('box.commit()') end,
- rollback = function(self) return self('box.rollback()') end,
- close = function(self) self.c1:close(); self.c2:close() end
- }
-}
-
-local function fiber_main(c1, c2)
- local code_str = c1:get()
- if code_str then
- c2:put(console.eval(code_str))
- return fiber_main(c1, c2) -- tail call
- end
-end
-
-local function new_txn_proxy()
- local c1, c2 = fiber.channel(), fiber.channel()
- local function on_gc() c1:close(); c2:close() end
- fiber.create(fiber_main, c1, c2)
- return setmetatable({
- c1 = c1, c2 = c2,
- __gc = ffi.gc(ffi.new('char[1]'), on_gc)
- }, mt)
-end
-
-return { new = new_txn_proxy }
--
2.7.4
^ permalink raw reply [flat|nested] 26+ messages in thread
* [Tarantool-patches] [PATCH v4 12/12] txm: add a test
2020-09-08 10:22 [Tarantool-patches] [PATCH v4 00/12] Transaction engine for memtx engine Aleksandr Lyapunov
` (10 preceding siblings ...)
2020-09-08 10:22 ` [Tarantool-patches] [PATCH v4 11/12] test: move txn_proxy.lua to box/lua Aleksandr Lyapunov
@ 2020-09-08 10:22 ` Aleksandr Lyapunov
2020-09-15 18:05 ` Nikita Pettik
2020-09-23 12:18 ` [Tarantool-patches] [PATCH v4 00/12] Transaction engine for memtx engine Kirill Yukhin
12 siblings, 1 reply; 26+ messages in thread
From: Aleksandr Lyapunov @ 2020-09-08 10:22 UTC (permalink / raw)
To: tarantool-patches
Part of #4897
---
test/box/suite.ini | 2 +-
test/box/tx_man.lua | 10 ++
test/box/tx_man.result | 406 +++++++++++++++++++++++++++++++++++++++++++++++
test/box/tx_man.test.lua | 122 ++++++++++++++
4 files changed, 539 insertions(+), 1 deletion(-)
create mode 100644 test/box/tx_man.lua
create mode 100644 test/box/tx_man.result
create mode 100644 test/box/tx_man.test.lua
diff --git a/test/box/suite.ini b/test/box/suite.ini
index a9ed671..952a726 100644
--- a/test/box/suite.ini
+++ b/test/box/suite.ini
@@ -6,7 +6,7 @@ disabled = rtree_errinj.test.lua tuple_bench.test.lua
long_run = huge_field_map_long.test.lua
config = engine.cfg
release_disabled = errinj.test.lua errinj_index.test.lua rtree_errinj.test.lua upsert_errinj.test.lua iproto_stress.test.lua gh-4648-func-load-unload.test.lua
-lua_libs = lua/fifo.lua lua/utils.lua lua/bitset.lua lua/index_random_test.lua lua/push.lua lua/identifier.lua
+lua_libs = lua/fifo.lua lua/utils.lua lua/bitset.lua lua/index_random_test.lua lua/push.lua lua/identifier.lua lua/txn_proxy.lua
use_unix_sockets = True
use_unix_sockets_iproto = True
is_parallel = True
diff --git a/test/box/tx_man.lua b/test/box/tx_man.lua
new file mode 100644
index 0000000..addcd11
--- /dev/null
+++ b/test/box/tx_man.lua
@@ -0,0 +1,10 @@
+#!/usr/bin/env tarantool
+
+box.cfg{
+ listen = os.getenv("LISTEN"),
+ memtx_memory = 107374182,
+ pid_file = "tarantool.pid",
+ memtx_use_mvcc_engine = true,
+}
+
+require('console').listen(os.getenv('ADMIN'))
diff --git a/test/box/tx_man.result b/test/box/tx_man.result
new file mode 100644
index 0000000..bd74ecf
--- /dev/null
+++ b/test/box/tx_man.result
@@ -0,0 +1,406 @@
+-- test-run result file version 2
+env = require('test_run')
+ | ---
+ | ...
+test_run = env.new()
+ | ---
+ | ...
+test_run:cmd("create server tx_man with script='box/tx_man.lua'")
+ | ---
+ | - true
+ | ...
+test_run:cmd("start server tx_man")
+ | ---
+ | - true
+ | ...
+test_run:cmd("switch tx_man")
+ | ---
+ | - true
+ | ...
+
+txn_proxy = require('txn_proxy')
+ | ---
+ | ...
+
+s = box.schema.space.create('test')
+ | ---
+ | ...
+i = s:create_index('pk', {parts={{1, 'uint'}}})
+ | ---
+ | ...
+i = s:create_index('sec', {parts={{2, 'uint'}}})
+ | ---
+ | ...
+
+tx1 = txn_proxy.new()
+ | ---
+ | ...
+tx2 = txn_proxy.new()
+ | ---
+ | ...
+
+-- Simple read/write conflicts.
+s:replace{1, 0}
+ | ---
+ | - [1, 0]
+ | ...
+tx1:begin()
+ | ---
+ | -
+ | ...
+tx2:begin()
+ | ---
+ | -
+ | ...
+tx1('s:select{1}')
+ | ---
+ | - - [[1, 0]]
+ | ...
+tx2('s:select{1}')
+ | ---
+ | - - [[1, 0]]
+ | ...
+tx1('s:replace{1, 1}')
+ | ---
+ | - - [1, 1]
+ | ...
+tx2('s:replace{1, 2}')
+ | ---
+ | - - [1, 2]
+ | ...
+tx1:commit()
+ | ---
+ | -
+ | ...
+tx2:commit()
+ | ---
+ | - - {'error': 'Transaction has been aborted by conflict'}
+ | ...
+s:select{}
+ | ---
+ | - - [1, 1]
+ | ...
+
+-- Simple read/write conflicts, different order.
+s:replace{1, 0}
+ | ---
+ | - [1, 0]
+ | ...
+tx1:begin()
+ | ---
+ | -
+ | ...
+tx2:begin()
+ | ---
+ | -
+ | ...
+tx1('s:select{1}')
+ | ---
+ | - - [[1, 0]]
+ | ...
+tx2('s:select{1}')
+ | ---
+ | - - [[1, 0]]
+ | ...
+tx1('s:replace{1, 1}')
+ | ---
+ | - - [1, 1]
+ | ...
+tx2('s:replace{1, 2}')
+ | ---
+ | - - [1, 2]
+ | ...
+tx2:commit() -- note that tx2 commits first.
+ | ---
+ | -
+ | ...
+tx1:commit()
+ | ---
+ | - - {'error': 'Transaction has been aborted by conflict'}
+ | ...
+s:select{}
+ | ---
+ | - - [1, 2]
+ | ...
+
+-- Implicit read/write conflicts.
+s:replace{1, 0}
+ | ---
+ | - [1, 0]
+ | ...
+tx1:begin()
+ | ---
+ | -
+ | ...
+tx2:begin()
+ | ---
+ | -
+ | ...
+tx1("s:update({1}, {{'+', 2, 3}})")
+ | ---
+ | - - [1, 3]
+ | ...
+tx2("s:update({1}, {{'+', 2, 5}})")
+ | ---
+ | - - [1, 5]
+ | ...
+tx1:commit()
+ | ---
+ | -
+ | ...
+tx2:commit()
+ | ---
+ | - - {'error': 'Transaction has been aborted by conflict'}
+ | ...
+s:select{}
+ | ---
+ | - - [1, 3]
+ | ...
+
+-- Implicit read/write conflicts, different order.
+s:replace{1, 0}
+ | ---
+ | - [1, 0]
+ | ...
+tx1:begin()
+ | ---
+ | -
+ | ...
+tx2:begin()
+ | ---
+ | -
+ | ...
+tx1("s:update({1}, {{'+', 2, 3}})")
+ | ---
+ | - - [1, 3]
+ | ...
+tx2("s:update({1}, {{'+', 2, 5}})")
+ | ---
+ | - - [1, 5]
+ | ...
+tx2:commit() -- note that tx2 commits first.
+ | ---
+ | -
+ | ...
+tx1:commit()
+ | ---
+ | - - {'error': 'Transaction has been aborted by conflict'}
+ | ...
+s:select{}
+ | ---
+ | - - [1, 5]
+ | ...
+s:delete{1}
+ | ---
+ | - [1, 5]
+ | ...
+
+-- Conflict in secondary index.
+tx1:begin()
+ | ---
+ | -
+ | ...
+tx2:begin()
+ | ---
+ | -
+ | ...
+tx1("s:replace{1, 1}")
+ | ---
+ | - - [1, 1]
+ | ...
+tx2("s:replace{2, 1}")
+ | ---
+ | - - [2, 1]
+ | ...
+tx1:commit()
+ | ---
+ | -
+ | ...
+tx2:commit()
+ | ---
+ | - - {'error': 'Transaction has been aborted by conflict'}
+ | ...
+s:select{}
+ | ---
+ | - - [1, 1]
+ | ...
+s:delete{1}
+ | ---
+ | - [1, 1]
+ | ...
+
+-- Conflict in secondary index, different order.
+tx1:begin()
+ | ---
+ | -
+ | ...
+tx2:begin()
+ | ---
+ | -
+ | ...
+tx1("s:replace{1, 2}")
+ | ---
+ | - - [1, 2]
+ | ...
+tx2("s:replace{2, 2}")
+ | ---
+ | - - [2, 2]
+ | ...
+tx2:commit() -- note that tx2 commits first.
+ | ---
+ | -
+ | ...
+tx1:commit()
+ | ---
+ | - - {'error': 'Transaction has been aborted by conflict'}
+ | ...
+s:select{}
+ | ---
+ | - - [2, 2]
+ | ...
+s:delete{2}
+ | ---
+ | - [2, 2]
+ | ...
+
+-- TXN is send to read view.
+s:replace{1, 1}
+ | ---
+ | - [1, 1]
+ | ...
+s:replace{2, 2}
+ | ---
+ | - [2, 2]
+ | ...
+s:replace{3, 3}
+ | ---
+ | - [3, 3]
+ | ...
+tx1:begin()
+ | ---
+ | -
+ | ...
+tx2:begin()
+ | ---
+ | -
+ | ...
+
+tx1("s:select{}")
+ | ---
+ | - - [[1, 1], [2, 2], [3, 3]]
+ | ...
+tx2("s:replace{1, 11}")
+ | ---
+ | - - [1, 11]
+ | ...
+tx2("s:replace{2, 12}")
+ | ---
+ | - - [2, 12]
+ | ...
+tx2:commit()
+ | ---
+ | -
+ | ...
+tx1("s:select{}")
+ | ---
+ | - - [[1, 1], [2, 2], [3, 3]]
+ | ...
+tx1:commit()
+ | ---
+ | -
+ | ...
+
+s:delete{1}
+ | ---
+ | - [1, 11]
+ | ...
+s:delete{2}
+ | ---
+ | - [2, 12]
+ | ...
+s:delete{3}
+ | ---
+ | - [3, 3]
+ | ...
+
+-- TXN is send to read view but tries to replace and becomes conflicted.
+s:replace{1, 1}
+ | ---
+ | - [1, 1]
+ | ...
+s:replace{2, 2}
+ | ---
+ | - [2, 2]
+ | ...
+s:replace{3, 3}
+ | ---
+ | - [3, 3]
+ | ...
+tx1:begin()
+ | ---
+ | -
+ | ...
+tx2:begin()
+ | ---
+ | -
+ | ...
+
+tx1("s:select{}")
+ | ---
+ | - - [[1, 1], [2, 2], [3, 3]]
+ | ...
+tx2("s:replace{1, 11}")
+ | ---
+ | - - [1, 11]
+ | ...
+tx2("s:replace{2, 12}")
+ | ---
+ | - - [2, 12]
+ | ...
+tx2:commit()
+ | ---
+ | -
+ | ...
+tx1("s:select{}")
+ | ---
+ | - - [[1, 1], [2, 2], [3, 3]]
+ | ...
+tx1("s:replace{3, 13}")
+ | ---
+ | - - [3, 13]
+ | ...
+tx1("s:select{}")
+ | ---
+ | - - [[1, 1], [2, 2], [3, 13]]
+ | ...
+tx1:commit()
+ | ---
+ | - - {'error': 'Transaction has been aborted by conflict'}
+ | ...
+
+s:delete{1}
+ | ---
+ | - [1, 11]
+ | ...
+s:delete{2}
+ | ---
+ | - [2, 12]
+ | ...
+s:delete{3}
+ | ---
+ | - [3, 3]
+ | ...
+
+test_run:cmd("switch default")
+ | ---
+ | - true
+ | ...
+test_run:cmd("stop server tx_man")
+ | ---
+ | - true
+ | ...
+test_run:cmd("cleanup server tx_man")
+ | ---
+ | - true
+ | ...
+
diff --git a/test/box/tx_man.test.lua b/test/box/tx_man.test.lua
new file mode 100644
index 0000000..de5a734
--- /dev/null
+++ b/test/box/tx_man.test.lua
@@ -0,0 +1,122 @@
+env = require('test_run')
+test_run = env.new()
+test_run:cmd("create server tx_man with script='box/tx_man.lua'")
+test_run:cmd("start server tx_man")
+test_run:cmd("switch tx_man")
+
+txn_proxy = require('txn_proxy')
+
+s = box.schema.space.create('test')
+i = s:create_index('pk', {parts={{1, 'uint'}}})
+i = s:create_index('sec', {parts={{2, 'uint'}}})
+
+tx1 = txn_proxy.new()
+tx2 = txn_proxy.new()
+
+-- Simple read/write conflicts.
+s:replace{1, 0}
+tx1:begin()
+tx2:begin()
+tx1('s:select{1}')
+tx2('s:select{1}')
+tx1('s:replace{1, 1}')
+tx2('s:replace{1, 2}')
+tx1:commit()
+tx2:commit()
+s:select{}
+
+-- Simple read/write conflicts, different order.
+s:replace{1, 0}
+tx1:begin()
+tx2:begin()
+tx1('s:select{1}')
+tx2('s:select{1}')
+tx1('s:replace{1, 1}')
+tx2('s:replace{1, 2}')
+tx2:commit() -- note that tx2 commits first.
+tx1:commit()
+s:select{}
+
+-- Implicit read/write conflicts.
+s:replace{1, 0}
+tx1:begin()
+tx2:begin()
+tx1("s:update({1}, {{'+', 2, 3}})")
+tx2("s:update({1}, {{'+', 2, 5}})")
+tx1:commit()
+tx2:commit()
+s:select{}
+
+-- Implicit read/write conflicts, different order.
+s:replace{1, 0}
+tx1:begin()
+tx2:begin()
+tx1("s:update({1}, {{'+', 2, 3}})")
+tx2("s:update({1}, {{'+', 2, 5}})")
+tx2:commit() -- note that tx2 commits first.
+tx1:commit()
+s:select{}
+s:delete{1}
+
+-- Conflict in secondary index.
+tx1:begin()
+tx2:begin()
+tx1("s:replace{1, 1}")
+tx2("s:replace{2, 1}")
+tx1:commit()
+tx2:commit()
+s:select{}
+s:delete{1}
+
+-- Conflict in secondary index, different order.
+tx1:begin()
+tx2:begin()
+tx1("s:replace{1, 2}")
+tx2("s:replace{2, 2}")
+tx2:commit() -- note that tx2 commits first.
+tx1:commit()
+s:select{}
+s:delete{2}
+
+-- TXN is send to read view.
+s:replace{1, 1}
+s:replace{2, 2}
+s:replace{3, 3}
+tx1:begin()
+tx2:begin()
+
+tx1("s:select{}")
+tx2("s:replace{1, 11}")
+tx2("s:replace{2, 12}")
+tx2:commit()
+tx1("s:select{}")
+tx1:commit()
+
+s:delete{1}
+s:delete{2}
+s:delete{3}
+
+-- TXN is send to read view but tries to replace and becomes conflicted.
+s:replace{1, 1}
+s:replace{2, 2}
+s:replace{3, 3}
+tx1:begin()
+tx2:begin()
+
+tx1("s:select{}")
+tx2("s:replace{1, 11}")
+tx2("s:replace{2, 12}")
+tx2:commit()
+tx1("s:select{}")
+tx1("s:replace{3, 13}")
+tx1("s:select{}")
+tx1:commit()
+
+s:delete{1}
+s:delete{2}
+s:delete{3}
+
+test_run:cmd("switch default")
+test_run:cmd("stop server tx_man")
+test_run:cmd("cleanup server tx_man")
+
--
2.7.4
^ permalink raw reply [flat|nested] 26+ messages in thread
* Re: [Tarantool-patches] [PATCH v4 12/12] txm: add a test
2020-09-08 10:22 ` [Tarantool-patches] [PATCH v4 12/12] txm: add a test Aleksandr Lyapunov
@ 2020-09-15 18:05 ` Nikita Pettik
2020-09-22 17:58 ` Aleksandr Lyapunov
0 siblings, 1 reply; 26+ messages in thread
From: Nikita Pettik @ 2020-09-15 18:05 UTC (permalink / raw)
To: Aleksandr Lyapunov; +Cc: tarantool-patches
On 08 Sep 13:22, Aleksandr Lyapunov wrote:
> Part of #4897
> ---
> diff --git a/test/box/tx_man.test.lua b/test/box/tx_man.test.lua
> new file mode 100644
> index 0000000..de5a734
> --- /dev/null
> +++ b/test/box/tx_man.test.lua
> @@ -0,0 +1,122 @@
> +env = require('test_run')
> +test_run = env.new()
> +test_run:cmd("create server tx_man with script='box/tx_man.lua'")
> +test_run:cmd("start server tx_man")
> +test_run:cmd("switch tx_man")
> +
> +txn_proxy = require('txn_proxy')
> +
> +s = box.schema.space.create('test')
> +i = s:create_index('pk', {parts={{1, 'uint'}}})
> +i = s:create_index('sec', {parts={{2, 'uint'}}})
> +
Uncovered test scenarious: more than two indexes; more than one space;
unique secondary indexes; recovery; triggers; rollbacks...
^ permalink raw reply [flat|nested] 26+ messages in thread
* Re: [Tarantool-patches] [PATCH v4 12/12] txm: add a test
2020-09-15 18:05 ` Nikita Pettik
@ 2020-09-22 17:58 ` Aleksandr Lyapunov
2020-09-23 11:07 ` Nikita Pettik
0 siblings, 1 reply; 26+ messages in thread
From: Aleksandr Lyapunov @ 2020-09-22 17:58 UTC (permalink / raw)
To: Nikita Pettik; +Cc: tarantool-patches
Hi, thanks for the review.
I added the tests (triggers and recovery I'll do a bit later).
Found some bugs, here the fixes I made to make the tests pass.
diff --git a/src/box/memtx_tx.c b/src/box/memtx_tx.c
index d72fa92..5aacd22 100644
--- a/src/box/memtx_tx.c
+++ b/src/box/memtx_tx.c
@@ -422,7 +422,7 @@ memtx_tx_story_gc_step()
* actually delete the tuple, it must be deleted from
* index.
*/
- if (story->del_psn > 0) {
+ if (story->del_psn > 0 && story->space != NULL) {
struct index *index = story->space->index[i];
struct tuple *unused;
if (index_replace(index, story->tuple, NULL,
@@ -801,7 +801,6 @@ memtx_tx_history_rollback_stmt(struct txn_stmt *stmt)
memtx_tx_story_unlink(story, i);
}
stmt->add_story->add_stmt = NULL;
- memtx_tx_story_delete(stmt->add_story);
stmt->add_story = NULL;
tuple_unref(stmt->new_tuple);
}
@@ -945,8 +944,22 @@ memtx_tx_history_prepare_stmt(struct txn_stmt *stmt)
if (stmt->add_story != NULL)
stmt->add_story->add_psn = stmt->txn->psn;
- if (stmt->del_story != NULL)
+ if (stmt->del_story != NULL) {
stmt->del_story->del_psn = stmt->txn->psn;
+ // Let's conflict all other deleting stories.
+ struct txn_stmt *dels = stmt->del_story->del_stmt;
+ while (dels != NULL) {
+ struct txn_stmt *next = dels->next_in_del_list;
+ if (dels != stmt) {
+ dels->del_story = NULL;
+ dels->next_in_del_list = NULL;
+ }
+ dels = next;
+ }
+ // Set the only deleting statement for that story.
+ stmt->del_story->del_stmt = stmt;
+ stmt->next_in_del_list = NULL;
+ }
}
ssize_t
@@ -1001,6 +1014,20 @@ memtx_tx_tuple_clarify_slow(struct txn *txn,
struct space *space,
return result;
}
+void
+memtx_tx_on_space_delete(struct space *space)
+{
+ /* Just clear pointer to space, it will be handled in GC. */
+ while (!rlist_empty(&space->memtx_stories)) {
+ struct memtx_story *story
+ = rlist_first_entry(&space->memtx_stories,
+ struct memtx_story,
+ in_space_stories);
+ story->space = NULL;
+ rlist_del(&story->in_space_stories);
+ }
+}
+
static void
memtx_tx_story_delete(struct memtx_story *story)
{
diff --git a/src/box/memtx_tx.h b/src/box/memtx_tx.h
index 6197d1b..aa204ac 100644
--- a/src/box/memtx_tx.h
+++ b/src/box/memtx_tx.h
@@ -310,6 +310,15 @@ memtx_tx_tuple_clarify(struct txn *txn, struct
space *space,
is_prepared_ok);
}
+/**
+ * Notify manager the a space is deleted.
+ * It's necessary because there is a chance that garbage collector hasn't
+ * deleted all stories of that space and in that case some actions of
+ * story's destructor are not applicable.
+ */
+void
+memtx_tx_on_space_delete(struct space *space);
+
#if defined(__cplusplus)
} /* extern "C" */
#endif /* defined(__cplusplus) */
diff --git a/src/box/space.c b/src/box/space.c
index 1243932..6d1d771 100644
--- a/src/box/space.c
+++ b/src/box/space.c
@@ -38,6 +38,7 @@
#include "user.h"
#include "session.h"
#include "txn.h"
+#include "memtx_tx.h"
#include "tuple.h"
#include "xrow_update.h"
#include "request.h"
@@ -253,7 +254,7 @@ space_new_ephemeral(struct space_def *def, struct
rlist *key_list)
void
space_delete(struct space *space)
{
- rlist_del(&space->memtx_stories);
+ memtx_tx_on_space_delete(space);
assert(space->ck_constraint_trigger == NULL);
for (uint32_t j = 0; j <= space->index_id_max; j++) {
struct index *index = space->index_map[j];
On 15.09.2020 21:05, Nikita Pettik wrote:
> Uncovered test scenarious: more than two indexes; more than one space;
> unique secondary indexes; recovery; triggers; rollbacks...
>
^ permalink raw reply [flat|nested] 26+ messages in thread
* Re: [Tarantool-patches] [PATCH v4 12/12] txm: add a test
2020-09-22 17:58 ` Aleksandr Lyapunov
@ 2020-09-23 11:07 ` Nikita Pettik
2020-09-23 11:12 ` Aleksandr Lyapunov
0 siblings, 1 reply; 26+ messages in thread
From: Nikita Pettik @ 2020-09-23 11:07 UTC (permalink / raw)
To: Aleksandr Lyapunov; +Cc: tarantool-patches
On 22 Sep 20:58, Aleksandr Lyapunov wrote:
> Hi, thanks for the review.
> I added the tests (triggers and recovery I'll do a bit later).
> Found some bugs, here the fixes I made to make the tests pass.
A few non-functional fixes:
diff --git a/src/box/memtx_tx.c b/src/box/memtx_tx.c
index 55748ad64..b10440ae3 100644
--- a/src/box/memtx_tx.c
+++ b/src/box/memtx_tx.c
@@ -418,8 +418,8 @@ memtx_tx_story_gc_step()
if (link->newer_story == NULL) {
/*
* We are at the top of the chain. That means
- * that story->tuple is in index. If the story is
- * actually delete the tuple, it must be deleted from
+ * that story->tuple is in index. If the story
+ * actually deletes the tuple, it must be deleted from
* index.
*/
if (story->del_psn > 0 && story->space != NULL) {
@@ -946,7 +946,7 @@ memtx_tx_history_prepare_stmt(struct txn_stmt *stmt)
if (stmt->del_story != NULL) {
stmt->del_story->del_psn = stmt->txn->psn;
- // Let's conflict all other deleting stories.
+ /* Let's conflict all other deleting stories. */
struct txn_stmt *dels = stmt->del_story->del_stmt;
while (dels != NULL) {
struct txn_stmt *next = dels->next_in_del_list;
@@ -956,7 +956,7 @@ memtx_tx_history_prepare_stmt(struct txn_stmt *stmt)
}
dels = next;
}
- // Set the only deleting statement for that story.
+ /* Set the only deleting statement for that story. */
stmt->del_story->del_stmt = stmt;
stmt->next_in_del_list = NULL;
}
@@ -1008,8 +1008,8 @@ memtx_tx_tuple_clarify_slow(struct txn *txn, struct space *space,
if (!own_change)
memtx_tx_track_read(txn, space, tuple);
if (mk_index != 0) {
- assert(false); /* TODO: multiindex */
- panic("multikey indexes are not supported int TX manager");
+ unreachable(); /* TODO: multiindex */
+ panic("multikey indexes are not supported in TX manager");
}
return result;
}
diff --git a/src/box/memtx_tx.h b/src/box/memtx_tx.h
index 25a203880..6aadad1f0 100644
--- a/src/box/memtx_tx.h
+++ b/src/box/memtx_tx.h
@@ -311,7 +311,7 @@ memtx_tx_tuple_clarify(struct txn *txn, struct space *space,
}
/**
- * Notify manager the a space is deleted.
+ * Notify manager that the space is deleted.
* It's necessary because there is a chance that garbage collector hasn't
* deleted all stories of that space and in that case some actions of
* story's destructor are not applicable.
Othewise LGTM
> diff --git a/src/box/memtx_tx.c b/src/box/memtx_tx.c
> index d72fa92..5aacd22 100644
> --- a/src/box/memtx_tx.c
> +++ b/src/box/memtx_tx.c
> @@ -422,7 +422,7 @@ memtx_tx_story_gc_step()
> * actually delete the tuple, it must be deleted from
> * index.
> */
> - if (story->del_psn > 0) {
> + if (story->del_psn > 0 && story->space != NULL) {
> struct index *index = story->space->index[i];
> struct tuple *unused;
> if (index_replace(index, story->tuple, NULL,
> @@ -801,7 +801,6 @@ memtx_tx_history_rollback_stmt(struct txn_stmt *stmt)
> memtx_tx_story_unlink(story, i);
> }
> stmt->add_story->add_stmt = NULL;
> - memtx_tx_story_delete(stmt->add_story);
> stmt->add_story = NULL;
> tuple_unref(stmt->new_tuple);
> }
> @@ -945,8 +944,22 @@ memtx_tx_history_prepare_stmt(struct txn_stmt *stmt)
> if (stmt->add_story != NULL)
> stmt->add_story->add_psn = stmt->txn->psn;
>
> - if (stmt->del_story != NULL)
> + if (stmt->del_story != NULL) {
> stmt->del_story->del_psn = stmt->txn->psn;
> + // Let's conflict all other deleting stories.
> + struct txn_stmt *dels = stmt->del_story->del_stmt;
> + while (dels != NULL) {
> + struct txn_stmt *next = dels->next_in_del_list;
> + if (dels != stmt) {
> + dels->del_story = NULL;
> + dels->next_in_del_list = NULL;
> + }
> + dels = next;
> + }
> + // Set the only deleting statement for that story.
> + stmt->del_story->del_stmt = stmt;
> + stmt->next_in_del_list = NULL;
> + }
> }
^ permalink raw reply [flat|nested] 26+ messages in thread
* Re: [Tarantool-patches] [PATCH v4 12/12] txm: add a test
2020-09-23 11:07 ` Nikita Pettik
@ 2020-09-23 11:12 ` Aleksandr Lyapunov
0 siblings, 0 replies; 26+ messages in thread
From: Aleksandr Lyapunov @ 2020-09-23 11:12 UTC (permalink / raw)
To: Nikita Pettik; +Cc: tarantool-patches
Thanks!
Meanwhile I added more tests and comments and fixed a tiny bug:
diff --git a/src/box/memtx_tx.c b/src/box/memtx_tx.c
index 55748ad..62ddfbe 100644
--- a/src/box/memtx_tx.c
+++ b/src/box/memtx_tx.c
@@ -808,6 +808,11 @@ memtx_tx_history_rollback_stmt(struct txn_stmt *stmt)
if (stmt->del_story != NULL) {
struct memtx_story *story = stmt->del_story;
+ /**
+ * Clear del_story's pointer to the statement.
+ * Since the story hold a list of in-progress delete
statements,
+ * find the stmt in the list and unlink.
+ */
struct txn_stmt **prev = &story->del_stmt;
while (*prev != stmt) {
prev = &(*prev)->next_in_del_list;
@@ -816,7 +821,7 @@ memtx_tx_history_rollback_stmt(struct txn_stmt *stmt)
*prev = stmt->next_in_del_list;
stmt->next_in_del_list = NULL;
- stmt->del_story->del_stmt = NULL;
+ /* And vice versa: clear the statement's pointer to
story. */
stmt->del_story = NULL;
}
}
On 23.09.2020 14:07, Nikita Pettik wrote:
>
> Othewise LGTM
>
>
^ permalink raw reply [flat|nested] 26+ messages in thread
* Re: [Tarantool-patches] [PATCH v4 00/12] Transaction engine for memtx engine
2020-09-08 10:22 [Tarantool-patches] [PATCH v4 00/12] Transaction engine for memtx engine Aleksandr Lyapunov
` (11 preceding siblings ...)
2020-09-08 10:22 ` [Tarantool-patches] [PATCH v4 12/12] txm: add a test Aleksandr Lyapunov
@ 2020-09-23 12:18 ` Kirill Yukhin
12 siblings, 0 replies; 26+ messages in thread
From: Kirill Yukhin @ 2020-09-23 12:18 UTC (permalink / raw)
To: Aleksandr Lyapunov; +Cc: tarantool-patches
Hello,
On 08 сен 13:22, Aleksandr Lyapunov wrote:
> GH issue: https://github.com/tarantool/tarantool/issues/4897
> GH branch: https://github.com/tarantool/tarantool/tree/alyapunov/gh-4897-memtx-tx-engine
>
> Changes in V4:
> - fixed almost all suggestion from the last review
> - fixed some problems I found by myself
I've checked your patch set into master.
Could you please update Release Notes and check that doc
tickets are properly created?
--
Regards, Kirill Yukhin
^ permalink raw reply [flat|nested] 26+ messages in thread