From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtpng3.m.smailru.net (smtpng3.m.smailru.net [94.100.177.149]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id E8E4846970E for ; Tue, 17 Dec 2019 11:23:40 +0300 (MSK) References: <8e393243cfa7db53733b49a53d1bdfc121e12bcc.1576512515.git.lvasiliev@tarantool.org> From: Leonid Vasiliev Message-ID: Date: Tue, 17 Dec 2019 11:23:38 +0300 MIME-Version: 1.0 In-Reply-To: <8e393243cfa7db53733b49a53d1bdfc121e12bcc.1576512515.git.lvasiliev@tarantool.org> Content-Type: text/plain; charset=utf-8; format=flowed Content-Language: en-US Content-Transfer-Encoding: 7bit Subject: Re: [Tarantool-patches] [PATCH V2] Fix take a task after disconnect List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: alexander.turenko@tarantool.org Cc: tarantool-patches@dev.tarantool.org Update --- --- a/t/110-take-task-after-reconnect.t +++ b/t/110-take-task-after-reconnect.t @@ -36,7 +36,7 @@ local function test_take_task_after_disconnect(test) fiber.join(fiber_1) fiber.sleep(0.1) - test:is((box.space.test_tube:select()[1][2]) == 'r', true, 'Task in release state') + test:is((box.space.test_tube:select()[1][2]) == 'r', true, 'Task in ready state') end On 12/16/19 7:11 PM, Leonid Vasiliev wrote: > https://github.com/tarantool/queue/issues/104 > https://github.com/tarantool/queue/tree/lvasiliev/gh-queue-104-take-task-after-disconnect > > --- > queue/abstract.lua | 10 ++++++- > t/110-take-task-after-reconnect.t | 47 +++++++++++++++++++++++++++++++ > 2 files changed, 56 insertions(+), 1 deletion(-) > create mode 100755 t/110-take-task-after-reconnect.t > > diff --git a/queue/abstract.lua b/queue/abstract.lua > index ad8817d..4e932cc 100644 > --- a/queue/abstract.lua > +++ b/queue/abstract.lua > @@ -73,6 +73,7 @@ function tube.put(self, data, opts) > end > > local conds = {} > +local releasing_sessions = {} > > function tube.take(self, timeout) > timeout = time(timeout or TIMEOUT_INFINITY) > @@ -94,7 +95,13 @@ function tube.take(self, timeout) > conds[fid]:free() > box.space._queue_consumers:delete{ sid, fid } > > - task = self.raw:take() > + -- We don't take a task if the session is in a disconnecting state > + if releasing_sessions[fid] == nil then > + task = self.raw:take() > + else > + releasing_sessions[fid] = nil > + return nil > + end > > if task ~= nil then > return self.raw:normalize_task(task) > @@ -352,6 +359,7 @@ function method._on_consumer_disconnect() > box.space._queue_consumers:delete{ waiter[1], waiter[2] } > local cond = conds[waiter[2]] > if cond then > + releasing_sessions[waiter[2]] = true > cond:signal(waiter[2]) > end > end > diff --git a/t/110-take-task-after-reconnect.t b/t/110-take-task-after-reconnect.t > new file mode 100755 > index 0000000..66c4d73 > --- /dev/null > +++ b/t/110-take-task-after-reconnect.t > @@ -0,0 +1,47 @@ > +#!/usr/bin/env tarantool > + > +local fiber = require('fiber') > +local netbox = require('net.box') > +local os = require('os') > +local queue = require('queue') > +local test = require('tap').test() > +local tnt = require('t.tnt') > + > + > +test:plan(1) > + > +local listen = 'localhost:1918' > +tnt.cfg{ listen = listen } > + > + > +local function test_take_task_after_disconnect(test) > + test:plan(1) > + local driver = 'fifottl' > + local tube = queue.create_tube('test_tube', driver, > + { if_not_exists = true }) > + rawset(_G, 'queue', require('queue')) > + tube:grant('guest', { call = true }) > + queue.tube.test_tube:put('test_data') > + > + local connection = netbox.connect(listen) > + local fiber_1 = fiber.create(function() > + connection:call('queue.tube.test_tube:take') > + connection:call('queue.tube.test_tube:take') > + end) > + > + fiber.sleep(0.1) > + connection:close() > + fiber.set_joinable(fiber_1, true) > + fiber.kill(fiber_1) > + fiber.join(fiber_1) > + fiber.sleep(0.1) > + > + test:is((box.space.test_tube:select()[1][2]) == 'r', true, 'Task in release state') > +end > + > + > +test:test('Don\'t take a task after disconnect', test_take_task_after_disconnect) > + > + > +tnt.finish() > +os.exit(test:check() == true and 0 or -1) >