* [Tarantool-patches] [PATCH V2] Fix take a task after disconnect
@ 2019-12-16 16:11 Leonid Vasiliev
2019-12-17 8:23 ` Leonid Vasiliev
0 siblings, 1 reply; 3+ messages in thread
From: Leonid Vasiliev @ 2019-12-16 16:11 UTC (permalink / raw)
To: alexander.turenko; +Cc: tarantool-patches
https://github.com/tarantool/queue/issues/104
https://github.com/tarantool/queue/tree/lvasiliev/gh-queue-104-take-task-after-disconnect
---
queue/abstract.lua | 10 ++++++-
t/110-take-task-after-reconnect.t | 47 +++++++++++++++++++++++++++++++
2 files changed, 56 insertions(+), 1 deletion(-)
create mode 100755 t/110-take-task-after-reconnect.t
diff --git a/queue/abstract.lua b/queue/abstract.lua
index ad8817d..4e932cc 100644
--- a/queue/abstract.lua
+++ b/queue/abstract.lua
@@ -73,6 +73,7 @@ function tube.put(self, data, opts)
end
local conds = {}
+local releasing_sessions = {}
function tube.take(self, timeout)
timeout = time(timeout or TIMEOUT_INFINITY)
@@ -94,7 +95,13 @@ function tube.take(self, timeout)
conds[fid]:free()
box.space._queue_consumers:delete{ sid, fid }
- task = self.raw:take()
+ -- We don't take a task if the session is in a disconnecting state
+ if releasing_sessions[fid] == nil then
+ task = self.raw:take()
+ else
+ releasing_sessions[fid] = nil
+ return nil
+ end
if task ~= nil then
return self.raw:normalize_task(task)
@@ -352,6 +359,7 @@ function method._on_consumer_disconnect()
box.space._queue_consumers:delete{ waiter[1], waiter[2] }
local cond = conds[waiter[2]]
if cond then
+ releasing_sessions[waiter[2]] = true
cond:signal(waiter[2])
end
end
diff --git a/t/110-take-task-after-reconnect.t b/t/110-take-task-after-reconnect.t
new file mode 100755
index 0000000..66c4d73
--- /dev/null
+++ b/t/110-take-task-after-reconnect.t
@@ -0,0 +1,47 @@
+#!/usr/bin/env tarantool
+
+local fiber = require('fiber')
+local netbox = require('net.box')
+local os = require('os')
+local queue = require('queue')
+local test = require('tap').test()
+local tnt = require('t.tnt')
+
+
+test:plan(1)
+
+local listen = 'localhost:1918'
+tnt.cfg{ listen = listen }
+
+
+local function test_take_task_after_disconnect(test)
+ test:plan(1)
+ local driver = 'fifottl'
+ local tube = queue.create_tube('test_tube', driver,
+ { if_not_exists = true })
+ rawset(_G, 'queue', require('queue'))
+ tube:grant('guest', { call = true })
+ queue.tube.test_tube:put('test_data')
+
+ local connection = netbox.connect(listen)
+ local fiber_1 = fiber.create(function()
+ connection:call('queue.tube.test_tube:take')
+ connection:call('queue.tube.test_tube:take')
+ end)
+
+ fiber.sleep(0.1)
+ connection:close()
+ fiber.set_joinable(fiber_1, true)
+ fiber.kill(fiber_1)
+ fiber.join(fiber_1)
+ fiber.sleep(0.1)
+
+ test:is((box.space.test_tube:select()[1][2]) == 'r', true, 'Task in release state')
+end
+
+
+test:test('Don\'t take a task after disconnect', test_take_task_after_disconnect)
+
+
+tnt.finish()
+os.exit(test:check() == true and 0 or -1)
--
2.17.1
^ permalink raw reply [flat|nested] 3+ messages in thread
* Re: [Tarantool-patches] [PATCH V2] Fix take a task after disconnect
2019-12-16 16:11 [Tarantool-patches] [PATCH V2] Fix take a task after disconnect Leonid Vasiliev
@ 2019-12-17 8:23 ` Leonid Vasiliev
2020-02-03 15:52 ` Leonid Vasiliev
0 siblings, 1 reply; 3+ messages in thread
From: Leonid Vasiliev @ 2019-12-17 8:23 UTC (permalink / raw)
To: alexander.turenko; +Cc: tarantool-patches
Update
---
--- a/t/110-take-task-after-reconnect.t
+++ b/t/110-take-task-after-reconnect.t
@@ -36,7 +36,7 @@ local function test_take_task_after_disconnect(test)
fiber.join(fiber_1)
fiber.sleep(0.1)
- test:is((box.space.test_tube:select()[1][2]) == 'r', true, 'Task in
release state')
+ test:is((box.space.test_tube:select()[1][2]) == 'r', true, 'Task in
ready state')
end
On 12/16/19 7:11 PM, Leonid Vasiliev wrote:
> https://github.com/tarantool/queue/issues/104
> https://github.com/tarantool/queue/tree/lvasiliev/gh-queue-104-take-task-after-disconnect
>
> ---
> queue/abstract.lua | 10 ++++++-
> t/110-take-task-after-reconnect.t | 47 +++++++++++++++++++++++++++++++
> 2 files changed, 56 insertions(+), 1 deletion(-)
> create mode 100755 t/110-take-task-after-reconnect.t
>
> diff --git a/queue/abstract.lua b/queue/abstract.lua
> index ad8817d..4e932cc 100644
> --- a/queue/abstract.lua
> +++ b/queue/abstract.lua
> @@ -73,6 +73,7 @@ function tube.put(self, data, opts)
> end
>
> local conds = {}
> +local releasing_sessions = {}
>
> function tube.take(self, timeout)
> timeout = time(timeout or TIMEOUT_INFINITY)
> @@ -94,7 +95,13 @@ function tube.take(self, timeout)
> conds[fid]:free()
> box.space._queue_consumers:delete{ sid, fid }
>
> - task = self.raw:take()
> + -- We don't take a task if the session is in a disconnecting state
> + if releasing_sessions[fid] == nil then
> + task = self.raw:take()
> + else
> + releasing_sessions[fid] = nil
> + return nil
> + end
>
> if task ~= nil then
> return self.raw:normalize_task(task)
> @@ -352,6 +359,7 @@ function method._on_consumer_disconnect()
> box.space._queue_consumers:delete{ waiter[1], waiter[2] }
> local cond = conds[waiter[2]]
> if cond then
> + releasing_sessions[waiter[2]] = true
> cond:signal(waiter[2])
> end
> end
> diff --git a/t/110-take-task-after-reconnect.t b/t/110-take-task-after-reconnect.t
> new file mode 100755
> index 0000000..66c4d73
> --- /dev/null
> +++ b/t/110-take-task-after-reconnect.t
> @@ -0,0 +1,47 @@
> +#!/usr/bin/env tarantool
> +
> +local fiber = require('fiber')
> +local netbox = require('net.box')
> +local os = require('os')
> +local queue = require('queue')
> +local test = require('tap').test()
> +local tnt = require('t.tnt')
> +
> +
> +test:plan(1)
> +
> +local listen = 'localhost:1918'
> +tnt.cfg{ listen = listen }
> +
> +
> +local function test_take_task_after_disconnect(test)
> + test:plan(1)
> + local driver = 'fifottl'
> + local tube = queue.create_tube('test_tube', driver,
> + { if_not_exists = true })
> + rawset(_G, 'queue', require('queue'))
> + tube:grant('guest', { call = true })
> + queue.tube.test_tube:put('test_data')
> +
> + local connection = netbox.connect(listen)
> + local fiber_1 = fiber.create(function()
> + connection:call('queue.tube.test_tube:take')
> + connection:call('queue.tube.test_tube:take')
> + end)
> +
> + fiber.sleep(0.1)
> + connection:close()
> + fiber.set_joinable(fiber_1, true)
> + fiber.kill(fiber_1)
> + fiber.join(fiber_1)
> + fiber.sleep(0.1)
> +
> + test:is((box.space.test_tube:select()[1][2]) == 'r', true, 'Task in release state')
> +end
> +
> +
> +test:test('Don\'t take a task after disconnect', test_take_task_after_disconnect)
> +
> +
> +tnt.finish()
> +os.exit(test:check() == true and 0 or -1)
>
^ permalink raw reply [flat|nested] 3+ messages in thread
* Re: [Tarantool-patches] [PATCH V2] Fix take a task after disconnect
2019-12-17 8:23 ` Leonid Vasiliev
@ 2020-02-03 15:52 ` Leonid Vasiliev
0 siblings, 0 replies; 3+ messages in thread
From: Leonid Vasiliev @ 2020-02-03 15:52 UTC (permalink / raw)
To: alexander.turenko; +Cc: tarantool-patches
https://github.com/tarantool/queue/pull/108
On 12/17/19 11:23 AM, Leonid Vasiliev wrote:
> Update
>
> ---
> --- a/t/110-take-task-after-reconnect.t
> +++ b/t/110-take-task-after-reconnect.t
> @@ -36,7 +36,7 @@ local function test_take_task_after_disconnect(test)
> fiber.join(fiber_1)
> fiber.sleep(0.1)
>
> - test:is((box.space.test_tube:select()[1][2]) == 'r', true, 'Task in
> release state')
> + test:is((box.space.test_tube:select()[1][2]) == 'r', true, 'Task in
> ready state')
> end
>
>
> On 12/16/19 7:11 PM, Leonid Vasiliev wrote:
>> https://github.com/tarantool/queue/issues/104
>> https://github.com/tarantool/queue/tree/lvasiliev/gh-queue-104-take-task-after-disconnect
>>
>>
>> ---
>> queue/abstract.lua | 10 ++++++-
>> t/110-take-task-after-reconnect.t | 47 +++++++++++++++++++++++++++++++
>> 2 files changed, 56 insertions(+), 1 deletion(-)
>> create mode 100755 t/110-take-task-after-reconnect.t
>>
>> diff --git a/queue/abstract.lua b/queue/abstract.lua
>> index ad8817d..4e932cc 100644
>> --- a/queue/abstract.lua
>> +++ b/queue/abstract.lua
>> @@ -73,6 +73,7 @@ function tube.put(self, data, opts)
>> end
>> local conds = {}
>> +local releasing_sessions = {}
>> function tube.take(self, timeout)
>> timeout = time(timeout or TIMEOUT_INFINITY)
>> @@ -94,7 +95,13 @@ function tube.take(self, timeout)
>> conds[fid]:free()
>> box.space._queue_consumers:delete{ sid, fid }
>> - task = self.raw:take()
>> + -- We don't take a task if the session is in a disconnecting
>> state
>> + if releasing_sessions[fid] == nil then
>> + task = self.raw:take()
>> + else
>> + releasing_sessions[fid] = nil
>> + return nil
>> + end
>> if task ~= nil then
>> return self.raw:normalize_task(task)
>> @@ -352,6 +359,7 @@ function method._on_consumer_disconnect()
>> box.space._queue_consumers:delete{ waiter[1], waiter[2] }
>> local cond = conds[waiter[2]]
>> if cond then
>> + releasing_sessions[waiter[2]] = true
>> cond:signal(waiter[2])
>> end
>> end
>> diff --git a/t/110-take-task-after-reconnect.t
>> b/t/110-take-task-after-reconnect.t
>> new file mode 100755
>> index 0000000..66c4d73
>> --- /dev/null
>> +++ b/t/110-take-task-after-reconnect.t
>> @@ -0,0 +1,47 @@
>> +#!/usr/bin/env tarantool
>> +
>> +local fiber = require('fiber')
>> +local netbox = require('net.box')
>> +local os = require('os')
>> +local queue = require('queue')
>> +local test = require('tap').test()
>> +local tnt = require('t.tnt')
>> +
>> +
>> +test:plan(1)
>> +
>> +local listen = 'localhost:1918'
>> +tnt.cfg{ listen = listen }
>> +
>> +
>> +local function test_take_task_after_disconnect(test)
>> + test:plan(1)
>> + local driver = 'fifottl'
>> + local tube = queue.create_tube('test_tube', driver,
>> + { if_not_exists = true })
>> + rawset(_G, 'queue', require('queue'))
>> + tube:grant('guest', { call = true })
>> + queue.tube.test_tube:put('test_data')
>> +
>> + local connection = netbox.connect(listen)
>> + local fiber_1 = fiber.create(function()
>> + connection:call('queue.tube.test_tube:take')
>> + connection:call('queue.tube.test_tube:take')
>> + end)
>> +
>> + fiber.sleep(0.1)
>> + connection:close()
>> + fiber.set_joinable(fiber_1, true)
>> + fiber.kill(fiber_1)
>> + fiber.join(fiber_1)
>> + fiber.sleep(0.1)
>> +
>> + test:is((box.space.test_tube:select()[1][2]) == 'r', true, 'Task
>> in release state')
>> +end
>> +
>> +
>> +test:test('Don\'t take a task after disconnect',
>> test_take_task_after_disconnect)
>> +
>> +
>> +tnt.finish()
>> +os.exit(test:check() == true and 0 or -1)
>>
^ permalink raw reply [flat|nested] 3+ messages in thread
end of thread, other threads:[~2020-02-03 15:52 UTC | newest]
Thread overview: 3+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2019-12-16 16:11 [Tarantool-patches] [PATCH V2] Fix take a task after disconnect Leonid Vasiliev
2019-12-17 8:23 ` Leonid Vasiliev
2020-02-03 15:52 ` Leonid Vasiliev
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox