From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtpng2.m.smailru.net (smtpng2.m.smailru.net [94.100.179.3]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id A468C46970E for ; Mon, 3 Feb 2020 18:52:53 +0300 (MSK) From: Leonid Vasiliev References: <8e393243cfa7db53733b49a53d1bdfc121e12bcc.1576512515.git.lvasiliev@tarantool.org> Message-ID: <51565f78-2a3a-1e5f-50b9-d9e56c310428@tarantool.org> Date: Mon, 3 Feb 2020 18:52:52 +0300 MIME-Version: 1.0 In-Reply-To: Content-Type: text/plain; charset="utf-8"; format="flowed" Content-Language: en-US Content-Transfer-Encoding: 8bit Subject: Re: [Tarantool-patches] [PATCH V2] Fix take a task after disconnect List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: alexander.turenko@tarantool.org Cc: tarantool-patches@dev.tarantool.org https://github.com/tarantool/queue/pull/108 On 12/17/19 11:23 AM, Leonid Vasiliev wrote: > Update > > --- > --- a/t/110-take-task-after-reconnect.t > +++ b/t/110-take-task-after-reconnect.t > @@ -36,7 +36,7 @@ local function test_take_task_after_disconnect(test) >      fiber.join(fiber_1) >      fiber.sleep(0.1) > > -    test:is((box.space.test_tube:select()[1][2]) == 'r', true, 'Task in > release state') > +    test:is((box.space.test_tube:select()[1][2]) == 'r', true, 'Task in > ready state') >  end > > > On 12/16/19 7:11 PM, Leonid Vasiliev wrote: >> https://github.com/tarantool/queue/issues/104 >> https://github.com/tarantool/queue/tree/lvasiliev/gh-queue-104-take-task-after-disconnect >> >> >> --- >>   queue/abstract.lua                | 10 ++++++- >>   t/110-take-task-after-reconnect.t | 47 +++++++++++++++++++++++++++++++ >>   2 files changed, 56 insertions(+), 1 deletion(-) >>   create mode 100755 t/110-take-task-after-reconnect.t >> >> diff --git a/queue/abstract.lua b/queue/abstract.lua >> index ad8817d..4e932cc 100644 >> --- a/queue/abstract.lua >> +++ b/queue/abstract.lua >> @@ -73,6 +73,7 @@ function tube.put(self, data, opts) >>   end >>   local conds = {} >> +local releasing_sessions = {} >>   function tube.take(self, timeout) >>       timeout = time(timeout or TIMEOUT_INFINITY) >> @@ -94,7 +95,13 @@ function tube.take(self, timeout) >>           conds[fid]:free() >>           box.space._queue_consumers:delete{ sid, fid } >> -        task = self.raw:take() >> +        -- We don't take a task if the session is in a disconnecting >> state >> +        if releasing_sessions[fid] == nil then >> +            task = self.raw:take() >> +        else >> +            releasing_sessions[fid] = nil >> +            return nil >> +        end >>           if task ~= nil then >>               return self.raw:normalize_task(task) >> @@ -352,6 +359,7 @@ function method._on_consumer_disconnect() >>           box.space._queue_consumers:delete{ waiter[1], waiter[2] } >>           local cond = conds[waiter[2]] >>           if cond then >> +            releasing_sessions[waiter[2]] = true >>               cond:signal(waiter[2]) >>           end >>       end >> diff --git a/t/110-take-task-after-reconnect.t >> b/t/110-take-task-after-reconnect.t >> new file mode 100755 >> index 0000000..66c4d73 >> --- /dev/null >> +++ b/t/110-take-task-after-reconnect.t >> @@ -0,0 +1,47 @@ >> +#!/usr/bin/env tarantool >> + >> +local fiber = require('fiber') >> +local netbox = require('net.box') >> +local os = require('os') >> +local queue = require('queue') >> +local test = require('tap').test() >> +local tnt = require('t.tnt') >> + >> + >> +test:plan(1) >> + >> +local listen = 'localhost:1918' >> +tnt.cfg{ listen = listen } >> + >> + >> +local function test_take_task_after_disconnect(test) >> +    test:plan(1) >> +    local driver = 'fifottl' >> +    local tube = queue.create_tube('test_tube', driver, >> +        { if_not_exists = true }) >> +    rawset(_G, 'queue', require('queue')) >> +    tube:grant('guest', { call = true }) >> +    queue.tube.test_tube:put('test_data') >> + >> +    local connection = netbox.connect(listen) >> +    local fiber_1 = fiber.create(function() >> +            connection:call('queue.tube.test_tube:take') >> +            connection:call('queue.tube.test_tube:take') >> +        end) >> + >> +    fiber.sleep(0.1) >> +    connection:close() >> +    fiber.set_joinable(fiber_1, true) >> +    fiber.kill(fiber_1) >> +    fiber.join(fiber_1) >> +    fiber.sleep(0.1) >> + >> +    test:is((box.space.test_tube:select()[1][2]) == 'r', true, 'Task >> in release state') >> +end >> + >> + >> +test:test('Don\'t take a task after disconnect', >> test_take_task_after_disconnect) >> + >> + >> +tnt.finish() >> +os.exit(test:check() == true and 0 or -1) >>