* [tarantool-patches] [PATCH v4 1/9] txn: handle fiber stop event at transaction level
2019-06-19 21:23 [tarantool-patches] [PATCH v4 0/9] Parallel applier Georgy Kirichenko
@ 2019-06-19 21:23 ` Georgy Kirichenko
2019-06-20 7:28 ` [tarantool-patches] " Konstantin Osipov
2019-06-20 11:39 ` [tarantool-patches] " Vladimir Davydov
2019-06-19 21:23 ` [tarantool-patches] [PATCH v4 2/9] core: latch_steal routine Georgy Kirichenko
` (7 subsequent siblings)
8 siblings, 2 replies; 37+ messages in thread
From: Georgy Kirichenko @ 2019-06-19 21:23 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
Get rid of duplicated fiber on stop logic.
Prerequisites: #1254
---
src/box/memtx_engine.c | 5 -----
src/box/txn.c | 4 ++++
src/box/vinyl.c | 8 --------
3 files changed, 4 insertions(+), 13 deletions(-)
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index f4312484a..cd763e547 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -94,15 +94,12 @@ memtx_init_txn(struct txn *txn)
trigger_create(&txn->fiber_on_yield, txn_on_yield,
NULL, NULL);
- trigger_create(&txn->fiber_on_stop, txn_on_stop,
- NULL, NULL);
/*
* Memtx doesn't allow yields between statements of
* a transaction. Set a trigger which would roll
* back the transaction if there is a yield.
*/
trigger_add(&fiber->on_yield, &txn->fiber_on_yield);
- trigger_add(&fiber->on_stop, &txn->fiber_on_stop);
/*
* This serves as a marker that the triggers are
* initialized.
@@ -379,7 +376,6 @@ memtx_engine_prepare(struct engine *engine, struct txn *txn)
* on calls to trigger_create/trigger_clear.
*/
trigger_clear(&txn->fiber_on_yield);
- trigger_clear(&txn->fiber_on_stop);
if (txn->is_aborted) {
diag_set(ClientError, ER_TRANSACTION_YIELD);
diag_log();
@@ -458,7 +454,6 @@ memtx_engine_rollback(struct engine *engine, struct txn *txn)
{
if (txn->engine_tx != NULL) {
trigger_clear(&txn->fiber_on_yield);
- trigger_clear(&txn->fiber_on_stop);
}
struct txn_stmt *stmt;
stailq_reverse(&txn->stmts);
diff --git a/src/box/txn.c b/src/box/txn.c
index d4627b554..7a2c8cdaf 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -198,6 +198,8 @@ txn_begin(bool is_autocommit)
txn->psql_txn = NULL;
/* fiber_on_yield/fiber_on_stop initialized by engine on demand */
fiber_set_txn(fiber(), txn);
+ trigger_create(&txn->fiber_on_stop, txn_on_stop, NULL, NULL);
+ trigger_add(&fiber()->on_stop, &txn->fiber_on_stop);
return txn;
}
@@ -421,6 +423,7 @@ txn_commit(struct txn *txn)
if (engine_prepare(txn->engine, txn) != 0)
goto fail;
}
+ trigger_clear(&txn->fiber_on_stop);
if (txn->n_new_rows + txn->n_applier_rows > 0) {
txn->signature = txn_write_to_wal(txn);
@@ -475,6 +478,7 @@ txn_rollback()
struct txn *txn = in_txn();
if (txn == NULL)
return;
+ trigger_clear(&txn->fiber_on_stop);
if (txn->engine)
engine_rollback(txn->engine, txn);
/* Rollback triggers must not throw. */
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index ecf197523..814325da5 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -2420,10 +2420,6 @@ vinyl_engine_begin(struct engine *engine, struct txn *txn)
txn->engine_tx = vy_tx_begin(env->xm);
if (txn->engine_tx == NULL)
return -1;
- if (!txn->is_autocommit) {
- trigger_create(&txn->fiber_on_stop, txn_on_stop, NULL, NULL);
- trigger_add(&fiber()->on_stop, &txn->fiber_on_stop);
- }
return 0;
}
@@ -2493,8 +2489,6 @@ vinyl_engine_commit(struct engine *engine, struct txn *txn)
vy_regulator_check_dump_watermark(&env->regulator);
txn->engine_tx = NULL;
- if (!txn->is_autocommit)
- trigger_clear(&txn->fiber_on_stop);
}
static void
@@ -2508,8 +2502,6 @@ vinyl_engine_rollback(struct engine *engine, struct txn *txn)
vy_tx_rollback(tx);
txn->engine_tx = NULL;
- if (!txn->is_autocommit)
- trigger_clear(&txn->fiber_on_stop);
}
static int
--
2.22.0
^ permalink raw reply [flat|nested] 37+ messages in thread
* [tarantool-patches] [PATCH v4 2/9] core: latch_steal routine
2019-06-19 21:23 [tarantool-patches] [PATCH v4 0/9] Parallel applier Georgy Kirichenko
2019-06-19 21:23 ` [tarantool-patches] [PATCH v4 1/9] txn: handle fiber stop event at transaction level Georgy Kirichenko
@ 2019-06-19 21:23 ` Georgy Kirichenko
2019-06-20 7:28 ` [tarantool-patches] " Konstantin Osipov
2019-06-20 11:53 ` [tarantool-patches] " Vladimir Davydov
2019-06-19 21:23 ` [tarantool-patches] [PATCH v4 3/9] txn: get rid of autocommit from a txn structure Georgy Kirichenko
` (6 subsequent siblings)
8 siblings, 2 replies; 37+ messages in thread
From: Georgy Kirichenko @ 2019-06-19 21:23 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
Allow to steal locked latch ownership for fiber which isn't owner
of the latch. This is required to process transaction triggers
asynchronously.
Prerequisites: #1254
---
src/lib/core/latch.h | 10 ++++++++++
1 file changed, 10 insertions(+)
diff --git a/src/lib/core/latch.h b/src/lib/core/latch.h
index 49c59cf63..580942564 100644
--- a/src/lib/core/latch.h
+++ b/src/lib/core/latch.h
@@ -155,6 +155,16 @@ latch_trylock(struct latch *l)
return latch_lock_timeout(l, 0);
}
+/**
+ * Take a latch ownership
+ */
+static inline void
+latch_steal(struct latch *l)
+{
+ assert(l->owner != NULL);
+ l->owner = fiber();
+}
+
/**
* \copydoc box_latch_unlock
*/
--
2.22.0
^ permalink raw reply [flat|nested] 37+ messages in thread
* [tarantool-patches] Re: [PATCH v4 2/9] core: latch_steal routine
2019-06-19 21:23 ` [tarantool-patches] [PATCH v4 2/9] core: latch_steal routine Georgy Kirichenko
@ 2019-06-20 7:28 ` Konstantin Osipov
2019-06-20 11:53 ` [tarantool-patches] " Vladimir Davydov
1 sibling, 0 replies; 37+ messages in thread
From: Konstantin Osipov @ 2019-06-20 7:28 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
* Georgy Kirichenko <georgy@tarantool.org> [19/06/20 09:54]:
> Allow to steal locked latch ownership for fiber which isn't owner
> of the latch. This is required to process transaction triggers
> asynchronously.
>
LGTM, but since you discussed this idea with Vova, please solicit
his review as well.
--
Konstantin Osipov, Moscow, Russia
^ permalink raw reply [flat|nested] 37+ messages in thread
* Re: [tarantool-patches] [PATCH v4 2/9] core: latch_steal routine
2019-06-19 21:23 ` [tarantool-patches] [PATCH v4 2/9] core: latch_steal routine Georgy Kirichenko
2019-06-20 7:28 ` [tarantool-patches] " Konstantin Osipov
@ 2019-06-20 11:53 ` Vladimir Davydov
2019-06-20 20:34 ` Георгий Кириченко
1 sibling, 1 reply; 37+ messages in thread
From: Vladimir Davydov @ 2019-06-20 11:53 UTC (permalink / raw)
To: Georgy Kirichenko; +Cc: tarantool-patches
On Thu, Jun 20, 2019 at 12:23:09AM +0300, Georgy Kirichenko wrote:
> Allow to steal locked latch ownership for fiber which isn't owner
> of the latch. This is required to process transaction triggers
> asynchronously.
>
> Prerequisites: #1254
> ---
> src/lib/core/latch.h | 10 ++++++++++
> 1 file changed, 10 insertions(+)
>
> diff --git a/src/lib/core/latch.h b/src/lib/core/latch.h
> index 49c59cf63..580942564 100644
> --- a/src/lib/core/latch.h
> +++ b/src/lib/core/latch.h
> @@ -155,6 +155,16 @@ latch_trylock(struct latch *l)
> return latch_lock_timeout(l, 0);
> }
>
> +/**
> + * Take a latch ownership
> + */
> +static inline void
> +latch_steal(struct latch *l)
> +{
> + assert(l->owner != NULL);
> + l->owner = fiber();
> +}
> +
Please try not to introduce dead code. This patch should be squashed
into the one that actually uses latch_steal IMO.
^ permalink raw reply [flat|nested] 37+ messages in thread
* Re: [tarantool-patches] [PATCH v4 2/9] core: latch_steal routine
2019-06-20 11:53 ` [tarantool-patches] " Vladimir Davydov
@ 2019-06-20 20:34 ` Георгий Кириченко
0 siblings, 0 replies; 37+ messages in thread
From: Георгий Кириченко @ 2019-06-20 20:34 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: tarantool-patches
[-- Attachment #1: Type: text/plain, Size: 1013 bytes --]
On Thursday, June 20, 2019 2:53:11 PM MSK Vladimir Davydov wrote:
> On Thu, Jun 20, 2019 at 12:23:09AM +0300, Georgy Kirichenko wrote:
> > Allow to steal locked latch ownership for fiber which isn't owner
> > of the latch. This is required to process transaction triggers
> > asynchronously.
> >
> > Prerequisites: #1254
> > ---
> >
> > src/lib/core/latch.h | 10 ++++++++++
> > 1 file changed, 10 insertions(+)
> >
> > diff --git a/src/lib/core/latch.h b/src/lib/core/latch.h
> > index 49c59cf63..580942564 100644
> > --- a/src/lib/core/latch.h
> > +++ b/src/lib/core/latch.h
> > @@ -155,6 +155,16 @@ latch_trylock(struct latch *l)
> >
> > return latch_lock_timeout(l, 0);
> >
> > }
> >
> > +/**
> > + * Take a latch ownership
> > + */
> > +static inline void
> > +latch_steal(struct latch *l)
> > +{
> > + assert(l->owner != NULL);
> > + l->owner = fiber();
> > +}
> > +
>
> Please try not to introduce dead code. This patch should be squashed
> into the one that actually uses latch_steal IMO.
Ack
[-- Attachment #2: This is a digitally signed message part. --]
[-- Type: application/pgp-signature, Size: 488 bytes --]
^ permalink raw reply [flat|nested] 37+ messages in thread
* [tarantool-patches] [PATCH v4 3/9] txn: get rid of autocommit from a txn structure
2019-06-19 21:23 [tarantool-patches] [PATCH v4 0/9] Parallel applier Georgy Kirichenko
2019-06-19 21:23 ` [tarantool-patches] [PATCH v4 1/9] txn: handle fiber stop event at transaction level Georgy Kirichenko
2019-06-19 21:23 ` [tarantool-patches] [PATCH v4 2/9] core: latch_steal routine Georgy Kirichenko
@ 2019-06-19 21:23 ` Georgy Kirichenko
2019-06-20 7:32 ` [tarantool-patches] " Konstantin Osipov
2019-06-20 11:52 ` [tarantool-patches] " Vladimir Davydov
2019-06-19 21:23 ` [tarantool-patches] [PATCH v4 4/9] txn: get rid of fiber_gc from txn_rollback Georgy Kirichenko
` (5 subsequent siblings)
8 siblings, 2 replies; 37+ messages in thread
From: Georgy Kirichenko @ 2019-06-19 21:23 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
Move transaction auto start and auto commit behavior to the box level.
From now a transaction won't start and commit automatically without
txn_begin/txn_commit invocations. This is a part of a bigger transaction
refactoring in order to implement detachable transactions and a parallel
applier.
Prerequisites: #1254
---
src/box/applier.cc | 35 +++++++++++++---
src/box/box.cc | 94 +++++++++++++++++++++++++++++++++---------
src/box/index.cc | 10 ++---
src/box/memtx_engine.c | 10 ++++-
src/box/memtx_space.c | 8 ++--
src/box/sql.c | 2 +-
src/box/txn.c | 46 +++++++--------------
src/box/txn.h | 16 +++----
src/box/vy_scheduler.c | 6 +--
9 files changed, 144 insertions(+), 83 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 373e1feb9..e3203a4c8 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -172,11 +172,26 @@ applier_writer_f(va_list ap)
static int
apply_initial_join_row(struct xrow_header *row)
{
+ struct txn *txn = txn_begin();
+ if (txn == NULL)
+ return -1;
struct request request;
xrow_decode_dml(row, &request, dml_request_key_map(row->type));
- struct space *space = space_cache_find_xc(request.space_id);
+ struct space *space = space_cache_find(request.space_id);
+ if (space == NULL)
+ goto rollback;
/* no access checks here - applier always works with admin privs */
- return space_apply_initial_join_row(space, &request);
+ if (space_apply_initial_join_row(space, &request))
+ goto rollback;
+ int rc;
+ rc = txn_commit(txn);
+ if (rc < 0)
+ return -1;
+ fiber_gc();
+ return rc;
+rollback:
+ txn_rollback();
+ return -1;
}
/**
@@ -189,8 +204,8 @@ static int
process_nop(struct request *request)
{
assert(request->type == IPROTO_NOP);
- struct txn *txn = txn_begin_stmt(NULL);
- if (txn == NULL)
+ struct txn *txn = in_txn();
+ if (txn_begin_stmt(txn, NULL) == NULL)
return -1;
return txn_commit_stmt(txn, request);
}
@@ -403,8 +418,16 @@ applier_join(struct applier *applier)
applier->last_row_time = ev_monotonic_now(loop());
if (iproto_type_is_dml(row.type)) {
vclock_follow_xrow(&replicaset.vclock, &row);
- if (apply_row(&row) != 0)
+ struct txn *txn = txn_begin();
+ if (txn == NULL)
+ diag_raise();
+ if (apply_row(&row) != 0) {
+ txn_rollback();
+ diag_raise();
+ }
+ if (txn_commit(txn) != 0)
diag_raise();
+ fiber_gc();
if (++row_count % 100000 == 0)
say_info("%.1fM rows received", row_count / 1e6);
} else if (row.type == IPROTO_OK) {
@@ -555,7 +578,7 @@ applier_apply_tx(struct stailq *rows)
* conflict safely access failed xrow object and allocate
* IPROTO_NOP on gc.
*/
- struct txn *txn = txn_begin(false);
+ struct txn *txn = txn_begin();
struct applier_tx_row *item;
if (txn == NULL)
diag_raise();
diff --git a/src/box/box.cc b/src/box/box.cc
index 57419ee01..7f23716e5 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -169,34 +169,62 @@ int
box_process_rw(struct request *request, struct space *space,
struct tuple **result)
{
+ struct tuple *tuple = NULL;
+ struct txn *txn = in_txn();
+ bool is_autocommit = txn == NULL;
+ if (is_autocommit && (txn = txn_begin()) == NULL)
+ return -1;
assert(iproto_type_is_dml(request->type));
rmean_collect(rmean_box, request->type, 1);
if (access_check_space(space, PRIV_W) != 0)
- return -1;
- struct txn *txn = txn_begin_stmt(space);
- if (txn == NULL)
- return -1;
- struct tuple *tuple;
+ goto fail;
+ if (txn_begin_stmt(txn, space) == NULL)
+ goto fail;
if (space_execute_dml(space, txn, request, &tuple) != 0) {
- txn_rollback_stmt();
- return -1;
+ txn_rollback_stmt(txn);
+ goto fail;
+ }
+ if (result != NULL)
+ *result = tuple;
+
+ if (result == NULL || tuple == NULL) {
+ if (txn_commit_stmt(txn, request) != 0)
+ goto fail;
+ if (is_autocommit) {
+ if (txn_commit(txn) != 0)
+ return -1;
+ fiber_gc();
+ }
+ return 0;
}
- if (result == NULL)
- return txn_commit_stmt(txn, request);
- *result = tuple;
- if (tuple == NULL)
- return txn_commit_stmt(txn, request);
/*
* Pin the tuple locally before the commit,
* otherwise it may go away during yield in
* when WAL is written in autocommit mode.
*/
tuple_ref(tuple);
- int rc = txn_commit_stmt(txn, request);
- if (rc == 0)
- tuple_bless(tuple);
+
+ if (txn_commit_stmt(txn, request)) {
+ /* Unref tuple and rollback if autocommit. */
+ tuple_unref(tuple);
+ goto fail;
+ }
+ if (is_autocommit) {
+ if (txn_commit(txn) != 0) {
+ /* Unref tuple and exit. */
+ tuple_unref(tuple);
+ return -1;
+ }
+ fiber_gc();
+ }
+ tuple_bless(tuple);
tuple_unref(tuple);
- return rc;
+ return 0;
+
+fail:
+ if (is_autocommit)
+ txn_rollback();
+ return -1;
}
void
@@ -299,10 +327,20 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
if (request.type != IPROTO_NOP) {
struct space *space = space_cache_find_xc(request.space_id);
+ struct txn *txn = txn_begin();
+ if (txn == NULL) {
+ say_error("error applying row: %s", request_str(&request));
+ diag_raise();
+ }
if (box_process_rw(&request, space, NULL) != 0) {
say_error("error applying row: %s", request_str(&request));
+ txn_rollback();
diag_raise();
}
+ if (txn_commit(txn) != 0) {
+ diag_raise();
+ }
+ fiber_gc();
}
struct wal_stream *xstream =
container_of(stream, struct wal_stream, base);
@@ -1055,7 +1093,7 @@ box_select(uint32_t space_id, uint32_t index_id,
struct iterator *it = index_create_iterator(index, type,
key, part_count);
if (it == NULL) {
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
@@ -1080,7 +1118,7 @@ box_select(uint32_t space_id, uint32_t index_id,
if (rc != 0) {
port_destroy(port);
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
txn_commit_ro_stmt(txn);
@@ -1313,9 +1351,17 @@ box_sequence_reset(uint32_t seq_id)
static inline void
box_register_replica(uint32_t id, const struct tt_uuid *uuid)
{
+ struct txn *txn = txn_begin();
+ if (txn == NULL)
+ diag_raise();
if (boxk(IPROTO_INSERT, BOX_CLUSTER_ID, "[%u%s]",
- (unsigned) id, tt_uuid_str(uuid)) != 0)
+ (unsigned) id, tt_uuid_str(uuid)) != 0) {
+ txn_rollback();
diag_raise();
+ }
+ if (txn_commit(txn) != 0)
+ diag_raise();
+ fiber_gc();
assert(replica_by_uuid(uuid)->id == id);
}
@@ -1636,10 +1682,18 @@ box_set_replicaset_uuid(const struct tt_uuid *replicaset_uuid)
uu = *replicaset_uuid;
else
tt_uuid_create(&uu);
+ struct txn *txn = txn_begin();
+ if (txn == NULL)
+ diag_raise();
/* Save replica set UUID in _schema */
if (boxk(IPROTO_INSERT, BOX_SCHEMA_ID, "[%s%s]", "cluster",
- tt_uuid_str(&uu)))
+ tt_uuid_str(&uu))) {
+ txn_rollback();
+ diag_raise();
+ }
+ if (txn_commit(txn) != 0)
diag_raise();
+ fiber_gc();
}
void
diff --git a/src/box/index.cc b/src/box/index.cc
index 4a444e5d0..7f26c9bc2 100644
--- a/src/box/index.cc
+++ b/src/box/index.cc
@@ -240,7 +240,7 @@ box_index_get(uint32_t space_id, uint32_t index_id, const char *key,
if (txn_begin_ro_stmt(space, &txn) != 0)
return -1;
if (index_get(index, key, part_count, result) != 0) {
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
txn_commit_ro_stmt(txn);
@@ -274,7 +274,7 @@ box_index_min(uint32_t space_id, uint32_t index_id, const char *key,
if (txn_begin_ro_stmt(space, &txn) != 0)
return -1;
if (index_min(index, key, part_count, result) != 0) {
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
txn_commit_ro_stmt(txn);
@@ -306,7 +306,7 @@ box_index_max(uint32_t space_id, uint32_t index_id, const char *key,
if (txn_begin_ro_stmt(space, &txn) != 0)
return -1;
if (index_max(index, key, part_count, result) != 0) {
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
txn_commit_ro_stmt(txn);
@@ -340,7 +340,7 @@ box_index_count(uint32_t space_id, uint32_t index_id, int type,
return -1;
ssize_t count = index_count(index, itype, key, part_count);
if (count < 0) {
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
txn_commit_ro_stmt(txn);
@@ -377,7 +377,7 @@ box_index_iterator(uint32_t space_id, uint32_t index_id, int type,
struct iterator *it = index_create_iterator(index, itype,
key, part_count);
if (it == NULL) {
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return NULL;
}
txn_commit_ro_stmt(txn);
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index cd763e547..dae9955b2 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -269,16 +269,22 @@ memtx_engine_recover_snapshot_row(struct memtx_engine *memtx,
diag_set(ClientError, ER_CROSS_ENGINE_TRANSACTION);
return -1;
}
+ struct txn *txn = txn_begin();
+ if (txn == NULL)
+ return -1;
/* no access checks here - applier always works with admin privs */
- if (space_apply_initial_join_row(space, &request) != 0)
+ if (space_apply_initial_join_row(space, &request) != 0) {
+ txn_rollback();
return -1;
+ }
+ int rc = txn_commit(txn);
/*
* Don't let gc pool grow too much. Yet to
* it before reading the next row, to make
* sure it's not freed along here.
*/
fiber_gc();
- return 0;
+ return rc;
}
/** Called at start to tell memtx to recover to a given LSN. */
diff --git a/src/box/memtx_space.c b/src/box/memtx_space.c
index 78a0059a0..b8eb04e84 100644
--- a/src/box/memtx_space.c
+++ b/src/box/memtx_space.c
@@ -325,10 +325,10 @@ memtx_space_apply_initial_join_row(struct space *space, struct request *request)
return -1;
}
request->header->replica_id = 0;
- struct txn *txn = txn_begin_stmt(space);
- if (txn == NULL)
+ struct txn *txn = in_txn();
+ struct txn_stmt *stmt = txn_begin_stmt(txn, space);
+ if (stmt == NULL)
return -1;
- struct txn_stmt *stmt = txn_current_stmt(txn);
stmt->new_tuple = memtx_tuple_new(space->format, request->tuple,
request->tuple_end);
if (stmt->new_tuple == NULL)
@@ -341,7 +341,7 @@ memtx_space_apply_initial_join_row(struct space *space, struct request *request)
rollback:
say_error("rollback: %s", diag_last_error(diag_get())->errmsg);
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
diff --git a/src/box/sql.c b/src/box/sql.c
index f2ef5b302..fe7ea629b 100644
--- a/src/box/sql.c
+++ b/src/box/sql.c
@@ -882,7 +882,7 @@ cursor_seek(BtCursor *pCur, int *pRes)
part_count);
if (it == NULL) {
if (txn != NULL)
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
pCur->eState = CURSOR_INVALID;
return SQL_TARANTOOL_ERROR;
}
diff --git a/src/box/txn.c b/src/box/txn.c
index 7a2c8cdaf..39b1ed773 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -174,7 +174,7 @@ txn_free(struct txn *txn)
}
struct txn *
-txn_begin(bool is_autocommit)
+txn_begin()
{
static int64_t tsn = 0;
assert(! in_txn());
@@ -187,7 +187,6 @@ txn_begin(bool is_autocommit)
txn->n_new_rows = 0;
txn->n_local_rows = 0;
txn->n_applier_rows = 0;
- txn->is_autocommit = is_autocommit;
txn->has_triggers = false;
txn->is_aborted = false;
txn->in_sub_stmt = 0;
@@ -222,26 +221,20 @@ txn_begin_in_engine(struct engine *engine, struct txn *txn)
return 0;
}
-struct txn *
-txn_begin_stmt(struct space *space)
+struct txn_stmt *
+txn_begin_stmt(struct txn *txn, struct space *space)
{
- struct txn *txn = in_txn();
- if (txn == NULL) {
- txn = txn_begin(true);
- if (txn == NULL)
- return NULL;
- } else if (txn->in_sub_stmt > TXN_SUB_STMT_MAX) {
+ assert(txn == in_txn());
+ assert(txn != NULL);
+ if (txn->in_sub_stmt > TXN_SUB_STMT_MAX) {
diag_set(ClientError, ER_SUB_STMT_MAX);
return NULL;
}
struct txn_stmt *stmt = txn_stmt_new(txn);
- if (stmt == NULL) {
- if (txn->is_autocommit && txn->in_sub_stmt == 0)
- txn_rollback();
+ if (stmt == NULL)
return NULL;
- }
if (space == NULL)
- return txn;
+ return stmt;
if (trigger_run(&space->on_stmt_begin, txn) != 0)
goto fail;
@@ -254,9 +247,9 @@ txn_begin_stmt(struct space *space)
if (engine_begin_statement(engine, txn) != 0)
goto fail;
- return txn;
+ return stmt;
fail:
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return NULL;
}
@@ -274,8 +267,7 @@ txn_is_distributed(struct txn *txn)
}
/**
- * End a statement. In autocommit mode, end
- * the current transaction as well.
+ * End a statement.
*/
int
txn_commit_stmt(struct txn *txn, struct request *request)
@@ -335,14 +327,9 @@ txn_commit_stmt(struct txn *txn, struct request *request)
goto fail;
}
--txn->in_sub_stmt;
- if (txn->is_autocommit && txn->in_sub_stmt == 0) {
- int rc = txn_commit(txn);
- fiber_gc();
- return rc;
- }
return 0;
fail:
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
@@ -378,7 +365,7 @@ txn_write_to_wal(struct txn *txn)
if (res < 0) {
/* Cascading rollback. */
- txn_rollback(); /* Perform our part of cascading rollback. */
+ txn_rollback(txn); /* Perform our part of cascading rollback. */
/*
* Move fiber to end of event loop to avoid
* execution of any new requests before all
@@ -461,14 +448,11 @@ fail:
}
void
-txn_rollback_stmt()
+txn_rollback_stmt(struct txn *txn)
{
- struct txn *txn = in_txn();
if (txn == NULL || txn->in_sub_stmt == 0)
return;
txn->in_sub_stmt--;
- if (txn->is_autocommit && txn->in_sub_stmt == 0)
- return txn_rollback();
txn_rollback_to_svp(txn, txn->sub_stmt_begin[txn->in_sub_stmt]);
}
@@ -540,7 +524,7 @@ box_txn_begin()
diag_set(ClientError, ER_ACTIVE_TRANSACTION);
return -1;
}
- if (txn_begin(false) == NULL)
+ if (txn_begin() == NULL)
return -1;
return 0;
}
diff --git a/src/box/txn.h b/src/box/txn.h
index f4d861824..5a66f8e53 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -162,11 +162,6 @@ struct txn {
* already assigned LSN.
*/
int n_applier_rows;
- /**
- * True if this transaction is running in autocommit mode
- * (statement end causes an automatic transaction commit).
- */
- bool is_autocommit;
/**
* True if the transaction was aborted so should be
* rolled back at commit.
@@ -214,7 +209,7 @@ in_txn()
* @pre no transaction is active
*/
struct txn *
-txn_begin(bool is_autocommit);
+txn_begin();
/**
* Commit a transaction.
@@ -271,11 +266,10 @@ txn_on_rollback(struct txn *txn, struct trigger *trigger)
}
/**
- * Start a new statement. If no current transaction,
- * start a new transaction with autocommit = true.
+ * Start a new statement.
*/
-struct txn *
-txn_begin_stmt(struct space *space);
+struct txn_stmt *
+txn_begin_stmt(struct txn *txn, struct space *space);
int
txn_begin_in_engine(struct engine *engine, struct txn *txn);
@@ -334,7 +328,7 @@ txn_commit_stmt(struct txn *txn, struct request *request);
* rolls back the entire transaction.
*/
void
-txn_rollback_stmt();
+txn_rollback_stmt(struct txn *txn);
/**
* Raise an error if this is a multi-statement transaction: DDL
diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c
index 0180331e3..74ac9a66c 100644
--- a/src/box/vy_scheduler.c
+++ b/src/box/vy_scheduler.c
@@ -840,14 +840,14 @@ vy_deferred_delete_process_one(struct space *deferred_delete_space,
tuple_unref(delete);
- struct txn *txn = txn_begin_stmt(deferred_delete_space);
- if (txn == NULL)
+ struct txn *txn = in_txn();
+ if (txn_begin_stmt(txn, deferred_delete_space) == NULL)
return -1;
struct tuple *unused;
if (space_execute_dml(deferred_delete_space, txn,
&request, &unused) != 0) {
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
return txn_commit_stmt(txn, &request);
--
2.22.0
^ permalink raw reply [flat|nested] 37+ messages in thread
* [tarantool-patches] Re: [PATCH v4 3/9] txn: get rid of autocommit from a txn structure
2019-06-19 21:23 ` [tarantool-patches] [PATCH v4 3/9] txn: get rid of autocommit from a txn structure Georgy Kirichenko
@ 2019-06-20 7:32 ` Konstantin Osipov
2019-06-20 11:52 ` [tarantool-patches] " Vladimir Davydov
1 sibling, 0 replies; 37+ messages in thread
From: Konstantin Osipov @ 2019-06-20 7:32 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
* Georgy Kirichenko <georgy@tarantool.org> [19/06/20 09:54]:
> Move transaction auto start and auto commit behavior to the box level.
> >From now a transaction won't start and commit automatically without
> txn_begin/txn_commit invocations. This is a part of a bigger transaction
> refactoring in order to implement detachable transactions and a parallel
> applier.
Leaving to Vova for a final review.
--
Konstantin Osipov, Moscow, Russia
^ permalink raw reply [flat|nested] 37+ messages in thread
* Re: [tarantool-patches] [PATCH v4 3/9] txn: get rid of autocommit from a txn structure
2019-06-19 21:23 ` [tarantool-patches] [PATCH v4 3/9] txn: get rid of autocommit from a txn structure Georgy Kirichenko
2019-06-20 7:32 ` [tarantool-patches] " Konstantin Osipov
@ 2019-06-20 11:52 ` Vladimir Davydov
2019-06-20 20:16 ` Георгий Кириченко
1 sibling, 1 reply; 37+ messages in thread
From: Vladimir Davydov @ 2019-06-20 11:52 UTC (permalink / raw)
To: Georgy Kirichenko; +Cc: tarantool-patches
On Thu, Jun 20, 2019 at 12:23:10AM +0300, Georgy Kirichenko wrote:
> @@ -1313,9 +1351,17 @@ box_sequence_reset(uint32_t seq_id)
> static inline void
> box_register_replica(uint32_t id, const struct tt_uuid *uuid)
> {
> + struct txn *txn = txn_begin();
> + if (txn == NULL)
> + diag_raise();
> if (boxk(IPROTO_INSERT, BOX_CLUSTER_ID, "[%u%s]",
> - (unsigned) id, tt_uuid_str(uuid)) != 0)
> + (unsigned) id, tt_uuid_str(uuid)) != 0) {
> + txn_rollback();
> diag_raise();
> + }
> + if (txn_commit(txn) != 0)
> + diag_raise();
> + fiber_gc();
> assert(replica_by_uuid(uuid)->id == id);
> }
>
It looks like you didn't addressed any of my comments to v3,
neither did you reply to the email:
https://www.freelists.org/post/tarantool-patches/PATCH-v3-0514-txn-get-rid-of-autocommit-from-a-txn-structure,1
^ permalink raw reply [flat|nested] 37+ messages in thread
* Re: [tarantool-patches] [PATCH v4 3/9] txn: get rid of autocommit from a txn structure
2019-06-20 11:52 ` [tarantool-patches] " Vladimir Davydov
@ 2019-06-20 20:16 ` Георгий Кириченко
0 siblings, 0 replies; 37+ messages in thread
From: Георгий Кириченко @ 2019-06-20 20:16 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: tarantool-patches
[-- Attachment #1: Type: text/plain, Size: 1006 bytes --]
On Thursday, June 20, 2019 2:52:21 PM MSK Vladimir Davydov wrote:
> On Thu, Jun 20, 2019 at 12:23:10AM +0300, Georgy Kirichenko wrote:
> > @@ -1313,9 +1351,17 @@ box_sequence_reset(uint32_t seq_id)
> >
> > static inline void
> > box_register_replica(uint32_t id, const struct tt_uuid *uuid)
> > {
> >
> > + struct txn *txn = txn_begin();
> > + if (txn == NULL)
> > + diag_raise();
> >
> > if (boxk(IPROTO_INSERT, BOX_CLUSTER_ID, "[%u%s]",
> >
> > - (unsigned) id, tt_uuid_str(uuid)) != 0)
> > + (unsigned) id, tt_uuid_str(uuid)) != 0) {
> > + txn_rollback();
> >
> > diag_raise();
> >
> > + }
> > + if (txn_commit(txn) != 0)
> > + diag_raise();
> > + fiber_gc();
> >
> > assert(replica_by_uuid(uuid)->id == id);
> >
> > }
>
> It looks like you didn't addressed any of my comments to v3,
> neither did you reply to the email:
Oh, sorry, I forgot this one
>
>
> https://www.freelists.org/post/tarantool-patches/PATCH-v3-0514-txn-get-rid-> of-autocommit-from-a-txn-structure,1
[-- Attachment #2: This is a digitally signed message part. --]
[-- Type: application/pgp-signature, Size: 488 bytes --]
^ permalink raw reply [flat|nested] 37+ messages in thread
* [tarantool-patches] [PATCH v4 4/9] txn: get rid of fiber_gc from txn_rollback
2019-06-19 21:23 [tarantool-patches] [PATCH v4 0/9] Parallel applier Georgy Kirichenko
` (2 preceding siblings ...)
2019-06-19 21:23 ` [tarantool-patches] [PATCH v4 3/9] txn: get rid of autocommit from a txn structure Georgy Kirichenko
@ 2019-06-19 21:23 ` Georgy Kirichenko
2019-06-20 7:43 ` [tarantool-patches] " Konstantin Osipov
2019-06-20 13:03 ` [tarantool-patches] " Vladimir Davydov
2019-06-19 21:23 ` [tarantool-patches] [PATCH v4 5/9] wal: a dedicated wal scheduling fiber Georgy Kirichenko
` (4 subsequent siblings)
8 siblings, 2 replies; 37+ messages in thread
From: Georgy Kirichenko @ 2019-06-19 21:23 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
Don't touch a fiber gc storage on a transaction rollback explicitly.
This relaxes dependencies between fiber and transaction life cycles.
Prerequisites: #1254
---
src/box/applier.cc | 8 +++++---
src/box/box.cc | 14 +++++++++-----
src/box/call.c | 22 ++++++++++++++++------
src/box/memtx_engine.c | 3 ++-
src/box/txn.c | 35 +++++++++++++++++------------------
src/box/txn.h | 8 ++++++--
src/box/vy_scheduler.c | 10 +++++++---
7 files changed, 62 insertions(+), 38 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index e3203a4c8..5a92f6109 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -190,7 +190,7 @@ apply_initial_join_row(struct xrow_header *row)
fiber_gc();
return rc;
rollback:
- txn_rollback();
+ txn_rollback(txn);
return -1;
}
@@ -422,7 +422,8 @@ applier_join(struct applier *applier)
if (txn == NULL)
diag_raise();
if (apply_row(&row) != 0) {
- txn_rollback();
+ txn_rollback(txn);
+ fiber_gc();
diag_raise();
}
if (txn_commit(txn) != 0)
@@ -625,7 +626,8 @@ applier_apply_tx(struct stailq *rows)
return txn_commit(txn);
rollback:
- txn_rollback();
+ txn_rollback(txn);
+ fiber_gc();
return -1;
}
diff --git a/src/box/box.cc b/src/box/box.cc
index 7f23716e5..5e5cd2b08 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -222,8 +222,10 @@ box_process_rw(struct request *request, struct space *space,
return 0;
fail:
- if (is_autocommit)
- txn_rollback();
+ if (is_autocommit) {
+ txn_rollback(txn);
+ fiber_gc();
+ }
return -1;
}
@@ -334,7 +336,7 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
}
if (box_process_rw(&request, space, NULL) != 0) {
say_error("error applying row: %s", request_str(&request));
- txn_rollback();
+ txn_rollback(txn);
diag_raise();
}
if (txn_commit(txn) != 0) {
@@ -1356,7 +1358,8 @@ box_register_replica(uint32_t id, const struct tt_uuid *uuid)
diag_raise();
if (boxk(IPROTO_INSERT, BOX_CLUSTER_ID, "[%u%s]",
(unsigned) id, tt_uuid_str(uuid)) != 0) {
- txn_rollback();
+ txn_rollback(txn);
+ fiber_gc();
diag_raise();
}
if (txn_commit(txn) != 0)
@@ -1688,7 +1691,8 @@ box_set_replicaset_uuid(const struct tt_uuid *replicaset_uuid)
/* Save replica set UUID in _schema */
if (boxk(IPROTO_INSERT, BOX_SCHEMA_ID, "[%s%s]", "cluster",
tt_uuid_str(&uu))) {
- txn_rollback();
+ txn_rollback(txn);
+ fiber_gc();
diag_raise();
}
if (txn_commit(txn) != 0)
diff --git a/src/box/call.c b/src/box/call.c
index 56da53fb3..7f6fc8bba 100644
--- a/src/box/call.c
+++ b/src/box/call.c
@@ -208,14 +208,18 @@ box_process_call(struct call_request *request, struct port *port)
if (orig_credentials)
fiber_set_user(fiber(), orig_credentials);
+ struct txn *txn = in_txn();
if (rc != 0) {
- txn_rollback();
+ if (txn != NULL)
+ txn_rollback(txn);
+ fiber_gc();
return -1;
}
- if (in_txn()) {
+ if (txn != NULL) {
diag_set(ClientError, ER_FUNCTION_TX_ACTIVE);
- txn_rollback();
+ txn_rollback(txn);
+ fiber_gc();
return -1;
}
@@ -229,14 +233,20 @@ box_process_eval(struct call_request *request, struct port *port)
/* Check permissions */
if (access_check_universe(PRIV_X) != 0)
return -1;
+ struct txn *txn;
if (box_lua_eval(request, port) != 0) {
- txn_rollback();
+ txn = in_txn();
+ if (txn != NULL)
+ txn_rollback(txn);
+ fiber_gc();
return -1;
}
- if (in_txn()) {
+ txn = in_txn();
+ if (txn != NULL) {
diag_set(ClientError, ER_FUNCTION_TX_ACTIVE);
- txn_rollback();
+ txn_rollback(txn);
+ fiber_gc();
return -1;
}
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index dae9955b2..f371d147f 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -274,7 +274,8 @@ memtx_engine_recover_snapshot_row(struct memtx_engine *memtx,
return -1;
/* no access checks here - applier always works with admin privs */
if (space_apply_initial_join_row(space, &request) != 0) {
- txn_rollback();
+ txn_rollback(txn);
+ fiber_gc();
return -1;
}
int rc = txn_commit(txn);
diff --git a/src/box/txn.c b/src/box/txn.c
index 39b1ed773..21f7e98b4 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -168,6 +168,10 @@ txn_new()
inline static void
txn_free(struct txn *txn)
{
+ struct txn_stmt *stmt;
+ stailq_foreach_entry(stmt, &txn->stmts, next)
+ txn_stmt_unref_tuples(stmt);
+
/* Truncate region up to struct txn size. */
region_truncate(&txn->region, sizeof(struct txn));
stailq_add(&txn_cache, &txn->in_txn_cache);
@@ -341,8 +345,10 @@ txn_write_to_wal(struct txn *txn)
struct journal_entry *req = journal_entry_new(txn->n_new_rows +
txn->n_applier_rows,
&txn->region);
- if (req == NULL)
+ if (req == NULL) {
+ txn_rollback(txn);
return -1;
+ }
struct txn_stmt *stmt;
struct xrow_header **remote_row = req->rows;
@@ -415,7 +421,7 @@ txn_commit(struct txn *txn)
if (txn->n_new_rows + txn->n_applier_rows > 0) {
txn->signature = txn_write_to_wal(txn);
if (txn->signature < 0)
- goto fail;
+ return -1;
}
/*
* Engine can be NULL if transaction contains IPROTO_NOP
@@ -435,15 +441,12 @@ txn_commit(struct txn *txn)
panic("commit trigger failed");
}
- struct txn_stmt *stmt;
- stailq_foreach_entry(stmt, &txn->stmts, next)
- txn_stmt_unref_tuples(stmt);
fiber_set_txn(fiber(), NULL);
txn_free(txn);
return 0;
fail:
- txn_rollback();
+ txn_rollback(txn);
return -1;
}
@@ -457,11 +460,9 @@ txn_rollback_stmt(struct txn *txn)
}
void
-txn_rollback()
+txn_rollback(struct txn *txn)
{
- struct txn *txn = in_txn();
- if (txn == NULL)
- return;
+ assert(txn == in_txn());
trigger_clear(&txn->fiber_on_stop);
if (txn->engine)
engine_rollback(txn->engine, txn);
@@ -473,12 +474,6 @@ txn_rollback()
panic("rollback trigger failed");
}
- struct txn_stmt *stmt;
- stailq_foreach_entry(stmt, &txn->stmts, next)
- txn_stmt_unref_tuples(stmt);
-
- /** Free volatile txn memory. */
- fiber_gc();
fiber_set_txn(fiber(), NULL);
txn_free(txn);
}
@@ -554,11 +549,14 @@ int
box_txn_rollback()
{
struct txn *txn = in_txn();
+ if (txn == NULL)
+ return 0;
if (txn && txn->in_sub_stmt) {
diag_set(ClientError, ER_ROLLBACK_IN_SUB_STMT);
return -1;
}
- txn_rollback(); /* doesn't throw */
+ txn_rollback(txn); /* doesn't throw */
+ fiber_gc();
return 0;
}
@@ -636,6 +634,7 @@ txn_on_stop(struct trigger *trigger, void *event)
{
(void) trigger;
(void) event;
- txn_rollback(); /* doesn't yield or fail */
+ txn_rollback(in_txn()); /* doesn't yield or fail */
+
}
diff --git a/src/box/txn.h b/src/box/txn.h
index 5a66f8e53..569978ce9 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -221,9 +221,12 @@ txn_begin();
int
txn_commit(struct txn *txn);
-/** Rollback a transaction, if any. */
+/**
+ * Rollback a transaction.
+ * @pre txn == in_txn()
+ */
void
-txn_rollback();
+txn_rollback(struct txn *txn);
/**
* Roll back the transaction but keep the object around.
@@ -267,6 +270,7 @@ txn_on_rollback(struct txn *txn, struct trigger *trigger)
/**
* Start a new statement.
+ * Return a new statement or NULL in case of error.
*/
struct txn_stmt *
txn_begin_stmt(struct txn *txn, struct space *space);
diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c
index 74ac9a66c..5981f475e 100644
--- a/src/box/vy_scheduler.c
+++ b/src/box/vy_scheduler.c
@@ -889,18 +889,22 @@ vy_deferred_delete_batch_process_f(struct cmsg *cmsg)
for (int i = 0; i < batch->count; i++) {
if (vy_deferred_delete_process_one(deferred_delete_space,
pk->space_id, pk->mem_format,
- &batch->stmt[i]) != 0)
- goto fail;
+ &batch->stmt[i]) != 0) {
+ goto fail_rollback;
+ }
}
if (txn_commit(txn) != 0)
goto fail;
fiber_gc();
return;
+
+fail_rollback:
+ txn_rollback(txn);
fail:
batch->is_failed = true;
diag_move(diag_get(), &batch->diag);
- txn_rollback();
+ fiber_gc();
}
/**
--
2.22.0
^ permalink raw reply [flat|nested] 37+ messages in thread
* [tarantool-patches] Re: [PATCH v4 4/9] txn: get rid of fiber_gc from txn_rollback
2019-06-19 21:23 ` [tarantool-patches] [PATCH v4 4/9] txn: get rid of fiber_gc from txn_rollback Georgy Kirichenko
@ 2019-06-20 7:43 ` Konstantin Osipov
2019-06-20 20:35 ` Георгий Кириченко
2019-06-20 13:03 ` [tarantool-patches] " Vladimir Davydov
1 sibling, 1 reply; 37+ messages in thread
From: Konstantin Osipov @ 2019-06-20 7:43 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
* Georgy Kirichenko <georgy@tarantool.org> [19/06/20 09:54]:
> Don't touch a fiber gc storage on a transaction rollback explicitly.
> This relaxes dependencies between fiber and transaction life cycles.
As discussed verbally, LGTM.
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -190,7 +190,7 @@ apply_initial_join_row(struct xrow_header *row)
> fiber_gc();
> return rc;
> rollback:
> - txn_rollback();
> + txn_rollback(txn);
> return -1;
> }
>
I weighed your argument that an error leads to diag_raise(), which
ends the running fiber, so fiber_gc() is unnecessary. This is
true, but don't you think it is a change in the protocol between
the caller and the callee? What should be this protocol generally
now that we moved most of memory allocation into txn? What should
is fiber gc useful for, and should the user always use it with
savepoints perhaps?
While we're thinking about it and inspecting all of the code to
use txn gc if at all possible, please add a comment that
fiber_gc() will be called by the caller since we expect the error
to abort it. Actually whichever you prefer -a comment or a
fiber_gc(). I prefer fiber_gc(), until we replace it with an
assert that fiber gc is not used in this function.
> @@ -334,7 +336,7 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
> }
> if (box_process_rw(&request, space, NULL) != 0) {
> say_error("error applying row: %s", request_str(&request));
> - txn_rollback();
> + txn_rollback(txn);
> diag_raise();
the same argument is here. please either add a comment or not
change the protocol between the caller and the callee and leave
fiber_gc() in place. Tomorrow the error may be suppressed for
whatever reason, and we may start having growing garbage on fiber
gc pool. Honestly, until we add fiber_gc() to every fiber yield or
use another system-wide mechanism to ensure it never grows out of
control, I don't think it's safe to remove it from such places.
Fusing fiber memory and txn memory had one advantage: we had at
leasat some certainty that a fiber which executes transaction will
free its memory on a regular basis. Now there is no certainty at
all.
With these two minor comments the patch is LGTM.
--
Konstantin Osipov, Moscow, Russia
^ permalink raw reply [flat|nested] 37+ messages in thread
* [tarantool-patches] Re: [PATCH v4 4/9] txn: get rid of fiber_gc from txn_rollback
2019-06-20 7:43 ` [tarantool-patches] " Konstantin Osipov
@ 2019-06-20 20:35 ` Георгий Кириченко
0 siblings, 0 replies; 37+ messages in thread
From: Георгий Кириченко @ 2019-06-20 20:35 UTC (permalink / raw)
To: Konstantin Osipov; +Cc: tarantool-patches
[-- Attachment #1: Type: text/plain, Size: 2482 bytes --]
On Thursday, June 20, 2019 10:43:18 AM MSK Konstantin Osipov wrote:
Accepted, I will restore all fiber_gc
> * Georgy Kirichenko <georgy@tarantool.org> [19/06/20 09:54]:
> > Don't touch a fiber gc storage on a transaction rollback explicitly.
> > This relaxes dependencies between fiber and transaction life cycles.
>
> As discussed verbally, LGTM.
>
> > --- a/src/box/applier.cc
> > +++ b/src/box/applier.cc
> > @@ -190,7 +190,7 @@ apply_initial_join_row(struct xrow_header *row)
> >
> > fiber_gc();
> > return rc;
> >
> > rollback:
> > - txn_rollback();
> > + txn_rollback(txn);
> >
> > return -1;
> >
> > }
>
> I weighed your argument that an error leads to diag_raise(), which
> ends the running fiber, so fiber_gc() is unnecessary. This is
> true, but don't you think it is a change in the protocol between
> the caller and the callee? What should be this protocol generally
> now that we moved most of memory allocation into txn? What should
> is fiber gc useful for, and should the user always use it with
> savepoints perhaps?
>
> While we're thinking about it and inspecting all of the code to
> use txn gc if at all possible, please add a comment that
> fiber_gc() will be called by the caller since we expect the error
> to abort it. Actually whichever you prefer -a comment or a
> fiber_gc(). I prefer fiber_gc(), until we replace it with an
> assert that fiber gc is not used in this function.
>
> > @@ -334,7 +336,7 @@ apply_wal_row(struct xstream *stream, struct
> > xrow_header *row)>
> > }
> > if (box_process_rw(&request, space, NULL) != 0) {
> >
> > say_error("error applying row: %s",
request_str(&request));
> >
> > - txn_rollback();
> > + txn_rollback(txn);
> >
> > diag_raise();
>
> the same argument is here. please either add a comment or not
> change the protocol between the caller and the callee and leave
> fiber_gc() in place. Tomorrow the error may be suppressed for
> whatever reason, and we may start having growing garbage on fiber
> gc pool. Honestly, until we add fiber_gc() to every fiber yield or
> use another system-wide mechanism to ensure it never grows out of
> control, I don't think it's safe to remove it from such places.
>
> Fusing fiber memory and txn memory had one advantage: we had at
> leasat some certainty that a fiber which executes transaction will
> free its memory on a regular basis. Now there is no certainty at
> all.
>
> With these two minor comments the patch is LGTM.
[-- Attachment #2: This is a digitally signed message part. --]
[-- Type: application/pgp-signature, Size: 488 bytes --]
^ permalink raw reply [flat|nested] 37+ messages in thread
* Re: [tarantool-patches] [PATCH v4 4/9] txn: get rid of fiber_gc from txn_rollback
2019-06-19 21:23 ` [tarantool-patches] [PATCH v4 4/9] txn: get rid of fiber_gc from txn_rollback Georgy Kirichenko
2019-06-20 7:43 ` [tarantool-patches] " Konstantin Osipov
@ 2019-06-20 13:03 ` Vladimir Davydov
2019-06-20 20:16 ` Георгий Кириченко
1 sibling, 1 reply; 37+ messages in thread
From: Vladimir Davydov @ 2019-06-20 13:03 UTC (permalink / raw)
To: Georgy Kirichenko; +Cc: tarantool-patches
On Thu, Jun 20, 2019 at 12:23:11AM +0300, Georgy Kirichenko wrote:
> Don't touch a fiber gc storage on a transaction rollback explicitly.
> This relaxes dependencies between fiber and transaction life cycles.
>
> Prerequisites: #1254
> ---
> src/box/applier.cc | 8 +++++---
> src/box/box.cc | 14 +++++++++-----
> src/box/call.c | 22 ++++++++++++++++------
> src/box/memtx_engine.c | 3 ++-
> src/box/txn.c | 35 +++++++++++++++++------------------
> src/box/txn.h | 8 ++++++--
> src/box/vy_scheduler.c | 10 +++++++---
> 7 files changed, 62 insertions(+), 38 deletions(-)
>
> diff --git a/src/box/txn.c b/src/box/txn.c
> index 39b1ed773..21f7e98b4 100644
> --- a/src/box/txn.c
> +++ b/src/box/txn.c
> @@ -168,6 +168,10 @@ txn_new()
> inline static void
> txn_free(struct txn *txn)
> {
> + struct txn_stmt *stmt;
> + stailq_foreach_entry(stmt, &txn->stmts, next)
> + txn_stmt_unref_tuples(stmt);
> +
Clearly, this doesn't belong here. Should be a part of the previous
patch?
> /* Truncate region up to struct txn size. */
> region_truncate(&txn->region, sizeof(struct txn));
> stailq_add(&txn_cache, &txn->in_txn_cache);
> @@ -636,6 +634,7 @@ txn_on_stop(struct trigger *trigger, void *event)
> {
> (void) trigger;
> (void) event;
> - txn_rollback(); /* doesn't yield or fail */
> + txn_rollback(in_txn()); /* doesn't yield or fail */
> +
Extra new line.
> }
>
> diff --git a/src/box/txn.h b/src/box/txn.h
> index 5a66f8e53..569978ce9 100644
> --- a/src/box/txn.h
> +++ b/src/box/txn.h
> @@ -267,6 +270,7 @@ txn_on_rollback(struct txn *txn, struct trigger *trigger)
>
> /**
> * Start a new statement.
> + * Return a new statement or NULL in case of error.
Again, looks like a leftover from the prevoius patch.
Other than that the patch looks fine to me. I would rather remove
fiber_gc() altogether, but as Kostja pointed out, it's too risky
so fine - let it be.
> */
> struct txn_stmt *
> txn_begin_stmt(struct txn *txn, struct space *space);
^ permalink raw reply [flat|nested] 37+ messages in thread
* Re: [tarantool-patches] [PATCH v4 4/9] txn: get rid of fiber_gc from txn_rollback
2019-06-20 13:03 ` [tarantool-patches] " Vladimir Davydov
@ 2019-06-20 20:16 ` Георгий Кириченко
0 siblings, 0 replies; 37+ messages in thread
From: Георгий Кириченко @ 2019-06-20 20:16 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: tarantool-patches
[-- Attachment #1: Type: text/plain, Size: 2391 bytes --]
On Thursday, June 20, 2019 4:03:52 PM MSK Vladimir Davydov wrote:
> On Thu, Jun 20, 2019 at 12:23:11AM +0300, Georgy Kirichenko wrote:
> > Don't touch a fiber gc storage on a transaction rollback explicitly.
> > This relaxes dependencies between fiber and transaction life cycles.
> >
> > Prerequisites: #1254
> > ---
> >
> > src/box/applier.cc | 8 +++++---
> > src/box/box.cc | 14 +++++++++-----
> > src/box/call.c | 22 ++++++++++++++++------
> > src/box/memtx_engine.c | 3 ++-
> > src/box/txn.c | 35 +++++++++++++++++------------------
> > src/box/txn.h | 8 ++++++--
> > src/box/vy_scheduler.c | 10 +++++++---
> > 7 files changed, 62 insertions(+), 38 deletions(-)
> >
> > diff --git a/src/box/txn.c b/src/box/txn.c
> > index 39b1ed773..21f7e98b4 100644
> > --- a/src/box/txn.c
> > +++ b/src/box/txn.c
> > @@ -168,6 +168,10 @@ txn_new()
> >
> > inline static void
> > txn_free(struct txn *txn)
> > {
> >
> > + struct txn_stmt *stmt;
> > + stailq_foreach_entry(stmt, &txn->stmts, next)
> > + txn_stmt_unref_tuples(stmt);
> > +
>
> Clearly, this doesn't belong here. Should be a part of the previous
> patch?
I think it would be better if I moved this to a dedicated refactoring patch.
>
> > /* Truncate region up to struct txn size. */
> > region_truncate(&txn->region, sizeof(struct txn));
> > stailq_add(&txn_cache, &txn->in_txn_cache);
> >
> > @@ -636,6 +634,7 @@ txn_on_stop(struct trigger *trigger, void *event)
> >
> > {
> >
> > (void) trigger;
> > (void) event;
> >
> > - txn_rollback(); /* doesn't yield or fail */
> > + txn_rollback(in_txn()); /* doesn't yield or fail */
> > +
>
> Extra new line.
>
> > }
> >
> > diff --git a/src/box/txn.h b/src/box/txn.h
> > index 5a66f8e53..569978ce9 100644
> > --- a/src/box/txn.h
> > +++ b/src/box/txn.h
> > @@ -267,6 +270,7 @@ txn_on_rollback(struct txn *txn, struct trigger
> > *trigger)>
> > /**
> >
> > * Start a new statement.
> >
> > + * Return a new statement or NULL in case of error.
>
> Again, looks like a leftover from the prevoius patch.
>
> Other than that the patch looks fine to me. I would rather remove
> fiber_gc() altogether, but as Kostja pointed out, it's too risky
> so fine - let it be.
Accepted
>
> > */
> >
> > struct txn_stmt *
> > txn_begin_stmt(struct txn *txn, struct space *space);
[-- Attachment #2: This is a digitally signed message part. --]
[-- Type: application/pgp-signature, Size: 488 bytes --]
^ permalink raw reply [flat|nested] 37+ messages in thread
* [tarantool-patches] [PATCH v4 5/9] wal: a dedicated wal scheduling fiber
2019-06-19 21:23 [tarantool-patches] [PATCH v4 0/9] Parallel applier Georgy Kirichenko
` (3 preceding siblings ...)
2019-06-19 21:23 ` [tarantool-patches] [PATCH v4 4/9] txn: get rid of fiber_gc from txn_rollback Georgy Kirichenko
@ 2019-06-19 21:23 ` Georgy Kirichenko
2019-06-20 7:53 ` [tarantool-patches] " Konstantin Osipov
2019-06-20 13:05 ` [tarantool-patches] " Vladimir Davydov
2019-06-19 21:23 ` [tarantool-patches] [PATCH v4 6/9] wal: introduce a journal entry finalization callback Georgy Kirichenko
` (3 subsequent siblings)
8 siblings, 2 replies; 37+ messages in thread
From: Georgy Kirichenko @ 2019-06-19 21:23 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
As we intend to implement asynchronous transaction we should be able to
process a transaction finalization despite the fact whether a
transaction has a waiting fiber or not. So introduce a wal->tx scheduler
fiber to process the transaction finalization. As we don't have a
asynchronous transaction right now the fiber only wakes a fiber.
Note: this induces one context switch for each batch and doesn't look too
expensive to be unaffordable luxury.
Prerequisites: #1254
---
src/box/wal.c | 53 ++++++++++++++++++++++++++++++++++++++-------------
1 file changed, 40 insertions(+), 13 deletions(-)
diff --git a/src/box/wal.c b/src/box/wal.c
index 0ea15a432..71f6dbb5c 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -91,6 +91,12 @@ struct wal_writer
struct cpipe wal_pipe;
/** A memory pool for messages. */
struct mempool msg_pool;
+ /** A queue to schedule journal entry completions. */
+ struct stailq schedule_queue;
+ /** True if writer is in rollback state. */
+ bool is_in_rollback;
+ /** A condition to signal about new schedule queue entries. */
+ struct fiber_cond schedule_cond;
/* ----------------- wal ------------------- */
/** A setting from instance configuration - rows_per_wal */
int64_t wal_max_rows;
@@ -245,23 +251,35 @@ xlog_write_entry(struct xlog *l, struct journal_entry *entry)
return xlog_tx_commit(l);
}
+/*
+ * Tx schedule fiber function.
+ */
+static int
+tx_schedule_f(va_list ap)
+{
+ struct wal_writer *writer = va_arg(ap, struct wal_writer *);
+ while (!fiber_is_cancelled()) {
+ while (!stailq_empty(&writer->schedule_queue)) {
+ struct journal_entry *req =
+ stailq_shift_entry(&writer->schedule_queue,
+ struct journal_entry, fifo);
+ fiber_wakeup(req->fiber);
+ }
+ writer->is_in_rollback = false;
+ fiber_cond_wait(&writer->schedule_cond);
+ }
+ return 0;
+}
+
/**
- * Invoke fibers waiting for their journal_entry's to be
- * completed. The fibers are invoked in strict fifo order:
- * this ensures that, in case of rollback, requests are
- * rolled back in strict reverse order, producing
- * a consistent database state.
+ * Attach requests to a scheduling queue.
*/
static void
tx_schedule_queue(struct stailq *queue)
{
- /*
- * fiber_wakeup() is faster than fiber_call() when there
- * are many ready fibers.
- */
- struct journal_entry *req;
- stailq_foreach_entry(req, queue, fifo)
- fiber_wakeup(req->fiber);
+ struct wal_writer *writer = &wal_writer_singleton;
+ stailq_concat(&writer->schedule_queue, queue);
+ fiber_cond_signal(&writer->schedule_cond);
}
/**
@@ -306,6 +324,8 @@ tx_schedule_rollback(struct cmsg *msg)
/* Must not yield. */
tx_schedule_queue(&writer->rollback);
stailq_create(&writer->rollback);
+ writer->is_in_rollback = true;
+
if (msg != &writer->in_rollback)
mempool_free(&writer->msg_pool,
container_of(msg, struct wal_msg, base));
@@ -356,6 +376,7 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
writer->wal_mode = wal_mode;
writer->wal_max_rows = wal_max_rows;
writer->wal_max_size = wal_max_size;
+ writer->is_in_rollback = false;
journal_create(&writer->base, wal_mode == WAL_NONE ?
wal_write_in_wal_mode_none : wal_write, NULL);
@@ -368,6 +389,12 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
stailq_create(&writer->rollback);
cmsg_init(&writer->in_rollback, NULL);
+ stailq_create(&writer->schedule_queue);
+ fiber_cond_create(&writer->schedule_cond);
+ struct fiber *schedule_fiber = fiber_new("tx_schedule", tx_schedule_f);
+ if (schedule_fiber == NULL)
+ panic("Could not create schedule fiber");
+ fiber_start(schedule_fiber, writer);
writer->checkpoint_wal_size = 0;
writer->checkpoint_threshold = INT64_MAX;
@@ -1133,7 +1160,7 @@ wal_write(struct journal *journal, struct journal_entry *entry)
ERROR_INJECT_RETURN(ERRINJ_WAL_IO);
- if (! stailq_empty(&writer->rollback)) {
+ if (writer->is_in_rollback) {
/*
* The writer rollback queue is not empty,
* roll back this transaction immediately.
--
2.22.0
^ permalink raw reply [flat|nested] 37+ messages in thread
* [tarantool-patches] Re: [PATCH v4 5/9] wal: a dedicated wal scheduling fiber
2019-06-19 21:23 ` [tarantool-patches] [PATCH v4 5/9] wal: a dedicated wal scheduling fiber Georgy Kirichenko
@ 2019-06-20 7:53 ` Konstantin Osipov
2019-06-20 13:05 ` [tarantool-patches] " Vladimir Davydov
1 sibling, 0 replies; 37+ messages in thread
From: Konstantin Osipov @ 2019-06-20 7:53 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
* Georgy Kirichenko <georgy@tarantool.org> [19/06/20 09:54]:
First of all I think this patch is small enough to be collapsed
with the subsequent patch which uses the machinery.
Vova will remove yield from vinyl pretty soon, so we might not
need this patch after all.
> + /** A queue to schedule journal entry completions. */
> + struct stailq schedule_queue;
> + /** True if writer is in rollback state. */
> + bool is_in_rollback;
Why do you need a separate boolean variable? There is plenty of
state which could be used to check that we're in rollback already.
> + /** A condition to signal about new schedule queue entries. */
> + struct fiber_cond schedule_cond;
> /* ----------------- wal ------------------- */
> /** A setting from instance configuration - rows_per_wal */
> int64_t wal_max_rows;
> @@ -245,23 +251,35 @@ xlog_write_entry(struct xlog *l, struct journal_entry *entry)
> return xlog_tx_commit(l);
> }
>
> +/*
> + * Tx schedule fiber function.
> + */
> +static int
> +tx_schedule_f(va_list ap)
> +{
> + struct wal_writer *writer = va_arg(ap, struct wal_writer *);
> + while (!fiber_is_cancelled()) {
> + while (!stailq_empty(&writer->schedule_queue)) {
> + struct journal_entry *req =
> + stailq_shift_entry(&writer->schedule_queue,
> + struct journal_entry, fifo);
> + fiber_wakeup(req->fiber);
> + }
> + writer->is_in_rollback = false;
> + fiber_cond_wait(&writer->schedule_cond);
> + }
> + return 0;
> +}
> +
> /**
> - * Invoke fibers waiting for their journal_entry's to be
> - * completed. The fibers are invoked in strict fifo order:
> - * this ensures that, in case of rollback, requests are
> - * rolled back in strict reverse order, producing
> - * a consistent database state.
> + * Attach requests to a scheduling queue.
> */
> static void
> tx_schedule_queue(struct stailq *queue)
> {
> - /*
> - * fiber_wakeup() is faster than fiber_call() when there
> - * are many ready fibers.
> - */
> - struct journal_entry *req;
> - stailq_foreach_entry(req, queue, fifo)
> - fiber_wakeup(req->fiber);
> + struct wal_writer *writer = &wal_writer_singleton;
> + stailq_concat(&writer->schedule_queue, queue);
> + fiber_cond_signal(&writer->schedule_cond);
> }
Sorry for a nit, but you use fiber_wakeup() for tx fibers but
cond_signal, which incurs an extra stailq manipulation, with this
fiber.
Why do you need another queue? Shouldn't you replace existing
queues with a single one instead?
Having commit, rollback and schedule queues is confusing.
I like it that you effectively finalize all tx in one roll, and
then yield to the user fibers. Effectively this is a double sweep
over the list of tx, but it is less context switches overall,
since we don't switch in and out of the scheduler fiber in this
case, but switch from one ready fiber to another. Could you please
confirm?
--
Konstantin Osipov, Moscow, Russia
^ permalink raw reply [flat|nested] 37+ messages in thread
* Re: [tarantool-patches] [PATCH v4 5/9] wal: a dedicated wal scheduling fiber
2019-06-19 21:23 ` [tarantool-patches] [PATCH v4 5/9] wal: a dedicated wal scheduling fiber Georgy Kirichenko
2019-06-20 7:53 ` [tarantool-patches] " Konstantin Osipov
@ 2019-06-20 13:05 ` Vladimir Davydov
1 sibling, 0 replies; 37+ messages in thread
From: Vladimir Davydov @ 2019-06-20 13:05 UTC (permalink / raw)
To: Georgy Kirichenko; +Cc: tarantool-patches
On Thu, Jun 20, 2019 at 12:23:12AM +0300, Georgy Kirichenko wrote:
> As we intend to implement asynchronous transaction we should be able to
> process a transaction finalization despite the fact whether a
> transaction has a waiting fiber or not. So introduce a wal->tx scheduler
> fiber to process the transaction finalization. As we don't have a
> asynchronous transaction right now the fiber only wakes a fiber.
>
> Note: this induces one context switch for each batch and doesn't look too
> expensive to be unaffordable luxury.
I pushed the patch removing yields from on_commit/rollback triggers in
Vinyl so I guess we can safely drop this one.
^ permalink raw reply [flat|nested] 37+ messages in thread
* [tarantool-patches] [PATCH v4 6/9] wal: introduce a journal entry finalization callback
2019-06-19 21:23 [tarantool-patches] [PATCH v4 0/9] Parallel applier Georgy Kirichenko
` (4 preceding siblings ...)
2019-06-19 21:23 ` [tarantool-patches] [PATCH v4 5/9] wal: a dedicated wal scheduling fiber Georgy Kirichenko
@ 2019-06-19 21:23 ` Georgy Kirichenko
2019-06-20 7:56 ` [tarantool-patches] " Konstantin Osipov
2019-06-20 14:08 ` [tarantool-patches] " Vladimir Davydov
2019-06-19 21:23 ` [tarantool-patches] [PATCH v4 7/9] txn: introduce asynchronous txn commit Georgy Kirichenko
` (2 subsequent siblings)
8 siblings, 2 replies; 37+ messages in thread
From: Georgy Kirichenko @ 2019-06-19 21:23 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
Finalize a transaction thorough a journal entry callback. So transaction
processing doesn't rely on fiber schedule. This also enforce transaction
finalization order as triggers might fail.
Prerequisites: #1254
---
src/box/alter.cc | 5 +++
src/box/box.cc | 7 ++-
src/box/journal.c | 10 ++++-
src/box/journal.h | 12 ++++-
src/box/txn.c | 112 ++++++++++++++++++++++++++++++----------------
src/box/vy_log.c | 3 +-
src/box/wal.c | 20 ++++++++-
7 files changed, 122 insertions(+), 47 deletions(-)
diff --git a/src/box/alter.cc b/src/box/alter.cc
index a37a68ce4..aa6a79264 100644
--- a/src/box/alter.cc
+++ b/src/box/alter.cc
@@ -3558,6 +3558,11 @@ unlock_after_dd(struct trigger *trigger, void *event)
{
(void) trigger;
(void) event;
+ /*
+ * A trigger could be processed by the wal scheduler fiber
+ * so steal the latch first.
+ */
+ latch_steal(&schema_lock);
latch_unlock(&schema_lock);
}
diff --git a/src/box/box.cc b/src/box/box.cc
index 5e5cd2b08..2994363ab 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -309,10 +309,13 @@ struct recovery_journal {
*/
static int64_t
recovery_journal_write(struct journal *base,
- struct journal_entry * /* entry */)
+ struct journal_entry *entry)
{
struct recovery_journal *journal = (struct recovery_journal *) base;
- return vclock_sum(journal->vclock);
+ entry->res = vclock_sum(journal->vclock);
+ if (entry->on_done_cb)
+ entry->on_done_cb(entry, entry->on_done_cb_data);
+ return entry->res;
}
static inline void
diff --git a/src/box/journal.c b/src/box/journal.c
index fe13fb6ee..eb0db9af2 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -41,7 +41,9 @@ static int64_t
dummy_journal_write(struct journal *journal, struct journal_entry *entry)
{
(void) journal;
- (void) entry;
+ entry->res = 0;
+ if (entry->on_done_cb)
+ entry->on_done_cb(entry, entry->on_done_cb_data);
return 0;
}
@@ -53,7 +55,9 @@ static struct journal dummy_journal = {
struct journal *current_journal = &dummy_journal;
struct journal_entry *
-journal_entry_new(size_t n_rows, struct region *region)
+journal_entry_new(size_t n_rows, struct region *region,
+ void (*on_done_cb)(struct journal_entry *entry, void *data),
+ void *on_done_cb_data)
{
struct journal_entry *entry;
@@ -70,6 +74,8 @@ journal_entry_new(size_t n_rows, struct region *region)
entry->n_rows = n_rows;
entry->res = -1;
entry->fiber = fiber();
+ entry->on_done_cb = on_done_cb;
+ entry->on_done_cb_data = on_done_cb_data;
return entry;
}
diff --git a/src/box/journal.h b/src/box/journal.h
index 8ac32ee5e..b704b5c67 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -58,6 +58,14 @@ struct journal_entry {
* The fiber issuing the request.
*/
struct fiber *fiber;
+ /**
+ * A journal entry completion callback.
+ */
+ void (*on_done_cb)(struct journal_entry *entry, void *data);
+ /**
+ * A journal entry completion callback argument.
+ */
+ void *on_done_cb_data;
/**
* Approximate size of this request when encoded.
*/
@@ -80,7 +88,9 @@ struct region;
* @return NULL if out of memory, fiber diagnostics area is set
*/
struct journal_entry *
-journal_entry_new(size_t n_rows, struct region *region);
+journal_entry_new(size_t n_rows, struct region *region,
+ void (*on_done_cb)(struct journal_entry *entry, void *data),
+ void *on_done_cb_data);
/**
* An API for an abstract journal for all transactions of this
diff --git a/src/box/txn.c b/src/box/txn.c
index 21f7e98b4..52e16f3e6 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -337,6 +337,66 @@ fail:
return -1;
}
+/**
+ * Complete transaction processing.
+ */
+static void
+txn_complete(struct txn *txn)
+{
+ if (txn->signature < 0) {
+ if (txn->engine)
+ engine_rollback(txn->engine, txn);
+ /*
+ * Some of triggers require for in_txn variable is set so
+ * restore it for time a trigger is in progress.
+ */
+ fiber_set_txn(fiber(), txn);
+ /* Rollback triggers must not throw. */
+ if (txn->has_triggers &&
+ trigger_run(&txn->on_rollback, txn) != 0) {
+ diag_log();
+ unreachable();
+ panic("rollback trigger failed");
+ }
+ fiber_set_txn(fiber(), NULL);
+
+ return;
+ }
+ /*
+ * Engine can be NULL if transaction contains IPROTO_NOP
+ * statements only.
+ */
+ if (txn->engine != NULL)
+ engine_commit(txn->engine, txn);
+ /*
+ * Some of triggers require for in_txn variable is set so
+ * restore it for time a trigger is in progress.
+ */
+ fiber_set_txn(fiber(), txn);
+ /*
+ * The transaction is in the binary log. No action below
+ * may throw. In case an error has happened, there is
+ * no other option but terminate.
+ */
+ if (txn->has_triggers &&
+ trigger_run(&txn->on_commit, txn) != 0) {
+ diag_log();
+ unreachable();
+ panic("commit trigger failed");
+ }
+
+ fiber_set_txn(fiber(), NULL);
+}
+
+static void
+txn_entry_done_cb(struct journal_entry *entry, void *data)
+{
+ struct txn *txn = (struct txn *)data;
+ txn->signature = entry->res;
+ txn_complete(txn);
+}
+
+
static int64_t
txn_write_to_wal(struct txn *txn)
{
@@ -344,7 +404,9 @@ txn_write_to_wal(struct txn *txn)
struct journal_entry *req = journal_entry_new(txn->n_new_rows +
txn->n_applier_rows,
- &txn->region);
+ &txn->region,
+ txn_entry_done_cb,
+ txn);
if (req == NULL) {
txn_rollback(txn);
return -1;
@@ -370,14 +432,6 @@ txn_write_to_wal(struct txn *txn)
ev_tstamp stop = ev_monotonic_now(loop());
if (res < 0) {
- /* Cascading rollback. */
- txn_rollback(txn); /* Perform our part of cascading rollback. */
- /*
- * Move fiber to end of event loop to avoid
- * execution of any new requests before all
- * pending rollbacks are processed.
- */
- fiber_reschedule();
diag_set(ClientError, ER_WAL_IO);
diag_log();
} else if (stop - start > too_long_threshold) {
@@ -418,31 +472,20 @@ txn_commit(struct txn *txn)
}
trigger_clear(&txn->fiber_on_stop);
+ fiber_set_txn(fiber(), NULL);
if (txn->n_new_rows + txn->n_applier_rows > 0) {
txn->signature = txn_write_to_wal(txn);
if (txn->signature < 0)
return -1;
+ } else {
+ /*
+ * However there is noting to write to wal a completion
+ * should be fired.
+ */
+ txn->signature = 0;
+ txn_complete(txn);
}
- /*
- * Engine can be NULL if transaction contains IPROTO_NOP
- * statements only.
- */
- if (txn->engine != NULL)
- engine_commit(txn->engine, txn);
- /*
- * The transaction is in the binary log. No action below
- * may throw. In case an error has happened, there is
- * no other option but terminate.
- */
- if (txn->has_triggers &&
- trigger_run(&txn->on_commit, txn) != 0) {
- diag_log();
- unreachable();
- panic("commit trigger failed");
- }
-
- fiber_set_txn(fiber(), NULL);
txn_free(txn);
return 0;
fail:
@@ -464,17 +507,8 @@ txn_rollback(struct txn *txn)
{
assert(txn == in_txn());
trigger_clear(&txn->fiber_on_stop);
- if (txn->engine)
- engine_rollback(txn->engine, txn);
- /* Rollback triggers must not throw. */
- if (txn->has_triggers &&
- trigger_run(&txn->on_rollback, txn) != 0) {
- diag_log();
- unreachable();
- panic("rollback trigger failed");
- }
-
- fiber_set_txn(fiber(), NULL);
+ txn->signature = -1;
+ txn_complete(txn);
txn_free(txn);
}
diff --git a/src/box/vy_log.c b/src/box/vy_log.c
index bdc1cfa31..7cf5ff3e9 100644
--- a/src/box/vy_log.c
+++ b/src/box/vy_log.c
@@ -772,7 +772,8 @@ vy_log_flush(void)
tx_size++;
size_t used = region_used(&fiber()->gc);
- struct journal_entry *entry = journal_entry_new(tx_size, &fiber()->gc);
+ struct journal_entry *entry = journal_entry_new(tx_size, &fiber()->gc,
+ NULL, NULL);
if (entry == NULL)
goto err;
diff --git a/src/box/wal.c b/src/box/wal.c
index 71f6dbb5c..62b6391fd 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -263,6 +263,8 @@ tx_schedule_f(va_list ap)
struct journal_entry *req =
stailq_shift_entry(&writer->schedule_queue,
struct journal_entry, fifo);
+ if (req->on_done_cb != NULL)
+ req->on_done_cb(req, req->on_done_cb_data);
fiber_wakeup(req->fiber);
}
writer->is_in_rollback = false;
@@ -1158,7 +1160,12 @@ wal_write(struct journal *journal, struct journal_entry *entry)
{
struct wal_writer *writer = (struct wal_writer *) journal;
- ERROR_INJECT_RETURN(ERRINJ_WAL_IO);
+ ERROR_INJECT(ERRINJ_WAL_IO, {
+ entry->res = -1;
+ if (entry->on_done_cb != NULL)
+ entry->on_done_cb(entry, entry->on_done_cb_data);
+ return -1;
+ });
if (writer->is_in_rollback) {
/*
@@ -1171,6 +1178,9 @@ wal_write(struct journal *journal, struct journal_entry *entry)
say_error("Aborting transaction %llu during "
"cascading rollback",
vclock_sum(&writer->vclock));
+ entry->res = -1;
+ if (entry->on_done_cb != NULL)
+ entry->on_done_cb(entry, entry->on_done_cb_data);
return -1;
}
@@ -1185,6 +1195,9 @@ wal_write(struct journal *journal, struct journal_entry *entry)
if (batch == NULL) {
diag_set(OutOfMemory, sizeof(struct wal_msg),
"region", "struct wal_msg");
+ entry->res = -1;
+ if (entry->on_done_cb != NULL)
+ entry->on_done_cb(entry, entry->on_done_cb_data);
return -1;
}
wal_msg_create(batch);
@@ -1222,7 +1235,10 @@ wal_write_in_wal_mode_none(struct journal *journal,
entry->rows + entry->n_rows);
vclock_merge(&writer->vclock, &vclock_diff);
vclock_copy(&replicaset.vclock, &writer->vclock);
- return vclock_sum(&writer->vclock);
+ entry->res = vclock_sum(&writer->vclock);
+ if (entry->on_done_cb)
+ entry->on_done_cb(entry, entry->on_done_cb_data);
+ return entry->res;
}
void
--
2.22.0
^ permalink raw reply [flat|nested] 37+ messages in thread
* [tarantool-patches] Re: [PATCH v4 6/9] wal: introduce a journal entry finalization callback
2019-06-19 21:23 ` [tarantool-patches] [PATCH v4 6/9] wal: introduce a journal entry finalization callback Georgy Kirichenko
@ 2019-06-20 7:56 ` Konstantin Osipov
2019-06-20 14:08 ` [tarantool-patches] " Vladimir Davydov
1 sibling, 0 replies; 37+ messages in thread
From: Konstantin Osipov @ 2019-06-20 7:56 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
* Georgy Kirichenko <georgy@tarantool.org> [19/06/20 09:54]:
> + if (entry->on_done_cb)
> + entry->on_done_cb(entry, entry->on_done_cb_data);
A single callback is enough for today perhaps, but generally this
looks exactly like a single trigger for me.
Could we perhaps allocate a journal entry right in txn_new(), and
move txn commit/rollback trigger objects to the journal entry
object?
--
Konstantin Osipov, Moscow, Russia
^ permalink raw reply [flat|nested] 37+ messages in thread
* Re: [tarantool-patches] [PATCH v4 6/9] wal: introduce a journal entry finalization callback
2019-06-19 21:23 ` [tarantool-patches] [PATCH v4 6/9] wal: introduce a journal entry finalization callback Georgy Kirichenko
2019-06-20 7:56 ` [tarantool-patches] " Konstantin Osipov
@ 2019-06-20 14:08 ` Vladimir Davydov
2019-06-20 20:22 ` Георгий Кириченко
1 sibling, 1 reply; 37+ messages in thread
From: Vladimir Davydov @ 2019-06-20 14:08 UTC (permalink / raw)
To: Georgy Kirichenko; +Cc: tarantool-patches
On Thu, Jun 20, 2019 at 12:23:13AM +0300, Georgy Kirichenko wrote:
> Finalize a transaction thorough a journal entry callback. So transaction
> processing doesn't rely on fiber schedule. This also enforce transaction
> finalization order as triggers might fail.
>
> Prerequisites: #1254
> ---
> src/box/alter.cc | 5 +++
> src/box/box.cc | 7 ++-
> src/box/journal.c | 10 ++++-
> src/box/journal.h | 12 ++++-
> src/box/txn.c | 112 ++++++++++++++++++++++++++++++----------------
> src/box/vy_log.c | 3 +-
> src/box/wal.c | 20 ++++++++-
> 7 files changed, 122 insertions(+), 47 deletions(-)
In general, looks fine to me. A few minor comments below.
>
> diff --git a/src/box/alter.cc b/src/box/alter.cc
> index a37a68ce4..aa6a79264 100644
> --- a/src/box/alter.cc
> +++ b/src/box/alter.cc
> @@ -3558,6 +3558,11 @@ unlock_after_dd(struct trigger *trigger, void *event)
> {
> (void) trigger;
> (void) event;
> + /*
> + * A trigger could be processed by the wal scheduler fiber
> + * so steal the latch first.
> + */
Not "could be", but "is processed", I guess.
> + latch_steal(&schema_lock);
> latch_unlock(&schema_lock);
> }
>
> diff --git a/src/box/box.cc b/src/box/box.cc
> index 5e5cd2b08..2994363ab 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -309,10 +309,13 @@ struct recovery_journal {
> */
> static int64_t
> recovery_journal_write(struct journal *base,
> - struct journal_entry * /* entry */)
> + struct journal_entry *entry)
> {
> struct recovery_journal *journal = (struct recovery_journal *) base;
> - return vclock_sum(journal->vclock);
> + entry->res = vclock_sum(journal->vclock);
> + if (entry->on_done_cb)
> + entry->on_done_cb(entry, entry->on_done_cb_data);
This can't be - on_done_cb must always be set.
Note, although vy_log uses journal_entry, they don't go through this
path.
> + return entry->res;
Shouldn't it return 0? BTW it looks like you need to update the comment
to journal_write().
> }
>
> static inline void
> diff --git a/src/box/journal.c b/src/box/journal.c
> index fe13fb6ee..eb0db9af2 100644
> --- a/src/box/journal.c
> +++ b/src/box/journal.c
> @@ -41,7 +41,9 @@ static int64_t
> dummy_journal_write(struct journal *journal, struct journal_entry *entry)
> {
> (void) journal;
> - (void) entry;
> + entry->res = 0;
> + if (entry->on_done_cb)
> + entry->on_done_cb(entry, entry->on_done_cb_data);
> return 0;
> }
>
> @@ -53,7 +55,9 @@ static struct journal dummy_journal = {
> struct journal *current_journal = &dummy_journal;
>
> struct journal_entry *
> -journal_entry_new(size_t n_rows, struct region *region)
> +journal_entry_new(size_t n_rows, struct region *region,
> + void (*on_done_cb)(struct journal_entry *entry, void *data),
> + void *on_done_cb_data)
> {
> struct journal_entry *entry;
>
> @@ -70,6 +74,8 @@ journal_entry_new(size_t n_rows, struct region *region)
> entry->n_rows = n_rows;
> entry->res = -1;
> entry->fiber = fiber();
> + entry->on_done_cb = on_done_cb;
> + entry->on_done_cb_data = on_done_cb_data;
> return entry;
> }
>
> diff --git a/src/box/journal.h b/src/box/journal.h
> index 8ac32ee5e..b704b5c67 100644
> --- a/src/box/journal.h
> +++ b/src/box/journal.h
> @@ -58,6 +58,14 @@ struct journal_entry {
> * The fiber issuing the request.
> */
> struct fiber *fiber;
> + /**
> + * A journal entry completion callback.
> + */
I think we should elaborate the comment. When is this callback called?
Is it called on failure? Is it called from the same fiber or some other?
> + void (*on_done_cb)(struct journal_entry *entry, void *data);
Let's please add a typedef for on_done_cb - you use its signature pretty
often.
> + /**
> + * A journal entry completion callback argument.
> + */
> + void *on_done_cb_data;
> /**
> * Approximate size of this request when encoded.
> */
> @@ -80,7 +88,9 @@ struct region;
> * @return NULL if out of memory, fiber diagnostics area is set
> */
> struct journal_entry *
> -journal_entry_new(size_t n_rows, struct region *region);
> +journal_entry_new(size_t n_rows, struct region *region,
> + void (*on_done_cb)(struct journal_entry *entry, void *data),
> + void *on_done_cb_data);
>
> /**
> * An API for an abstract journal for all transactions of this
> diff --git a/src/box/txn.c b/src/box/txn.c
> index 21f7e98b4..52e16f3e6 100644
> --- a/src/box/txn.c
> +++ b/src/box/txn.c
> @@ -337,6 +337,66 @@ fail:
> return -1;
> }
>
> +/**
> + * Complete transaction processing.
> + */
> +static void
> +txn_complete(struct txn *txn)
> +{
> + if (txn->signature < 0) {
> + if (txn->engine)
> + engine_rollback(txn->engine, txn);
> + /*
> + * Some of triggers require for in_txn variable is set so
> + * restore it for time a trigger is in progress.
> + */
> + fiber_set_txn(fiber(), txn);
> + /* Rollback triggers must not throw. */
> + if (txn->has_triggers &&
> + trigger_run(&txn->on_rollback, txn) != 0) {
> + diag_log();
> + unreachable();
> + panic("rollback trigger failed");
> + }
> + fiber_set_txn(fiber(), NULL);
> +
> + return;
> + }
> + /*
> + * Engine can be NULL if transaction contains IPROTO_NOP
> + * statements only.
> + */
> + if (txn->engine != NULL)
> + engine_commit(txn->engine, txn);
> + /*
> + * Some of triggers require for in_txn variable is set so
> + * restore it for time a trigger is in progress.
> + */
> + fiber_set_txn(fiber(), txn);
copy-and-paste... May be, add a helper function for running triggers?
> + /*
> + * The transaction is in the binary log. No action below
> + * may throw. In case an error has happened, there is
> + * no other option but terminate.
> + */
> + if (txn->has_triggers &&
> + trigger_run(&txn->on_commit, txn) != 0) {
> + diag_log();
> + unreachable();
> + panic("commit trigger failed");
> + }
> +
> + fiber_set_txn(fiber(), NULL);
> +}
> +
> +static void
> +txn_entry_done_cb(struct journal_entry *entry, void *data)
> +{
> + struct txn *txn = (struct txn *)data;
> + txn->signature = entry->res;
> + txn_complete(txn);
> +}
> +
> +
> static int64_t
> txn_write_to_wal(struct txn *txn)
> {
> @@ -418,31 +472,20 @@ txn_commit(struct txn *txn)
> }
> trigger_clear(&txn->fiber_on_stop);
>
> + fiber_set_txn(fiber(), NULL);
> if (txn->n_new_rows + txn->n_applier_rows > 0) {
> txn->signature = txn_write_to_wal(txn);
> if (txn->signature < 0)
> return -1;
> + } else {
> + /*
> + * However there is noting to write to wal a completion
s/noting/nothing
punctuation marks missing
> + * should be fired.
> + */
> + txn->signature = 0;
> + txn_complete(txn);
> }
> - /*
> - * Engine can be NULL if transaction contains IPROTO_NOP
> - * statements only.
> - */
> - if (txn->engine != NULL)
> - engine_commit(txn->engine, txn);
> - /*
> - * The transaction is in the binary log. No action below
> - * may throw. In case an error has happened, there is
> - * no other option but terminate.
> - */
> - if (txn->has_triggers &&
> - trigger_run(&txn->on_commit, txn) != 0) {
> - diag_log();
> - unreachable();
> - panic("commit trigger failed");
> - }
> -
>
> - fiber_set_txn(fiber(), NULL);
> txn_free(txn);
> return 0;
> fail:
> diff --git a/src/box/wal.c b/src/box/wal.c
> index 71f6dbb5c..62b6391fd 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -263,6 +263,8 @@ tx_schedule_f(va_list ap)
> struct journal_entry *req =
> stailq_shift_entry(&writer->schedule_queue,
> struct journal_entry, fifo);
> + if (req->on_done_cb != NULL)
> + req->on_done_cb(req, req->on_done_cb_data);
> fiber_wakeup(req->fiber);
> }
> writer->is_in_rollback = false;
> @@ -1158,7 +1160,12 @@ wal_write(struct journal *journal, struct journal_entry *entry)
> {
> struct wal_writer *writer = (struct wal_writer *) journal;
>
> - ERROR_INJECT_RETURN(ERRINJ_WAL_IO);
> + ERROR_INJECT(ERRINJ_WAL_IO, {
> + entry->res = -1;
> + if (entry->on_done_cb != NULL)
> + entry->on_done_cb(entry, entry->on_done_cb_data);
> + return -1;
> + });
>
> if (writer->is_in_rollback) {
> /*
> @@ -1171,6 +1178,9 @@ wal_write(struct journal *journal, struct journal_entry *entry)
> say_error("Aborting transaction %llu during "
> "cascading rollback",
> vclock_sum(&writer->vclock));
> + entry->res = -1;
> + if (entry->on_done_cb != NULL)
> + entry->on_done_cb(entry, entry->on_done_cb_data);
Could you please add 'goto fail' so as not to duplicate code.
I would also add a helper function journal_entry_complete() that would
call the callback - would look neater that way IMO.
> return -1;
> }
>
> @@ -1185,6 +1195,9 @@ wal_write(struct journal *journal, struct journal_entry *entry)
> if (batch == NULL) {
> diag_set(OutOfMemory, sizeof(struct wal_msg),
> "region", "struct wal_msg");
> + entry->res = -1;
> + if (entry->on_done_cb != NULL)
> + entry->on_done_cb(entry, entry->on_done_cb_data);
> return -1;
> }
> wal_msg_create(batch);
> @@ -1222,7 +1235,10 @@ wal_write_in_wal_mode_none(struct journal *journal,
> entry->rows + entry->n_rows);
> vclock_merge(&writer->vclock, &vclock_diff);
> vclock_copy(&replicaset.vclock, &writer->vclock);
> - return vclock_sum(&writer->vclock);
> + entry->res = vclock_sum(&writer->vclock);
> + if (entry->on_done_cb)
> + entry->on_done_cb(entry, entry->on_done_cb_data);
> + return entry->res;
> }
^ permalink raw reply [flat|nested] 37+ messages in thread
* Re: [tarantool-patches] [PATCH v4 6/9] wal: introduce a journal entry finalization callback
2019-06-20 14:08 ` [tarantool-patches] " Vladimir Davydov
@ 2019-06-20 20:22 ` Георгий Кириченко
2019-06-21 7:26 ` [tarantool-patches] " Konstantin Osipov
0 siblings, 1 reply; 37+ messages in thread
From: Георгий Кириченко @ 2019-06-20 20:22 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: tarantool-patches
[-- Attachment #1: Type: text/plain, Size: 10828 bytes --]
On Thursday, June 20, 2019 5:08:48 PM MSK Vladimir Davydov wrote:
> On Thu, Jun 20, 2019 at 12:23:13AM +0300, Georgy Kirichenko wrote:
> > Finalize a transaction thorough a journal entry callback. So transaction
> > processing doesn't rely on fiber schedule. This also enforce transaction
> > finalization order as triggers might fail.
> >
> > Prerequisites: #1254
> > ---
> >
> > src/box/alter.cc | 5 +++
> > src/box/box.cc | 7 ++-
> > src/box/journal.c | 10 ++++-
> > src/box/journal.h | 12 ++++-
> > src/box/txn.c | 112 ++++++++++++++++++++++++++++++----------------
> > src/box/vy_log.c | 3 +-
> > src/box/wal.c | 20 ++++++++-
> > 7 files changed, 122 insertions(+), 47 deletions(-)
>
> In general, looks fine to me. A few minor comments below.
>
> > diff --git a/src/box/alter.cc b/src/box/alter.cc
> > index a37a68ce4..aa6a79264 100644
> > --- a/src/box/alter.cc
> > +++ b/src/box/alter.cc
> > @@ -3558,6 +3558,11 @@ unlock_after_dd(struct trigger *trigger, void
> > *event)>
> > {
> >
> > (void) trigger;
> > (void) event;
> >
> > + /*
> > + * A trigger could be processed by the wal scheduler fiber
> > + * so steal the latch first.
> > + */
>
> Not "could be", but "is processed", I guess.
There are two cases: for non-yielding journal (wal none or recovery) it would
be the same fiber but for yielding one - this would be a tx_prio callback.
>
> > + latch_steal(&schema_lock);
> >
> > latch_unlock(&schema_lock);
> >
> > }
> >
> > diff --git a/src/box/box.cc b/src/box/box.cc
> > index 5e5cd2b08..2994363ab 100644
> > --- a/src/box/box.cc
> > +++ b/src/box/box.cc
> > @@ -309,10 +309,13 @@ struct recovery_journal {
> >
> > */
> >
> > static int64_t
> > recovery_journal_write(struct journal *base,
> >
> > - struct journal_entry * /* entry */)
> > + struct journal_entry *entry)
> >
> > {
> >
> > struct recovery_journal *journal = (struct recovery_journal *) base;
> >
> > - return vclock_sum(journal->vclock);
> > + entry->res = vclock_sum(journal->vclock);
> > + if (entry->on_done_cb)
> > + entry->on_done_cb(entry, entry->on_done_cb_data);
>
> This can't be - on_done_cb must always be set.
>
> Note, although vy_log uses journal_entry, they don't go through this
> path.
Ok, so I will add an assert instead of the condition.
>
> > + return entry->res;
>
> Shouldn't it return 0? BTW it looks like you need to update the comment
> to journal_write().
Accepted
>
> > }
> >
> > static inline void
> >
> > diff --git a/src/box/journal.c b/src/box/journal.c
> > index fe13fb6ee..eb0db9af2 100644
> > --- a/src/box/journal.c
> > +++ b/src/box/journal.c
> > @@ -41,7 +41,9 @@ static int64_t
> >
> > dummy_journal_write(struct journal *journal, struct journal_entry *entry)
> > {
> >
> > (void) journal;
> >
> > - (void) entry;
> > + entry->res = 0;
> > + if (entry->on_done_cb)
> > + entry->on_done_cb(entry, entry->on_done_cb_data);
> >
> > return 0;
> >
> > }
> >
> > @@ -53,7 +55,9 @@ static struct journal dummy_journal = {
> >
> > struct journal *current_journal = &dummy_journal;
> >
> > struct journal_entry *
> >
> > -journal_entry_new(size_t n_rows, struct region *region)
> > +journal_entry_new(size_t n_rows, struct region *region,
> > + void (*on_done_cb)(struct journal_entry *entry, void *data),
> > + void *on_done_cb_data)
> >
> > {
> >
> > struct journal_entry *entry;
> >
> > @@ -70,6 +74,8 @@ journal_entry_new(size_t n_rows, struct region *region)
> >
> > entry->n_rows = n_rows;
> > entry->res = -1;
> > entry->fiber = fiber();
> >
> > + entry->on_done_cb = on_done_cb;
> > + entry->on_done_cb_data = on_done_cb_data;
> >
> > return entry;
> >
> > }
> >
> > diff --git a/src/box/journal.h b/src/box/journal.h
> > index 8ac32ee5e..b704b5c67 100644
> > --- a/src/box/journal.h
> > +++ b/src/box/journal.h
> > @@ -58,6 +58,14 @@ struct journal_entry {
> >
> > * The fiber issuing the request.
> > */
> >
> > struct fiber *fiber;
> >
> > + /**
> > + * A journal entry completion callback.
> > + */
>
> I think we should elaborate the comment. When is this callback called?
> Is it called on failure? Is it called from the same fiber or some other?
Ok
>
> > + void (*on_done_cb)(struct journal_entry *entry, void *data);
>
> Let's please add a typedef for on_done_cb - you use its signature pretty
> often.
Ok
>
> > + /**
> > + * A journal entry completion callback argument.
> > + */
> > + void *on_done_cb_data;
> >
> > /**
> >
> > * Approximate size of this request when encoded.
> > */
> >
> > @@ -80,7 +88,9 @@ struct region;
> >
> > * @return NULL if out of memory, fiber diagnostics area is set
> > */
> >
> > struct journal_entry *
> >
> > -journal_entry_new(size_t n_rows, struct region *region);
> > +journal_entry_new(size_t n_rows, struct region *region,
> > + void (*on_done_cb)(struct journal_entry *entry, void *data),
> > + void *on_done_cb_data);
> >
> > /**
> >
> > * An API for an abstract journal for all transactions of this
> >
> > diff --git a/src/box/txn.c b/src/box/txn.c
> > index 21f7e98b4..52e16f3e6 100644
> > --- a/src/box/txn.c
> > +++ b/src/box/txn.c
> >
> > @@ -337,6 +337,66 @@ fail:
> > return -1;
> >
> > }
> >
> > +/**
> > + * Complete transaction processing.
> > + */
> > +static void
> > +txn_complete(struct txn *txn)
> > +{
> > + if (txn->signature < 0) {
> > + if (txn->engine)
> > + engine_rollback(txn->engine, txn);
> > + /*
> > + * Some of triggers require for in_txn variable is set so
> > + * restore it for time a trigger is in progress.
> > + */
> > + fiber_set_txn(fiber(), txn);
> > + /* Rollback triggers must not throw. */
> > + if (txn->has_triggers &&
> > + trigger_run(&txn->on_rollback, txn) != 0) {
> > + diag_log();
> > + unreachable();
> > + panic("rollback trigger failed");
> > + }
> > + fiber_set_txn(fiber(), NULL);
> > +
> > + return;
> > + }
> > + /*
> > + * Engine can be NULL if transaction contains IPROTO_NOP
> > + * statements only.
> > + */
> > + if (txn->engine != NULL)
> > + engine_commit(txn->engine, txn);
> > + /*
> > + * Some of triggers require for in_txn variable is set so
> > + * restore it for time a trigger is in progress.
> > + */
> > + fiber_set_txn(fiber(), txn);
>
> copy-and-paste... May be, add a helper function for running triggers?
Accepted
>
> > + /*
> > + * The transaction is in the binary log. No action below
> > + * may throw. In case an error has happened, there is
> > + * no other option but terminate.
> > + */
> > + if (txn->has_triggers &&
> > + trigger_run(&txn->on_commit, txn) != 0) {
> > + diag_log();
> > + unreachable();
> > + panic("commit trigger failed");
> > + }
> > +
> > + fiber_set_txn(fiber(), NULL);
> > +}
> > +
> > +static void
> > +txn_entry_done_cb(struct journal_entry *entry, void *data)
> > +{
> > + struct txn *txn = (struct txn *)data;
> > + txn->signature = entry->res;
> > + txn_complete(txn);
> > +}
> > +
> > +
> >
> > static int64_t
> > txn_write_to_wal(struct txn *txn)
> > {
> >
> > @@ -418,31 +472,20 @@ txn_commit(struct txn *txn)
> >
> > }
> > trigger_clear(&txn->fiber_on_stop);
> >
> > + fiber_set_txn(fiber(), NULL);
> >
> > if (txn->n_new_rows + txn->n_applier_rows > 0) {
> >
> > txn->signature = txn_write_to_wal(txn);
> > if (txn->signature < 0)
> >
> > return -1;
> >
> > + } else {
> > + /*
> > + * However there is noting to write to wal a completion
>
> s/noting/nothing
>
> punctuation marks missing
>
> > + * should be fired.
> > + */
> > + txn->signature = 0;
> > + txn_complete(txn);
> >
> > }
> >
> > - /*
> > - * Engine can be NULL if transaction contains IPROTO_NOP
> > - * statements only.
> > - */
> > - if (txn->engine != NULL)
> > - engine_commit(txn->engine, txn);
> > - /*
> > - * The transaction is in the binary log. No action below
> > - * may throw. In case an error has happened, there is
> > - * no other option but terminate.
> > - */
> > - if (txn->has_triggers &&
> > - trigger_run(&txn->on_commit, txn) != 0) {
> > - diag_log();
> > - unreachable();
> > - panic("commit trigger failed");
> > - }
> > -
> >
> > - fiber_set_txn(fiber(), NULL);
> >
> > txn_free(txn);
> > return 0;
> >
> > fail:
> > diff --git a/src/box/wal.c b/src/box/wal.c
> > index 71f6dbb5c..62b6391fd 100644
> > --- a/src/box/wal.c
> > +++ b/src/box/wal.c
> > @@ -263,6 +263,8 @@ tx_schedule_f(va_list ap)
> >
> > struct journal_entry *req =
> >
> > stailq_shift_entry(&writer->schedule_queue,
> >
> > struct journal_entry, fifo);
> >
> > + if (req->on_done_cb != NULL)
> > + req->on_done_cb(req, req->on_done_cb_data);
> >
> > fiber_wakeup(req->fiber);
> >
> > }
> > writer->is_in_rollback = false;
> >
> > @@ -1158,7 +1160,12 @@ wal_write(struct journal *journal, struct
> > journal_entry *entry)>
> > {
> >
> > struct wal_writer *writer = (struct wal_writer *) journal;
> >
> > - ERROR_INJECT_RETURN(ERRINJ_WAL_IO);
> > + ERROR_INJECT(ERRINJ_WAL_IO, {
> > + entry->res = -1;
> > + if (entry->on_done_cb != NULL)
> > + entry->on_done_cb(entry, entry->on_done_cb_data);
> > + return -1;
> > + });
> >
> > if (writer->is_in_rollback) {
> >
> > /*
> >
> > @@ -1171,6 +1178,9 @@ wal_write(struct journal *journal, struct
> > journal_entry *entry)>
> > say_error("Aborting transaction %llu during "
> >
> > "cascading rollback",
> > vclock_sum(&writer->vclock));
> >
> > + entry->res = -1;
> > + if (entry->on_done_cb != NULL)
> > + entry->on_done_cb(entry, entry->on_done_cb_data);
>
> Could you please add 'goto fail' so as not to duplicate code.
>
> I would also add a helper function journal_entry_complete() that would
> call the callback - would look neater that way IMO.
Accepted
>
> > return -1;
> >
> > }
> >
> > @@ -1185,6 +1195,9 @@ wal_write(struct journal *journal, struct
> > journal_entry *entry)>
> > if (batch == NULL) {
> >
> > diag_set(OutOfMemory, sizeof(struct wal_msg),
> >
> > "region", "struct wal_msg");
> >
> > + entry->res = -1;
> > + if (entry->on_done_cb != NULL)
> > + entry->on_done_cb(entry, entry->on_done_cb_data);
> >
> > return -1;
> >
> > }
> > wal_msg_create(batch);
> >
> > @@ -1222,7 +1235,10 @@ wal_write_in_wal_mode_none(struct journal *journal,
> >
> > entry->rows + entry->n_rows);
> >
> > vclock_merge(&writer->vclock, &vclock_diff);
> > vclock_copy(&replicaset.vclock, &writer->vclock);
> >
> > - return vclock_sum(&writer->vclock);
> > + entry->res = vclock_sum(&writer->vclock);
> > + if (entry->on_done_cb)
> > + entry->on_done_cb(entry, entry->on_done_cb_data);
> > + return entry->res;
> >
> > }
[-- Attachment #2: This is a digitally signed message part. --]
[-- Type: application/pgp-signature, Size: 488 bytes --]
^ permalink raw reply [flat|nested] 37+ messages in thread
* Re: [tarantool-patches] Re: [PATCH v4 6/9] wal: introduce a journal entry finalization callback
2019-06-20 20:22 ` Георгий Кириченко
@ 2019-06-21 7:26 ` Konstantin Osipov
0 siblings, 0 replies; 37+ messages in thread
From: Konstantin Osipov @ 2019-06-21 7:26 UTC (permalink / raw)
To: tarantool-patches; +Cc: Vladimir Davydov
* Георгий Кириченко <georgy@tarantool.org> [19/06/20 23:23]:
> > > + /*
> > > + * A trigger could be processed by the wal scheduler fiber
> > > + * so steal the latch first.
> > > + */
> >
> > Not "could be", but "is processed", I guess.
> There are two cases: for non-yielding journal (wal none or recovery) it would
> be the same fiber but for yielding one - this would be a tx_prio callback.
I think it's OK to mention both in the comment.
--
Konstantin Osipov, Moscow, Russia
^ permalink raw reply [flat|nested] 37+ messages in thread
* [tarantool-patches] [PATCH v4 7/9] txn: introduce asynchronous txn commit
2019-06-19 21:23 [tarantool-patches] [PATCH v4 0/9] Parallel applier Georgy Kirichenko
` (5 preceding siblings ...)
2019-06-19 21:23 ` [tarantool-patches] [PATCH v4 6/9] wal: introduce a journal entry finalization callback Georgy Kirichenko
@ 2019-06-19 21:23 ` Georgy Kirichenko
2019-06-20 8:01 ` [tarantool-patches] " Konstantin Osipov
2019-06-20 15:00 ` [tarantool-patches] " Vladimir Davydov
2019-06-19 21:23 ` [tarantool-patches] [PATCH v4 8/9] applier: apply transaction in parallel Georgy Kirichenko
2019-06-19 21:23 ` [tarantool-patches] [PATCH v4 9/9] test: fix flaky test Georgy Kirichenko
8 siblings, 2 replies; 37+ messages in thread
From: Georgy Kirichenko @ 2019-06-19 21:23 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
This commit implements asynchronous transaction processing using
txn_write. The method prepares a transaction and sends it to an journal
without an yield until the transaction was finished. The transaction
status could be controlled via on_commit/on_rollback triggers.
In order to support asynchronous transaction journal_write method turned
to an asynchronous one and now a transaction engine controls journal status
using journal entry finalization callback.
Prerequisites: #1254
---
src/box/journal.c | 2 -
src/box/journal.h | 10 +---
src/box/txn.c | 137 ++++++++++++++++++++++++++++++----------------
src/box/txn.h | 16 ++++++
src/box/wal.c | 23 +++-----
5 files changed, 116 insertions(+), 72 deletions(-)
diff --git a/src/box/journal.c b/src/box/journal.c
index eb0db9af2..b4f3515f0 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -30,7 +30,6 @@
*/
#include "journal.h"
#include <small/region.h>
-#include <fiber.h>
#include <diag.h>
/**
@@ -73,7 +72,6 @@ journal_entry_new(size_t n_rows, struct region *region,
entry->approx_len = 0;
entry->n_rows = n_rows;
entry->res = -1;
- entry->fiber = fiber();
entry->on_done_cb = on_done_cb;
entry->on_done_cb_data = on_done_cb_data;
return entry;
diff --git a/src/box/journal.h b/src/box/journal.h
index b704b5c67..e85ff2c9e 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -33,6 +33,7 @@
#include <stdint.h>
#include <stdbool.h>
#include "salad/stailq.h"
+#include "fiber.h"
#if defined(__cplusplus)
extern "C" {
@@ -54,10 +55,6 @@ struct journal_entry {
* the committed transaction, on error is -1
*/
int64_t res;
- /**
- * The fiber issuing the request.
- */
- struct fiber *fiber;
/**
* A journal entry completion callback.
*/
@@ -110,10 +107,9 @@ struct journal {
extern struct journal *current_journal;
/**
- * Record a single entry.
+ * Send a single entry to write.
*
- * @return a log sequence number (vclock signature) of the entry
- * or -1 on error.
+ * @return 0 if write was scheduled or -1 in case of an error.
*/
static inline int64_t
journal_write(struct journal_entry *entry)
diff --git a/src/box/txn.c b/src/box/txn.c
index 52e16f3e6..493bc2e3c 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -199,6 +199,9 @@ txn_begin()
txn->engine = NULL;
txn->engine_tx = NULL;
txn->psql_txn = NULL;
+ txn->entry = NULL;
+ txn->fiber = NULL;
+ txn->done = false;
/* fiber_on_yield/fiber_on_stop initialized by engine on demand */
fiber_set_txn(fiber(), txn);
trigger_create(&txn->fiber_on_stop, txn_on_stop, NULL, NULL);
@@ -359,7 +362,11 @@ txn_complete(struct txn *txn)
panic("rollback trigger failed");
}
fiber_set_txn(fiber(), NULL);
-
+ txn->done = true;
+ if (txn->fiber == NULL)
+ txn_free(txn);
+ else if (txn->fiber != fiber())
+ fiber_wakeup(txn->fiber);
return;
}
/*
@@ -368,6 +375,15 @@ txn_complete(struct txn *txn)
*/
if (txn->engine != NULL)
engine_commit(txn->engine, txn);
+
+ ev_tstamp stop_tm = ev_monotonic_now(loop());
+ if (stop_tm - txn->start_tm > too_long_threshold) {
+ int n_rows = txn->n_new_rows + txn->n_applier_rows;
+ say_warn_ratelimited("too long WAL write: %d rows at "
+ "LSN %lld: %.3f sec", n_rows,
+ txn->signature - n_rows + 1,
+ stop_tm - txn->start_tm);
+ }
/*
* Some of triggers require for in_txn variable is set so
* restore it for time a trigger is in progress.
@@ -378,6 +394,7 @@ txn_complete(struct txn *txn)
* may throw. In case an error has happened, there is
* no other option but terminate.
*/
+ fiber_set_txn(fiber(), txn);
if (txn->has_triggers &&
trigger_run(&txn->on_commit, txn) != 0) {
diag_log();
@@ -386,35 +403,41 @@ txn_complete(struct txn *txn)
}
fiber_set_txn(fiber(), NULL);
+ txn->done = true;
+ if (txn->fiber == NULL)
+ txn_free(txn);
+ else if (txn->fiber != fiber())
+ fiber_wakeup(txn->fiber);
}
static void
txn_entry_done_cb(struct journal_entry *entry, void *data)
{
struct txn *txn = (struct txn *)data;
+ assert(txn->entry == entry);
txn->signature = entry->res;
txn_complete(txn);
}
-
static int64_t
-txn_write_to_wal(struct txn *txn)
+txn_journal_write(struct txn *txn)
{
+ assert(txn->entry == NULL);
assert(txn->n_new_rows + txn->n_applier_rows > 0);
- struct journal_entry *req = journal_entry_new(txn->n_new_rows +
- txn->n_applier_rows,
- &txn->region,
- txn_entry_done_cb,
- txn);
- if (req == NULL) {
+ /* Prepare a journal entry. */
+ txn->entry = journal_entry_new(txn->n_new_rows +
+ txn->n_applier_rows,
+ &txn->region,
+ txn_entry_done_cb, txn);
+ if (txn->entry == NULL) {
txn_rollback(txn);
return -1;
}
struct txn_stmt *stmt;
- struct xrow_header **remote_row = req->rows;
- struct xrow_header **local_row = req->rows + txn->n_applier_rows;
+ struct xrow_header **remote_row = txn->entry->rows;
+ struct xrow_header **local_row = txn->entry->rows + txn->n_applier_rows;
stailq_foreach_entry(stmt, &txn->stmts, next) {
if (stmt->row == NULL)
continue; /* A read (e.g. select) request */
@@ -422,34 +445,25 @@ txn_write_to_wal(struct txn *txn)
*local_row++ = stmt->row;
else
*remote_row++ = stmt->row;
- req->approx_len += xrow_approx_len(stmt->row);
+ txn->entry->approx_len += xrow_approx_len(stmt->row);
}
- assert(remote_row == req->rows + txn->n_applier_rows);
+ assert(remote_row == txn->entry->rows + txn->n_applier_rows);
assert(local_row == remote_row + txn->n_new_rows);
- ev_tstamp start = ev_monotonic_now(loop());
- int64_t res = journal_write(req);
- ev_tstamp stop = ev_monotonic_now(loop());
-
- if (res < 0) {
+ /* Send entry to a journal. */
+ if (journal_write(txn->entry) < 0) {
diag_set(ClientError, ER_WAL_IO);
- diag_log();
- } else if (stop - start > too_long_threshold) {
- int n_rows = txn->n_new_rows + txn->n_applier_rows;
- say_warn_ratelimited("too long WAL write: %d rows at "
- "LSN %lld: %.3f sec", n_rows,
- res - n_rows + 1, stop - start);
+ return -1;
}
- /*
- * Use vclock_sum() from WAL writer as transaction signature.
- */
- return res;
+ return 0;
}
-int
-txn_commit(struct txn *txn)
+/*
+ * Prepare a transaction using engines.
+ */
+static int
+txn_prepare(struct txn *txn)
{
- assert(txn == in_txn());
/*
* If transaction has been started in SQL, deferred
* foreign key constraints must not be violated.
@@ -459,7 +473,7 @@ txn_commit(struct txn *txn)
struct sql_txn *sql_txn = txn->psql_txn;
if (sql_txn->fk_deferred_count != 0) {
diag_set(ClientError, ER_FOREIGN_KEY_CONSTRAINT);
- goto fail;
+ return -1;
}
}
/*
@@ -467,32 +481,63 @@ txn_commit(struct txn *txn)
* we have a bunch of IPROTO_NOP statements.
*/
if (txn->engine != NULL) {
- if (engine_prepare(txn->engine, txn) != 0)
- goto fail;
+ if (engine_prepare(txn->engine, txn) != 0) {
+ return -1;
+ }
}
trigger_clear(&txn->fiber_on_stop);
+ return 0;
+}
- fiber_set_txn(fiber(), NULL);
- if (txn->n_new_rows + txn->n_applier_rows > 0) {
- txn->signature = txn_write_to_wal(txn);
- if (txn->signature < 0)
- return -1;
- } else {
- /*
- * However there is noting to write to wal a completion
- * should be fired.
- */
+/*
+ * Send a transaction to a journal.
+ */
+int
+txn_write(struct txn *txn)
+{
+ if (txn_prepare(txn) != 0)
+ goto fail;
+
+ txn->start_tm = ev_monotonic_now(loop());
+ if (txn->n_new_rows + txn->n_applier_rows == 0) {
+ /* Nothing to do. */
txn->signature = 0;
txn_complete(txn);
+ return 0;
}
- txn_free(txn);
+ if (txn_journal_write(txn) != 0)
+ return -1;
+ fiber_set_txn(fiber(), NULL);
return 0;
fail:
txn_rollback(txn);
return -1;
}
+int
+txn_commit(struct txn *txn)
+{
+ txn->fiber = fiber();
+
+ if (txn_write(txn) != 0)
+ return -1;
+ /*
+ * In case of non-yielding journal the transaction could already
+ * be done and there is nothing to wait in such cases.
+ */
+ if (!txn->done) {
+ bool cancellable = fiber_set_cancellable(false);
+ fiber_yield();
+ fiber_set_cancellable(cancellable);
+ }
+ int res = txn->signature >= 0? 0: -1;
+ if (res != 0)
+ diag_set(ClientError, ER_WAL_IO);
+ txn_free(txn);
+ return res;
+}
+
void
txn_rollback_stmt(struct txn *txn)
{
@@ -505,11 +550,9 @@ txn_rollback_stmt(struct txn *txn)
void
txn_rollback(struct txn *txn)
{
- assert(txn == in_txn());
trigger_clear(&txn->fiber_on_stop);
txn->signature = -1;
txn_complete(txn);
- txn_free(txn);
}
void
diff --git a/src/box/txn.h b/src/box/txn.h
index 569978ce9..bd6b695a9 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -195,6 +195,16 @@ struct txn {
/** Commit and rollback triggers */
struct rlist on_commit, on_rollback;
struct sql_txn *psql_txn;
+ /** Journal entry to control txn write. */
+ struct journal_entry *entry;
+ /** Transaction completion trigger. */
+ struct trigger entry_done;
+ /** Timestampt of entry write start. */
+ ev_tstamp start_tm;
+ /* A fiber to wake up when transaction is finished. */
+ struct fiber *fiber;
+ /* True when transaction is processed. */
+ bool done;
};
/* Pointer to the current transaction (if any) */
@@ -228,6 +238,12 @@ txn_commit(struct txn *txn);
void
txn_rollback(struct txn *txn);
+int
+txn_write(struct txn *txn);
+
+int
+txn_wait(struct txn *txn);
+
/**
* Roll back the transaction but keep the object around.
* A special case for memtx transaction abort on yield. In this
diff --git a/src/box/wal.c b/src/box/wal.c
index 62b6391fd..582ae4598 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -265,7 +265,6 @@ tx_schedule_f(va_list ap)
struct journal_entry, fifo);
if (req->on_done_cb != NULL)
req->on_done_cb(req, req->on_done_cb_data);
- fiber_wakeup(req->fiber);
}
writer->is_in_rollback = false;
fiber_cond_wait(&writer->schedule_cond);
@@ -274,7 +273,7 @@ tx_schedule_f(va_list ap)
}
/**
- * Attach requests to a scheduling queue.
+ * Signal done condition.
*/
static void
tx_schedule_queue(struct stailq *queue)
@@ -380,7 +379,8 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
writer->wal_max_size = wal_max_size;
writer->is_in_rollback = false;
journal_create(&writer->base, wal_mode == WAL_NONE ?
- wal_write_in_wal_mode_none : wal_write, NULL);
+ wal_write_in_wal_mode_none : wal_write,
+ NULL);
struct xlog_opts opts = xlog_opts_default;
opts.sync_is_async = true;
@@ -1153,9 +1153,9 @@ wal_writer_f(va_list ap)
/**
* WAL writer main entry point: queue a single request
- * to be written to disk and wait until this task is completed.
+ * to be written to disk.
*/
-int64_t
+static int64_t
wal_write(struct journal *journal, struct journal_entry *entry)
{
struct wal_writer *writer = (struct wal_writer *) journal;
@@ -1212,19 +1212,10 @@ wal_write(struct journal *journal, struct journal_entry *entry)
batch->approx_len += entry->approx_len;
writer->wal_pipe.n_input += entry->n_rows * XROW_IOVMAX;
cpipe_flush_input(&writer->wal_pipe);
- /**
- * It's not safe to spuriously wakeup this fiber
- * since in that case it will ignore a possible
- * error from WAL writer and not roll back the
- * transaction.
- */
- bool cancellable = fiber_set_cancellable(false);
- fiber_yield(); /* Request was inserted. */
- fiber_set_cancellable(cancellable);
- return entry->res;
+ return 0;
}
-int64_t
+static int64_t
wal_write_in_wal_mode_none(struct journal *journal,
struct journal_entry *entry)
{
--
2.22.0
^ permalink raw reply [flat|nested] 37+ messages in thread
* [tarantool-patches] Re: [PATCH v4 7/9] txn: introduce asynchronous txn commit
2019-06-19 21:23 ` [tarantool-patches] [PATCH v4 7/9] txn: introduce asynchronous txn commit Georgy Kirichenko
@ 2019-06-20 8:01 ` Konstantin Osipov
2019-06-20 15:00 ` [tarantool-patches] " Vladimir Davydov
1 sibling, 0 replies; 37+ messages in thread
From: Konstantin Osipov @ 2019-06-20 8:01 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
* Georgy Kirichenko <georgy@tarantool.org> [19/06/20 09:54]:
> + txn->fiber = NULL;
> + txn->done = false;
> /* fiber_on_yield/fiber_on_stop initialized by engine on demand */
> fiber_set_txn(fiber(), txn);
> trigger_create(&txn->fiber_on_stop, txn_on_stop, NULL, NULL);
> @@ -359,7 +362,11 @@ txn_complete(struct txn *txn)
> panic("rollback trigger failed");
> }
> fiber_set_txn(fiber(), NULL);
> -
> + txn->done = true;
> + if (txn->fiber == NULL)
> + txn_free(txn);
> + else if (txn->fiber != fiber())
> + fiber_wakeup(txn->fiber);
I think if a journal entry allows multiple triggers, txn->fiber
could be moved to a trigger as well, so you don't need to extend
the fiber.
I like it that you only have a single trigger list - fewer lists
to initialize. Unfortunately there are already two lists - commit
and rollback - iin the txn
--
Konstantin Osipov, Moscow, Russia
^ permalink raw reply [flat|nested] 37+ messages in thread
* Re: [tarantool-patches] [PATCH v4 7/9] txn: introduce asynchronous txn commit
2019-06-19 21:23 ` [tarantool-patches] [PATCH v4 7/9] txn: introduce asynchronous txn commit Georgy Kirichenko
2019-06-20 8:01 ` [tarantool-patches] " Konstantin Osipov
@ 2019-06-20 15:00 ` Vladimir Davydov
2019-06-21 7:28 ` [tarantool-patches] " Konstantin Osipov
1 sibling, 1 reply; 37+ messages in thread
From: Vladimir Davydov @ 2019-06-20 15:00 UTC (permalink / raw)
To: Georgy Kirichenko; +Cc: tarantool-patches
On Thu, Jun 20, 2019 at 12:23:14AM +0300, Georgy Kirichenko wrote:
> This commit implements asynchronous transaction processing using
> txn_write. The method prepares a transaction and sends it to an journal
> without an yield until the transaction was finished. The transaction
> status could be controlled via on_commit/on_rollback triggers.
> In order to support asynchronous transaction journal_write method turned
> to an asynchronous one and now a transaction engine controls journal status
> using journal entry finalization callback.
>
> Prerequisites: #1254
> ---
> src/box/journal.c | 2 -
> src/box/journal.h | 10 +---
> src/box/txn.c | 137 ++++++++++++++++++++++++++++++----------------
> src/box/txn.h | 16 ++++++
> src/box/wal.c | 23 +++-----
> 5 files changed, 116 insertions(+), 72 deletions(-)
>
> diff --git a/src/box/journal.c b/src/box/journal.c
> index eb0db9af2..b4f3515f0 100644
> --- a/src/box/journal.c
> +++ b/src/box/journal.c
> @@ -30,7 +30,6 @@
> */
> #include "journal.h"
> #include <small/region.h>
> -#include <fiber.h>
> #include <diag.h>
>
> /**
> @@ -73,7 +72,6 @@ journal_entry_new(size_t n_rows, struct region *region,
> entry->approx_len = 0;
> entry->n_rows = n_rows;
> entry->res = -1;
> - entry->fiber = fiber();
> entry->on_done_cb = on_done_cb;
> entry->on_done_cb_data = on_done_cb_data;
> return entry;
> diff --git a/src/box/journal.h b/src/box/journal.h
> index b704b5c67..e85ff2c9e 100644
> --- a/src/box/journal.h
> +++ b/src/box/journal.h
> @@ -33,6 +33,7 @@
> #include <stdint.h>
> #include <stdbool.h>
> #include "salad/stailq.h"
> +#include "fiber.h"
>
> #if defined(__cplusplus)
> extern "C" {
> @@ -54,10 +55,6 @@ struct journal_entry {
> * the committed transaction, on error is -1
> */
> int64_t res;
> - /**
> - * The fiber issuing the request.
> - */
> - struct fiber *fiber;
I'd move fiber_wakeup to the callback in the same patch I introduce the
callback, but I guess it doesn't really matter.
> /**
> * A journal entry completion callback.
> */
> @@ -110,10 +107,9 @@ struct journal {
> extern struct journal *current_journal;
>
> /**
> - * Record a single entry.
> + * Send a single entry to write.
> *
> - * @return a log sequence number (vclock signature) of the entry
> - * or -1 on error.
> + * @return 0 if write was scheduled or -1 in case of an error.
> */
> static inline int64_t
> journal_write(struct journal_entry *entry)
This belongs to the previous patch.
> diff --git a/src/box/txn.c b/src/box/txn.c
> index 52e16f3e6..493bc2e3c 100644
> --- a/src/box/txn.c
> +++ b/src/box/txn.c
> @@ -199,6 +199,9 @@ txn_begin()
> txn->engine = NULL;
> txn->engine_tx = NULL;
> txn->psql_txn = NULL;
> + txn->entry = NULL;
> + txn->fiber = NULL;
> + txn->done = false;
> /* fiber_on_yield/fiber_on_stop initialized by engine on demand */
> fiber_set_txn(fiber(), txn);
> trigger_create(&txn->fiber_on_stop, txn_on_stop, NULL, NULL);
> @@ -359,7 +362,11 @@ txn_complete(struct txn *txn)
> panic("rollback trigger failed");
> }
> fiber_set_txn(fiber(), NULL);
> -
> + txn->done = true;
> + if (txn->fiber == NULL)
> + txn_free(txn);
> + else if (txn->fiber != fiber())
> + fiber_wakeup(txn->fiber);
Some comments would be really appreciated, something like:
if (txn->fiber != NULL) {
/*
* Wake up the initiating fiber - it will free
* the transaction memory. Note, this function
* may be called by the initiating fiber itself,
* e.g. for wal_mode=none mode, in which case
* we don't need to call fiber_wakeup().
*/
if (txn->fiber != fiber())
fiber_wakeup(txn->fiber);
} else {
/*
* This is either rollback, in which case we
* must free memory, or an async transaction
* completion. In the latter case the fiber that
* initiated the transaction is long gone so
* it's our responsibility to clean up.
*/
txn_free(txn);
}
> return;
> }
> /*
> @@ -368,6 +375,15 @@ txn_complete(struct txn *txn)
> */
> if (txn->engine != NULL)
> engine_commit(txn->engine, txn);
> +
> + ev_tstamp stop_tm = ev_monotonic_now(loop());
> + if (stop_tm - txn->start_tm > too_long_threshold) {
> + int n_rows = txn->n_new_rows + txn->n_applier_rows;
> + say_warn_ratelimited("too long WAL write: %d rows at "
> + "LSN %lld: %.3f sec", n_rows,
> + txn->signature - n_rows + 1,
> + stop_tm - txn->start_tm);
> + }
> /*
> * Some of triggers require for in_txn variable is set so
> * restore it for time a trigger is in progress.
> @@ -378,6 +394,7 @@ txn_complete(struct txn *txn)
> * may throw. In case an error has happened, there is
> * no other option but terminate.
> */
> + fiber_set_txn(fiber(), txn);
It's already set, just a few lines above. Added by the previous patch
AFAIR.
> if (txn->has_triggers &&
> trigger_run(&txn->on_commit, txn) != 0) {
> diag_log();
> @@ -386,35 +403,41 @@ txn_complete(struct txn *txn)
> }
>
> fiber_set_txn(fiber(), NULL);
> + txn->done = true;
> + if (txn->fiber == NULL)
> + txn_free(txn);
> + else if (txn->fiber != fiber())
> + fiber_wakeup(txn->fiber);
Please re-factor your code to avoid copy-and-paste.
I assume you could move it to txn_entry_done_cb.
> }
>
> static void
> txn_entry_done_cb(struct journal_entry *entry, void *data)
> {
> struct txn *txn = (struct txn *)data;
> + assert(txn->entry == entry);
> txn->signature = entry->res;
> txn_complete(txn);
> }
> @@ -467,32 +481,63 @@ txn_commit(struct txn *txn)
> * we have a bunch of IPROTO_NOP statements.
> */
> if (txn->engine != NULL) {
> - if (engine_prepare(txn->engine, txn) != 0)
> - goto fail;
> + if (engine_prepare(txn->engine, txn) != 0) {
> + return -1;
> + }
> }
> trigger_clear(&txn->fiber_on_stop);
> + return 0;
> +}
>
> - fiber_set_txn(fiber(), NULL);
> - if (txn->n_new_rows + txn->n_applier_rows > 0) {
> - txn->signature = txn_write_to_wal(txn);
> - if (txn->signature < 0)
> - return -1;
> - } else {
> - /*
> - * However there is noting to write to wal a completion
> - * should be fired.
> - */
> +/*
> + * Send a transaction to a journal.
> + */
> +int
> +txn_write(struct txn *txn)
I'd call it txn_commit_async, becase it's just like txn_commit, but
asynchronous.
> +{
> + if (txn_prepare(txn) != 0)
> + goto fail;
> +
> + txn->start_tm = ev_monotonic_now(loop());
> + if (txn->n_new_rows + txn->n_applier_rows == 0) {
> + /* Nothing to do. */
> txn->signature = 0;
> txn_complete(txn);
> + return 0;
> }
>
> - txn_free(txn);
> + if (txn_journal_write(txn) != 0)
> + return -1;
> + fiber_set_txn(fiber(), NULL);
> return 0;
> fail:
> txn_rollback(txn);
> return -1;
> }
>
> +int
> +txn_commit(struct txn *txn)
> +{
> + txn->fiber = fiber();
> +
> + if (txn_write(txn) != 0)
> + return -1;
> + /*
> + * In case of non-yielding journal the transaction could already
> + * be done and there is nothing to wait in such cases.
> + */
> + if (!txn->done) {
> + bool cancellable = fiber_set_cancellable(false);
> + fiber_yield();
> + fiber_set_cancellable(cancellable);
> + }
> + int res = txn->signature >= 0? 0: -1;
> + if (res != 0)
> + diag_set(ClientError, ER_WAL_IO);
> + txn_free(txn);
> + return res;
> +}
> +
> void
> txn_rollback_stmt(struct txn *txn)
> {
> @@ -505,11 +550,9 @@ txn_rollback_stmt(struct txn *txn)
> void
> txn_rollback(struct txn *txn)
> {
> - assert(txn == in_txn());
> trigger_clear(&txn->fiber_on_stop);
> txn->signature = -1;
> txn_complete(txn);
> - txn_free(txn);
> }
>
> void
> diff --git a/src/box/txn.h b/src/box/txn.h
> index 569978ce9..bd6b695a9 100644
> --- a/src/box/txn.h
> +++ b/src/box/txn.h
> @@ -195,6 +195,16 @@ struct txn {
> /** Commit and rollback triggers */
> struct rlist on_commit, on_rollback;
> struct sql_txn *psql_txn;
> + /** Journal entry to control txn write. */
> + struct journal_entry *entry;
Why do you need to add txn->entry? Why not allocate it on txn->region?
You just need it to stay alive until the transaction is freed. You don't
need to store a pointer to it in txn AFAICS.
> + /** Transaction completion trigger. */
> + struct trigger entry_done;
This trigger is never used.
> + /** Timestampt of entry write start. */
> + ev_tstamp start_tm;
> + /* A fiber to wake up when transaction is finished. */
> + struct fiber *fiber;
It can be NULL. Please explain in what cases.
> + /* True when transaction is processed. */
> + bool done;
> };
>
> /* Pointer to the current transaction (if any) */
> @@ -228,6 +238,12 @@ txn_commit(struct txn *txn);
> void
> txn_rollback(struct txn *txn);
>
> +int
> +txn_write(struct txn *txn);
> +
> +int
> +txn_wait(struct txn *txn);
> +
txn_wait isn't defined
> /**
> * Roll back the transaction but keep the object around.
> * A special case for memtx transaction abort on yield. In this
> diff --git a/src/box/wal.c b/src/box/wal.c
> index 62b6391fd..582ae4598 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -265,7 +265,6 @@ tx_schedule_f(va_list ap)
> struct journal_entry, fifo);
> if (req->on_done_cb != NULL)
> req->on_done_cb(req, req->on_done_cb_data);
> - fiber_wakeup(req->fiber);
> }
> writer->is_in_rollback = false;
> fiber_cond_wait(&writer->schedule_cond);
> @@ -274,7 +273,7 @@ tx_schedule_f(va_list ap)
> }
>
> /**
> - * Attach requests to a scheduling queue.
> + * Signal done condition.
> */
> static void
> tx_schedule_queue(struct stailq *queue)
> @@ -380,7 +379,8 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
> writer->wal_max_size = wal_max_size;
> writer->is_in_rollback = false;
> journal_create(&writer->base, wal_mode == WAL_NONE ?
> - wal_write_in_wal_mode_none : wal_write, NULL);
> + wal_write_in_wal_mode_none : wal_write,
> + NULL);
Why change this?
^ permalink raw reply [flat|nested] 37+ messages in thread
* [tarantool-patches] Re: [PATCH v4 7/9] txn: introduce asynchronous txn commit
2019-06-20 15:00 ` [tarantool-patches] " Vladimir Davydov
@ 2019-06-21 7:28 ` Konstantin Osipov
0 siblings, 0 replies; 37+ messages in thread
From: Konstantin Osipov @ 2019-06-21 7:28 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
* Vladimir Davydov <vdavydov.dev@gmail.com> [19/06/20 18:01]:
> > +/*
> > + * Send a transaction to a journal.
> > + */
> > +int
> > +txn_write(struct txn *txn)
>
> I'd call it txn_commit_async, becase it's just like txn_commit, but
> asynchronous.
I suggested the name to match the WAL api.
wal-write, wal-wait and txn-write, txn-wait
txn-commit is txn-write + txn-wait
If you decide to change the name please follow through with the
wal.
--
Konstantin Osipov, Moscow, Russia
^ permalink raw reply [flat|nested] 37+ messages in thread
* [tarantool-patches] [PATCH v4 8/9] applier: apply transaction in parallel
2019-06-19 21:23 [tarantool-patches] [PATCH v4 0/9] Parallel applier Georgy Kirichenko
` (6 preceding siblings ...)
2019-06-19 21:23 ` [tarantool-patches] [PATCH v4 7/9] txn: introduce asynchronous txn commit Georgy Kirichenko
@ 2019-06-19 21:23 ` Georgy Kirichenko
2019-06-20 7:41 ` [tarantool-patches] " Георгий Кириченко
2019-06-20 8:06 ` Konstantin Osipov
2019-06-19 21:23 ` [tarantool-patches] [PATCH v4 9/9] test: fix flaky test Georgy Kirichenko
8 siblings, 2 replies; 37+ messages in thread
From: Georgy Kirichenko @ 2019-06-19 21:23 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
Applier use asynchronous transaction to batch journal writes. All
appliers share the replicaset.applier.tx_vclock which means the vclock
applied but not necessarily written to a journal. Appliers use a trigger
to coordinate in case of failure - when a transaction is going to
be rolled back. Also an applier writer condition is shared across all
appliers and signaled in case of commit or hearth beat message.
Closes: #1254
---
src/box/applier.cc | 123 +++++++++++++++++++++++++++++++----------
src/box/applier.h | 6 +-
src/box/replication.cc | 7 +++
src/box/replication.h | 14 +++++
4 files changed, 119 insertions(+), 31 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 5a92f6109..252dd58ea 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -50,6 +50,7 @@
#include "schema.h"
#include "txn.h"
#include "box.h"
+#include "scoped_guard.h"
STRS(applier_state, applier_STATE);
@@ -130,10 +131,10 @@ applier_writer_f(va_list ap)
* replication_timeout seconds any more.
*/
if (applier->version_id >= version_id(1, 7, 7))
- fiber_cond_wait_timeout(&applier->writer_cond,
+ fiber_cond_wait_timeout(&replicaset.applier.commit_cond,
TIMEOUT_INFINITY);
else
- fiber_cond_wait_timeout(&applier->writer_cond,
+ fiber_cond_wait_timeout(&replicaset.applier.commit_cond,
replication_timeout);
/* Send ACKs only when in FOLLOW mode ,*/
if (applier->state != APPLIER_SYNC &&
@@ -565,6 +566,36 @@ applier_read_tx(struct applier *applier, struct stailq *rows)
next)->row.is_commit);
}
+static void
+sequencer_rollback_cb(struct trigger *trigger, void *event)
+{
+ (void) trigger;
+ (void) event;
+ diag_set(ClientError, ER_WAL_IO);
+ diag_move(&fiber()->diag, &replicaset.applier.diag);
+ trigger_run(&replicaset.applier.on_replication_fail, NULL);
+ vclock_copy(&replicaset.applier.net_vclock, &replicaset.vclock);
+}
+
+static void
+sequencer_commit_cb(struct trigger *trigger, void *event)
+{
+ (void) trigger;
+ (void) event;
+ fiber_cond_broadcast(&replicaset.applier.commit_cond);
+}
+
+static void
+applier_on_fail(struct trigger *trigger, void *event)
+{
+ (void) event;
+ struct applier *applier = (struct applier *)trigger->data;
+ if (!diag_is_empty(&replicaset.applier.diag))
+ diag_add_error(&applier->diag, diag_last_error(&replicaset.applier.diag));
+ fiber_cancel(applier->reader);
+
+}
+
/**
* Apply all rows in the rows queue as a single transaction.
*
@@ -573,6 +604,22 @@ applier_read_tx(struct applier *applier, struct stailq *rows)
static int
applier_apply_tx(struct stailq *rows)
{
+ struct xrow_header *first_row =
+ &stailq_first_entry(rows, struct applier_tx_row,
+ next)->row;
+ struct replica *replica = replica_by_id(first_row->replica_id);
+ struct latch *latch = (replica ? &replica->order_latch :
+ &replicaset.applier.order_latch);
+ latch_lock(latch);
+ if (vclock_get(&replicaset.applier.net_vclock, first_row->replica_id) >=
+ first_row->lsn) {
+ /* Check there is a heathbeat message and wake a writers up. */
+ if (first_row->lsn == 0)
+ fiber_cond_broadcast(&replicaset.applier.commit_cond);
+ latch_unlock(latch);
+ return 0;
+ }
+
/**
* Explicitly begin the transaction so that we can
* control fiber->gc life cycle and, in case of apply
@@ -581,8 +628,10 @@ applier_apply_tx(struct stailq *rows)
*/
struct txn *txn = txn_begin();
struct applier_tx_row *item;
- if (txn == NULL)
- diag_raise();
+ if (txn == NULL) {
+ latch_unlock(latch);
+ return -1;
+ }
stailq_foreach_entry(item, rows, next) {
struct xrow_header *row = &item->row;
int res = apply_row(row);
@@ -623,10 +672,34 @@ applier_apply_tx(struct stailq *rows)
"Replication", "distributed transactions");
goto rollback;
}
- return txn_commit(txn);
+ /* We are ready to submit txn to wal. */
+ struct trigger *on_rollback, *on_commit;
+ on_rollback = (struct trigger *)region_alloc(&txn->region,
+ sizeof(struct trigger));
+ on_commit = (struct trigger *)region_alloc(&txn->region,
+ sizeof(struct trigger));
+ if (on_rollback == NULL || on_commit == NULL)
+ goto rollback;
+
+ trigger_create(on_rollback, sequencer_rollback_cb, NULL, NULL);
+ txn_on_rollback(txn, on_rollback);
+
+ trigger_create(on_commit, sequencer_commit_cb, NULL, NULL);
+ txn_on_commit(txn, on_commit);
+
+ if (txn_write(txn) < 0)
+ goto fail;
+ /* Transaction was sent to journal so promote vclock. */
+ vclock_follow(&replicaset.applier.net_vclock, first_row->replica_id,
+ first_row->lsn);
+ latch_unlock(latch);
+
+ return 0;
rollback:
txn_rollback(txn);
+fail:
+ latch_unlock(latch);
fiber_gc();
return -1;
}
@@ -735,6 +808,15 @@ applier_subscribe(struct applier *applier)
applier->lag = TIMEOUT_INFINITY;
+ /* Register a trigger to handle replication failures. */
+ struct trigger on_fail;
+ trigger_create(&on_fail, applier_on_fail, applier, NULL);
+ trigger_add(&replicaset.applier.on_replication_fail, &on_fail);
+ auto trigger_guard = make_scoped_guard([&] {
+ trigger_clear(&on_fail);
+ });
+
+
/*
* Process a stream of rows from the binary log.
*/
@@ -763,31 +845,10 @@ applier_subscribe(struct applier *applier)
struct stailq rows;
applier_read_tx(applier, &rows);
- struct xrow_header *first_row =
- &stailq_first_entry(&rows, struct applier_tx_row,
- next)->row;
applier->last_row_time = ev_monotonic_now(loop());
- struct replica *replica = replica_by_id(first_row->replica_id);
- struct latch *latch = (replica ? &replica->order_latch :
- &replicaset.applier.order_latch);
- /*
- * In a full mesh topology, the same set of changes
- * may arrive via two concurrently running appliers.
- * Hence we need a latch to strictly order all changes
- * that belong to the same server id.
- */
- latch_lock(latch);
- if (vclock_get(&replicaset.vclock, first_row->replica_id) <
- first_row->lsn &&
- applier_apply_tx(&rows) != 0) {
- latch_unlock(latch);
+ if (applier_apply_tx(&rows) != 0)
diag_raise();
- }
- latch_unlock(latch);
- if (applier->state == APPLIER_SYNC ||
- applier->state == APPLIER_FOLLOW)
- fiber_cond_signal(&applier->writer_cond);
if (ibuf_used(ibuf) == 0)
ibuf_reset(ibuf);
fiber_gc();
@@ -872,6 +933,11 @@ applier_f(va_list ap)
return -1;
}
} catch (FiberIsCancelled *e) {
+ if (!diag_is_empty(&applier->diag)) {
+ diag_move(&applier->diag, &fiber()->diag);
+ applier_disconnect(applier, APPLIER_STOPPED);
+ break;
+ }
applier_disconnect(applier, APPLIER_OFF);
break;
} catch (SocketError *e) {
@@ -959,7 +1025,7 @@ applier_new(const char *uri)
applier->last_row_time = ev_monotonic_now(loop());
rlist_create(&applier->on_state);
fiber_cond_create(&applier->resume_cond);
- fiber_cond_create(&applier->writer_cond);
+ diag_create(&applier->diag);
return applier;
}
@@ -972,7 +1038,6 @@ applier_delete(struct applier *applier)
assert(applier->io.fd == -1);
trigger_destroy(&applier->on_state);
fiber_cond_destroy(&applier->resume_cond);
- fiber_cond_destroy(&applier->writer_cond);
free(applier);
}
diff --git a/src/box/applier.h b/src/box/applier.h
index 5bff90031..348fdacf2 100644
--- a/src/box/applier.h
+++ b/src/box/applier.h
@@ -74,8 +74,6 @@ struct applier {
struct fiber *reader;
/** Background fiber to reply with vclock */
struct fiber *writer;
- /** Writer cond. */
- struct fiber_cond writer_cond;
/** Finite-state machine */
enum applier_state state;
/** Local time of this replica when the last row has been received */
@@ -114,8 +112,12 @@ struct applier {
bool is_paused;
/** Condition variable signaled to resume the applier. */
struct fiber_cond resume_cond;
+ struct diag diag;
};
+void
+applier_init();
+
/**
* Start a client to a remote master using a background fiber.
*
diff --git a/src/box/replication.cc b/src/box/replication.cc
index a1a2a9eb3..fd4d4e387 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -90,6 +90,13 @@ replication_init(void)
fiber_cond_create(&replicaset.applier.cond);
replicaset.replica_by_id = (struct replica **)calloc(VCLOCK_MAX, sizeof(struct replica *));
latch_create(&replicaset.applier.order_latch);
+
+ vclock_create(&replicaset.applier.net_vclock);
+ vclock_copy(&replicaset.applier.net_vclock, &replicaset.vclock);
+ rlist_create(&replicaset.applier.on_replication_fail);
+
+ fiber_cond_create(&replicaset.applier.commit_cond);
+ diag_create(&replicaset.applier.diag);
}
void
diff --git a/src/box/replication.h b/src/box/replication.h
index 8c8a9927e..a4830f5b5 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -232,6 +232,20 @@ struct replicaset {
* struct replica object).
*/
struct latch order_latch;
+ /*
+ * A vclock of the last transaction wich was read
+ * from an applier connection.
+ */
+ struct vclock net_vclock;
+ /* Signaled on replicated transaction commit. */
+ struct fiber_cond commit_cond;
+ /*
+ * Trigger to fire when replication stops in case
+ * of an error.
+ */
+ struct rlist on_replication_fail;
+ /* Diag to populate an error acros all appliers. */
+ struct diag diag;
} applier;
/** Map of all known replica_id's to correspponding replica's. */
struct replica **replica_by_id;
--
2.22.0
^ permalink raw reply [flat|nested] 37+ messages in thread
* [tarantool-patches] Re: [PATCH v4 8/9] applier: apply transaction in parallel
2019-06-19 21:23 ` [tarantool-patches] [PATCH v4 8/9] applier: apply transaction in parallel Georgy Kirichenko
@ 2019-06-20 7:41 ` Георгий Кириченко
2019-06-20 8:07 ` Konstantin Osipov
2019-06-20 16:37 ` Vladimir Davydov
2019-06-20 8:06 ` Konstantin Osipov
1 sibling, 2 replies; 37+ messages in thread
From: Георгий Кириченко @ 2019-06-20 7:41 UTC (permalink / raw)
To: tarantool-patches
[-- Attachment #1: Type: text/plain, Size: 11587 bytes --]
I'm sorry, there is proper version of the commit:
Applier use asynchronous transaction to batch journal writes. All
appliers share the replicaset.applier.tx_vclock which means the vclock
applied but not necessarily written to a journal. Appliers use a trigger
to coordinate in case of failure - when a transaction is going to
be rolled back. Also an applier writer condition is shared across all
appliers and signaled in case of commit or hearth beat message.
Closes: #1254
---
src/box/applier.cc | 156 +++++++++++++++++++++++++++++------------
src/box/applier.h | 9 ++-
src/box/replication.cc | 7 ++
src/box/replication.h | 14 ++++
4 files changed, 138 insertions(+), 48 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 5a92f6109..fee49d8ca 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -50,6 +50,7 @@
#include "schema.h"
#include "txn.h"
#include "box.h"
+#include "scoped_guard.h"
STRS(applier_state, applier_STATE);
@@ -130,11 +131,24 @@ applier_writer_f(va_list ap)
* replication_timeout seconds any more.
*/
if (applier->version_id >= version_id(1, 7, 7))
- fiber_cond_wait_timeout(&applier->writer_cond,
+ fiber_cond_wait_timeout(&replicaset.applier.commit_cond,
TIMEOUT_INFINITY);
else
- fiber_cond_wait_timeout(&applier->writer_cond,
+ fiber_cond_wait_timeout(&replicaset.applier.commit_cond,
replication_timeout);
+ /*
+ * Stay 'orphan' until appliers catch up with
+ * the remote vclock at the time of SUBSCRIBE
+ * and the lag is less than configured.
+ */
+ if (applier->state == APPLIER_SYNC &&
+ applier->lag <= replication_sync_lag &&
+ vclock_compare(&applier->remote_vclock_at_subscribe,
+ &replicaset.vclock) <= 0) {
+ /* Applier is synced, switch to "follow". */
+ applier_set_state(applier, APPLIER_FOLLOW);
+ }
+
/* Send ACKs only when in FOLLOW mode ,*/
if (applier->state != APPLIER_SYNC &&
applier->state != APPLIER_FOLLOW)
@@ -565,6 +579,36 @@ applier_read_tx(struct applier *applier, struct stailq
*rows)
next)->row.is_commit);
}
+static void
+sequencer_rollback_cb(struct trigger *trigger, void *event)
+{
+ (void) trigger;
+ (void) event;
+ diag_set(ClientError, ER_WAL_IO);
+ diag_move(&fiber()->diag, &replicaset.applier.diag);
+ trigger_run(&replicaset.applier.on_replication_fail, NULL);
+ vclock_copy(&replicaset.applier.net_vclock, &replicaset.vclock);
+}
+
+static void
+sequencer_commit_cb(struct trigger *trigger, void *event)
+{
+ (void) trigger;
+ (void) event;
+ fiber_cond_broadcast(&replicaset.applier.commit_cond);
+}
+
+static void
+applier_on_fail(struct trigger *trigger, void *event)
+{
+ (void) event;
+ struct applier *applier = (struct applier *)trigger->data;
+ if (!diag_is_empty(&replicaset.applier.diag))
+ diag_add_error(&applier->diag,
diag_last_error(&replicaset.applier.diag));
+ fiber_cancel(applier->reader);
+
+}
+
/**
* Apply all rows in the rows queue as a single transaction.
*
@@ -573,6 +617,22 @@ applier_read_tx(struct applier *applier, struct stailq
*rows)
static int
applier_apply_tx(struct stailq *rows)
{
+ struct xrow_header *first_row =
+ &stailq_first_entry(rows, struct applier_tx_row,
+ next)->row;
+ struct replica *replica = replica_by_id(first_row->replica_id);
+ struct latch *latch = (replica ? &replica->order_latch :
+ &replicaset.applier.order_latch);
+ latch_lock(latch);
+ if (vclock_get(&replicaset.applier.net_vclock, first_row->replica_id) >=
+ first_row->lsn) {
+ /* Check there is a heathbeat message and wake a writers up. */
+ if (first_row->lsn == 0)
+ fiber_cond_broadcast(&replicaset.applier.commit_cond);
+ latch_unlock(latch);
+ return 0;
+ }
+
/**
* Explicitly begin the transaction so that we can
* control fiber->gc life cycle and, in case of apply
@@ -581,8 +641,10 @@ applier_apply_tx(struct stailq *rows)
*/
struct txn *txn = txn_begin();
struct applier_tx_row *item;
- if (txn == NULL)
- diag_raise();
+ if (txn == NULL) {
+ latch_unlock(latch);
+ return -1;
+ }
stailq_foreach_entry(item, rows, next) {
struct xrow_header *row = &item->row;
int res = apply_row(row);
@@ -623,10 +685,34 @@ applier_apply_tx(struct stailq *rows)
"Replication", "distributed transactions");
goto rollback;
}
- return txn_commit(txn);
+ /* We are ready to submit txn to wal. */
+ struct trigger *on_rollback, *on_commit;
+ on_rollback = (struct trigger *)region_alloc(&txn->region,
+ sizeof(struct trigger));
+ on_commit = (struct trigger *)region_alloc(&txn->region,
+ sizeof(struct trigger));
+ if (on_rollback == NULL || on_commit == NULL)
+ goto rollback;
+
+ trigger_create(on_rollback, sequencer_rollback_cb, NULL, NULL);
+ txn_on_rollback(txn, on_rollback);
+
+ trigger_create(on_commit, sequencer_commit_cb, NULL, NULL);
+ txn_on_commit(txn, on_commit);
+
+ if (txn_write(txn) < 0)
+ goto fail;
+ /* Transaction was sent to journal so promote vclock. */
+ vclock_follow(&replicaset.applier.net_vclock, first_row->replica_id,
+ first_row->lsn);
+ latch_unlock(latch);
+
+ return 0;
rollback:
txn_rollback(txn);
+fail:
+ latch_unlock(latch);
fiber_gc();
return -1;
}
@@ -641,7 +727,6 @@ applier_subscribe(struct applier *applier)
struct ev_io *coio = &applier->io;
struct ibuf *ibuf = &applier->ibuf;
struct xrow_header row;
- struct vclock remote_vclock_at_subscribe;
struct tt_uuid cluster_id = uuid_nil;
struct vclock vclock;
@@ -668,10 +753,10 @@ applier_subscribe(struct applier *applier)
* the replica, and replica has to check whether
* its and master's cluster ids match.
*/
- vclock_create(&remote_vclock_at_subscribe);
+ vclock_create(&applier->remote_vclock_at_subscribe);
xrow_decode_subscribe_response_xc(&row,
&cluster_id,
- &remote_vclock_at_subscribe);
+ &applier-
>remote_vclock_at_subscribe);
/*
* If master didn't send us its cluster id
* assume that it has done all the checks.
@@ -686,7 +771,7 @@ applier_subscribe(struct applier *applier)
say_info("subscribed");
say_info("remote vclock %s local vclock %s",
- vclock_to_string(&remote_vclock_at_subscribe),
+ vclock_to_string(&applier->remote_vclock_at_subscribe),
vclock_to_string(&vclock));
}
/*
@@ -735,6 +820,15 @@ applier_subscribe(struct applier *applier)
applier->lag = TIMEOUT_INFINITY;
+ /* Register a trigger to handle replication failures. */
+ struct trigger on_fail;
+ trigger_create(&on_fail, applier_on_fail, applier, NULL);
+ trigger_add(&replicaset.applier.on_replication_fail, &on_fail);
+ auto trigger_guard = make_scoped_guard([&] {
+ trigger_clear(&on_fail);
+ });
+
+
/*
* Process a stream of rows from the binary log.
*/
@@ -747,47 +841,13 @@ applier_subscribe(struct applier *applier)
applier_set_state(applier, APPLIER_FOLLOW);
}
- /*
- * Stay 'orphan' until appliers catch up with
- * the remote vclock at the time of SUBSCRIBE
- * and the lag is less than configured.
- */
- if (applier->state == APPLIER_SYNC &&
- applier->lag <= replication_sync_lag &&
- vclock_compare(&remote_vclock_at_subscribe,
- &replicaset.vclock) <= 0) {
- /* Applier is synced, switch to "follow". */
- applier_set_state(applier, APPLIER_FOLLOW);
- }
-
struct stailq rows;
applier_read_tx(applier, &rows);
- struct xrow_header *first_row =
- &stailq_first_entry(&rows, struct applier_tx_row,
- next)->row;
applier->last_row_time = ev_monotonic_now(loop());
- struct replica *replica = replica_by_id(first_row->replica_id);
- struct latch *latch = (replica ? &replica->order_latch :
- &replicaset.applier.order_latch);
- /*
- * In a full mesh topology, the same set of changes
- * may arrive via two concurrently running appliers.
- * Hence we need a latch to strictly order all changes
- * that belong to the same server id.
- */
- latch_lock(latch);
- if (vclock_get(&replicaset.vclock, first_row->replica_id) <
- first_row->lsn &&
- applier_apply_tx(&rows) != 0) {
- latch_unlock(latch);
+ if (applier_apply_tx(&rows) != 0)
diag_raise();
- }
- latch_unlock(latch);
- if (applier->state == APPLIER_SYNC ||
- applier->state == APPLIER_FOLLOW)
- fiber_cond_signal(&applier->writer_cond);
if (ibuf_used(ibuf) == 0)
ibuf_reset(ibuf);
fiber_gc();
@@ -872,6 +932,11 @@ applier_f(va_list ap)
return -1;
}
} catch (FiberIsCancelled *e) {
+ if (!diag_is_empty(&applier->diag)) {
+ diag_move(&applier->diag, &fiber()->diag);
+ applier_disconnect(applier, APPLIER_STOPPED);
+ break;
+ }
applier_disconnect(applier, APPLIER_OFF);
break;
} catch (SocketError *e) {
@@ -959,7 +1024,7 @@ applier_new(const char *uri)
applier->last_row_time = ev_monotonic_now(loop());
rlist_create(&applier->on_state);
fiber_cond_create(&applier->resume_cond);
- fiber_cond_create(&applier->writer_cond);
+ diag_create(&applier->diag);
return applier;
}
@@ -972,7 +1037,6 @@ applier_delete(struct applier *applier)
assert(applier->io.fd == -1);
trigger_destroy(&applier->on_state);
fiber_cond_destroy(&applier->resume_cond);
- fiber_cond_destroy(&applier->writer_cond);
free(applier);
}
diff --git a/src/box/applier.h b/src/box/applier.h
index 5bff90031..716da32e2 100644
--- a/src/box/applier.h
+++ b/src/box/applier.h
@@ -74,8 +74,6 @@ struct applier {
struct fiber *reader;
/** Background fiber to reply with vclock */
struct fiber *writer;
- /** Writer cond. */
- struct fiber_cond writer_cond;
/** Finite-state machine */
enum applier_state state;
/** Local time of this replica when the last row has been received */
@@ -114,8 +112,15 @@ struct applier {
bool is_paused;
/** Condition variable signaled to resume the applier. */
struct fiber_cond resume_cond;
+ /* Diag to raise an error. */
+ struct diag diag;
+ /* The masters vclock while subscribe. */
+ struct vclock remote_vclock_at_subscribe;
};
+void
+applier_init();
+
/**
* Start a client to a remote master using a background fiber.
*
diff --git a/src/box/replication.cc b/src/box/replication.cc
index a1a2a9eb3..fd4d4e387 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -90,6 +90,13 @@ replication_init(void)
fiber_cond_create(&replicaset.applier.cond);
replicaset.replica_by_id = (struct replica **)calloc(VCLOCK_MAX,
sizeof(struct replica *));
latch_create(&replicaset.applier.order_latch);
+
+ vclock_create(&replicaset.applier.net_vclock);
+ vclock_copy(&replicaset.applier.net_vclock, &replicaset.vclock);
+ rlist_create(&replicaset.applier.on_replication_fail);
+
+ fiber_cond_create(&replicaset.applier.commit_cond);
+ diag_create(&replicaset.applier.diag);
}
void
diff --git a/src/box/replication.h b/src/box/replication.h
index 8c8a9927e..a4830f5b5 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -232,6 +232,20 @@ struct replicaset {
* struct replica object).
*/
struct latch order_latch;
+ /*
+ * A vclock of the last transaction wich was read
+ * from an applier connection.
+ */
+ struct vclock net_vclock;
+ /* Signaled on replicated transaction commit. */
+ struct fiber_cond commit_cond;
+ /*
+ * Trigger to fire when replication stops in case
+ * of an error.
+ */
+ struct rlist on_replication_fail;
+ /* Diag to populate an error acros all appliers. */
+ struct diag diag;
} applier;
/** Map of all known replica_id's to correspponding replica's. */
struct replica **replica_by_id;
--
2.22.0
[-- Attachment #2: This is a digitally signed message part. --]
[-- Type: application/pgp-signature, Size: 488 bytes --]
^ permalink raw reply [flat|nested] 37+ messages in thread
* [tarantool-patches] Re: [PATCH v4 8/9] applier: apply transaction in parallel
2019-06-20 7:41 ` [tarantool-patches] " Георгий Кириченко
@ 2019-06-20 8:07 ` Konstantin Osipov
2019-06-20 16:37 ` Vladimir Davydov
1 sibling, 0 replies; 37+ messages in thread
From: Konstantin Osipov @ 2019-06-20 8:07 UTC (permalink / raw)
To: tarantool-patches
* Георгий Кириченко <georgy@tarantool.org> [19/06/20 10:44]:
> I'm sorry, there is proper version of the commit:
>
> Applier use asynchronous transaction to batch journal writes. All
> appliers share the replicaset.applier.tx_vclock which means the vclock
> applied but not necessarily written to a journal. Appliers use a trigger
> to coordinate in case of failure - when a transaction is going to
> be rolled back. Also an applier writer condition is shared across all
> appliers and signaled in case of commit or hearth beat message.
I only now figured out that you have no sequencer object. Don't
you think it should be a proper object with its own state? Looks
like now the sequencer state is shared across multiple objects.
--
Konstantin Osipov, Moscow, Russia
^ permalink raw reply [flat|nested] 37+ messages in thread
* Re: [tarantool-patches] Re: [PATCH v4 8/9] applier: apply transaction in parallel
2019-06-20 7:41 ` [tarantool-patches] " Георгий Кириченко
2019-06-20 8:07 ` Konstantin Osipov
@ 2019-06-20 16:37 ` Vladimir Davydov
2019-06-20 20:33 ` Георгий Кириченко
1 sibling, 1 reply; 37+ messages in thread
From: Vladimir Davydov @ 2019-06-20 16:37 UTC (permalink / raw)
To: Георгий
Кириченко
Cc: tarantool-patches
On Thu, Jun 20, 2019 at 10:41:10AM +0300, Георгий Кириченко wrote:
> I'm sorry, there is proper version of the commit:
>
> Applier use asynchronous transaction to batch journal writes. All
> appliers share the replicaset.applier.tx_vclock which means the vclock
> applied but not necessarily written to a journal. Appliers use a trigger
> to coordinate in case of failure - when a transaction is going to
> be rolled back. Also an applier writer condition is shared across all
> appliers and signaled in case of commit or hearth beat message.
>
> Closes: #1254
> ---
> src/box/applier.cc | 156 +++++++++++++++++++++++++++++------------
> src/box/applier.h | 9 ++-
> src/box/replication.cc | 7 ++
> src/box/replication.h | 14 ++++
> 4 files changed, 138 insertions(+), 48 deletions(-)
>
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 5a92f6109..fee49d8ca 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -50,6 +50,7 @@
> #include "schema.h"
> #include "txn.h"
> #include "box.h"
> +#include "scoped_guard.h"
>
> STRS(applier_state, applier_STATE);
>
> @@ -130,11 +131,24 @@ applier_writer_f(va_list ap)
> * replication_timeout seconds any more.
> */
> if (applier->version_id >= version_id(1, 7, 7))
> - fiber_cond_wait_timeout(&applier->writer_cond,
> + fiber_cond_wait_timeout(&replicaset.applier.commit_cond,
> TIMEOUT_INFINITY);
> else
> - fiber_cond_wait_timeout(&applier->writer_cond,
> + fiber_cond_wait_timeout(&replicaset.applier.commit_cond,
> replication_timeout);
Why replace applier->writer_cond with replicaset.applier.commit_cond?
This means that even if only one applier is active, we will wake up all
of the writers on each commit, which looks strange.
> + /*
> + * Stay 'orphan' until appliers catch up with
> + * the remote vclock at the time of SUBSCRIBE
> + * and the lag is less than configured.
> + */
> + if (applier->state == APPLIER_SYNC &&
> + applier->lag <= replication_sync_lag &&
> + vclock_compare(&applier->remote_vclock_at_subscribe,
> + &replicaset.vclock) <= 0) {
> + /* Applier is synced, switch to "follow". */
> + applier_set_state(applier, APPLIER_FOLLOW);
> + }
> +
A writer is supposed to send ACKs, not change the applier state.
How did this wind up here? Can't we do this right from the on_commit
trigger?
> /* Send ACKs only when in FOLLOW mode ,*/
> if (applier->state != APPLIER_SYNC &&
> applier->state != APPLIER_FOLLOW)
> @@ -565,6 +579,36 @@ applier_read_tx(struct applier *applier, struct stailq
> *rows)
> next)->row.is_commit);
> }
>
> +static void
> +sequencer_rollback_cb(struct trigger *trigger, void *event)
There's no sequencer object so the names are confusing. Let's call them
applier_on_rollback/commit?
> +{
> + (void) trigger;
> + (void) event;
> + diag_set(ClientError, ER_WAL_IO);
> + diag_move(&fiber()->diag, &replicaset.applier.diag);
> + trigger_run(&replicaset.applier.on_replication_fail, NULL);
> + vclock_copy(&replicaset.applier.net_vclock, &replicaset.vclock);
> +}
> +
> +static void
> +sequencer_commit_cb(struct trigger *trigger, void *event)
> +{
> + (void) trigger;
> + (void) event;
> + fiber_cond_broadcast(&replicaset.applier.commit_cond);
> +}
> +
> +static void
> +applier_on_fail(struct trigger *trigger, void *event)
> +{
> + (void) event;
> + struct applier *applier = (struct applier *)trigger->data;
> + if (!diag_is_empty(&replicaset.applier.diag))
> + diag_add_error(&applier->diag,
> diag_last_error(&replicaset.applier.diag));
> + fiber_cancel(applier->reader);
> +
> +}
> +
> /**
> * Apply all rows in the rows queue as a single transaction.
> *
> @@ -573,6 +617,22 @@ applier_read_tx(struct applier *applier, struct stailq
> *rows)
> static int
> applier_apply_tx(struct stailq *rows)
> {
> + struct xrow_header *first_row =
> + &stailq_first_entry(rows, struct applier_tx_row,
> + next)->row;
> + struct replica *replica = replica_by_id(first_row->replica_id);
> + struct latch *latch = (replica ? &replica->order_latch :
> + &replicaset.applier.order_latch);
> + latch_lock(latch);
> + if (vclock_get(&replicaset.applier.net_vclock, first_row->replica_id) >=
> + first_row->lsn) {
> + /* Check there is a heathbeat message and wake a writers up. */
> + if (first_row->lsn == 0)
> + fiber_cond_broadcast(&replicaset.applier.commit_cond);
Would be better to check that before taking the latch. We don't need the
latch to reply to a heartbeat message, do we?
> + latch_unlock(latch);
> + return 0;
> + }
> +
> /**
> * Explicitly begin the transaction so that we can
> * control fiber->gc life cycle and, in case of apply
> @@ -581,8 +641,10 @@ applier_apply_tx(struct stailq *rows)
> */
> struct txn *txn = txn_begin();
> struct applier_tx_row *item;
> - if (txn == NULL)
> - diag_raise();
> + if (txn == NULL) {
> + latch_unlock(latch);
> + return -1;
> + }
> stailq_foreach_entry(item, rows, next) {
> struct xrow_header *row = &item->row;
> int res = apply_row(row);
> @@ -623,10 +685,34 @@ applier_apply_tx(struct stailq *rows)
> "Replication", "distributed transactions");
> goto rollback;
> }
> - return txn_commit(txn);
>
> + /* We are ready to submit txn to wal. */
> + struct trigger *on_rollback, *on_commit;
> + on_rollback = (struct trigger *)region_alloc(&txn->region,
> + sizeof(struct trigger));
> + on_commit = (struct trigger *)region_alloc(&txn->region,
> + sizeof(struct trigger));
> + if (on_rollback == NULL || on_commit == NULL)
> + goto rollback;
> +
> + trigger_create(on_rollback, sequencer_rollback_cb, NULL, NULL);
> + txn_on_rollback(txn, on_rollback);
> +
> + trigger_create(on_commit, sequencer_commit_cb, NULL, NULL);
> + txn_on_commit(txn, on_commit);
> +
> + if (txn_write(txn) < 0)
> + goto fail;
> + /* Transaction was sent to journal so promote vclock. */
> + vclock_follow(&replicaset.applier.net_vclock, first_row->replica_id,
> + first_row->lsn);
> + latch_unlock(latch);
> +
> + return 0;
> rollback:
> txn_rollback(txn);
> +fail:
> + latch_unlock(latch);
> fiber_gc();
> return -1;
> }
> @@ -641,7 +727,6 @@ applier_subscribe(struct applier *applier)
> struct ev_io *coio = &applier->io;
> struct ibuf *ibuf = &applier->ibuf;
> struct xrow_header row;
> - struct vclock remote_vclock_at_subscribe;
> struct tt_uuid cluster_id = uuid_nil;
>
> struct vclock vclock;
> @@ -668,10 +753,10 @@ applier_subscribe(struct applier *applier)
> * the replica, and replica has to check whether
> * its and master's cluster ids match.
> */
> - vclock_create(&remote_vclock_at_subscribe);
> + vclock_create(&applier->remote_vclock_at_subscribe);
> xrow_decode_subscribe_response_xc(&row,
> &cluster_id,
> - &remote_vclock_at_subscribe);
> + &applier-
> >remote_vclock_at_subscribe);
> /*
> * If master didn't send us its cluster id
> * assume that it has done all the checks.
> @@ -686,7 +771,7 @@ applier_subscribe(struct applier *applier)
>
> say_info("subscribed");
> say_info("remote vclock %s local vclock %s",
> - vclock_to_string(&remote_vclock_at_subscribe),
> + vclock_to_string(&applier->remote_vclock_at_subscribe),
> vclock_to_string(&vclock));
> }
> /*
> @@ -735,6 +820,15 @@ applier_subscribe(struct applier *applier)
>
> applier->lag = TIMEOUT_INFINITY;
>
> + /* Register a trigger to handle replication failures. */
> + struct trigger on_fail;
> + trigger_create(&on_fail, applier_on_fail, applier, NULL);
> + trigger_add(&replicaset.applier.on_replication_fail, &on_fail);
Why do we need on_replication_fail trigger? AFAICS it is called from
on_rollback callback. Can't we call applier_on_fail right from there,
without the use of the intermediary?
> + auto trigger_guard = make_scoped_guard([&] {
> + trigger_clear(&on_fail);
> + });
> +
> +
> /*
> * Process a stream of rows from the binary log.
> */
> @@ -747,47 +841,13 @@ applier_subscribe(struct applier *applier)
> applier_set_state(applier, APPLIER_FOLLOW);
> }
>
> - /*
> - * Stay 'orphan' until appliers catch up with
> - * the remote vclock at the time of SUBSCRIBE
> - * and the lag is less than configured.
> - */
> - if (applier->state == APPLIER_SYNC &&
> - applier->lag <= replication_sync_lag &&
> - vclock_compare(&remote_vclock_at_subscribe,
> - &replicaset.vclock) <= 0) {
> - /* Applier is synced, switch to "follow". */
> - applier_set_state(applier, APPLIER_FOLLOW);
> - }
> -
> struct stailq rows;
> applier_read_tx(applier, &rows);
>
> - struct xrow_header *first_row =
> - &stailq_first_entry(&rows, struct applier_tx_row,
> - next)->row;
> applier->last_row_time = ev_monotonic_now(loop());
> - struct replica *replica = replica_by_id(first_row->replica_id);
> - struct latch *latch = (replica ? &replica->order_latch :
> - &replicaset.applier.order_latch);
> - /*
> - * In a full mesh topology, the same set of changes
> - * may arrive via two concurrently running appliers.
> - * Hence we need a latch to strictly order all changes
> - * that belong to the same server id.
> - */
Why did you remove this comment? (You didn't move it, I checked).
> - latch_lock(latch);
> - if (vclock_get(&replicaset.vclock, first_row->replica_id) <
> - first_row->lsn &&
> - applier_apply_tx(&rows) != 0) {
> - latch_unlock(latch);
> + if (applier_apply_tx(&rows) != 0)
> diag_raise();
> - }
> - latch_unlock(latch);
>
> - if (applier->state == APPLIER_SYNC ||
> - applier->state == APPLIER_FOLLOW)
> - fiber_cond_signal(&applier->writer_cond);
> if (ibuf_used(ibuf) == 0)
> ibuf_reset(ibuf);
> fiber_gc();
> @@ -872,6 +932,11 @@ applier_f(va_list ap)
> return -1;
> }
> } catch (FiberIsCancelled *e) {
> + if (!diag_is_empty(&applier->diag)) {
> + diag_move(&applier->diag, &fiber()->diag);
Hmm, AFAIK we only need this diag for box.info.replication. May be,
better patch box.info.replication to use applier->diag instead of
applier->reader->diag so that we don't need to move errors around?
> + applier_disconnect(applier, APPLIER_STOPPED);
> + break;
> + }
> applier_disconnect(applier, APPLIER_OFF);
> break;
> } catch (SocketError *e) {
> @@ -959,7 +1024,7 @@ applier_new(const char *uri)
> applier->last_row_time = ev_monotonic_now(loop());
> rlist_create(&applier->on_state);
> fiber_cond_create(&applier->resume_cond);
> - fiber_cond_create(&applier->writer_cond);
> + diag_create(&applier->diag);
>
> return applier;
> }
> @@ -972,7 +1037,6 @@ applier_delete(struct applier *applier)
> assert(applier->io.fd == -1);
> trigger_destroy(&applier->on_state);
> fiber_cond_destroy(&applier->resume_cond);
> - fiber_cond_destroy(&applier->writer_cond);
> free(applier);
> }
>
> diff --git a/src/box/applier.h b/src/box/applier.h
> index 5bff90031..716da32e2 100644
> --- a/src/box/applier.h
> +++ b/src/box/applier.h
> @@ -74,8 +74,6 @@ struct applier {
> struct fiber *reader;
> /** Background fiber to reply with vclock */
> struct fiber *writer;
> - /** Writer cond. */
> - struct fiber_cond writer_cond;
> /** Finite-state machine */
> enum applier_state state;
> /** Local time of this replica when the last row has been received */
> @@ -114,8 +112,15 @@ struct applier {
> bool is_paused;
> /** Condition variable signaled to resume the applier. */
> struct fiber_cond resume_cond;
> + /* Diag to raise an error. */
> + struct diag diag;
> + /* The masters vclock while subscribe. */
> + struct vclock remote_vclock_at_subscribe;
> };
>
> +void
> +applier_init();
> +
This function isn't defined anywhere.
> diff --git a/src/box/replication.h b/src/box/replication.h
> index 8c8a9927e..a4830f5b5 100644
> --- a/src/box/replication.h
> +++ b/src/box/replication.h
> @@ -232,6 +232,20 @@ struct replicaset {
> * struct replica object).
> */
> struct latch order_latch;
> + /*
> + * A vclock of the last transaction wich was read
> + * from an applier connection.
> + */
> + struct vclock net_vclock;
Please elaborate. Can it be less than replicaset.vclock? Can it be
greater? Why?
> + /* Signaled on replicated transaction commit. */
> + struct fiber_cond commit_cond;
> + /*
> + * Trigger to fire when replication stops in case
> + * of an error.
> + */
> + struct rlist on_replication_fail;
> + /* Diag to populate an error acros all appliers. */
> + struct diag diag;
> } applier;
> /** Map of all known replica_id's to correspponding replica's. */
> struct replica **replica_by_id;
^ permalink raw reply [flat|nested] 37+ messages in thread
* Re: [tarantool-patches] Re: [PATCH v4 8/9] applier: apply transaction in parallel
2019-06-20 16:37 ` Vladimir Davydov
@ 2019-06-20 20:33 ` Георгий Кириченко
2019-06-21 8:36 ` Vladimir Davydov
0 siblings, 1 reply; 37+ messages in thread
From: Георгий Кириченко @ 2019-06-20 20:33 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: tarantool-patches
[-- Attachment #1: Type: text/plain, Size: 15550 bytes --]
On Thursday, June 20, 2019 7:37:09 PM MSK Vladimir Davydov wrote:
> On Thu, Jun 20, 2019 at 10:41:10AM +0300, Георгий Кириченко wrote:
> > I'm sorry, there is proper version of the commit:
> >
> > Applier use asynchronous transaction to batch journal writes. All
> > appliers share the replicaset.applier.tx_vclock which means the vclock
> > applied but not necessarily written to a journal. Appliers use a trigger
> > to coordinate in case of failure - when a transaction is going to
> > be rolled back. Also an applier writer condition is shared across all
> > appliers and signaled in case of commit or hearth beat message.
> >
> > Closes: #1254
> > ---
> >
> > src/box/applier.cc | 156 +++++++++++++++++++++++++++++------------
> > src/box/applier.h | 9 ++-
> > src/box/replication.cc | 7 ++
> > src/box/replication.h | 14 ++++
> > 4 files changed, 138 insertions(+), 48 deletions(-)
> >
> > diff --git a/src/box/applier.cc b/src/box/applier.cc
> > index 5a92f6109..fee49d8ca 100644
> > --- a/src/box/applier.cc
> > +++ b/src/box/applier.cc
> > @@ -50,6 +50,7 @@
> >
> > #include "schema.h"
> > #include "txn.h"
> > #include "box.h"
> >
> > +#include "scoped_guard.h"
> >
> > STRS(applier_state, applier_STATE);
> >
> > @@ -130,11 +131,24 @@ applier_writer_f(va_list ap)
> >
> > * replication_timeout seconds any more.
> > */
> >
> > if (applier->version_id >= version_id(1, 7, 7))
> >
> > - fiber_cond_wait_timeout(&applier->writer_cond,
> > + fiber_cond_wait_timeout(&replicaset.applier.commit_cond,
> >
> > TIMEOUT_INFINITY);
> >
> > else
> >
> > - fiber_cond_wait_timeout(&applier->writer_cond,
> > + fiber_cond_wait_timeout(&replicaset.applier.commit_cond,
> >
> > replication_timeout);
>
> Why replace applier->writer_cond with replicaset.applier.commit_cond?
> This means that even if only one applier is active, we will wake up all
> of the writers on each commit, which looks strange.
I did it because an applier doesn't have any control of how transaction is
finished except an on_commit/on_rollback trigger. However if an applier sends
nothing to commit (for instance it could be behind others) it still should
send ACK. Also I think we should update this state for any transaction
processed (even for local ones).
>
> > + /*
> > + * Stay 'orphan' until appliers catch up with
> > + * the remote vclock at the time of SUBSCRIBE
> > + * and the lag is less than configured.
> > + */
> > + if (applier->state == APPLIER_SYNC &&
> > + applier->lag <= replication_sync_lag &&
> > + vclock_compare(&applier->remote_vclock_at_subscribe,
> > + &replicaset.vclock) <= 0) {
> > + /* Applier is synced, switch to "follow". */
> > + applier_set_state(applier, APPLIER_FOLLOW);
> > + }
> > +
>
> A writer is supposed to send ACKs, not change the applier state.
> How did this wind up here? Can't we do this right from the on_commit
> trigger?
The same case above - if applier didn't send anything to commit (it is behind
other applier) where is the better point to update its state.
>
> > /* Send ACKs only when in FOLLOW mode ,*/
> > if (applier->state != APPLIER_SYNC &&
> >
> > applier->state != APPLIER_FOLLOW)
> >
> > @@ -565,6 +579,36 @@ applier_read_tx(struct applier *applier, struct
> > stailq
> > *rows)
> >
> > next)->row.is_commit);
> >
> > }
> >
> > +static void
> > +sequencer_rollback_cb(struct trigger *trigger, void *event)
>
> There's no sequencer object so the names are confusing. Let's call them
> applier_on_rollback/commit?
Accepted.
>
> > +{
> > + (void) trigger;
> > + (void) event;
> > + diag_set(ClientError, ER_WAL_IO);
> > + diag_move(&fiber()->diag, &replicaset.applier.diag);
> > + trigger_run(&replicaset.applier.on_replication_fail, NULL);
> > + vclock_copy(&replicaset.applier.net_vclock, &replicaset.vclock);
> > +}
> > +
> > +static void
> > +sequencer_commit_cb(struct trigger *trigger, void *event)
> > +{
> > + (void) trigger;
> > + (void) event;
> > + fiber_cond_broadcast(&replicaset.applier.commit_cond);
> > +}
> > +
> > +static void
> > +applier_on_fail(struct trigger *trigger, void *event)
> > +{
> > + (void) event;
> > + struct applier *applier = (struct applier *)trigger->data;
> > + if (!diag_is_empty(&replicaset.applier.diag))
> > + diag_add_error(&applier->diag,
> > diag_last_error(&replicaset.applier.diag));
> > + fiber_cancel(applier->reader);
> > +
> > +}
> > +
> >
> > /**
> >
> > * Apply all rows in the rows queue as a single transaction.
> > *
> >
> > @@ -573,6 +617,22 @@ applier_read_tx(struct applier *applier, struct
> > stailq
> > *rows)
> >
> > static int
> > applier_apply_tx(struct stailq *rows)
> > {
> >
> > + struct xrow_header *first_row =
> > + &stailq_first_entry(rows, struct applier_tx_row,
> > + next)->row;
> > + struct replica *replica = replica_by_id(first_row->replica_id);
> > + struct latch *latch = (replica ? &replica->order_latch :
> > + &replicaset.applier.order_latch);
> > + latch_lock(latch);
> > + if (vclock_get(&replicaset.applier.net_vclock, first_row->replica_id)
>=
> > + first_row->lsn) {
> > + /* Check there is a heathbeat message and wake a writers up. */
> > + if (first_row->lsn == 0)
> > + fiber_cond_broadcast(&replicaset.applier.commit_cond);
>
> Would be better to check that before taking the latch. We don't need the
> latch to reply to a heartbeat message, do we?
Accepted.
>
> > + latch_unlock(latch);
> > + return 0;
> > + }
> > +
> >
> > /**
> >
> > * Explicitly begin the transaction so that we can
> > * control fiber->gc life cycle and, in case of apply
> >
> > @@ -581,8 +641,10 @@ applier_apply_tx(struct stailq *rows)
> >
> > */
> >
> > struct txn *txn = txn_begin();
> > struct applier_tx_row *item;
> >
> > - if (txn == NULL)
> > - diag_raise();
> > + if (txn == NULL) {
> > + latch_unlock(latch);
> > + return -1;
> > + }
> >
> > stailq_foreach_entry(item, rows, next) {
> >
> > struct xrow_header *row = &item->row;
> > int res = apply_row(row);
> >
> > @@ -623,10 +685,34 @@ applier_apply_tx(struct stailq *rows)
> >
> > "Replication", "distributed transactions");
> >
> > goto rollback;
> >
> > }
> >
> > - return txn_commit(txn);
> >
> > + /* We are ready to submit txn to wal. */
> > + struct trigger *on_rollback, *on_commit;
> > + on_rollback = (struct trigger *)region_alloc(&txn->region,
> > + sizeof(struct trigger));
> > + on_commit = (struct trigger *)region_alloc(&txn->region,
> > + sizeof(struct trigger));
> > + if (on_rollback == NULL || on_commit == NULL)
> > + goto rollback;
> > +
> > + trigger_create(on_rollback, sequencer_rollback_cb, NULL, NULL);
> > + txn_on_rollback(txn, on_rollback);
> > +
> > + trigger_create(on_commit, sequencer_commit_cb, NULL, NULL);
> > + txn_on_commit(txn, on_commit);
> > +
> > + if (txn_write(txn) < 0)
> > + goto fail;
> > + /* Transaction was sent to journal so promote vclock. */
> > + vclock_follow(&replicaset.applier.net_vclock, first_row->replica_id,
> > + first_row->lsn);
> > + latch_unlock(latch);
> > +
> > + return 0;
> >
> > rollback:
> > txn_rollback(txn);
> >
> > +fail:
> > + latch_unlock(latch);
> >
> > fiber_gc();
> > return -1;
> >
> > }
> >
> > @@ -641,7 +727,6 @@ applier_subscribe(struct applier *applier)
> >
> > struct ev_io *coio = &applier->io;
> > struct ibuf *ibuf = &applier->ibuf;
> > struct xrow_header row;
> >
> > - struct vclock remote_vclock_at_subscribe;
> >
> > struct tt_uuid cluster_id = uuid_nil;
> >
> > struct vclock vclock;
> >
> > @@ -668,10 +753,10 @@ applier_subscribe(struct applier *applier)
> >
> > * the replica, and replica has to check whether
> > * its and master's cluster ids match.
> > */
> >
> > - vclock_create(&remote_vclock_at_subscribe);
> > + vclock_create(&applier->remote_vclock_at_subscribe);
> >
> > xrow_decode_subscribe_response_xc(&row,
> >
> > &cluster_id,
> >
> > - &remote_vclock_at_subscribe);
> > + &applier-
> >
> > >remote_vclock_at_subscribe);
> > >
> > /*
> >
> > * If master didn't send us its cluster id
> > * assume that it has done all the checks.
> >
> > @@ -686,7 +771,7 @@ applier_subscribe(struct applier *applier)
> >
> > say_info("subscribed");
> > say_info("remote vclock %s local vclock %s",
> >
> > - vclock_to_string(&remote_vclock_at_subscribe),
> > + vclock_to_string(&applier->remote_vclock_at_subscribe),
> >
> > vclock_to_string(&vclock));
> >
> > }
> > /*
> >
> > @@ -735,6 +820,15 @@ applier_subscribe(struct applier *applier)
> >
> > applier->lag = TIMEOUT_INFINITY;
> >
> > + /* Register a trigger to handle replication failures. */
> > + struct trigger on_fail;
> > + trigger_create(&on_fail, applier_on_fail, applier, NULL);
> > + trigger_add(&replicaset.applier.on_replication_fail, &on_fail);
>
> Why do we need on_replication_fail trigger? AFAICS it is called from
> on_rollback callback. Can't we call applier_on_fail right from there,
> without the use of the intermediary?
Because we should cancel all appliers if anything failed (for instance an
applier could skip a transaction and start with the next one and then should
be cancelled if other applier failed to). We could track the applier list but
I'm not sure it would be better.
>
> > + auto trigger_guard = make_scoped_guard([&] {
> > + trigger_clear(&on_fail);
> > + });
> > +
> > +
> >
> > /*
> >
> > * Process a stream of rows from the binary log.
> > */
> >
> > @@ -747,47 +841,13 @@ applier_subscribe(struct applier *applier)
> >
> > applier_set_state(applier, APPLIER_FOLLOW);
> >
> > }
> >
> > - /*
> > - * Stay 'orphan' until appliers catch up with
> > - * the remote vclock at the time of SUBSCRIBE
> > - * and the lag is less than configured.
> > - */
> > - if (applier->state == APPLIER_SYNC &&
> > - applier->lag <= replication_sync_lag &&
> > - vclock_compare(&remote_vclock_at_subscribe,
> > - &replicaset.vclock) <= 0) {
> > - /* Applier is synced, switch to "follow". */
> > - applier_set_state(applier, APPLIER_FOLLOW);
> > - }
> > -
> >
> > struct stailq rows;
> > applier_read_tx(applier, &rows);
> >
> > - struct xrow_header *first_row =
> > - &stailq_first_entry(&rows, struct applier_tx_row,
> > - next)->row;
> >
> > applier->last_row_time = ev_monotonic_now(loop());
> >
> > - struct replica *replica = replica_by_id(first_row->replica_id);
> > - struct latch *latch = (replica ? &replica->order_latch :
> > - &replicaset.applier.order_latch);
> > - /*
> > - * In a full mesh topology, the same set of changes
> > - * may arrive via two concurrently running appliers.
> > - * Hence we need a latch to strictly order all changes
> > - * that belong to the same server id.
> > - */
>
> Why did you remove this comment? (You didn't move it, I checked).
Accepted
>
> > - latch_lock(latch);
> > - if (vclock_get(&replicaset.vclock, first_row->replica_id) <
> > - first_row->lsn &&
> > - applier_apply_tx(&rows) != 0) {
> > - latch_unlock(latch);
> > + if (applier_apply_tx(&rows) != 0)
> >
> > diag_raise();
> >
> > - }
> > - latch_unlock(latch);
> >
> > - if (applier->state == APPLIER_SYNC ||
> > - applier->state == APPLIER_FOLLOW)
> > - fiber_cond_signal(&applier->writer_cond);
> >
> > if (ibuf_used(ibuf) == 0)
> >
> > ibuf_reset(ibuf);
> >
> > fiber_gc();
> >
> > @@ -872,6 +932,11 @@ applier_f(va_list ap)
> >
> > return -1;
> >
> > }
> >
> > } catch (FiberIsCancelled *e) {
> >
> > + if (!diag_is_empty(&applier->diag)) {
> > + diag_move(&applier->diag, &fiber()->diag);
>
> Hmm, AFAIK we only need this diag for box.info.replication. May be,
> better patch box.info.replication to use applier->diag instead of
> applier->reader->diag so that we don't need to move errors around?
If one applier failed then any other is going to be cancelled with the same
error. As applier_subscribe uses an exception I should populate the diag to
raise it.
>
> > + applier_disconnect(applier, APPLIER_STOPPED);
> > + break;
> > + }
> >
> > applier_disconnect(applier, APPLIER_OFF);
> > break;
> >
> > } catch (SocketError *e) {
> >
> > @@ -959,7 +1024,7 @@ applier_new(const char *uri)
> >
> > applier->last_row_time = ev_monotonic_now(loop());
> > rlist_create(&applier->on_state);
> > fiber_cond_create(&applier->resume_cond);
> >
> > - fiber_cond_create(&applier->writer_cond);
> > + diag_create(&applier->diag);
> >
> > return applier;
> >
> > }
> >
> > @@ -972,7 +1037,6 @@ applier_delete(struct applier *applier)
> >
> > assert(applier->io.fd == -1);
> > trigger_destroy(&applier->on_state);
> > fiber_cond_destroy(&applier->resume_cond);
> >
> > - fiber_cond_destroy(&applier->writer_cond);
> >
> > free(applier);
> >
> > }
> >
> > diff --git a/src/box/applier.h b/src/box/applier.h
> > index 5bff90031..716da32e2 100644
> > --- a/src/box/applier.h
> > +++ b/src/box/applier.h
> > @@ -74,8 +74,6 @@ struct applier {
> >
> > struct fiber *reader;
> > /** Background fiber to reply with vclock */
> > struct fiber *writer;
> >
> > - /** Writer cond. */
> > - struct fiber_cond writer_cond;
> >
> > /** Finite-state machine */
> > enum applier_state state;
> > /** Local time of this replica when the last row has been received */
> >
> > @@ -114,8 +112,15 @@ struct applier {
> >
> > bool is_paused;
> > /** Condition variable signaled to resume the applier. */
> > struct fiber_cond resume_cond;
> >
> > + /* Diag to raise an error. */
> > + struct diag diag;
> > + /* The masters vclock while subscribe. */
> > + struct vclock remote_vclock_at_subscribe;
> >
> > };
> >
> > +void
> > +applier_init();
> > +
>
> This function isn't defined anywhere.
>
> > diff --git a/src/box/replication.h b/src/box/replication.h
> > index 8c8a9927e..a4830f5b5 100644
> > --- a/src/box/replication.h
> > +++ b/src/box/replication.h
> > @@ -232,6 +232,20 @@ struct replicaset {
> >
> > * struct replica object).
> > */
> >
> > struct latch order_latch;
> >
> > + /*
> > + * A vclock of the last transaction wich was read
> > + * from an applier connection.
> > + */
> > + struct vclock net_vclock;
>
> Please elaborate. Can it be less than replicaset.vclock? Can it be
> greater? Why?
Let discuss it f2f.
>
> > + /* Signaled on replicated transaction commit. */
> > + struct fiber_cond commit_cond;
> > + /*
> > + * Trigger to fire when replication stops in case
> > + * of an error.
> > + */
> > + struct rlist on_replication_fail;
> > + /* Diag to populate an error acros all appliers. */
> > + struct diag diag;
> >
> > } applier;
> > /** Map of all known replica_id's to correspponding replica's. */
> > struct replica **replica_by_id;
[-- Attachment #2: This is a digitally signed message part. --]
[-- Type: application/pgp-signature, Size: 488 bytes --]
^ permalink raw reply [flat|nested] 37+ messages in thread
* Re: [tarantool-patches] Re: [PATCH v4 8/9] applier: apply transaction in parallel
2019-06-20 20:33 ` Георгий Кириченко
@ 2019-06-21 8:36 ` Vladimir Davydov
0 siblings, 0 replies; 37+ messages in thread
From: Vladimir Davydov @ 2019-06-21 8:36 UTC (permalink / raw)
To: Георгий
Кириченко
Cc: tarantool-patches
On Thu, Jun 20, 2019 at 11:33:50PM +0300, Георгий Кириченко wrote:
> On Thursday, June 20, 2019 7:37:09 PM MSK Vladimir Davydov wrote:
> > On Thu, Jun 20, 2019 at 10:41:10AM +0300, Георгий Кириченко wrote:
> > > I'm sorry, there is proper version of the commit:
> > >
> > > Applier use asynchronous transaction to batch journal writes. All
> > > appliers share the replicaset.applier.tx_vclock which means the vclock
> > > applied but not necessarily written to a journal. Appliers use a trigger
> > > to coordinate in case of failure - when a transaction is going to
> > > be rolled back. Also an applier writer condition is shared across all
> > > appliers and signaled in case of commit or hearth beat message.
> > >
> > > Closes: #1254
> > > ---
> > >
> > > src/box/applier.cc | 156 +++++++++++++++++++++++++++++------------
> > > src/box/applier.h | 9 ++-
> > > src/box/replication.cc | 7 ++
> > > src/box/replication.h | 14 ++++
> > > 4 files changed, 138 insertions(+), 48 deletions(-)
> > >
> > > diff --git a/src/box/applier.cc b/src/box/applier.cc
> > > index 5a92f6109..fee49d8ca 100644
> > > --- a/src/box/applier.cc
> > > +++ b/src/box/applier.cc
> > > @@ -50,6 +50,7 @@
> > >
> > > #include "schema.h"
> > > #include "txn.h"
> > > #include "box.h"
> > >
> > > +#include "scoped_guard.h"
> > >
> > > STRS(applier_state, applier_STATE);
> > >
> > > @@ -130,11 +131,24 @@ applier_writer_f(va_list ap)
> > >
> > > * replication_timeout seconds any more.
> > > */
> > >
> > > if (applier->version_id >= version_id(1, 7, 7))
> > >
> > > - fiber_cond_wait_timeout(&applier->writer_cond,
> > > + fiber_cond_wait_timeout(&replicaset.applier.commit_cond,
> > >
> > > TIMEOUT_INFINITY);
> > >
> > > else
> > >
> > > - fiber_cond_wait_timeout(&applier->writer_cond,
> > > + fiber_cond_wait_timeout(&replicaset.applier.commit_cond,
> > >
> > > replication_timeout);
> >
> > Why replace applier->writer_cond with replicaset.applier.commit_cond?
> > This means that even if only one applier is active, we will wake up all
> > of the writers on each commit, which looks strange.
> I did it because an applier doesn't have any control of how transaction is
> finished except an on_commit/on_rollback trigger.
Okay, we can wake up the appropriate applier from the trigger, can we?
> However if an applier sends nothing to commit (for instance it could
> be behind others) it still should send ACK.
In which case we can wake up the applier from applier_apply_tx.
> Also I think we should update this state for any transaction processed
> (even for local ones).
This I don't understand.
> >
> > > + /*
> > > + * Stay 'orphan' until appliers catch up with
> > > + * the remote vclock at the time of SUBSCRIBE
> > > + * and the lag is less than configured.
> > > + */
> > > + if (applier->state == APPLIER_SYNC &&
> > > + applier->lag <= replication_sync_lag &&
> > > + vclock_compare(&applier->remote_vclock_at_subscribe,
> > > + &replicaset.vclock) <= 0) {
> > > + /* Applier is synced, switch to "follow". */
> > > + applier_set_state(applier, APPLIER_FOLLOW);
> > > + }
> > > +
> >
> > A writer is supposed to send ACKs, not change the applier state.
> > How did this wind up here? Can't we do this right from the on_commit
> > trigger?
> The same case above - if applier didn't send anything to commit (it is behind
> other applier) where is the better point to update its state.
In applier_apply_tx or applier_subscribe?
Just that it looks really weird that the writer fiber, the sole purpose
of which is to reply with ACKs, can also update the applier state.
> > > @@ -735,6 +820,15 @@ applier_subscribe(struct applier *applier)
> > >
> > > applier->lag = TIMEOUT_INFINITY;
> > >
> > > + /* Register a trigger to handle replication failures. */
> > > + struct trigger on_fail;
> > > + trigger_create(&on_fail, applier_on_fail, applier, NULL);
> > > + trigger_add(&replicaset.applier.on_replication_fail, &on_fail);
> >
> > Why do we need on_replication_fail trigger? AFAICS it is called from
> > on_rollback callback. Can't we call applier_on_fail right from there,
> > without the use of the intermediary?
> Because we should cancel all appliers if anything failed (for instance an
> applier could skip a transaction and start with the next one and then should
> be cancelled if other applier failed to). We could track the applier list but
> I'm not sure it would be better.
We didn't cancel all appliers before and it worked just fine so I fail
to understand why we need to do it now. Could you please give an example
when something breaks because of that?
> > > diff --git a/src/box/replication.h b/src/box/replication.h
> > > index 8c8a9927e..a4830f5b5 100644
> > > --- a/src/box/replication.h
> > > +++ b/src/box/replication.h
> > > @@ -232,6 +232,20 @@ struct replicaset {
> > >
> > > * struct replica object).
> > > */
> > >
> > > struct latch order_latch;
> > >
> > > + /*
> > > + * A vclock of the last transaction wich was read
> > > + * from an applier connection.
> > > + */
> > > + struct vclock net_vclock;
> >
> > Please elaborate. Can it be less than replicaset.vclock? Can it be
> > greater? Why?
> Let discuss it f2f.
I just want the comment improved :)
^ permalink raw reply [flat|nested] 37+ messages in thread
* [tarantool-patches] Re: [PATCH v4 8/9] applier: apply transaction in parallel
2019-06-19 21:23 ` [tarantool-patches] [PATCH v4 8/9] applier: apply transaction in parallel Georgy Kirichenko
2019-06-20 7:41 ` [tarantool-patches] " Георгий Кириченко
@ 2019-06-20 8:06 ` Konstantin Osipov
1 sibling, 0 replies; 37+ messages in thread
From: Konstantin Osipov @ 2019-06-20 8:06 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
* Georgy Kirichenko <georgy@tarantool.org> [19/06/20 09:54]:
> applier_apply_tx(struct stailq *rows)
> {
> + struct xrow_header *first_row =
> + &stailq_first_entry(rows, struct applier_tx_row,
> + next)->row;
> + struct replica *replica = replica_by_id(first_row->replica_id);
> + struct latch *latch = (replica ? &replica->order_latch :
> + &replicaset.applier.order_latch);
> + latch_lock(latch);
Now that we have a sequencer shouldn't it handle the job of this
latch as well?
> + if (vclock_get(&replicaset.applier.net_vclock, first_row->replica_id) >=
> + first_row->lsn) {
> + /* Check there is a heathbeat message and wake a writers up. */
> + if (first_row->lsn == 0)
> + fiber_cond_broadcast(&replicaset.applier.commit_cond);
> + latch_unlock(latch);
> + return 0;
> + }
> +
> + /*
> + * A vclock of the last transaction wich was read
> + * from an applier connection.
> + */
> + struct vclock net_vclock;
> + /* Signaled on replicated transaction commit. */
I don't understand this comment, what is a replicated transaction
commit? Could you please rephrase? Why do you need this cond?
Please add more comments explaining how and why.
> + struct fiber_cond commit_cond;
> + /*
> + * Trigger to fire when replication stops in case
> + * of an error.
> + */
> + struct rlist on_replication_fail;
I would simply call it on_error on_failure. It's already clear it's related to the
applier work. Please explain how and why
this trigger is used in the comment.
> + /* Diag to populate an error acros all appliers. */
> + struct diag diag;
> } applier;
> /** Map of all known replica_id's to correspponding replica's. */
> struct replica **replica_by_id;
--
Konstantin Osipov, Moscow, Russia
^ permalink raw reply [flat|nested] 37+ messages in thread
* [tarantool-patches] [PATCH v4 9/9] test: fix flaky test
2019-06-19 21:23 [tarantool-patches] [PATCH v4 0/9] Parallel applier Georgy Kirichenko
` (7 preceding siblings ...)
2019-06-19 21:23 ` [tarantool-patches] [PATCH v4 8/9] applier: apply transaction in parallel Georgy Kirichenko
@ 2019-06-19 21:23 ` Georgy Kirichenko
8 siblings, 0 replies; 37+ messages in thread
From: Georgy Kirichenko @ 2019-06-19 21:23 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
This test fails sporadically
---
test/replication/sync.result | 7 +++++--
test/replication/sync.test.lua | 4 ++--
test/replication/transaction.result | 16 +++++++++++++++-
test/replication/transaction.test.lua | 7 ++++++-
4 files changed, 28 insertions(+), 6 deletions(-)
diff --git a/test/replication/sync.result b/test/replication/sync.result
index eddc7cbc8..6b5a14d3f 100644
--- a/test/replication/sync.result
+++ b/test/replication/sync.result
@@ -46,7 +46,7 @@ function fill()
box.space.test:replace{i}
end
fiber.create(function()
- box.error.injection.set('ERRINJ_RELAY_TIMEOUT', 0.1)
+ box.error.injection.set('ERRINJ_RELAY_TIMEOUT', 0.0025)
test_run:wait_cond(function()
local r = box.info.replication[2]
return r ~= nil and r.downstream ~= nil and
@@ -55,7 +55,6 @@ function fill()
for i = count + 101, count + 200 do
box.space.test:replace{i}
end
- box.error.injection.set('ERRINJ_RELAY_TIMEOUT', 0)
end)
count = count + 200
end;
@@ -250,6 +249,10 @@ test_run:cmd("switch default")
---
- true
...
+box.error.injection.set('ERRINJ_RELAY_TIMEOUT', 0)
+---
+- ok
+...
box.error.injection.set('ERRINJ_WAL_DELAY', true)
---
- ok
diff --git a/test/replication/sync.test.lua b/test/replication/sync.test.lua
index 52ce88fe2..f0f530ad4 100644
--- a/test/replication/sync.test.lua
+++ b/test/replication/sync.test.lua
@@ -30,7 +30,7 @@ function fill()
box.space.test:replace{i}
end
fiber.create(function()
- box.error.injection.set('ERRINJ_RELAY_TIMEOUT', 0.1)
+ box.error.injection.set('ERRINJ_RELAY_TIMEOUT', 0.0025)
test_run:wait_cond(function()
local r = box.info.replication[2]
return r ~= nil and r.downstream ~= nil and
@@ -39,7 +39,6 @@ function fill()
for i = count + 101, count + 200 do
box.space.test:replace{i}
end
- box.error.injection.set('ERRINJ_RELAY_TIMEOUT', 0)
end)
count = count + 200
end;
@@ -136,6 +135,7 @@ box.cfg{replication_sync_lag = 1}
box.cfg{replication_sync_timeout = 10}
test_run:cmd("switch default")
+box.error.injection.set('ERRINJ_RELAY_TIMEOUT', 0)
box.error.injection.set('ERRINJ_WAL_DELAY', true)
test_run:cmd("setopt delimiter ';'")
_ = fiber.create(function()
diff --git a/test/replication/transaction.result b/test/replication/transaction.result
index 8c2ac6ee4..c54c1e8d5 100644
--- a/test/replication/transaction.result
+++ b/test/replication/transaction.result
@@ -7,12 +7,21 @@ test_run = env.new()
box.schema.user.grant('guest', 'replication')
---
...
-s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')})
+engine = test_run:get_cfg('engine')
+---
+...
+s = box.schema.space.create('test', {engine = engine})
---
...
_ = s:create_index('pk')
---
...
+l = box.schema.space.create('l_space', {engine = engine, is_local = true})
+---
+...
+_ = l:create_index('pk')
+---
+...
-- transaction w/o conflict
box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit()
---
@@ -92,6 +101,11 @@ box.cfg{replication = replication}
---
...
-- replication stopped of third transaction
+-- flush wal
+box.space.l_space:replace({1})
+---
+- [1]
+...
v1[1] + 2 == box.info.vclock[1]
---
- true
diff --git a/test/replication/transaction.test.lua b/test/replication/transaction.test.lua
index f25a4737d..4e0323128 100644
--- a/test/replication/transaction.test.lua
+++ b/test/replication/transaction.test.lua
@@ -1,9 +1,12 @@
env = require('test_run')
test_run = env.new()
box.schema.user.grant('guest', 'replication')
+engine = test_run:get_cfg('engine')
-s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')})
+s = box.schema.space.create('test', {engine = engine})
_ = s:create_index('pk')
+l = box.schema.space.create('l_space', {engine = engine, is_local = true})
+_ = l:create_index('pk')
-- transaction w/o conflict
box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit()
@@ -37,6 +40,8 @@ replication = box.cfg.replication
box.cfg{replication = {}}
box.cfg{replication = replication}
-- replication stopped of third transaction
+-- flush wal
+box.space.l_space:replace({1})
v1[1] + 2 == box.info.vclock[1]
box.space.test:select()
-- check replication status
--
2.22.0
^ permalink raw reply [flat|nested] 37+ messages in thread