From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 3335726940 for ; Wed, 13 Feb 2019 03:33:11 -0500 (EST) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id RM1TGF8Ndu-x for ; Wed, 13 Feb 2019 03:33:11 -0500 (EST) Received: from smtp36.i.mail.ru (smtp36.i.mail.ru [94.100.177.96]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id E14F62677E for ; Wed, 13 Feb 2019 03:33:10 -0500 (EST) From: Georgy Kirichenko Subject: [tarantool-patches] [PATCH v2 1/2] Lightweight vclock_create and vclock_copy Date: Wed, 13 Feb 2019 11:35:16 +0300 Message-Id: <7d849e07ce43fe0c1fa72d5c04ee329ace13c3a8.1550046797.git.georgy@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: tarantool-patches@freelists.org Cc: Georgy Kirichenko Modify only needed part of a vclock structure instead of full structure writing. Follow-up: #2283 --- src/box/vclock.c | 5 +++-- src/box/vclock.h | 37 +++++++++++++++++++++++++++---------- src/box/xrow.h | 4 ++-- test/unit/vclock.cc | 2 +- 4 files changed, 33 insertions(+), 15 deletions(-) diff --git a/src/box/vclock.c b/src/box/vclock.c index b5eb2800b..d4b2ba759 100644 --- a/src/box/vclock.c +++ b/src/box/vclock.c @@ -41,7 +41,7 @@ vclock_follow(struct vclock *vclock, uint32_t replica_id, int64_t lsn) { assert(lsn >= 0); assert(replica_id < VCLOCK_MAX); - int64_t prev_lsn = vclock->lsn[replica_id]; + int64_t prev_lsn = vclock_get(vclock, replica_id); assert(lsn > prev_lsn); /* Easier add each time than check. */ vclock->map |= 1 << replica_id; @@ -159,7 +159,8 @@ vclock_from_string(struct vclock *vclock, const char *str) errno = 0; lsn = strtoll(p, (char **) &p, 10); if (errno != 0 || lsn < 0 || lsn > INT64_MAX || - replica_id >= VCLOCK_MAX || vclock->lsn[replica_id] > 0) + replica_id >= VCLOCK_MAX || + vclock_get(vclock, replica_id) > 0) goto error; vclock->map |= 1 << replica_id; vclock->lsn[replica_id] = lsn; diff --git a/src/box/vclock.h b/src/box/vclock.h index 111e29160..a59b2bddb 100644 --- a/src/box/vclock.h +++ b/src/box/vclock.h @@ -48,7 +48,7 @@ extern "C" { enum { /** - * The maximum number of components in vclock + * The maximum number of components in vclock, should be power of two. */ VCLOCK_MAX = 32, @@ -129,7 +129,9 @@ vclock_iterator_next(struct vclock_iterator *it) static inline void vclock_create(struct vclock *vclock) { - memset(vclock, 0, sizeof(*vclock)); + vclock->signature = 0; + vclock->map = 0; + vclock->lsn[0] = 0; } /** @@ -139,8 +141,9 @@ vclock_create(struct vclock *vclock) static inline void vclock_clear(struct vclock *vclock) { - memset(vclock, 0, sizeof(*vclock)); vclock->signature = -1; + vclock->map = 0; + vclock->lsn[0] = 0; } /** @@ -156,16 +159,24 @@ vclock_is_set(const struct vclock *vclock) static inline int64_t vclock_get(const struct vclock *vclock, uint32_t replica_id) { - if (replica_id >= VCLOCK_MAX) - return 0; - return vclock->lsn[replica_id]; + /* + * If replica_id does not fit in VCLOCK_MAX - 1 then + * let result be undefined. + */ + replica_id &= VCLOCK_MAX - 1; + /* Evaluate a bitmask to avoid branching. */ + int64_t mask = 0 - ((vclock->map >> replica_id) & 0x1); + return mask & vclock->lsn[replica_id]; } static inline int64_t vclock_inc(struct vclock *vclock, uint32_t replica_id) { /* Easier add each time than check. */ - vclock->map |= 1 << replica_id; + if (((vclock->map >> replica_id) & 0x01) == 0) { + vclock->lsn[replica_id] = 0; + vclock->map |= 1 << replica_id; + } vclock->signature++; return ++vclock->lsn[replica_id]; } @@ -173,7 +184,13 @@ vclock_inc(struct vclock *vclock, uint32_t replica_id) static inline void vclock_copy(struct vclock *dst, const struct vclock *src) { - *dst = *src; + /* + * Set the last bit of map because bit_clz_u32 returns + * undefined result if zero passed. + */ + unsigned int max_pos = VCLOCK_MAX - bit_clz_u32(src->map | 0x01); + memcpy(dst, src, offsetof(struct vclock, lsn) + + sizeof(*dst->lsn) * max_pos); } static inline uint32_t @@ -253,8 +270,8 @@ vclock_compare(const struct vclock *a, const struct vclock *b) for (size_t replica_id = bit_iterator_next(&it); replica_id < VCLOCK_MAX; replica_id = bit_iterator_next(&it)) { - int64_t lsn_a = a->lsn[replica_id]; - int64_t lsn_b = b->lsn[replica_id]; + int64_t lsn_a = vclock_get(a, replica_id); + int64_t lsn_b = vclock_get(b, replica_id); le = le && lsn_a <= lsn_b; ge = ge && lsn_a >= lsn_b; if (!ge && !le) diff --git a/src/box/xrow.h b/src/box/xrow.h index 719add4f0..2fce83bbc 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -631,7 +631,7 @@ vclock_follow_xrow(struct vclock* vclock, const struct xrow_header *row) { assert(row); assert(row->replica_id < VCLOCK_MAX); - if (row->lsn <= vclock->lsn[row->replica_id]) { + if (row->lsn <= vclock_get(vclock, row->replica_id)) { struct request req; const char *req_str = "n/a"; if (xrow_decode_dml((struct xrow_header *)row, &req, 0) == 0) @@ -640,7 +640,7 @@ vclock_follow_xrow(struct vclock* vclock, const struct xrow_header *row) panic("LSN for %u is used twice or COMMIT order is broken: " "confirmed: %lld, new: %lld, req: %s", (unsigned) row->replica_id, - (long long) vclock->lsn[row->replica_id], + (long long) vclock_get(vclock, row->replica_id), (long long) row->lsn, req_str); } diff --git a/test/unit/vclock.cc b/test/unit/vclock.cc index 8498eba3b..6a1d3bc27 100644 --- a/test/unit/vclock.cc +++ b/test/unit/vclock.cc @@ -308,7 +308,7 @@ test_fromstring_one(const char *str, uint32_t count, const int64_t *lsns) vclock_create(&check); for (uint32_t node_id = 0; node_id < count; node_id++) { if (lsns[node_id] >= 0) - check.lsn[node_id] = lsns[node_id]; + vclock_follow(&check, node_id, lsns[node_id]); } return (rc != 0 || vclock_compare(&vclock, &check) != 0); -- 2.20.1