From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtpng3.m.smailru.net (smtpng3.m.smailru.net [94.100.177.149]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 8F46146970E for ; Mon, 16 Dec 2019 19:11:15 +0300 (MSK) From: Leonid Vasiliev Date: Mon, 16 Dec 2019 19:11:12 +0300 Message-Id: <8e393243cfa7db53733b49a53d1bdfc121e12bcc.1576512515.git.lvasiliev@tarantool.org> Subject: [Tarantool-patches] [PATCH V2] Fix take a task after disconnect List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: alexander.turenko@tarantool.org Cc: tarantool-patches@dev.tarantool.org https://github.com/tarantool/queue/issues/104 https://github.com/tarantool/queue/tree/lvasiliev/gh-queue-104-take-task-after-disconnect --- queue/abstract.lua | 10 ++++++- t/110-take-task-after-reconnect.t | 47 +++++++++++++++++++++++++++++++ 2 files changed, 56 insertions(+), 1 deletion(-) create mode 100755 t/110-take-task-after-reconnect.t diff --git a/queue/abstract.lua b/queue/abstract.lua index ad8817d..4e932cc 100644 --- a/queue/abstract.lua +++ b/queue/abstract.lua @@ -73,6 +73,7 @@ function tube.put(self, data, opts) end local conds = {} +local releasing_sessions = {} function tube.take(self, timeout) timeout = time(timeout or TIMEOUT_INFINITY) @@ -94,7 +95,13 @@ function tube.take(self, timeout) conds[fid]:free() box.space._queue_consumers:delete{ sid, fid } - task = self.raw:take() + -- We don't take a task if the session is in a disconnecting state + if releasing_sessions[fid] == nil then + task = self.raw:take() + else + releasing_sessions[fid] = nil + return nil + end if task ~= nil then return self.raw:normalize_task(task) @@ -352,6 +359,7 @@ function method._on_consumer_disconnect() box.space._queue_consumers:delete{ waiter[1], waiter[2] } local cond = conds[waiter[2]] if cond then + releasing_sessions[waiter[2]] = true cond:signal(waiter[2]) end end diff --git a/t/110-take-task-after-reconnect.t b/t/110-take-task-after-reconnect.t new file mode 100755 index 0000000..66c4d73 --- /dev/null +++ b/t/110-take-task-after-reconnect.t @@ -0,0 +1,47 @@ +#!/usr/bin/env tarantool + +local fiber = require('fiber') +local netbox = require('net.box') +local os = require('os') +local queue = require('queue') +local test = require('tap').test() +local tnt = require('t.tnt') + + +test:plan(1) + +local listen = 'localhost:1918' +tnt.cfg{ listen = listen } + + +local function test_take_task_after_disconnect(test) + test:plan(1) + local driver = 'fifottl' + local tube = queue.create_tube('test_tube', driver, + { if_not_exists = true }) + rawset(_G, 'queue', require('queue')) + tube:grant('guest', { call = true }) + queue.tube.test_tube:put('test_data') + + local connection = netbox.connect(listen) + local fiber_1 = fiber.create(function() + connection:call('queue.tube.test_tube:take') + connection:call('queue.tube.test_tube:take') + end) + + fiber.sleep(0.1) + connection:close() + fiber.set_joinable(fiber_1, true) + fiber.kill(fiber_1) + fiber.join(fiber_1) + fiber.sleep(0.1) + + test:is((box.space.test_tube:select()[1][2]) == 'r', true, 'Task in release state') +end + + +test:test('Don\'t take a task after disconnect', test_take_task_after_disconnect) + + +tnt.finish() +os.exit(test:check() == true and 0 or -1) -- 2.17.1