[tarantool-patches] Re: [PATCH v2 4/4] replication: use wal memory buffer to fetch rows
Georgy Kirichenko
georgy at tarantool.org
Mon Sep 23 12:35:54 MSK 2019
On Sunday, September 22, 2019 7:14:13 PM MSK Vladislav Shpilevoy wrote:
> Thanks for the patch!
>
> On 18/09/2019 11:36, Georgy Kirichenko wrote:
> > A relay tries to create a wal memory cursor and follow them to relay
> > data to it's replica.
>
> 'follow them to relay data to replica'? Sorry, can't parse. Please,
> rephrase.
>
> > If a relay failed to attach to a wal memory buffer
> > or went out of the buffer
> > then the relay recovers xlogs from files and
> > the makes a new try to attach.
>
> 'the makes'? Can't parse, please rephrase. Just 'makes'?
> Or 'then makes'?
>
> > Closes: #3794
>
> Please, no ':'.
Ok
>
> Don't we need a docbot request for this feature? Is it documented
> somewhere, that our replication is from disk always?
Oh, I will ask should this be documented
>
> See 29 comments below.
>
> > ---
> >
> > src/box/relay.cc | 169 +++++++++---------
> > src/box/wal.c | 155 ++++++++++++++++
> >
> > + relay->r = NULL;
>
> 1. Why? 'relay' is allocated using 'calloc' a few lines
> above, 'r' is NULL already.
Accepted
>
> > return relay;
> > + if (relay->status_msg.msg.route != NULL)
> > + continue;
>
> 2. What is happening below? And how can msg.route be NULL?
Sometimes we used this pattern to mark a cbus message as processed
>
> > + struct vclock *send_vclock;
> > + if (relay->version_id < version_id(1, 7, 4))
> > + send_vclock = &relay->r->vclock;
>
> 3. I know, this is old code existed before your patch, and it will
> be dropped, but why do you send relay->r->vclock and don't decode it
> anywhere? Above you decoded a received vclock into relay->recv_vclock.
> relay->r->vclock is not updated, and nonetheless you send it.
Before 1.7.4 we used recovery vclock as replica vclock. So recovery does the
work.
>
> > + else
> > + send_vclock = &relay->recv_vclock;
> > + if (vclock_sum(&relay->status_msg.vclock) ==
> > + vclock_sum(send_vclock))
> > + continue;
>
> 4. How is it possible, that a vclock you are going to send, and
> a previously sent vclock are the same?
For instance if a relay receives an heartbeat-message answer (and vclock was
not changed)
>
> > + static const struct cmsg_hop route[] = {
> > - * Run the event loop until the connection is broken
> > - * or an error occurs.
> > - */
>
> 5. This comment is wrong now? Why did you drop it?
Sorry
>
> > while (!fiber_is_cancelled()) {
> >
> > + relay_set_error(relay, diag_last_error(&fiber()->diag));
> > + break;
> >
> > }
>
> 6. I just realized, that data appears in the buffer before it
> is written to disk. Can't it happen, that something is encoded to
> the buffer, sent to replica, and then failed to be written to
> WAL?
Wal does not yield between memory and disk writes - so it is not possible to
see non-flushed data here.
>
> > - cmsg_init(&relay->status_msg.msg, route);
> > - vclock_copy(&relay->status_msg.vclock, send_vclock);
> > - relay->status_msg.relay = relay;
> > - cpipe_push(&relay->tx_pipe, &relay->status_msg.msg);
> > + /* Recover xlogs from files. */
>
> 7. Please, describe here in a comment, when such switch happens.
> Why is it needed to recover both from memory and files.
Ok
>
> > + try {
> >
> > + vclock_copy(&relay->relay_vclock, replica_clock);
>
> 8. The comment above clearly is not about this line. Please, put it
> either above the comment, or after the 'if' below.
Ok
>
> > if (replica->gc == NULL) {
> >
> > replica->gc = gc_consumer_register(replica_clock, "replica
%s",
> >
> > tt_uuid_str(&replica->uuid));
> >
> > @@ -567,8 +554,13 @@ relay_subscribe(struct replica *replica, int fd,
> > uint64_t sync,>
> > });
> >
> > vclock_copy(&relay->local_vclock_at_subscribe, &replicaset.vclock);
> >
> > - relay->r = recovery_new(cfg_gets("wal_dir"), false,
> > - replica_clock);
> > + relay->r = NULL;
>
> 9. Why? If relay->r is not null, then it is a leak. If it is null,
> then you don't need to nullify it again.
Ok, will check it one more time again
>
> > + relay->wal_dir = strdup(cfg_gets("wal_dir"));
> > + if (relay->wal_dir == NULL) {
> > + diag_set(OutOfMemory, strlen(cfg_gets("wal_dir")),
>
> 10. I know, I am a fucking bore, but 'strlen() + 1', for terminating
> zero :)
))
>
> > + "runtime", "wal_dir");
>
> 11. There is a strict rule what arguments we pass to diag_raise -
> it is size, allocator function, variable name. Allocator function
> here is 'strdup', not 'runtime'.
Ok
>
> > + diag_raise();
>
> 12. diag_set + diag_raise = tnt_raise. Please, replace.
I hope this will change to diag_set + return -1 in the near future
>
> > + }
> >
> > vclock_copy(&relay->tx.vclock, replica_clock);
> > relay->version_id = replica_version_id;
> >
> > @@ -612,7 +604,10 @@ static void
> >
> > relay_send_row(struct xstream *stream, struct xrow_header *packet)
> > {
> >
> > struct relay *relay = container_of(stream, struct relay, stream);
> >
> > - assert(iproto_type_is_dml(packet->type));
> > + if (packet->type != IPROTO_OK) {
>
> 13. How can it be an error? We don't write errors to WAL, and packet here
> is a row, written to WAL. No? What is more, if it is not 'OK', below
> you may treat it as 'NOP' accidentally.
IPROTO_OK means a heartbeat message. The opposite case - a dml with verified
with an assert.
>
> > + assert(iproto_type_is_dml(packet->type));
> > + vclock_follow_xrow(&relay->relay_vclock, packet);
> > + }> /*
> >
> > * Transform replica local requests to IPROTO_NOP so as to
> > * promote vclock on the replica without actually modifying
> >
> > diff --git a/src/box/wal.c b/src/box/wal.c
> > index e77bd1ae1..0d900270c 100644
> > --- a/src/box/wal.c
> > +++ b/src/box/wal.c
> > @@ -159,6 +159,8 @@ struct wal_writer
> >
> > struct rlist watchers;
> > /** Xrow buffer. */
> > struct xrow_buf xrow_buf;
> >
> > + /* xrow buffer condition signaled when buffer write was done. */
>
> 14. Please, use a capital letter to start a sentence, and
> keep 66 symbols line width for comments.
Ok
>
> 15. Why do you need each write signaled?
To wake all relays up. And there is an error - I broadcast the condition only
when a whole batch was written out.
>
> 16. If it is signaled on write, then I guess the name could
> be more clear? For example, xrow_buf_write_cond?
Accepted
>
> 17. If this is an xrow_buf's cond, then why is it out of
> the xrow_buf structure? Otherwise it is not
> 'xrow_buf_write_cond', but rather just 'write_cond'.
Because it is not the buffer responsibility. Wal coordinates its memory buffer
and relays. And I found that xrow_buf on_commit trigger would be to
complicated.
>
> > + struct fiber_cond xrow_buf_cond;
> >
> > + fiber_cond_create(&writer->xrow_buf_cond);
>
> 18. Where do you call fiber_cond_destroy()?
Accepted
>
> > /** Initialize eio in this thread */
> > coio_enable();
> > + }
> > + /* Check if the row is already sent. */
>
> 19. This comment says what you do, but does not say why. This is
> what is called a narrative comment, which should be avoided, because
> they never help. How can a row be already sent, if you've read it
> just now?
Ok
>
> > + if (rc == 0 && vclock_get(vclock, row->replica_id) >= row->lsn)
> > + row = &hearth_beat;
>
> 20. Unfortunately, it is not safe to keep a pointer at an
> object out of its scope. Heartbeath's memory may be reused
> further.
Definitely yes, it is my fault.
>
> > + } else
> > + continue;
rotate_vy_log();
> >
> > +typedef int (*wal_relay_cb)(struct xrow_header *header, void *data);
> > +
> > +/**
> > + * Struct wal memory relay structure. */
>
> 21. Writing of good comments may be harder that writing
> code, but it does not mean, that it should not be done.
> Please, write a normal comment.
Ok
>
> 22. We write closing */ on a separate line. The only
> exception is that if a whole comment is in one line,
> including /**.
Ok
>
> > +struct wal_relay {
> > + struct cmsg base;
> > + /* Cbus pipe to wal cord. */
> > + struct cpipe wal_pipe;
> > + /* Cbus pipe from wal cord. */
> > + struct cpipe relay_pipe;
>
> 23. What is a purpose of wal_pipe, what is a purpose
> of relay_pipe, what messages do they deliver?
> I think, it should written here in a comment.
Ok
>
> > +
> > + /* Vclock to start relaying. */
> > + struct vclock *vclock;
> > + /* Callback to call for each relaying row. */
> > + wal_relay_cb on_wal_relay;
>
> 24. Why is that callback needed?
> I think, it should written here in a comment.
Ok
>
> > + /* Data pointer to pass to callback. */
> > + void *cb_data;
> > + /* Relaying fiber. */
> > + struct fiber *fiber;
> > + /* Message to cancel relaying fiber. */
> > + struct cmsg cancel_msg;
> > + /* Fiber condition to wait until relaying was stopped. */
> > + struct fiber_cond done_cond;
> > + /* Turns to true when relaying was stopped. */
> > + bool done;
>
> 25. Then I guess, it is 'is_stopped', no? 'Done' word is
> too common, it means nothing. What is 'done'? Also we use
> 'is_' prefix for flags (or another more sutable verb like
> 'has_', 'are_', 'was_', etc).
Ok
>
> > + /* Return code. */
> > + int rc;
> > + /* Diagnostic area. */
> > + struct diag diag;
> > +};
> > +
> > +
>
> 26. Please, keep one empty line between declarations.
Ok
>
> > +int
> > +wal_relay(struct wal_relay *wal_relay, struct vclock *vclock,
> > + wal_relay_cb on_wal_relay, void *cb_data, const char
*endpoint_name);
>
> 27. This function definitely is not a one-liner for which a
> comment could be omitted. Please, write what this function does,
> why, which non-trivial arguments does it take, etc.
Ok
>
> > +
> >
> > #if defined(__cplusplus)
> > } /* extern "C" */
> > #endif /* defined(__cplusplus) */
> >
> > diff --git a/src/lib/core/cbus.c b/src/lib/core/cbus.c
> > index b3b1280e7..b7e6d769b 100644
> > --- a/src/lib/core/cbus.c
> > +++ b/src/lib/core/cbus.c
> > @@ -284,6 +284,9 @@ cpipe_flush_cb(ev_loop *loop, struct ev_async
> > *watcher, int events)>
> > /* Trigger task processing when the queue becomes non-empty. */
> > bool output_was_empty;
> >
> > + int old_cancel_state;
> > + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancel_state);
>
> 28. Why?
Sorry, this is an artifact
>
> > +
> >
> > tt_pthread_mutex_lock(&endpoint->mutex);
> > output_was_empty = stailq_empty(&endpoint->output);
> > /** Flush input */
> >
> > @@ -297,6 +300,7 @@ cpipe_flush_cb(ev_loop *loop, struct ev_async
> > *watcher, int events)>
> > ev_async_send(endpoint->consumer, &endpoint->async);
> >
> > }
> >
> > + pthread_setcancelstate(old_cancel_state, NULL);
> >
> > }
>
> 29. I will review the tests later, when I will fully understand the
> code. But so few tests for such a serious feature looks not enough.
> Could you test it more rigorous?
The issue - this patchset is not changing the tarantool behavior, so there is
no visible changes to test them. However, I will try to implement some hacking
and errinj test to show that in-memory replication is working.
More information about the Tarantool-patches
mailing list