From b4a8349217abe5727047751ce55fe09f620b3669 Mon Sep 17 00:00:00 2001 Message-Id: From: Nikita Pettik Date: Fri, 22 May 2020 04:38:45 +0300 Subject: [PATCH] vinyl: handle invalid upserts in background squash process When L0 collects more than certain number of upserts (to be more precise VY_UPSERT_THRESHOLD == 128), background process is launched to squash them. During this squash, new statements could be inserted into L0. Until committed prepared statements feature lsn = MAX_LSN as a special marker. It is assumed that all upserts will be successfully squashed. As a result statements with given key in L0 will be able to have only MAX_LSN lsn (except for the squash result). However, it is not so if at least one upsert is failed to be applied: it will get stack in L0 until dump and it has normal (i.e. < MAX_LSN) lsn. So the assertion which verifies that lsn of upsert in L0 is greater than MAX_LSN is wrong. Moreover, if we execute enough (> 128) invalid upserts, they will prevent from squashing normal upserts. So let's skip invalid upserts and don't account them while calculating number of upserts residing in L0. Follow-up #1622 --- src/box/vinyl.c | 19 ++- .../vinyl/gh-1622-skip-invalid-upserts.result | 134 ++++++++++++++++++ .../gh-1622-skip-invalid-upserts.test.lua | 51 +++++++ 3 files changed, 200 insertions(+), 4 deletions(-) diff --git a/src/box/vinyl.c b/src/box/vinyl.c index 55a4c8fb7..b39e3b9d8 100644 --- a/src/box/vinyl.c +++ b/src/box/vinyl.c @@ -3539,10 +3539,21 @@ vy_squash_process(struct vy_squash *squash) if (vy_tuple_compare(result, mem_stmt, lsm->cmp_def) != 0 || vy_stmt_type(mem_stmt) != IPROTO_UPSERT) break; - assert(vy_stmt_lsn(mem_stmt) >= MAX_LSN); - vy_stmt_set_n_upserts((struct tuple *)mem_stmt, n_upserts); - if (n_upserts <= VY_UPSERT_THRESHOLD) - ++n_upserts; + /* + * Upsert was not applied in vy_history_apply(), + * but it still resides in tree memory. Ignore + * such statements and do not account them while + * counting upserts. Otherwise invalid upserts will + * get stack and won't allow other upserts to squash. + */ + if (vy_stmt_lsn(mem_stmt) >= MAX_LSN) { + vy_stmt_set_n_upserts((struct tuple *) mem_stmt, + n_upserts); + if (n_upserts <= VY_UPSERT_THRESHOLD) + ++n_upserts; + } else { + vy_stmt_set_n_upserts((struct tuple *) mem_stmt, 0); + } vy_mem_tree_iterator_prev(&mem->tree, &mem_itr); } diff --git a/test/vinyl/gh-1622-skip-invalid-upserts.result b/test/vinyl/gh-1622-skip-invalid-upserts.result index b59ed53dc..12ca65c19 100644 --- a/test/vinyl/gh-1622-skip-invalid-upserts.result +++ b/test/vinyl/gh-1622-skip-invalid-upserts.result @@ -130,6 +130,140 @@ s:select{} | - - [2, 13] | ... +-- Try to insert too many invalid upserts in a row so that +-- they trigger squashing procedure. +-- +fiber = require('fiber') + | --- + | ... +s:drop() + | --- + | ... + +s = box.schema.space.create('test', { engine = 'vinyl', field_count = 2 }) + | --- + | ... +pk = s:create_index('pk') + | --- + | ... +s:replace({1, 1}) + | --- + | - [1, 1] + | ... + +squashed_before = pk:stat().upsert.squashed + | --- + | ... +applied_before = pk:stat().upsert.applied + | --- + | ... +len_before = s:len() + | --- + | ... +upsert_cnt = 129 + | --- + | ... +for i =1 , upsert_cnt do s:upsert({1, 1}, {{'=', 3, 5}}) end + | --- + | ... +-- 1 squash is accounted in vy_squash_process() unconditionally. +-- +while pk:stat().upsert.squashed ~= squashed_before + 1 do fiber.sleep(0.01) end + | --- + | ... +-- Make sure invalid upserts are committed into tree. +-- +assert(s:len() == len_before + upsert_cnt) + | --- + | - true + | ... +assert(pk:stat().upsert.applied == applied_before) + | --- + | - true + | ... +-- Adding new invalid upserts will not result in squash until +-- we reach VY_UPSERT_THRESHOLD (128) limit again. +-- +s:upsert({1, 1}, {{'=', 3, 5}}) + | --- + | ... +assert(pk:stat().upsert.squashed == squashed_before + 1) + | --- + | - true + | ... +for i = 1, upsert_cnt - 3 do s:upsert({1, 1}, {{'=', 3, 5}}) end + | --- + | ... +assert(pk:stat().upsert.squashed == squashed_before + 1) + | --- + | - true + | ... +s:upsert({1, 1}, {{'=', 3, 5}}) + | --- + | ... +while pk:stat().upsert.squashed ~= squashed_before + 2 do fiber.sleep(0.01) end + | --- + | ... +assert(pk:stat().upsert.squashed == squashed_before + 2) + | --- + | - true + | ... +assert(pk:stat().upsert.applied == applied_before) + | --- + | - true + | ... +box.snapshot() + | --- + | - ok + | ... +-- No one upsert is applied so number of rows should not change. +-- +assert(s:len() == len_before) + | --- + | - true + | ... +assert(pk:stat().upsert.squashed == squashed_before + 2) + | --- + | - true + | ... +assert(pk:stat().upsert.applied == applied_before) + | --- + | - true + | ... +-- Now let's mix valid and invalid upserts. Make sure that only +-- valid are accounted during squash/apply. +-- +for i = 1, upsert_cnt do if i % 2 == 0 then s:upsert({1, 1}, {{'=', 3, 5}}) else s:upsert({1, 1}, {{'+', 2, 1}}) end end + | --- + | ... +while pk:stat().upsert.squashed ~= squashed_before + 3 do fiber.sleep(0.01) end + | --- + | ... +assert(s:len() == len_before + upsert_cnt) + | --- + | - true + | ... +assert(pk:stat().upsert.applied == math.floor(upsert_cnt/2) + 1) + | --- + | - true + | ... +box.snapshot() + | --- + | - ok + | ... +assert(s:len() == len_before) + | --- + | - true + | ... +assert(pk:stat().upsert.squashed == squashed_before + 3) + | --- + | - true + | ... +assert(pk:stat().upsert.applied == math.floor(upsert_cnt/2) + 1) + | --- + | - true + | ... + s:drop() | --- | ... diff --git a/test/vinyl/gh-1622-skip-invalid-upserts.test.lua b/test/vinyl/gh-1622-skip-invalid-upserts.test.lua index eb93393be..27c168fcb 100644 --- a/test/vinyl/gh-1622-skip-invalid-upserts.test.lua +++ b/test/vinyl/gh-1622-skip-invalid-upserts.test.lua @@ -47,4 +47,55 @@ s:select{} box.snapshot() s:select{} +-- Try to insert too many invalid upserts in a row so that +-- they trigger squashing procedure. +-- +fiber = require('fiber') +s:drop() + +s = box.schema.space.create('test', { engine = 'vinyl', field_count = 2 }) +pk = s:create_index('pk') +s:replace({1, 1}) + +squashed_before = pk:stat().upsert.squashed +applied_before = pk:stat().upsert.applied +len_before = s:len() +upsert_cnt = 129 +for i =1 , upsert_cnt do s:upsert({1, 1}, {{'=', 3, 5}}) end +-- 1 squash is accounted in vy_squash_process() unconditionally. +-- +while pk:stat().upsert.squashed ~= squashed_before + 1 do fiber.sleep(0.01) end +-- Make sure invalid upserts are committed into tree. +-- +assert(s:len() == len_before + upsert_cnt) +assert(pk:stat().upsert.applied == applied_before) +-- Adding new invalid upserts will not result in squash until +-- we reach VY_UPSERT_THRESHOLD (128) limit again. +-- +s:upsert({1, 1}, {{'=', 3, 5}}) +assert(pk:stat().upsert.squashed == squashed_before + 1) +for i = 1, upsert_cnt - 3 do s:upsert({1, 1}, {{'=', 3, 5}}) end +assert(pk:stat().upsert.squashed == squashed_before + 1) +s:upsert({1, 1}, {{'=', 3, 5}}) +while pk:stat().upsert.squashed ~= squashed_before + 2 do fiber.sleep(0.01) end +assert(pk:stat().upsert.squashed == squashed_before + 2) +assert(pk:stat().upsert.applied == applied_before) +box.snapshot() +-- No one upsert is applied so number of rows should not change. +-- +assert(s:len() == len_before) +assert(pk:stat().upsert.squashed == squashed_before + 2) +assert(pk:stat().upsert.applied == applied_before) +-- Now let's mix valid and invalid upserts. Make sure that only +-- valid are accounted during squash/apply. +-- +for i = 1, upsert_cnt do if i % 2 == 0 then s:upsert({1, 1}, {{'=', 3, 5}}) else s:upsert({1, 1}, {{'+', 2, 1}}) end end +while pk:stat().upsert.squashed ~= squashed_before + 3 do fiber.sleep(0.01) end +assert(s:len() == len_before + upsert_cnt) +assert(pk:stat().upsert.applied == math.floor(upsert_cnt/2) + 1) +box.snapshot() +assert(s:len() == len_before) +assert(pk:stat().upsert.squashed == squashed_before + 3) +assert(pk:stat().upsert.applied == math.floor(upsert_cnt/2) + 1) + s:drop() \ No newline at end of file -- 2.17.1