From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtpng1.m.smailru.net (smtpng1.m.smailru.net [94.100.181.251]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 6754846971A for ; Wed, 11 Dec 2019 14:47:57 +0300 (MSK) From: Leonid Vasiliev Date: Wed, 11 Dec 2019 14:47:55 +0300 Message-Id: Subject: [Tarantool-patches] [PATCH] Fix task release in session disconnect List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: alexander.turenko@tarantool.org Cc: tarantool-patches@dev.tarantool.org https://github.com/tarantool/queue/issues/103 https://github.com/tarantool/queue/tree/lvasiliev/gh-queue-103-wrong-session-id --- queue/abstract.lua | 12 ++++++--- t/110-my_test.t | 66 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 74 insertions(+), 4 deletions(-) create mode 100755 t/110-my_test.t diff --git a/queue/abstract.lua b/queue/abstract.lua index ad8817d..243fa60 100644 --- a/queue/abstract.lua +++ b/queue/abstract.lua @@ -151,18 +151,22 @@ function tube.ack(self, id) return result end -function tube.release(self, id, opts) +local function tube_release_internal(self, id, opts, session_id) opts = opts or {} - local _taken = box.space._queue_taken:get{session.id(), self.tube_id, id} + local _taken = box.space._queue_taken:get{session_id, self.tube_id, id} if _taken == nil then error("Task was not taken in the session") end - box.space._queue_taken:delete{session.id(), self.tube_id, id} + box.space._queue_taken:delete{session_id, self.tube_id, id} self:peek(id) return self.raw:normalize_task(self.raw:release(id, opts)) end +function tube.release(self, id, opts) + return tube_release_internal(self, id, opts, session.id()) +end + function tube.peek(self, id) local task = self.raw:peek(id) if task == nil then @@ -371,7 +375,7 @@ function method._on_consumer_disconnect() log.warn("Consumer %s disconnected, release task %s(%s)", id, task[3], tube[1]) - queue.tube[tube[1]]:release(task[3]) + tube_release_internal(queue.tube[tube[1]], task[3], nil, id) end end end diff --git a/t/110-my_test.t b/t/110-my_test.t new file mode 100755 index 0000000..ef10f09 --- /dev/null +++ b/t/110-my_test.t @@ -0,0 +1,66 @@ +#!/usr/bin/env tarantool + +local fiber = require('fiber') +local netbox = require('net.box') +local os = require('os') +local queue = require('queue') +local test = require('tap').test() +local tnt = require('t.tnt') + + +local tube + + +local function check_result() + test:plan(2) + if tube == nil then + os.exit(-1) + end + + local ok, res = pcall(tube.drop, tube) + test:is(ok, true, 'drop empty queue') + test:is(res, true, 'tube:drop() result is true') + + tnt.finish() + os.exit(test:check() == true and 0 or -1) +end + + +local function test_lost_session_id_after_yield(test) + -- See + -- https://github.com/tarantool/queue/issues/103 + -- https://github.com/tarantool/tarantool/issues/4627 + + -- We must check the results of a test after + -- the queue._on_consumer_disconnect trigger + -- has been done. + -- The type of a triggers queue is LIFO + box.session.on_disconnect(check_result) + + local listen = 'localhost:1918' + tnt.cfg{ listen = listen } + + local driver = 'fifottl' + tube = queue.create_tube('test_tube', driver, + { if_not_exists = true }) + + rawset(_G, 'queue', require('queue')) + tube:grant('guest', { call = true }) + + -- Needed for yielding into + -- the queue._on_consumer_disconnect trigger + queue.tube.test_tube:put('1') + queue.tube.test_tube:put('2') + local connection = netbox.connect(listen) + connection:call('queue.tube.test_tube:take') + connection:call('queue.tube.test_tube:take') + + connection:close() + + fiber.sleep(5) + -- Fail. Trigger check_result() is a valid exit point + os.exit(-1) +end + + +test:test('Lost a session id after yield', test_lost_session_id_after_yield) -- 2.17.1