[Tarantool-patches] [PATCH V2] Fix take a task after disconnect

Leonid Vasiliev lvasiliev at tarantool.org
Mon Feb 3 18:52:52 MSK 2020


https://github.com/tarantool/queue/pull/108



On 12/17/19 11:23 AM, Leonid Vasiliev wrote:
> Update
> 
> ---
> --- a/t/110-take-task-after-reconnect.t
> +++ b/t/110-take-task-after-reconnect.t
> @@ -36,7 +36,7 @@ local function test_take_task_after_disconnect(test)
>       fiber.join(fiber_1)
>       fiber.sleep(0.1)
> 
> -    test:is((box.space.test_tube:select()[1][2]) == 'r', true, 'Task in 
> release state')
> +    test:is((box.space.test_tube:select()[1][2]) == 'r', true, 'Task in 
> ready state')
>   end
> 
> 
> On 12/16/19 7:11 PM, Leonid Vasiliev wrote:
>> https://github.com/tarantool/queue/issues/104
>> https://github.com/tarantool/queue/tree/lvasiliev/gh-queue-104-take-task-after-disconnect 
>>
>>
>> ---
>>   queue/abstract.lua                | 10 ++++++-
>>   t/110-take-task-after-reconnect.t | 47 +++++++++++++++++++++++++++++++
>>   2 files changed, 56 insertions(+), 1 deletion(-)
>>   create mode 100755 t/110-take-task-after-reconnect.t
>>
>> diff --git a/queue/abstract.lua b/queue/abstract.lua
>> index ad8817d..4e932cc 100644
>> --- a/queue/abstract.lua
>> +++ b/queue/abstract.lua
>> @@ -73,6 +73,7 @@ function tube.put(self, data, opts)
>>   end
>>   local conds = {}
>> +local releasing_sessions = {}
>>   function tube.take(self, timeout)
>>       timeout = time(timeout or TIMEOUT_INFINITY)
>> @@ -94,7 +95,13 @@ function tube.take(self, timeout)
>>           conds[fid]:free()
>>           box.space._queue_consumers:delete{ sid, fid }
>> -        task = self.raw:take()
>> +        -- We don't take a task if the session is in a disconnecting 
>> state
>> +        if releasing_sessions[fid] == nil then
>> +            task = self.raw:take()
>> +        else
>> +            releasing_sessions[fid] = nil
>> +            return nil
>> +        end
>>           if task ~= nil then
>>               return self.raw:normalize_task(task)
>> @@ -352,6 +359,7 @@ function method._on_consumer_disconnect()
>>           box.space._queue_consumers:delete{ waiter[1], waiter[2] }
>>           local cond = conds[waiter[2]]
>>           if cond then
>> +            releasing_sessions[waiter[2]] = true
>>               cond:signal(waiter[2])
>>           end
>>       end
>> diff --git a/t/110-take-task-after-reconnect.t 
>> b/t/110-take-task-after-reconnect.t
>> new file mode 100755
>> index 0000000..66c4d73
>> --- /dev/null
>> +++ b/t/110-take-task-after-reconnect.t
>> @@ -0,0 +1,47 @@
>> +#!/usr/bin/env tarantool
>> +
>> +local fiber = require('fiber')
>> +local netbox = require('net.box')
>> +local os = require('os')
>> +local queue = require('queue')
>> +local test = require('tap').test()
>> +local tnt = require('t.tnt')
>> +
>> +
>> +test:plan(1)
>> +
>> +local listen = 'localhost:1918'
>> +tnt.cfg{ listen = listen }
>> +
>> +
>> +local function test_take_task_after_disconnect(test)
>> +    test:plan(1)
>> +    local driver = 'fifottl'
>> +    local tube = queue.create_tube('test_tube', driver,
>> +        { if_not_exists = true })
>> +    rawset(_G, 'queue', require('queue'))
>> +    tube:grant('guest', { call = true })
>> +    queue.tube.test_tube:put('test_data')
>> +
>> +    local connection = netbox.connect(listen)
>> +    local fiber_1 = fiber.create(function()
>> +            connection:call('queue.tube.test_tube:take')
>> +            connection:call('queue.tube.test_tube:take')
>> +        end)
>> +
>> +    fiber.sleep(0.1)
>> +    connection:close()
>> +    fiber.set_joinable(fiber_1, true)
>> +    fiber.kill(fiber_1)
>> +    fiber.join(fiber_1)
>> +    fiber.sleep(0.1)
>> +
>> +    test:is((box.space.test_tube:select()[1][2]) == 'r', true, 'Task 
>> in release state')
>> +end
>> +
>> +
>> +test:test('Don\'t take a task after disconnect', 
>> test_take_task_after_disconnect)
>> +
>> +
>> +tnt.finish()
>> +os.exit(test:check() == true and 0 or -1)
>>


More information about the Tarantool-patches mailing list