* [tarantool-patches] [PATCH v2 0/2] Transaction boundaries in replication protocol
@ 2019-01-06 13:05 Georgy Kirichenko
2019-01-06 13:05 ` [tarantool-patches] [PATCH v2 2/2] Transaction support for applier Georgy Kirichenko
2019-01-11 13:30 ` [tarantool-patches] Re: [PATCH v2 0/2] Transaction boundaries in replication protocol Georgy Kirichenko
0 siblings, 2 replies; 5+ messages in thread
From: Georgy Kirichenko @ 2019-01-06 13:05 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
This patchset introduces transactional replication and consist of two
commits:
* the first one forms transaction boundaries in a xstream
* the second one forms transactions in applier buffers and then
applies them with correct begin/commit boundaries.
Note: distributed transaction are not supported so journal forms a
separate transaction for all local triggers effects.
Changes in v2:
- Fixed local transaction extraction
Georgy Kirichenko (2):
Journal transaction boundaries
Transaction support for applier
src/box/applier.cc | 202 ++++++++++++++++++++++++++-----------
src/box/iproto_constants.h | 3 +
src/box/wal.c | 36 ++++++-
src/box/xrow.c | 38 +++++++
src/box/xrow.h | 5 +-
test/unit/xrow.cc | 3 +
test/vinyl/errinj.result | 8 +-
test/vinyl/info.result | 38 +++----
test/vinyl/layout.result | 24 ++---
9 files changed, 263 insertions(+), 94 deletions(-)
--
2.20.1
^ permalink raw reply [flat|nested] 5+ messages in thread
* [tarantool-patches] [PATCH v2 2/2] Transaction support for applier
2019-01-06 13:05 [tarantool-patches] [PATCH v2 0/2] Transaction boundaries in replication protocol Georgy Kirichenko
@ 2019-01-06 13:05 ` Georgy Kirichenko
2019-01-11 13:30 ` [tarantool-patches] Re: [PATCH v2 0/2] Transaction boundaries in replication protocol Georgy Kirichenko
1 sibling, 0 replies; 5+ messages in thread
From: Georgy Kirichenko @ 2019-01-06 13:05 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
Applier fetch incoming rows to form a transaction and then apply it.
Implementation assumes that transaction could not mix in a replication
stream. Also distributed transaction are not supported yet.
Closes: #2798
Needed for: #980
---
src/box/applier.cc | 202 ++++++++++++++++++++++++++++++++-------------
1 file changed, 145 insertions(+), 57 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 6c0eb45d5..7e208aaa2 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -48,6 +48,7 @@
#include "error.h"
#include "session.h"
#include "cfg.h"
+#include "txn.h"
STRS(applier_state, applier_STATE);
@@ -380,6 +381,102 @@ applier_join(struct applier *applier)
applier_set_state(applier, APPLIER_READY);
}
+/**
+ * Read one transaction from network.
+ * Transaction rows are placed into row_buf as an array, row's bodies are
+ * placed into obuf because it is not allowed to relocate row's bodies.
+ * Also we could not use applier input buffer because rpos adjusted after xrow
+ * decoding and corresponding space going to reuse.
+ *
+ * Note: current implementation grants that transaction could not be mixed, so
+ * we read each transaction from first xrow until xrow with txn_last = true.
+ */
+static int64_t
+applier_read_tx(struct applier *applier, struct ibuf *row_buf,
+ struct obuf *data_buf)
+{
+ struct xrow_header *row;
+ struct ev_io *coio = &applier->io;
+ struct ibuf *ibuf = &applier->ibuf;
+ int64_t txn_id = 0;
+ uint32_t txn_replica_id = 0;
+
+ do {
+ row = (struct xrow_header *)ibuf_alloc(row_buf,
+ sizeof(struct xrow_header));
+ if (row == NULL) {
+ diag_set(OutOfMemory, sizeof(struct xrow_header),
+ "slab", "struct xrow_header");
+ goto error;
+ }
+
+ double timeout = replication_disconnect_timeout();
+ try {
+ /* TODO: we should have a C version of this function. */
+ coio_read_xrow_timeout_xc(coio, ibuf, row, timeout);
+ } catch (...) {
+ goto error;
+ }
+
+ if (iproto_type_is_error(row->type)) {
+ xrow_decode_error(row);
+ goto error;
+ }
+
+ /* Replication request. */
+ if (row->replica_id == REPLICA_ID_NIL ||
+ row->replica_id >= VCLOCK_MAX) {
+ /*
+ * A safety net, this can only occur
+ * if we're fed a strangely broken xlog.
+ */
+ diag_set(ClientError, ER_UNKNOWN_REPLICA,
+ int2str(row->replica_id),
+ tt_uuid_str(&REPLICASET_UUID));
+ goto error;
+ }
+ if (ibuf_used(row_buf) == sizeof(struct xrow_header)) {
+ /*
+ * First row in a transaction. In order to enforce
+ * consistency check that first row lsn and replica id
+ * match with transaction.
+ */
+ txn_id = row->lsn;
+ txn_replica_id = row->replica_id;
+ }
+ if (txn_id != row->txn_id ||
+ txn_replica_id != row->txn_replica_id) {
+ /* We are not able to handle interleaving transactions. */
+ diag_set(ClientError, ER_UNSUPPORTED,
+ "replications",
+ "interleaving transactions");
+ goto error;
+ }
+
+
+ applier->lag = ev_now(loop()) - row->tm;
+ applier->last_row_time = ev_monotonic_now(loop());
+
+ if (row->body->iov_base != NULL) {
+ void *new_base = obuf_alloc(data_buf, row->body->iov_len);
+ if (new_base == NULL) {
+ diag_set(OutOfMemory, row->body->iov_len,
+ "slab", "xrow_data");
+ goto error;
+ }
+ memcpy(new_base, row->body->iov_base, row->body->iov_len);
+ row->body->iov_base = new_base;
+ }
+
+ } while (row->txn_last == 0);
+
+ return 0;
+error:
+ ibuf_reset(row_buf);
+ obuf_reset(data_buf);
+ return -1;
+}
+
/**
* Execute and process SUBSCRIBE request (follow updates from a master).
*/
@@ -396,6 +493,10 @@ applier_subscribe(struct applier *applier)
struct ibuf *ibuf = &applier->ibuf;
struct xrow_header row;
struct vclock remote_vclock_at_subscribe;
+ struct ibuf row_buf;
+ struct obuf data_buf;
+ ibuf_create(&row_buf, &cord()->slabc, 32 * sizeof(struct xrow_header));
+ obuf_create(&data_buf, &cord()->slabc, 0x10000);
xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID,
&replicaset.vclock);
@@ -475,80 +576,67 @@ applier_subscribe(struct applier *applier)
applier_set_state(applier, APPLIER_FOLLOW);
}
- /*
- * Tarantool < 1.7.7 does not send periodic heartbeat
- * messages so we can't assume that if we haven't heard
- * from the master for quite a while the connection is
- * broken - the master might just be idle.
- */
- if (applier->version_id < version_id(1, 7, 7)) {
- coio_read_xrow(coio, ibuf, &row);
- } else {
- double timeout = replication_disconnect_timeout();
- coio_read_xrow_timeout_xc(coio, ibuf, &row, timeout);
- }
+ if (applier_read_tx(applier, &row_buf, &data_buf) != 0)
+ diag_raise();
- if (iproto_type_is_error(row.type))
- xrow_decode_error_xc(&row); /* error */
- /* Replication request. */
- if (row.replica_id == REPLICA_ID_NIL ||
- row.replica_id >= VCLOCK_MAX) {
- /*
- * A safety net, this can only occur
- * if we're fed a strangely broken xlog.
- */
- tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
- int2str(row.replica_id),
- tt_uuid_str(&REPLICASET_UUID));
- }
+ struct txn *txn = NULL;
+ struct xrow_header *first_row = (struct xrow_header *)row_buf.rpos;
+ struct xrow_header *last_row = (struct xrow_header *)row_buf.wpos - 1;
- applier->lag = ev_now(loop()) - row.tm;
+ applier->lag = ev_now(loop()) - last_row->tm;
applier->last_row_time = ev_monotonic_now(loop());
- struct replica *replica = replica_by_id(row.replica_id);
+ struct replica *replica = replica_by_id(first_row->txn_replica_id);
struct latch *latch = (replica ? &replica->order_latch :
&replicaset.applier.order_latch);
- /*
- * In a full mesh topology, the same set
- * of changes may arrive via two
- * concurrently running appliers. Thanks
- * to vclock_follow() above, the first row
- * in the set will be skipped - but the
- * remaining may execute out of order,
- * when the following xstream_write()
- * yields on WAL. Hence we need a latch to
- * strictly order all changes which belong
- * to the same server id.
- */
latch_lock(latch);
+ /* First row identifies a transaction. */
+ assert(first_row->lsn == first_row->txn_id);
+ assert(first_row->replica_id == first_row->txn_replica_id);
if (vclock_get(&replicaset.applier.vclock,
- row.replica_id) < row.lsn) {
+ first_row->replica_id) < first_row->lsn) {
/* Preserve old lsn value. */
int64_t old_lsn = vclock_get(&replicaset.applier.vclock,
- row.replica_id);
- vclock_follow_xrow(&replicaset.applier.vclock, &row);
- int res = xstream_write(applier->subscribe_stream, &row);
- struct error *e = diag_last_error(diag_get());
- if (res != 0 && e->type == &type_ClientError &&
- box_error_code(e) == ER_TUPLE_FOUND &&
- replication_skip_conflict) {
- /**
- * Silently skip ER_TUPLE_FOUND error if such
- * option is set in config.
- */
- diag_clear(diag_get());
- row.type = IPROTO_NOP;
- row.bodycnt = 0;
- res = xstream_write(applier->subscribe_stream,
- &row);
+ first_row->replica_id);
+
+ struct xrow_header *row = first_row;
+ if (first_row != last_row)
+ txn = txn_begin(false);
+ int res = 0;
+ while (row <= last_row && res == 0) {
+ vclock_follow_xrow(&replicaset.applier.vclock, row);
+ res = xstream_write(applier->subscribe_stream, row);
+ struct error *e;
+ if (res != 0 &&
+ (e = diag_last_error(diag_get()))->type ==
+ &type_ClientError &&
+ box_error_code(e) == ER_TUPLE_FOUND &&
+ replication_skip_conflict) {
+ /**
+ * Silently skip ER_TUPLE_FOUND error
+ * if such option is set in config.
+ */
+ diag_clear(diag_get());
+ row->type = IPROTO_NOP;
+ row->bodycnt = 0;
+ res = xstream_write(applier->subscribe_stream,
+ row);
+ }
+ ++row;
}
+ if (res == 0 && txn != NULL)
+ res = txn_commit(txn);
if (res != 0) {
/* Rollback lsn to have a chance for a retry. */
vclock_set(&replicaset.applier.vclock,
- row.replica_id, old_lsn);
+ first_row->replica_id, old_lsn);
+ obuf_reset(&data_buf);
+ ibuf_reset(&row_buf);
latch_unlock(latch);
diag_raise();
}
}
+ obuf_reset(&data_buf);
+ ibuf_reset(&row_buf);
latch_unlock(latch);
/*
* Stay 'orphan' until appliers catch up with
--
2.20.1
^ permalink raw reply [flat|nested] 5+ messages in thread
* [tarantool-patches] Re: [PATCH v2 0/2] Transaction boundaries in replication protocol
2019-01-06 13:05 [tarantool-patches] [PATCH v2 0/2] Transaction boundaries in replication protocol Georgy Kirichenko
2019-01-06 13:05 ` [tarantool-patches] [PATCH v2 2/2] Transaction support for applier Georgy Kirichenko
@ 2019-01-11 13:30 ` Georgy Kirichenko
1 sibling, 0 replies; 5+ messages in thread
From: Georgy Kirichenko @ 2019-01-11 13:30 UTC (permalink / raw)
To: tarantool-patches
[-- Attachment #1: Type: text/plain, Size: 1236 bytes --]
Branch: https://github.com/tarantool/tarantool/tree/g.kirichenko/gh-2798-transaction-boundaries
Issue: https://github.com/tarantool/tarantool/issues/2798
On Sunday, January 6, 2019 4:05:51 PM MSK Georgy Kirichenko wrote:
> This patchset introduces transactional replication and consist of two
> commits:
> * the first one forms transaction boundaries in a xstream
> * the second one forms transactions in applier buffers and then
> applies them with correct begin/commit boundaries.
>
> Note: distributed transaction are not supported so journal forms a
> separate transaction for all local triggers effects.
>
> Changes in v2:
> - Fixed local transaction extraction
>
> Georgy Kirichenko (2):
> Journal transaction boundaries
> Transaction support for applier
>
> src/box/applier.cc | 202 ++++++++++++++++++++++++++-----------
> src/box/iproto_constants.h | 3 +
> src/box/wal.c | 36 ++++++-
> src/box/xrow.c | 38 +++++++
> src/box/xrow.h | 5 +-
> test/unit/xrow.cc | 3 +
> test/vinyl/errinj.result | 8 +-
> test/vinyl/info.result | 38 +++----
> test/vinyl/layout.result | 24 ++---
> 9 files changed, 263 insertions(+), 94 deletions(-)
[-- Attachment #2: This is a digitally signed message part. --]
[-- Type: application/pgp-signature, Size: 488 bytes --]
^ permalink raw reply [flat|nested] 5+ messages in thread
* Re: [tarantool-patches] [PATCH v2 2/2] Transaction support for applier
2019-01-22 10:57 ` [tarantool-patches] [PATCH v2 2/2] Transaction support for applier Georgy Kirichenko
@ 2019-01-28 13:35 ` Vladimir Davydov
0 siblings, 0 replies; 5+ messages in thread
From: Vladimir Davydov @ 2019-01-28 13:35 UTC (permalink / raw)
To: Georgy Kirichenko; +Cc: tarantool-patches
On Tue, Jan 22, 2019 at 01:57:37PM +0300, Georgy Kirichenko wrote:
> Applier fetch incoming rows to form a transaction and then apply it.
> Implementation assumes that transaction could not mix in a
> replication stream. Also distributed transaction are not supported yet.
>
> Closes: #2798
> Needed for: #980
> ---
> src/box/applier.cc | 207 ++++++++++++++++++++++++++++++++-------------
> 1 file changed, 148 insertions(+), 59 deletions(-)
Without a test, this patch is inadmissible. Vlad mentioned that he has
some tests left from his old implementation. Please salvage those.
>
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index adbe88679..0e3832ad8 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -48,6 +48,7 @@
> #include "error.h"
> #include "session.h"
> #include "cfg.h"
> +#include "txn.h"
>
> STRS(applier_state, applier_STATE);
>
> @@ -380,6 +381,102 @@ applier_join(struct applier *applier)
> applier_set_state(applier, APPLIER_READY);
> }
>
> +/**
> + * Read one transaction from network.
> + * Transaction rows are placed into row_buf as an array, row's bodies are
> + * placed into obuf because it is not allowed to relocate row's bodies.
> + * Also we could not use applier input buffer because rpos adjusted after xrow
> + * decoding and corresponding space going to reuse.
> + *
> + * Note: current implementation grants that transaction could not be mixed, so
> + * we read each transaction from first xrow until xrow with txn_last = true.
> + */
> +static int64_t
> +applier_read_tx(struct applier *applier, struct ibuf *row_buf,
> + struct obuf *data_buf)
> +{
> + struct xrow_header *row;
> + struct ev_io *coio = &applier->io;
> + struct ibuf *ibuf = &applier->ibuf;
> + int64_t txn_id = 0;
> + uint32_t txn_replica_id = 0;
> +
> + do {
> + row = (struct xrow_header *)ibuf_alloc(row_buf,
> + sizeof(struct xrow_header));
> + if (row == NULL) {
> + diag_set(OutOfMemory, sizeof(struct xrow_header),
> + "slab", "struct xrow_header");
> + goto error;
> + }
> +
> + double timeout = replication_disconnect_timeout();
> + try {
> + /* TODO: we should have a C version of this function. */
> + coio_read_xrow_timeout_xc(coio, ibuf, row, timeout);
> + } catch (...) {
> + goto error;
> + }
> +
> + if (iproto_type_is_error(row->type)) {
> + xrow_decode_error(row);
> + goto error;
> + }
> +
> + /* Replication request. */
> + if (row->replica_id == REPLICA_ID_NIL ||
> + row->replica_id >= VCLOCK_MAX) {
> + /*
> + * A safety net, this can only occur
> + * if we're fed a strangely broken xlog.
> + */
> + diag_set(ClientError, ER_UNKNOWN_REPLICA,
> + int2str(row->replica_id),
> + tt_uuid_str(&REPLICASET_UUID));
> + goto error;
> + }
> + if (ibuf_used(row_buf) == sizeof(struct xrow_header)) {
> + /*
> + * First row in a transaction. In order to enforce
> + * consistency check that first row lsn and replica id
> + * match with transaction.
> + */
> + txn_id = row->lsn;
> + txn_replica_id = row->replica_id;
> + }
> + if (txn_id != row->txn_id ||
> + txn_replica_id != row->txn_replica_id) {
> + /* We are not able to handle interleaving transactions. */
> + diag_set(ClientError, ER_UNSUPPORTED,
> + "replications",
> + "interleaving transactions");
> + goto error;
> + }
Accumulating rows feels like the iproto realm. I don't think that it's a
good idea to implement a dirty ad-hoc solution for this. Instead we
should move applier to iproto IMO. This would probably allow us to reuse
the code for interactive iproto transactions - the two issues look very
similar to me and I think we should use the same protocol and code to
get them both working.
> +
> +
> + applier->lag = ev_now(loop()) - row->tm;
> + applier->last_row_time = ev_monotonic_now(loop());
> +
> + if (row->body->iov_base != NULL) {
> + void *new_base = obuf_alloc(data_buf, row->body->iov_len);
> + if (new_base == NULL) {
> + diag_set(OutOfMemory, row->body->iov_len,
> + "slab", "xrow_data");
> + goto error;
> + }
> + memcpy(new_base, row->body->iov_base, row->body->iov_len);
> + row->body->iov_base = new_base;
> + }
> +
> + } while (row->txn_last == 0);
> +
> + return 0;
> +error:
> + ibuf_reset(row_buf);
> + obuf_reset(data_buf);
> + return -1;
> +}
> +
> /**
> * Execute and process SUBSCRIBE request (follow updates from a master).
> */
> @@ -396,6 +493,10 @@ applier_subscribe(struct applier *applier)
> struct ibuf *ibuf = &applier->ibuf;
> struct xrow_header row;
> struct vclock remote_vclock_at_subscribe;
> + struct ibuf row_buf;
> + struct obuf data_buf;
> + ibuf_create(&row_buf, &cord()->slabc, 32 * sizeof(struct xrow_header));
> + obuf_create(&data_buf, &cord()->slabc, 0x10000);
>
> xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID,
> &replicaset.vclock);
> @@ -475,87 +576,75 @@ applier_subscribe(struct applier *applier)
> applier_set_state(applier, APPLIER_FOLLOW);
> }
>
> - /*
> - * Tarantool < 1.7.7 does not send periodic heartbeat
> - * messages so we can't assume that if we haven't heard
> - * from the master for quite a while the connection is
> - * broken - the master might just be idle.
> - */
> - if (applier->version_id < version_id(1, 7, 7)) {
> - coio_read_xrow(coio, ibuf, &row);
> - } else {
> - double timeout = replication_disconnect_timeout();
> - coio_read_xrow_timeout_xc(coio, ibuf, &row, timeout);
> - }
> + if (applier_read_tx(applier, &row_buf, &data_buf) != 0)
> + diag_raise();
>
> - if (iproto_type_is_error(row.type))
> - xrow_decode_error_xc(&row); /* error */
> - /* Replication request. */
> - if (row.replica_id == REPLICA_ID_NIL ||
> - row.replica_id >= VCLOCK_MAX) {
> - /*
> - * A safety net, this can only occur
> - * if we're fed a strangely broken xlog.
> - */
> - tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
> - int2str(row.replica_id),
> - tt_uuid_str(&REPLICASET_UUID));
> - }
> + struct txn *txn = NULL;
> + struct xrow_header *first_row = (struct xrow_header *)row_buf.rpos;
> + struct xrow_header *last_row = (struct xrow_header *)row_buf.wpos - 1;
>
> - applier->lag = ev_now(loop()) - row.tm;
> + applier->lag = ev_now(loop()) - last_row->tm;
> applier->last_row_time = ev_monotonic_now(loop());
> - struct replica *replica = replica_by_id(row.replica_id);
> + struct replica *replica = replica_by_id(first_row->txn_replica_id);
> struct latch *latch = (replica ? &replica->order_latch :
> &replicaset.applier.order_latch);
> - /*
> - * In a full mesh topology, the same set
> - * of changes may arrive via two
> - * concurrently running appliers. Thanks
> - * to vclock_follow() above, the first row
> - * in the set will be skipped - but the
> - * remaining may execute out of order,
> - * when the following xstream_write()
> - * yields on WAL. Hence we need a latch to
> - * strictly order all changes which belong
> - * to the same server id.
> - */
> latch_lock(latch);
> + /* First row identifies a transaction. */
> + assert(first_row->lsn == first_row->txn_id);
> + assert(first_row->replica_id == first_row->txn_replica_id);
> if (vclock_get(&replicaset.applier.vclock,
> - row.replica_id) < row.lsn) {
> - if (row.replica_id == instance_id &&
> + first_row->replica_id) < first_row->lsn) {
> + if (first_row->replica_id == instance_id &&
> vclock_get(&replicaset.vclock, instance_id) <
> - row.lsn) {
> + first_row->lsn) {
> /* Local row returned back. */
> goto done;
> }
> /* Preserve old lsn value. */
> int64_t old_lsn = vclock_get(&replicaset.applier.vclock,
> - row.replica_id);
> - vclock_follow_xrow(&replicaset.applier.vclock, &row);
> - int res = xstream_write(applier->subscribe_stream, &row);
> - struct error *e = diag_last_error(diag_get());
> - if (res != 0 && e->type == &type_ClientError &&
> - box_error_code(e) == ER_TUPLE_FOUND &&
> - replication_skip_conflict) {
> - /**
> - * Silently skip ER_TUPLE_FOUND error if such
> - * option is set in config.
> - */
> - diag_clear(diag_get());
> - row.type = IPROTO_NOP;
> - row.bodycnt = 0;
> - res = xstream_write(applier->subscribe_stream,
> - &row);
> + first_row->replica_id);
> +
> + struct xrow_header *row = first_row;
> + if (first_row != last_row)
> + txn = txn_begin(false);
So we have xstream_write to hide box internals, but we still use
txn_begin/commit. This looks ugly. We should encapsulate those somehow
as well, I guess.
> + int res = 0;
> + while (row <= last_row && res == 0) {
> + vclock_follow_xrow(&replicaset.applier.vclock, row);
> + res = xstream_write(applier->subscribe_stream, row);
> + struct error *e;
> + if (res != 0 &&
> + (e = diag_last_error(diag_get()))->type ==
> + &type_ClientError &&
> + box_error_code(e) == ER_TUPLE_FOUND &&
> + replication_skip_conflict) {
> + /**
> + * Silently skip ER_TUPLE_FOUND error
> + * if such option is set in config.
> + */
> + diag_clear(diag_get());
> + row->type = IPROTO_NOP;
> + row->bodycnt = 0;
> + res = xstream_write(applier->subscribe_stream,
> + row);
> + }
> + ++row;
> }
> + if (res == 0 && txn != NULL)
> + res = txn_commit(txn);
> +
> if (res != 0) {
> /* Rollback lsn to have a chance for a retry. */
> vclock_set(&replicaset.applier.vclock,
> - row.replica_id, old_lsn);
> + first_row->replica_id, old_lsn);
> + obuf_reset(&data_buf);
> + ibuf_reset(&row_buf);
> latch_unlock(latch);
> diag_raise();
> }
> }
> done:
> + obuf_reset(&data_buf);
> + ibuf_reset(&row_buf);
> latch_unlock(latch);
> /*
> * Stay 'orphan' until appliers catch up with
^ permalink raw reply [flat|nested] 5+ messages in thread
* [tarantool-patches] [PATCH v2 2/2] Transaction support for applier
2019-01-22 10:57 [tarantool-patches] " Georgy Kirichenko
@ 2019-01-22 10:57 ` Georgy Kirichenko
2019-01-28 13:35 ` Vladimir Davydov
0 siblings, 1 reply; 5+ messages in thread
From: Georgy Kirichenko @ 2019-01-22 10:57 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
Applier fetch incoming rows to form a transaction and then apply it.
Implementation assumes that transaction could not mix in a
replication stream. Also distributed transaction are not supported yet.
Closes: #2798
Needed for: #980
---
src/box/applier.cc | 207 ++++++++++++++++++++++++++++++++-------------
1 file changed, 148 insertions(+), 59 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index adbe88679..0e3832ad8 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -48,6 +48,7 @@
#include "error.h"
#include "session.h"
#include "cfg.h"
+#include "txn.h"
STRS(applier_state, applier_STATE);
@@ -380,6 +381,102 @@ applier_join(struct applier *applier)
applier_set_state(applier, APPLIER_READY);
}
+/**
+ * Read one transaction from network.
+ * Transaction rows are placed into row_buf as an array, row's bodies are
+ * placed into obuf because it is not allowed to relocate row's bodies.
+ * Also we could not use applier input buffer because rpos adjusted after xrow
+ * decoding and corresponding space going to reuse.
+ *
+ * Note: current implementation grants that transaction could not be mixed, so
+ * we read each transaction from first xrow until xrow with txn_last = true.
+ */
+static int64_t
+applier_read_tx(struct applier *applier, struct ibuf *row_buf,
+ struct obuf *data_buf)
+{
+ struct xrow_header *row;
+ struct ev_io *coio = &applier->io;
+ struct ibuf *ibuf = &applier->ibuf;
+ int64_t txn_id = 0;
+ uint32_t txn_replica_id = 0;
+
+ do {
+ row = (struct xrow_header *)ibuf_alloc(row_buf,
+ sizeof(struct xrow_header));
+ if (row == NULL) {
+ diag_set(OutOfMemory, sizeof(struct xrow_header),
+ "slab", "struct xrow_header");
+ goto error;
+ }
+
+ double timeout = replication_disconnect_timeout();
+ try {
+ /* TODO: we should have a C version of this function. */
+ coio_read_xrow_timeout_xc(coio, ibuf, row, timeout);
+ } catch (...) {
+ goto error;
+ }
+
+ if (iproto_type_is_error(row->type)) {
+ xrow_decode_error(row);
+ goto error;
+ }
+
+ /* Replication request. */
+ if (row->replica_id == REPLICA_ID_NIL ||
+ row->replica_id >= VCLOCK_MAX) {
+ /*
+ * A safety net, this can only occur
+ * if we're fed a strangely broken xlog.
+ */
+ diag_set(ClientError, ER_UNKNOWN_REPLICA,
+ int2str(row->replica_id),
+ tt_uuid_str(&REPLICASET_UUID));
+ goto error;
+ }
+ if (ibuf_used(row_buf) == sizeof(struct xrow_header)) {
+ /*
+ * First row in a transaction. In order to enforce
+ * consistency check that first row lsn and replica id
+ * match with transaction.
+ */
+ txn_id = row->lsn;
+ txn_replica_id = row->replica_id;
+ }
+ if (txn_id != row->txn_id ||
+ txn_replica_id != row->txn_replica_id) {
+ /* We are not able to handle interleaving transactions. */
+ diag_set(ClientError, ER_UNSUPPORTED,
+ "replications",
+ "interleaving transactions");
+ goto error;
+ }
+
+
+ applier->lag = ev_now(loop()) - row->tm;
+ applier->last_row_time = ev_monotonic_now(loop());
+
+ if (row->body->iov_base != NULL) {
+ void *new_base = obuf_alloc(data_buf, row->body->iov_len);
+ if (new_base == NULL) {
+ diag_set(OutOfMemory, row->body->iov_len,
+ "slab", "xrow_data");
+ goto error;
+ }
+ memcpy(new_base, row->body->iov_base, row->body->iov_len);
+ row->body->iov_base = new_base;
+ }
+
+ } while (row->txn_last == 0);
+
+ return 0;
+error:
+ ibuf_reset(row_buf);
+ obuf_reset(data_buf);
+ return -1;
+}
+
/**
* Execute and process SUBSCRIBE request (follow updates from a master).
*/
@@ -396,6 +493,10 @@ applier_subscribe(struct applier *applier)
struct ibuf *ibuf = &applier->ibuf;
struct xrow_header row;
struct vclock remote_vclock_at_subscribe;
+ struct ibuf row_buf;
+ struct obuf data_buf;
+ ibuf_create(&row_buf, &cord()->slabc, 32 * sizeof(struct xrow_header));
+ obuf_create(&data_buf, &cord()->slabc, 0x10000);
xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID,
&replicaset.vclock);
@@ -475,87 +576,75 @@ applier_subscribe(struct applier *applier)
applier_set_state(applier, APPLIER_FOLLOW);
}
- /*
- * Tarantool < 1.7.7 does not send periodic heartbeat
- * messages so we can't assume that if we haven't heard
- * from the master for quite a while the connection is
- * broken - the master might just be idle.
- */
- if (applier->version_id < version_id(1, 7, 7)) {
- coio_read_xrow(coio, ibuf, &row);
- } else {
- double timeout = replication_disconnect_timeout();
- coio_read_xrow_timeout_xc(coio, ibuf, &row, timeout);
- }
+ if (applier_read_tx(applier, &row_buf, &data_buf) != 0)
+ diag_raise();
- if (iproto_type_is_error(row.type))
- xrow_decode_error_xc(&row); /* error */
- /* Replication request. */
- if (row.replica_id == REPLICA_ID_NIL ||
- row.replica_id >= VCLOCK_MAX) {
- /*
- * A safety net, this can only occur
- * if we're fed a strangely broken xlog.
- */
- tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
- int2str(row.replica_id),
- tt_uuid_str(&REPLICASET_UUID));
- }
+ struct txn *txn = NULL;
+ struct xrow_header *first_row = (struct xrow_header *)row_buf.rpos;
+ struct xrow_header *last_row = (struct xrow_header *)row_buf.wpos - 1;
- applier->lag = ev_now(loop()) - row.tm;
+ applier->lag = ev_now(loop()) - last_row->tm;
applier->last_row_time = ev_monotonic_now(loop());
- struct replica *replica = replica_by_id(row.replica_id);
+ struct replica *replica = replica_by_id(first_row->txn_replica_id);
struct latch *latch = (replica ? &replica->order_latch :
&replicaset.applier.order_latch);
- /*
- * In a full mesh topology, the same set
- * of changes may arrive via two
- * concurrently running appliers. Thanks
- * to vclock_follow() above, the first row
- * in the set will be skipped - but the
- * remaining may execute out of order,
- * when the following xstream_write()
- * yields on WAL. Hence we need a latch to
- * strictly order all changes which belong
- * to the same server id.
- */
latch_lock(latch);
+ /* First row identifies a transaction. */
+ assert(first_row->lsn == first_row->txn_id);
+ assert(first_row->replica_id == first_row->txn_replica_id);
if (vclock_get(&replicaset.applier.vclock,
- row.replica_id) < row.lsn) {
- if (row.replica_id == instance_id &&
+ first_row->replica_id) < first_row->lsn) {
+ if (first_row->replica_id == instance_id &&
vclock_get(&replicaset.vclock, instance_id) <
- row.lsn) {
+ first_row->lsn) {
/* Local row returned back. */
goto done;
}
/* Preserve old lsn value. */
int64_t old_lsn = vclock_get(&replicaset.applier.vclock,
- row.replica_id);
- vclock_follow_xrow(&replicaset.applier.vclock, &row);
- int res = xstream_write(applier->subscribe_stream, &row);
- struct error *e = diag_last_error(diag_get());
- if (res != 0 && e->type == &type_ClientError &&
- box_error_code(e) == ER_TUPLE_FOUND &&
- replication_skip_conflict) {
- /**
- * Silently skip ER_TUPLE_FOUND error if such
- * option is set in config.
- */
- diag_clear(diag_get());
- row.type = IPROTO_NOP;
- row.bodycnt = 0;
- res = xstream_write(applier->subscribe_stream,
- &row);
+ first_row->replica_id);
+
+ struct xrow_header *row = first_row;
+ if (first_row != last_row)
+ txn = txn_begin(false);
+ int res = 0;
+ while (row <= last_row && res == 0) {
+ vclock_follow_xrow(&replicaset.applier.vclock, row);
+ res = xstream_write(applier->subscribe_stream, row);
+ struct error *e;
+ if (res != 0 &&
+ (e = diag_last_error(diag_get()))->type ==
+ &type_ClientError &&
+ box_error_code(e) == ER_TUPLE_FOUND &&
+ replication_skip_conflict) {
+ /**
+ * Silently skip ER_TUPLE_FOUND error
+ * if such option is set in config.
+ */
+ diag_clear(diag_get());
+ row->type = IPROTO_NOP;
+ row->bodycnt = 0;
+ res = xstream_write(applier->subscribe_stream,
+ row);
+ }
+ ++row;
}
+ if (res == 0 && txn != NULL)
+ res = txn_commit(txn);
+
if (res != 0) {
/* Rollback lsn to have a chance for a retry. */
vclock_set(&replicaset.applier.vclock,
- row.replica_id, old_lsn);
+ first_row->replica_id, old_lsn);
+ obuf_reset(&data_buf);
+ ibuf_reset(&row_buf);
latch_unlock(latch);
diag_raise();
}
}
done:
+ obuf_reset(&data_buf);
+ ibuf_reset(&row_buf);
latch_unlock(latch);
/*
* Stay 'orphan' until appliers catch up with
--
2.20.1
^ permalink raw reply [flat|nested] 5+ messages in thread
end of thread, other threads:[~2019-01-28 13:35 UTC | newest]
Thread overview: 5+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2019-01-06 13:05 [tarantool-patches] [PATCH v2 0/2] Transaction boundaries in replication protocol Georgy Kirichenko
2019-01-06 13:05 ` [tarantool-patches] [PATCH v2 2/2] Transaction support for applier Georgy Kirichenko
2019-01-11 13:30 ` [tarantool-patches] Re: [PATCH v2 0/2] Transaction boundaries in replication protocol Georgy Kirichenko
2019-01-22 10:57 [tarantool-patches] " Georgy Kirichenko
2019-01-22 10:57 ` [tarantool-patches] [PATCH v2 2/2] Transaction support for applier Georgy Kirichenko
2019-01-28 13:35 ` Vladimir Davydov
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox