From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id 7D5236F873; Sun, 23 Jan 2022 17:17:59 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 7D5236F873 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1642947479; bh=MC1Zalv4lIS4K4YHZz8nW+MTUnmfBkJFt4WBKo8NkUs=; h=Date:To:Cc:References:In-Reply-To:Subject:List-Id: List-Unsubscribe:List-Archive:List-Post:List-Help:List-Subscribe: From:Reply-To:From; b=obq3sH/TaJ4j2lm90x2RonkyDnb3EZZeivGDYrQv6agrErgIcmCFUXu5lsS419611 8Eg/woUo1Ja55TONbBNpU2OXG1mirUQ8vaOAaqZTLK9rRFiRs/2DVG66+kvHGySi9R YEo5EuRNFBN1t941FG6VUb72fcIBWDESoDzk97J0= Received: from smtpng1.i.mail.ru (smtpng1.i.mail.ru [94.100.181.251]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id B87976F873 for ; Sun, 23 Jan 2022 17:17:57 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org B87976F873 Received: by smtpng1.m.smailru.net with esmtpa (envelope-from ) id 1nBdgq-0007KY-R3; Sun, 23 Jan 2022 17:17:57 +0300 Date: Sun, 23 Jan 2022 17:17:55 +0300 To: Vladislav Shpilevoy Cc: tarantool-patches@dev.tarantool.org Message-ID: <20220123141755.GA103585@tarantool.org> References: <04154369ec1ff8a1eaf7c9ea1ed37e1fcd1a7120.1642167504.git.imeevma@gmail.com> <0875aeb4-255d-ce3a-7244-69193cf2f334@tarantool.org> MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Disposition: inline In-Reply-To: <0875aeb4-255d-ce3a-7244-69193cf2f334@tarantool.org> X-4EC0790: 10 X-7564579A: 646B95376F6C166E X-77F55803: 4F1203BC0FB41BD9AA78FDF62ECAE61FBB84E692E37A2F285E8DB7FCD49F92A3182A05F538085040525F563517C0B6B8EFAA99999F5AFFFF5E9607C12815E548BCF72E3DBC04AF3F X-7FA49CB5: FF5795518A3D127A4AD6D5ED66289B5278DA827A17800CE71D44F6E7EB16B5A3EA1F7E6F0F101C67BD4B6F7A4D31EC0BCC500DACC3FED6E28638F802B75D45FF8AA50765F7900637B323FE155BC226618638F802B75D45FF36EB9D2243A4F8B5A6FCA7DBDB1FC311F39EFFDF887939037866D6147AF826D8F76057EA131CD6DD46E8A0A2E39F22FA117882F4460429724CE54428C33FAD305F5C1EE8F4F765FCAA867293B0326636D2E47CDBA5A96583BD4B6F7A4D31EC0BC014FD901B82EE079FA2833FD35BB23D27C277FBC8AE2E8BF1175FABE1C0F9B6A471835C12D1D977C4224003CC8364762BB6847A3DEAEFB0F43C7A68FF6260569E8FC8737B5C2249EC8D19AE6D49635B68655334FD4449CB9ECD01F8117BC8BEAAAE862A0553A39223F8577A6DFFEA7C3B6C7E47A292E8D043847C11F186F3C59DAA53EE0834AAEE X-C1DE0DAB: 0D63561A33F958A559A4E148285ABEFE90416F05A2EA29AC9E6EECBA88E40612D59269BC5F550898D99A6476B3ADF6B47008B74DF8BB9EF7333BD3B22AA88B938A852937E12ACA75913C2247C57F08EB410CA545F18667F91A7EA1CDA0B5A7A0 X-C8649E89: 4E36BF7865823D7055A7F0CF078B5EC49A30900B95165D348F1757F8DFBC5C3358B60EEC4441E8A1ABE9B30428D18B2C0FF68D21899B2E50D7AC7462B6A0AF5D1D7E09C32AA3244CB6582F88B23C27F199831A1A49B163906C24832127668422729B2BEF169E0186 X-D57D3AED: 3ZO7eAau8CL7WIMRKs4sN3D3tLDjz0dLbV79QFUyzQ2Ujvy7cMT6pYYqY16iZVKkSc3dCLJ7zSJH7+u4VD18S7Vl4ZUrpaVfd2+vE6kuoey4m4VkSEu530nj6fImhcD4MUrOEAnl0W826KZ9Q+tr5ycPtXkTV4k65bRjmOUUP8cvGozZ33TWg5HZplvhhXbhDGzqmQDTd6OAevLeAnq3Ra9uf7zvY2zzsIhlcp/Y7m53TZgf2aB4JOg4gkr2biojQuLvHC3oG1rjJW5v8QZxRQ== X-Mailru-Sender: 689FA8AB762F739339CABD9B3CA9A7D6B2E7366EF8A6F938CDE0A0486D16818B83D72C36FC87018B9F80AB2734326CD2FB559BB5D741EB96352A0ABBE4FDA4210A04DAD6CC59E3365FEEDEB644C299C0ED14614B50AE0675 X-Mras: Ok Subject: Re: [Tarantool-patches] [PATCH v1 1/1] sql: introduce user-defined aggregate functions X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: Mergen Imeev via Tarantool-patches Reply-To: Mergen Imeev Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" Hi! Thank you for the review! Diff, new patch and a patch that adds a new utility function below. On Tue, Jan 18, 2022 at 01:10:04AM +0100, Vladislav Shpilevoy wrote: > Hi! Thanks for the patch! > > What happened to the idea we had about SQL iterators? Then people would > be able to iterate with yields and to make any kind of aggregation without > having to create functions for that. > I think that this idea is actually very similar to the definition of CURSOR from the standard. Here's what the standard says about CURSORs: "A cursor is a mechanism by which the rows of a table may be acted on (e.g., returned to a host programming language) one at a time." I will try to suggest this idea. However, current issue have quite strict deadline, which is actually quite near. I believe that CURSOR injection is not a problem that can be solved in a few weeks. Also, about my current implementation. I don't like it, because there are too many changes made to BOX that are not related to BOX itself. I think there is a much simpler implementation. I think you noticed that the current implementation says that non-constant functions and C functions should define two implementations named "" and "_finalize". Do you think this rule can be made more general? I suggest creating two tuples in _func instead of one when we create an aggregate function using box.schema.func.create(). In this case, there will be almost no changes in alter.cc and BOX as a whole, and the logic will become much simpler. Also, I think we won't be adding new opcodes to the VDBE since we can just reuse OP_FunctionByName. That problem with this approach is that "_finalize" will be occupied. Also, this could potentially lead to a different combination of STEP and FINALIZE functions, for example, STEP could be constant but FINALIZE could be non-persistent. However, to create such combinations, the user must insert tuples directly into _func. I am not sure, if this a feature or a bug. What do you think? > See 8 comments below. > > > diff --git a/src/box/alter.cc b/src/box/alter.cc > > index 65c1cb952..f1dfcd807 100644 > > --- a/src/box/alter.cc > > +++ b/src/box/alter.cc > > @@ -3276,6 +3276,85 @@ func_def_get_ids_from_tuple(struct tuple *tuple, uint32_t *fid, uint32_t *uid) > > return tuple_field_u32(tuple, BOX_FUNC_FIELD_UID, uid); > > } > > > > +static int > > +func_def_check_body(struct tuple *tuple) > > +{ > > + assert(tuple_field_count(tuple) > BOX_FUNC_FIELD_BODY); > > + const char *field = tuple_field(tuple, BOX_FUNC_FIELD_BODY); > > + assert(field != NULL); > > + enum mp_type type = mp_typeof(*field); > > + if (type == MP_STR) > > + return 0; > > + if (type != MP_MAP) { > > + diag_set(ClientError, ER_FIELD_TYPE, "map or array", > > 1. 'Map or string'. > Fixed. > > + mp_type_strs[type]); > > + return -1; > > + } > > + const char *agg = tuple_field_cstr(tuple, BOX_FUNC_FIELD_AGGREGATE); > > + if (agg == NULL) > > + return -1; > > + if (STR2ENUM(func_aggregate, agg) != FUNC_AGGREGATE_GROUP) { > > + const char *name = tuple_field_cstr(tuple, BOX_FUNC_FIELD_NAME); > > + diag_set(ClientError, ER_CREATE_FUNCTION, name, > > + "only aggregate functions can have map as body"); > > + return -1; > > + } > > + > > + bool has_step = false; > > + bool has_finalize = false; > > + if (mp_decode_map(&field) != 2) > > + goto error; > > + for (int i = 0; i < 2; ++i) { > > + const char *value; > > + uint32_t size; > > + if (mp_typeof(*field) != MP_STR) > > + goto error; > > + value = mp_decode_str(&field, &size); > > + if (size == strlen("step")) { > > + if (strncmp(value, "step", size) != 0) > > + goto error; > > 2. I would rather propose to introduce a function to compare > 0-terminated string with non-terminated one. It is needed quite > often apparently. For instance, > > strlcmp(const char *l, const char *r, size_t r_len); > I added a new patch. I decided to insert it here instead of sending a new version, since changes it intoduced are quite small. Here it is: commit b30a92bdd47d49aed1b82a99dae0f9472e320ee8 Author: Mergen Imeev Date: Sun Jan 23 14:58:33 2022 +0300 util: introduce strlcmp() Utility function strlcmp() was introduced to simplify comparisons between NULL-terminated strings and non-NULL-terminated strings. diff --git a/src/lib/core/util.c b/src/lib/core/util.c index f29886105..035e207b1 100644 --- a/src/lib/core/util.c +++ b/src/lib/core/util.c @@ -264,6 +264,15 @@ strlcpy(char *dst, const char *src, size_t size) } #endif +int +strlcmp(const char *l, const char *r, size_t r_len) +{ + size_t l_len = strlen(l); + if (l_len != r_len) + return l_len - r_len > 0 ? 1 : -1; + return strncmp(l, r, r_len); +} + int utf8_check_printable(const char *start, size_t length) { diff --git a/src/trivia/util.h b/src/trivia/util.h index 3d853f612..ee1b88609 100644 --- a/src/trivia/util.h +++ b/src/trivia/util.h @@ -490,6 +490,19 @@ size_t strlcpy(char *dst, const char *src, size_t size); #endif +/** + * Compare two strings, the first of which is NULL-terminated and the second may + * not be NULL-terminated. + * + * @param l the first string, must be NULL-terminated. + * @param r the second string. + * @param r_len length of the second string. + * + * @return size of @a src string. + */ +int +strlcmp(const char *l, const char *r, size_t r_len); + /** * Check that @a str is valid utf-8 sequence and can be printed * unescaped. > > + has_step = true; > > + } else if (size == strlen("finalize")) { > > + if (strncmp(value, "finalize", size) != 0) > > + goto error; > > + has_finalize = true; > > + } > > + if (mp_typeof(*field) != MP_STR) > > + goto error; > > + mp_next(&field); > > + } > > + if (has_step && has_finalize) > > + return 0; > > +error: > > + const char *name = tuple_field_cstr(tuple, BOX_FUNC_FIELD_NAME); > > + diag_set(ClientError, ER_CREATE_FUNCTION, name, > > + "body of aggregate function should be map that contains " > > + "exactly two string fields: 'step' and 'finalize'"); > > + return -1; > > +} > > + > > +static const char * > > +func_def_get_agg_body(struct tuple *tuple, uint32_t *body_len, bool is_step) > > 3. Please, lets split that function in 2 maybe. Its invocation with > hardcoded 'true'/'false' in the end gives hard time understanding what > these values mean. > Fixed. > > +{ > > + const char *field = tuple_field(tuple, BOX_FUNC_FIELD_BODY); > > + assert(field != NULL); > > + if (mp_typeof(*field) == MP_STR) { > > + if (is_step) > > + return mp_decode_str(&field, body_len); > > + *body_len = 0; > > + return field; > > 4. You return len as 0, but the returned value != NULL? Why? > If I return NULL, that means an error has occurred. However, if I return something other than NULL and the length is 0, that means a non-persistent function has been introduced. > > + } > > @@ -3568,6 +3697,16 @@ on_replace_dd_func(struct trigger * /* trigger */, void *event) > > struct func *func = func_new(def); > > if (func == NULL) > > return -1; > > + if (def->aggregate == FUNC_AGGREGATE_GROUP) { > > + struct func_def *fin_def = > > + func_fin_def_new(new_tuple, def); > > + if (fin_def == NULL) > > + return -1; > > + struct func *func_fin = func_new(fin_def); > > + if (func_fin == NULL) > > + return -1; > > + func_fin_cache_insert(func_fin); > > 5. What happens if WAL write fails and on_create_func_rollback() is called? > I missed it earlier. Do you think it's better to introduce a new field in struct func/struct func_def or malloc()/free() a data for trigger->data? > > diff --git a/src/box/schema.h b/src/box/schema.h > > index d3bbdd590..2f020ee96 100644 > > --- a/src/box/schema.h > > +++ b/src/box/schema.h > > @@ -102,6 +102,15 @@ func_by_id(uint32_t fid); > > struct func * > > func_by_name(const char *name, uint32_t name_len); > > > > +void > > +func_fin_cache_insert(struct func *func); > > + > > +void > > +func_fin_cache_delete(uint32_t fid); > > + > > +struct func * > > +func_fin_by_id(uint32_t fid); > > 6. Lets leave some comments about what is 'func fin'. > Added a comment to func_fin_cache_insert(). > > diff --git a/src/box/sql/select.c b/src/box/sql/select.c > > index 2e33a31d2..c77cd1c9f 100644 > > --- a/src/box/sql/select.c > > +++ b/src/box/sql/select.c > > @@ -5644,10 +5650,17 @@ updateAccumulator(Parse * pParse, AggInfo * pAggInfo) > > pParse->is_aborted = true; > > return; > > } > > - sqlVdbeAddOp3(v, OP_AggStep, nArg, regAgg, pF->iMem); > > - sqlVdbeAppendP4(v, ctx, P4_FUNCCTX); > > - sql_expr_type_cache_change(pParse, regAgg, nArg); > > - sqlReleaseTempRange(pParse, regAgg, nArg); > > + if (pF->func->def->language == FUNC_LANGUAGE_SQL_BUILTIN) { > > + sqlVdbeAddOp3(v, OP_AggStep, nArg, regAgg, pF->iMem); > > + sqlVdbeAppendP4(v, ctx, P4_FUNCCTX); > > + } else { > > + sqlVdbeAddOp3(v, OP_UserAggStep, nArg, regAgg, > > + pF->iMem); > > + const char *name = pF->func->def->name; > > + int len = pF->func->def->name_len; > > + char *str = sqlDbStrNDup(pParse->db, name, len); > > + sqlVdbeAppendP4(v, str, P4_DYNAMIC); > > 7. Was it possible to unify how builtin and user-defined functions > are called? In the same opcode without any special 'if's. > I think something similar might be possible if we used struct func_sql_builtin somewhere before these lines are called. However, some preparations needed to be made, for example we have to force the user to re-prepare the statement in case something changes in _func. > > diff --git a/test/sql-tap/gh-2579-custom-aggregate.test.lua b/test/sql-tap/gh-2579-custom-aggregate.test.lua > > new file mode 100755 > > index 000000000..d5b845761 > > --- /dev/null > > +++ b/test/sql-tap/gh-2579-custom-aggregate.test.lua > 8. It might also be good to have a test with step/finalize functions > throwing a Lua error from their body. Added. Diff: diff --git a/src/box/alter.cc b/src/box/alter.cc index f1dfcd807..5381612af 100644 --- a/src/box/alter.cc +++ b/src/box/alter.cc @@ -3286,7 +3286,7 @@ func_def_check_body(struct tuple *tuple) if (type == MP_STR) return 0; if (type != MP_MAP) { - diag_set(ClientError, ER_FIELD_TYPE, "map or array", + diag_set(ClientError, ER_FIELD_TYPE, "map or string", mp_type_strs[type]); return -1; } @@ -3310,15 +3310,12 @@ func_def_check_body(struct tuple *tuple) if (mp_typeof(*field) != MP_STR) goto error; value = mp_decode_str(&field, &size); - if (size == strlen("step")) { - if (strncmp(value, "step", size) != 0) - goto error; + if (strlcmp("step", value, size) == 0) has_step = true; - } else if (size == strlen("finalize")) { - if (strncmp(value, "finalize", size) != 0) - goto error; + else if (strlcmp("finalize", value, size) == 0) has_finalize = true; - } + else + goto error; if (mp_typeof(*field) != MP_STR) goto error; mp_next(&field); @@ -3334,13 +3331,29 @@ error: } static const char * -func_def_get_agg_body(struct tuple *tuple, uint32_t *body_len, bool is_step) +func_def_get_agg_step_body(struct tuple *tuple, uint32_t *body_len) +{ + const char *field = tuple_field(tuple, BOX_FUNC_FIELD_BODY); + assert(field != NULL); + if (mp_typeof(*field) == MP_STR) + return mp_decode_str(&field, body_len); + assert(mp_typeof(*field) == MP_MAP); + mp_decode_map(&field); + uint32_t size; + mp_decode_str(&field, &size); + if (size == strlen("step")) + return mp_decode_str(&field, body_len); + mp_next(&field); + mp_next(&field); + return mp_decode_str(&field, body_len); +} + +static const char * +func_def_get_agg_finalize_body(struct tuple *tuple, uint32_t *body_len) { const char *field = tuple_field(tuple, BOX_FUNC_FIELD_BODY); assert(field != NULL); if (mp_typeof(*field) == MP_STR) { - if (is_step) - return mp_decode_str(&field, body_len); *body_len = 0; return field; } @@ -3348,7 +3361,7 @@ func_def_get_agg_body(struct tuple *tuple, uint32_t *body_len, bool is_step) mp_decode_map(&field); uint32_t size; mp_decode_str(&field, &size); - if (size == (is_step ? strlen("step") : strlen("finalize"))) + if (size == strlen("finalize")) return mp_decode_str(&field, body_len); mp_next(&field); mp_next(&field); @@ -3376,7 +3389,7 @@ func_def_new_from_tuple(struct tuple *tuple) if (field_count > BOX_FUNC_FIELD_BODY) { if (func_def_check_body(tuple) != 0) return NULL; - body = func_def_get_agg_body(tuple, &body_len, true); + body = func_def_get_agg_step_body(tuple, &body_len); if (body == NULL) return NULL; comment = tuple_field_str(tuple, BOX_FUNC_FIELD_COMMENT, @@ -3628,7 +3641,7 @@ func_fin_def_new(struct tuple *tuple, const struct func_def *def) const char *name = tt_sprintf("%s_finalize", def->name); size_t name_len = strlen(name); uint32_t body_len; - const char *body = func_def_get_agg_body(tuple, &body_len, false); + const char *body = func_def_get_agg_finalize_body(tuple, &body_len); if (body == NULL) return NULL; if ((body_len == 0 && def->body != NULL) || diff --git a/src/box/sql/vdbe.c b/src/box/sql/vdbe.c index 873b9f9f4..e2253caa1 100644 --- a/src/box/sql/vdbe.c +++ b/src/box/sql/vdbe.c @@ -1316,7 +1316,7 @@ case OP_UserAggStep: { struct region *region = &fiber()->gc; size_t region_svp = region_used(region); port_vdbemem_create(&args, (struct sql_value *)argv, pOp->p1 + 1); - if (func->vtab->call(func, &args, &ret) != 0) { + if (func_call(func, &args, &ret) != 0) { region_truncate(region, region_svp); goto abort_due_to_error; } @@ -1360,7 +1360,7 @@ case OP_UserAggFinal: { struct region *region = &fiber()->gc; size_t region_svp = region_used(region); port_vdbemem_create(&args, (struct sql_value *)argv, 1); - if (func->vtab->call(func, &args, &ret) != 0) { + if (func_call(func, &args, &ret) != 0) { region_truncate(region, region_svp); goto abort_due_to_error; } diff --git a/test/sql-tap/gh-2579-custom-aggregate.test.lua b/test/sql-tap/gh-2579-custom-aggregate.test.lua index d5b845761..b207257c6 100755 --- a/test/sql-tap/gh-2579-custom-aggregate.test.lua +++ b/test/sql-tap/gh-2579-custom-aggregate.test.lua @@ -3,7 +3,7 @@ local build_path = os.getenv("BUILDDIR") package.cpath = build_path..'/test/sql-tap/?.so;'..build_path..'/test/sql-tap/?.dylib;'..package.cpath local test = require("sqltester") -test:plan(13) +test:plan(15) -- Make sure that non-persistent aggregate functions are working as expected. local step = function(x, y) @@ -229,4 +229,50 @@ test:do_test( "be map that contains exactly two string fields: 'step' and 'finalize'" }) +-- +-- Make sure that user-defined aggregate functions that are throwing an error +-- are working correctly. +-- +local body_s10 = { + step = [[function(x) error("some error") return 0 end]], + finalize = [[function(x) return 0 end]] +} + +box.schema.func.create('S10', {aggregate = 'group', returns = 'number', + body = body_s10, exports = {'SQL'}}) + +test:do_catchsql_test( + "gh-2579-14", + [[ + SELECT s10(); + ]], { + 1, '[string "return function(x) error("some error") return..."]:1: '.. + 'some error' + }) + +local body_s11 = { + step = [[function(x) return 0 end]], + finalize = [[function(x) error("another error") return 0 end]] +} + +box.schema.func.create('S11', {aggregate = 'group', returns = 'number', + body = body_s11, exports = {'SQL'}}) + +test:do_catchsql_test( + "gh-2579-15", + [[ + SELECT s11(); + ]], { + 1, '[string "return function(x) error("another error") ret..."]:1: '.. + 'another error' + }) + +box.space._func.index[2]:delete("S1") +box.space._func.index[2]:delete("S2") +box.space._func.index[2]:delete("gh-2579-custom-aggregate.S3") +box.space._func.index[2]:delete("S4") +box.space._func.index[2]:delete("S10") +box.space._func.index[2]:delete("S11") +box.space.T:drop() + test:finish_test() diff --git a/src/box/schema.h b/src/box/schema.h index 2f020ee96..186fa9db7 100644 --- a/src/box/schema.h +++ b/src/box/schema.h @@ -102,6 +102,12 @@ func_by_id(uint32_t fid); struct func * func_by_name(const char *name, uint32_t name_len); +/** + * Insert a new function object for finalize of aggregate function in the + * function cache. + * + * @param func Function object to insert. + */ void func_fin_cache_insert(struct func *func); New patch: commit 1caf17400eae504a192b6e49cba0d254a4a510ea Author: Mergen Imeev Date: Tue Jan 11 08:48:58 2022 +0300 sql: introduce user-defined aggregate functions This patch introduces user-defined SQL aggregate functions and an interface for creating them. Closes #2579 @TarantoolBot document Title: User-defined aggregate functions User-defined aggregate functions are now available. There are three types of user-defined aggregate functions: persistent functions, non-persistent functions, and C-functions. All user-defined aggregate functions consist of two functions: a "step" function, which is executed for each record in the group, and a "finalize" function, which is executed after all records in the group have been processed. The "Step" function has one more argument than specified in "param_list" in the definition. This argument is the current state of the aggregate function and must be the first argument. The "Finalize" function has exactly one argument, which is the current state of the aggregate function. To use a C user-defined aggregate function and a non-persistent user-defined aggregate function, you must define a "step" function with a name equal to the function name, and a "finalize" function with a name equal to the function name with "_finalize" appended. For non-persistent functions, both functions must be global in Lua, and for a C function, the functions with the these names must be defined in the appropriate .so file. Example of non-persistent aggregate function: ``` F0 = function(x, y) if x == nil then x = {sum = 0, count = 0} end x.sum = x.sum + y x.count = x.count + 1 return x end F0_finalize = function(x) return x.sum / x.count end box.schema.func.create('F0', {aggregate = 'group', returns = 'number', param_list = {'integer'}, exports = {'SQL'}}) box.execute([[SELECT f0(column_3) FROM (VALUES(1), (2), (3));]]) ``` To use a persistent user-defined aggregate function, you must define the "body" field in the function definition as a map with two fields, "step" and "finalize" according to the rules defined above. Example of persistent aggregate function: ``` body = { step = [[function(x, y) if x == nil then x = {sum = 0, count = 0} end x.sum = x.sum + y x.count = x.count + 1 return x end]], finalize = [[function(x) return x.sum / x.count end]] } box.schema.func.create('F1', {aggregate = 'group', returns = 'number', body = body, param_list = {'integer'}, exports = {'SQL'}}) box.execute([[SELECT f1(column_3) FROM (VALUES(1), (2), (3));]]) ``` diff --git a/changelogs/unreleased/gh-2579-introduce-custom-aggregate-functions.md b/changelogs/unreleased/gh-2579-introduce-custom-aggregate-functions.md new file mode 100644 index 000000000..68adad478 --- /dev/null +++ b/changelogs/unreleased/gh-2579-introduce-custom-aggregate-functions.md @@ -0,0 +1,4 @@ +## feature/sql + +* Custom SQL aggregate function and an interface to create them is now + available (gh-2579). diff --git a/src/box/alter.cc b/src/box/alter.cc index 65c1cb952..5381612af 100644 --- a/src/box/alter.cc +++ b/src/box/alter.cc @@ -3276,6 +3276,98 @@ func_def_get_ids_from_tuple(struct tuple *tuple, uint32_t *fid, uint32_t *uid) return tuple_field_u32(tuple, BOX_FUNC_FIELD_UID, uid); } +static int +func_def_check_body(struct tuple *tuple) +{ + assert(tuple_field_count(tuple) > BOX_FUNC_FIELD_BODY); + const char *field = tuple_field(tuple, BOX_FUNC_FIELD_BODY); + assert(field != NULL); + enum mp_type type = mp_typeof(*field); + if (type == MP_STR) + return 0; + if (type != MP_MAP) { + diag_set(ClientError, ER_FIELD_TYPE, "map or string", + mp_type_strs[type]); + return -1; + } + const char *agg = tuple_field_cstr(tuple, BOX_FUNC_FIELD_AGGREGATE); + if (agg == NULL) + return -1; + if (STR2ENUM(func_aggregate, agg) != FUNC_AGGREGATE_GROUP) { + const char *name = tuple_field_cstr(tuple, BOX_FUNC_FIELD_NAME); + diag_set(ClientError, ER_CREATE_FUNCTION, name, + "only aggregate functions can have map as body"); + return -1; + } + + bool has_step = false; + bool has_finalize = false; + if (mp_decode_map(&field) != 2) + goto error; + for (int i = 0; i < 2; ++i) { + const char *value; + uint32_t size; + if (mp_typeof(*field) != MP_STR) + goto error; + value = mp_decode_str(&field, &size); + if (strlcmp("step", value, size) == 0) + has_step = true; + else if (strlcmp("finalize", value, size) == 0) + has_finalize = true; + else + goto error; + if (mp_typeof(*field) != MP_STR) + goto error; + mp_next(&field); + } + if (has_step && has_finalize) + return 0; +error: + const char *name = tuple_field_cstr(tuple, BOX_FUNC_FIELD_NAME); + diag_set(ClientError, ER_CREATE_FUNCTION, name, + "body of aggregate function should be map that contains " + "exactly two string fields: 'step' and 'finalize'"); + return -1; +} + +static const char * +func_def_get_agg_step_body(struct tuple *tuple, uint32_t *body_len) +{ + const char *field = tuple_field(tuple, BOX_FUNC_FIELD_BODY); + assert(field != NULL); + if (mp_typeof(*field) == MP_STR) + return mp_decode_str(&field, body_len); + assert(mp_typeof(*field) == MP_MAP); + mp_decode_map(&field); + uint32_t size; + mp_decode_str(&field, &size); + if (size == strlen("step")) + return mp_decode_str(&field, body_len); + mp_next(&field); + mp_next(&field); + return mp_decode_str(&field, body_len); +} + +static const char * +func_def_get_agg_finalize_body(struct tuple *tuple, uint32_t *body_len) +{ + const char *field = tuple_field(tuple, BOX_FUNC_FIELD_BODY); + assert(field != NULL); + if (mp_typeof(*field) == MP_STR) { + *body_len = 0; + return field; + } + assert(mp_typeof(*field) == MP_MAP); + mp_decode_map(&field); + uint32_t size; + mp_decode_str(&field, &size); + if (size == strlen("finalize")) + return mp_decode_str(&field, body_len); + mp_next(&field); + mp_next(&field); + return mp_decode_str(&field, body_len); +} + /** Create a function definition from tuple. */ static struct func_def * func_def_new_from_tuple(struct tuple *tuple) @@ -3295,7 +3387,9 @@ func_def_new_from_tuple(struct tuple *tuple) if (identifier_check(name, name_len) != 0) return NULL; if (field_count > BOX_FUNC_FIELD_BODY) { - body = tuple_field_str(tuple, BOX_FUNC_FIELD_BODY, &body_len); + if (func_def_check_body(tuple) != 0) + return NULL; + body = func_def_get_agg_step_body(tuple, &body_len); if (body == NULL) return NULL; comment = tuple_field_str(tuple, BOX_FUNC_FIELD_COMMENT, @@ -3498,6 +3592,11 @@ func_def_new_from_tuple(struct tuple *tuple) def->exports.lua = true; def->param_count = 0; } + if (def->aggregate == FUNC_AGGREGATE_GROUP && def->exports.lua != 0) { + diag_set(ClientError, ER_CREATE_FUNCTION, def->name, + "aggregate function can only be accessed in SQL"); + return NULL; + } if (func_def_check(def) != 0) return NULL; def_guard.is_active = false; @@ -3536,6 +3635,49 @@ on_drop_func_rollback(struct trigger *trigger, void * /* event */) return 0; } +struct func_def * +func_fin_def_new(struct tuple *tuple, const struct func_def *def) +{ + const char *name = tt_sprintf("%s_finalize", def->name); + size_t name_len = strlen(name); + uint32_t body_len; + const char *body = func_def_get_agg_finalize_body(tuple, &body_len); + if (body == NULL) + return NULL; + if ((body_len == 0 && def->body != NULL) || + (def->body == NULL && body_len != 0)) { + diag_set(ClientError, ER_CREATE_FUNCTION, def->name, + "step or finalize of aggregate function is undefined"); + return NULL; + } + uint32_t len = sizeof(struct func_def) + name_len + 1; + if (body_len > 0) + len += body_len + 1; + struct func_def *fin_def = (struct func_def *)malloc(len); + fin_def->fid = def->fid; + fin_def->uid = def->uid; + if (body_len > 0) { + fin_def->body = fin_def->name + name_len + 1; + memcpy(fin_def->body, body, body_len); + fin_def->body[body_len] = '\0'; + } else { + fin_def->body = NULL; + } + fin_def->comment = NULL; + fin_def->setuid = def->setuid; + fin_def->is_deterministic = def->is_deterministic; + fin_def->is_sandboxed = def->is_sandboxed; + fin_def->param_count = 0; + fin_def->returns = def->returns; + fin_def->aggregate = FUNC_AGGREGATE_GROUP; + fin_def->language = def->language; + fin_def->name_len = name_len; + fin_def->exports = def->exports; + fin_def->opts = def->opts; + memcpy(fin_def->name, name, name_len + 1); + return fin_def; +} + /** * A trigger invoked on replace in a space containing * functions on which there were defined any grants. @@ -3568,6 +3710,16 @@ on_replace_dd_func(struct trigger * /* trigger */, void *event) struct func *func = func_new(def); if (func == NULL) return -1; + if (def->aggregate == FUNC_AGGREGATE_GROUP) { + struct func_def *fin_def = + func_fin_def_new(new_tuple, def); + if (fin_def == NULL) + return -1; + struct func *func_fin = func_new(fin_def); + if (func_fin == NULL) + return -1; + func_fin_cache_insert(func_fin); + } def_guard.is_active = false; func_cache_insert(func); on_rollback->data = func; @@ -3610,6 +3762,8 @@ on_replace_dd_func(struct trigger * /* trigger */, void *event) txn_alter_trigger_new(on_drop_func_rollback, old_func); if (on_commit == NULL || on_rollback == NULL) return -1; + if (old_func->def->aggregate == FUNC_AGGREGATE_GROUP) + func_fin_cache_delete(old_func->def->fid); func_cache_delete(old_func->def->fid); txn_stmt_on_commit(stmt, on_commit); txn_stmt_on_rollback(stmt, on_rollback); @@ -3622,9 +3776,13 @@ on_replace_dd_func(struct trigger * /* trigger */, void *event) * definition to support upgrade script. */ struct func_def *old_def = NULL, *new_def = NULL; - auto guard = make_scoped_guard([&old_def, &new_def] { + struct func_def *new_fin_def = NULL; + struct func_def *old_fin_def = NULL; + auto guard = make_scoped_guard([=] { free(old_def); free(new_def); + free(new_fin_def); + free(old_fin_def); }); old_def = func_def_new_from_tuple(old_tuple); new_def = func_def_new_from_tuple(new_tuple); @@ -3635,6 +3793,18 @@ on_replace_dd_func(struct trigger * /* trigger */, void *event) "alter"); return -1; } + if (new_def->aggregate != FUNC_AGGREGATE_GROUP) + return 0; + new_fin_def = func_fin_def_new(new_tuple, new_def); + old_fin_def = func_fin_def_new(old_tuple, old_def); + if (new_fin_def == NULL || old_fin_def == NULL) + return -1; + if (func_def_cmp(new_fin_def, old_fin_def) != 0) { + diag_set(ClientError, ER_UNSUPPORTED, "function", + "alter"); + return -1; + } + return 0; } return 0; } diff --git a/src/box/bootstrap.snap b/src/box/bootstrap.snap index 018670d2a..610e513fd 100644 Binary files a/src/box/bootstrap.snap and b/src/box/bootstrap.snap differ diff --git a/src/box/lua/schema.lua b/src/box/lua/schema.lua index 99b65b0f5..8c328852d 100644 --- a/src/box/lua/schema.lua +++ b/src/box/lua/schema.lua @@ -2597,7 +2597,8 @@ box.schema.func.create = function(name, opts) opts = opts or {} check_param_table(opts, { setuid = 'boolean', if_not_exists = 'boolean', - language = 'string', body = 'string', + language = 'string', body = 'any', + aggregate = 'string', is_deterministic = 'boolean', is_sandboxed = 'boolean', is_multikey = 'boolean', diff --git a/src/box/lua/upgrade.lua b/src/box/lua/upgrade.lua index f9e6543c2..0e2c4a091 100644 --- a/src/box/lua/upgrade.lua +++ b/src/box/lua/upgrade.lua @@ -1212,6 +1212,21 @@ local function upgrade_to_2_9_1() remove_sql_builtin_functions_from_func() end +-------------------------------------------------------------------------------- +-- Tarantool 2.10.1 +-------------------------------------------------------------------------------- +local function change_func_body_field_type() + local _func = box.space._func + local format = _func:format() + -- Change type of 'body' field from 'string' to 'any'. + format[6].type = 'any' + _func:format(format) +end + +local function upgrade_to_2_10_1() + change_func_body_field_type() +end + -------------------------------------------------------------------------------- local handlers = { @@ -1229,6 +1244,7 @@ local handlers = { {version = mkversion(2, 3, 1), func = upgrade_to_2_3_1, auto = true}, {version = mkversion(2, 7, 1), func = upgrade_to_2_7_1, auto = true}, {version = mkversion(2, 9, 1), func = upgrade_to_2_9_1, auto = true}, + {version = mkversion(2, 10, 1), func = upgrade_to_2_10_1, auto = true}, } -- Schema version of the snapshot. diff --git a/src/box/schema.cc b/src/box/schema.cc index c248d4ee4..eb1b4a7dd 100644 --- a/src/box/schema.cc +++ b/src/box/schema.cc @@ -60,6 +60,7 @@ static struct mh_i32ptr_t *spaces; static struct mh_strnptr_t *spaces_by_name; static struct mh_i32ptr_t *funcs; static struct mh_strnptr_t *funcs_by_name; +static struct mh_i32ptr_t *funcs_fin; static struct mh_i32ptr_t *sequences; /** Public change counter. On its update clients need to fetch * new space data from the instance. */ @@ -379,6 +380,7 @@ schema_init(void) spaces_by_name = mh_strnptr_new(); funcs = mh_i32ptr_new(); funcs_by_name = mh_strnptr_new(); + funcs_fin = mh_i32ptr_new(); sequences = mh_i32ptr_new(); /* * Create surrogate space objects for the mandatory system @@ -540,6 +542,14 @@ schema_free(void) func_delete(func); } mh_i32ptr_delete(funcs); + while (mh_size(funcs_fin) > 0) { + mh_int_t i = mh_first(funcs_fin); + struct func *func = + (struct func *)mh_i32ptr_node(funcs_fin, i)->val; + func_cache_delete(func->def->fid); + func_delete(func); + } + mh_i32ptr_delete(funcs_fin); while (mh_size(sequences) > 0) { mh_int_t i = mh_first(sequences); @@ -597,6 +607,33 @@ func_by_name(const char *name, uint32_t name_len) return (struct func *) mh_strnptr_node(funcs_by_name, func)->val; } +void +func_fin_cache_insert(struct func *func) +{ + uint32_t fid = func->def->fid; + assert(func_fin_by_id(fid) == NULL); + const struct mh_i32ptr_node_t node = {fid, func}; + mh_i32ptr_put(funcs_fin, &node, NULL, NULL); +} + +void +func_fin_cache_delete(uint32_t fid) +{ + mh_int_t k = mh_i32ptr_find(funcs_fin, fid, NULL); + if (k == mh_end(funcs_fin)) + return; + mh_i32ptr_del(funcs_fin, k, NULL); +} + +struct func * +func_fin_by_id(uint32_t fid) +{ + mh_int_t func = mh_i32ptr_find(funcs_fin, fid, NULL); + if (func == mh_end(funcs_fin)) + return NULL; + return (struct func *)mh_i32ptr_node(funcs_fin, func)->val; +} + int schema_find_grants(const char *type, uint32_t id, bool *out) { diff --git a/src/box/schema.h b/src/box/schema.h index d3bbdd590..186fa9db7 100644 --- a/src/box/schema.h +++ b/src/box/schema.h @@ -102,6 +102,21 @@ func_by_id(uint32_t fid); struct func * func_by_name(const char *name, uint32_t name_len); +/** + * Insert a new function object for finalize of aggregate function in the + * function cache. + * + * @param func Function object to insert. + */ +void +func_fin_cache_insert(struct func *func); + +void +func_fin_cache_delete(uint32_t fid); + +struct func * +func_fin_by_id(uint32_t fid); + /** Call a visitor function on every space in the space cache. */ int space_foreach(int (*func)(struct space *sp, void *udata), void *udata); diff --git a/src/box/sql/expr.c b/src/box/sql/expr.c index eb169aeb8..46ffe377c 100644 --- a/src/box/sql/expr.c +++ b/src/box/sql/expr.c @@ -5469,7 +5469,18 @@ analyzeAggregate(Walker * pWalker, Expr * pExpr) (pExpr, EP_xIsSelect)); pItem = &pAggInfo->aFunc[i]; pItem->pExpr = pExpr; - pItem->iMem = ++pParse->nMem; + int n = pExpr->x.pList == NULL ? + 0 : pExpr->x.pList->nExpr; + /* + * Allocate 1 MEM for + * accumulator and next n MEMs + * for arguments. This makes it + * easier to pass these n + 1 + * MEMs to the user-defined + * aggregate function. + */ + pItem->iMem = pParse->nMem + 1; + pParse->nMem += n + 1; assert(!ExprHasProperty (pExpr, EP_IntValue)); pItem->func = @@ -5479,12 +5490,6 @@ analyzeAggregate(Walker * pWalker, Expr * pExpr) true; return WRC_Abort; } - assert(pItem->func->def-> - language == - FUNC_LANGUAGE_SQL_BUILTIN && - pItem->func->def-> - aggregate == - FUNC_AGGREGATE_GROUP); if (pExpr->flags & EP_Distinct) { pItem->iDistinct = pParse->nTab++; diff --git a/src/box/sql/func.c b/src/box/sql/func.c index b69bf7fd6..493b69a4c 100644 --- a/src/box/sql/func.c +++ b/src/box/sql/func.c @@ -2072,9 +2072,12 @@ uint32_t sql_func_flags(const char *name) { struct sql_func_dictionary *dict = built_in_func_get(name); - if (dict == NULL) + if (dict != NULL) + return dict->flags; + struct func *func = func_by_name(name, strlen(name)); + if (func == NULL || func->def->aggregate != FUNC_AGGREGATE_GROUP) return 0; - return dict->flags; + return SQL_FUNC_AGG; } static struct func_vtab func_sql_builtin_vtab; diff --git a/src/box/sql/select.c b/src/box/sql/select.c index 2e33a31d2..c77cd1c9f 100644 --- a/src/box/sql/select.c +++ b/src/box/sql/select.c @@ -4648,8 +4648,6 @@ is_simple_count(struct Select *select, struct AggInfo *agg_info) return NULL; if (NEVER(agg_info->nFunc == 0)) return NULL; - assert(agg_info->aFunc->func->def->language == - FUNC_LANGUAGE_SQL_BUILTIN); if (memcmp(agg_info->aFunc->func->def->name, "COUNT", 5) != 0 || (agg_info->aFunc->pExpr->x.pList != NULL && agg_info->aFunc->pExpr->x.pList->nExpr > 0)) @@ -5575,9 +5573,17 @@ finalizeAggFunctions(Parse * pParse, AggInfo * pAggInfo) for (i = 0, pF = pAggInfo->aFunc; i < pAggInfo->nFunc; i++, pF++) { ExprList *pList = pF->pExpr->x.pList; assert(!ExprHasProperty(pF->pExpr, EP_xIsSelect)); - sqlVdbeAddOp2(v, OP_AggFinal, pF->iMem, - pList ? pList->nExpr : 0); - sqlVdbeAppendP4(v, pF->func, P4_FUNC); + if (pF->func->def->language == FUNC_LANGUAGE_SQL_BUILTIN) { + sqlVdbeAddOp2(v, OP_AggFinal, pF->iMem, + pList != NULL ? pList->nExpr : 0); + sqlVdbeAppendP4(v, pF->func, P4_FUNC); + } else { + sqlVdbeAddOp1(v, OP_UserAggFinal, pF->iMem); + const char *name = pF->func->def->name; + int len = pF->func->def->name_len; + char *str = sqlDbStrNDup(pParse->db, name, len); + sqlVdbeAppendP4(v, str, P4_DYNAMIC); + } } } @@ -5604,7 +5610,7 @@ updateAccumulator(Parse * pParse, AggInfo * pAggInfo) assert(!ExprHasProperty(pF->pExpr, EP_xIsSelect)); if (pList) { nArg = pList->nExpr; - regAgg = sqlGetTempRange(pParse, nArg); + regAgg = pF->iMem + 1; sqlExprCodeExprList(pParse, pList, regAgg, 0, SQL_ECEL_DUP); } else { @@ -5644,10 +5650,17 @@ updateAccumulator(Parse * pParse, AggInfo * pAggInfo) pParse->is_aborted = true; return; } - sqlVdbeAddOp3(v, OP_AggStep, nArg, regAgg, pF->iMem); - sqlVdbeAppendP4(v, ctx, P4_FUNCCTX); - sql_expr_type_cache_change(pParse, regAgg, nArg); - sqlReleaseTempRange(pParse, regAgg, nArg); + if (pF->func->def->language == FUNC_LANGUAGE_SQL_BUILTIN) { + sqlVdbeAddOp3(v, OP_AggStep, nArg, regAgg, pF->iMem); + sqlVdbeAppendP4(v, ctx, P4_FUNCCTX); + } else { + sqlVdbeAddOp3(v, OP_UserAggStep, nArg, regAgg, + pF->iMem); + const char *name = pF->func->def->name; + int len = pF->func->def->name_len; + char *str = sqlDbStrNDup(pParse->db, name, len); + sqlVdbeAppendP4(v, str, P4_DYNAMIC); + } if (addrNext) { sqlVdbeResolveLabel(v, addrNext); sqlExprCacheClear(pParse); diff --git a/src/box/sql/vdbe.c b/src/box/sql/vdbe.c index 24cb28260..e2253caa1 100644 --- a/src/box/sql/vdbe.c +++ b/src/box/sql/vdbe.c @@ -1289,6 +1289,93 @@ case OP_FunctionByName: { break; } +/* + * Opcode: UserAggStep P1 P2 P3 P4 * + * Synopsis: accum=r[P3] step(r[P2@P1]) + * + * Execute the step function for a user-defined aggregate function. The step + * function has P1 arguments. P4 is a pointer to a function object that defines + * the aggregate function. Register P3 is the accumulator. + * + * The P1 arguments are taken from register P2 and its successors. The P2 + * register must be immediately after the P3 register. + */ +case OP_UserAggStep: { + assert(pOp->p4type == P4_DYNAMIC); + assert(pOp->p1 == 0 || pOp->p3 == pOp->p2 - 1); + struct func *func = func_by_name(pOp->p4.z, strlen(pOp->p4.z)); + if (func == NULL) { + diag_set(ClientError, ER_NO_SUCH_FUNCTION, pOp->p4.z); + goto abort_due_to_error; + } + + struct Mem *argv = &aMem[pOp->p3]; + struct port args; + struct port ret; + + struct region *region = &fiber()->gc; + size_t region_svp = region_used(region); + port_vdbemem_create(&args, (struct sql_value *)argv, pOp->p1 + 1); + if (func_call(func, &args, &ret) != 0) { + region_truncate(region, region_svp); + goto abort_due_to_error; + } + + uint32_t size; + struct Mem *mem = (struct Mem *)port_get_vdbemem(&ret, &size); + assert(size == 1); + bool is_error = mem == NULL || mem_copy(&aMem[pOp->p3], mem) != 0; + port_destroy(&ret); + region_truncate(region, region_svp); + if (is_error) + goto abort_due_to_error; + break; +} + +/* Opcode: UserAggFinal P1 * * P4 * + * Synopsis: r[P1] = finalize(r[P1]) + * + * Execute the finalize function for a user-defined aggregate function. P4 is a + * pointer to a function object that defines the aggregate function. Register P1 + * is the accumulator. The result will be written to register P1. + */ +case OP_UserAggFinal: { + assert(pOp->p1 > 0 && pOp->p1 <= (p->nMem + 1 - p->nCursor)); + assert(pOp->p4type == P4_DYNAMIC); + struct func *step_func = func_by_name(pOp->p4.z, strlen(pOp->p4.z)); + if (step_func == NULL) { + diag_set(ClientError, ER_NO_SUCH_FUNCTION, pOp->p4.z); + goto abort_due_to_error; + } + struct func *func = func_fin_by_id(step_func->def->fid); + if (func == NULL) { + diag_set(ClientError, ER_NO_SUCH_FUNCTION, pOp->p4.z); + goto abort_due_to_error; + } + + struct Mem *argv = &aMem[pOp->p1]; + struct port args; + struct port ret; + + struct region *region = &fiber()->gc; + size_t region_svp = region_used(region); + port_vdbemem_create(&args, (struct sql_value *)argv, 1); + if (func_call(func, &args, &ret) != 0) { + region_truncate(region, region_svp); + goto abort_due_to_error; + } + + uint32_t size; + struct Mem *mem = (struct Mem *)port_get_vdbemem(&ret, &size); + assert(size == 1); + bool is_error = mem == NULL || mem_copy(&aMem[pOp->p1], mem) != 0; + port_destroy(&ret); + region_truncate(region, region_svp); + if (is_error) + goto abort_due_to_error; + break; +} + /* Opcode: BitAnd P1 P2 P3 * * * Synopsis: r[P3]=r[P1]&r[P2] * diff --git a/test/box-py/bootstrap.result b/test/box-py/bootstrap.result index cea440c64..78d444bb0 100644 --- a/test/box-py/bootstrap.result +++ b/test/box-py/bootstrap.result @@ -4,7 +4,7 @@ box.internal.bootstrap() box.space._schema:select{} --- - - ['max_id', 511] - - ['version', 2, 9, 1] + - ['version', 2, 10, 1] ... box.space._cluster:select{} --- @@ -54,7 +54,7 @@ box.space._space:select{} - [296, 1, '_func', 'memtx', 0, {}, [{'name': 'id', 'type': 'unsigned'}, {'name': 'owner', 'type': 'unsigned'}, {'name': 'name', 'type': 'string'}, {'name': 'setuid', 'type': 'unsigned'}, {'name': 'language', 'type': 'string'}, {'name': 'body', - 'type': 'string'}, {'name': 'routine_type', 'type': 'string'}, {'name': 'param_list', + 'type': 'any'}, {'name': 'routine_type', 'type': 'string'}, {'name': 'param_list', 'type': 'array'}, {'name': 'returns', 'type': 'string'}, {'name': 'aggregate', 'type': 'string'}, {'name': 'sql_data_access', 'type': 'string'}, {'name': 'is_deterministic', 'type': 'boolean'}, {'name': 'is_sandboxed', 'type': 'boolean'}, {'name': 'is_null_call', diff --git a/test/sql-tap/CMakeLists.txt b/test/sql-tap/CMakeLists.txt index c4ec1214a..136a517d4 100644 --- a/test/sql-tap/CMakeLists.txt +++ b/test/sql-tap/CMakeLists.txt @@ -3,6 +3,8 @@ build_module(gh-5938-wrong-string-length gh-5938-wrong-string-length.c) target_link_libraries(gh-5938-wrong-string-length msgpuck) build_module(gh-6024-funcs-return-bin gh-6024-funcs-return-bin.c) target_link_libraries(gh-6024-funcs-return-bin msgpuck) +build_module(gh-2579-custom-aggregate gh-2579-custom-aggregate.c) +target_link_libraries(gh-2579-custom-aggregate msgpuck) build_module(sql_uuid sql_uuid.c) target_link_libraries(sql_uuid msgpuck core) build_module(decimal decimal.c) diff --git a/test/sql-tap/gh-2579-custom-aggregate.c b/test/sql-tap/gh-2579-custom-aggregate.c new file mode 100644 index 000000000..5552effc5 --- /dev/null +++ b/test/sql-tap/gh-2579-custom-aggregate.c @@ -0,0 +1,48 @@ +#include "msgpuck.h" +#include "module.h" +#include "mp_decimal.h" +#include "lua/tnt_msgpuck.h" +#include "mp_uuid.h" + +enum { + BUF_SIZE = 512, +}; + +int +S3(box_function_ctx_t *ctx, const char *args, const char *args_end) +{ + (void)args_end; + uint32_t arg_count = mp_decode_array(&args); + if (arg_count != 2) { + return box_error_set(__FILE__, __LINE__, ER_PROC_C, + "invalid argument count"); + } + int sum = 0; + if (mp_typeof(*args) != MP_UINT) + mp_decode_nil(&args); + else + sum = mp_decode_uint(&args); + sum += mp_decode_uint(&args); + char res[BUF_SIZE]; + char *end = mp_encode_uint(res, sum); + box_return_mp(ctx, res, end); + return 0; +} + +int +S3_finalize(box_function_ctx_t *ctx, const char *args, const char *args_end) +{ + (void)args_end; + uint32_t arg_count = mp_decode_array(&args); + if (arg_count != 1) { + return box_error_set(__FILE__, __LINE__, ER_PROC_C, + "invalid argument count"); + } + int sum = 0; + if (mp_typeof(*args) == MP_UINT) + sum = mp_decode_uint(&args); + char res[BUF_SIZE]; + char *end = mp_encode_double(res, sum / 10.0); + box_return_mp(ctx, res, end); + return 0; +} diff --git a/test/sql-tap/gh-2579-custom-aggregate.test.lua b/test/sql-tap/gh-2579-custom-aggregate.test.lua new file mode 100755 index 000000000..b207257c6 --- /dev/null +++ b/test/sql-tap/gh-2579-custom-aggregate.test.lua @@ -0,0 +1,278 @@ +#!/usr/bin/env tarantool +local build_path = os.getenv("BUILDDIR") +package.cpath = build_path..'/test/sql-tap/?.so;'..build_path..'/test/sql-tap/?.dylib;'..package.cpath + +local test = require("sqltester") +test:plan(15) + +-- Make sure that non-persistent aggregate functions are working as expected. +local step = function(x, y) + if x == nil then + x = {sum = 0, count = 0} + end + x.sum = x.sum + y + x.count = x.count + 1 + return x +end + +local finalize = function(x) + return x.sum / x.count +end + +rawset(_G, 'S1', step) +rawset(_G, 'S1_finalize', finalize) +box.schema.func.create('S1', {aggregate = 'group', returns = 'number', + param_list = {'integer'}, exports = {'SQL'}}) + +test:execsql([[ + CREATE TABLE t(i INTEGER PRIMARY KEY AUTOINCREMENT, a INT); + INSERT INTO t(a) VALUES(1), (1), (2), (2), (3); +]]) + +test:do_execsql_test( + "gh-2579-1", + [[ + SELECT avg(a), s1(a) FROM t; + ]], { + 1, 1.8 + }) + +test:do_execsql_test( + "gh-2579-2", + [[ + SELECT avg(distinct a), s1(distinct a) FROM t; + ]], { + 2, 2 + }) + +-- Make sure that persistent aggregate functions are working as expected. +local body_s2 = { + step = [[function(x, y) + if x == nil then + x = {sum = 0, count = 0} + end + x.sum = x.sum + y + x.count = x.count + 1 + return x + end]], + finalize = [[function(x) + return x.sum / x.count + end]] +} + +box.schema.func.create('S2', {aggregate = 'group', returns = 'number', + body = body_s2, param_list = {'integer'}, + exports = {'SQL'}}) + +test:do_execsql_test( + "gh-2579-3", + [[ + SELECT avg(a), s2(a) FROM t; + ]], { + 1, 1.8 + }) + +test:do_execsql_test( + "gh-2579-4", + [[ + SELECT avg(distinct a), s2(distinct a) FROM t; + ]], { + 2, 2 + }) + +-- Make sure that C aggregate functions are working as expected. +box.schema.func.create("gh-2579-custom-aggregate.S3", { + language = "C", aggregate = 'group', param_list = {"integer"}, + returns = "number", exports = {"SQL"}, +}) + +test:do_execsql_test( + "gh-2579-5", + [[ + SELECT "gh-2579-custom-aggregate.S3"(a) FROM t; + ]], { + 0.9 + }) + +test:do_execsql_test( + "gh-2579-6", + [[ + SELECT "gh-2579-custom-aggregate.S3"(distinct a) FROM t; + ]], { + 0.6 + }) + +-- +-- Make sure that user-defined aggregate functions can have more than one +-- argument. +-- +local body_s4 = { + step = [[function(x, y, z) + if x == nil then + x = {sum = 0, count = 0} + end + x.sum = x.sum + y + z + x.count = x.count + 1 + return x + end]], + finalize = [[function(x) + return x.sum / x.count + end]] +} +box.schema.func.create('S4', {aggregate = 'group', returns = 'number', + param_list = {'integer', 'integer'}, + body = body_s4, exports = {'SQL'}}) + +test:do_execsql_test( + "gh-2579-7", + [[ + SELECT s4(a, i) FROM t; + ]], { + 4.8 + }) + +-- +-- Make sure that user-defined aggregate functions with more than one argument +-- cannot work with DISTINCT. +-- +test:do_catchsql_test( + "gh-2579-8", + [[ + SELECT s4(distinct a, i) FROM t; + ]], { + 1, "DISTINCT aggregates must have exactly one argument" + }) + +-- Make sure user-defined aggregate functions are not available in Lua. +test:do_test( + "gh-2579-9", + function() + local def = {aggregate = 'group', returns = 'number', + param_list = {'integer'}, exports = {'LUA', 'SQL'}} + local res = {pcall(box.schema.func.create, 'S5', def)} + return {tostring(res[2])} + end, { + "Failed to create function 'S5': aggregate function can only be ".. + "accessed in SQL" + }) + +-- +-- Make sure user-defined aggregate functions should have both STEP and FINALIZE +-- persistent or non-persistent. +-- +test:do_test( + "gh-2579-10", + function() + local body = "function(x) return x / 1000 end" + local def = {aggregate = 'group', returns = 'number', body = body, + param_list = {'integer'}, exports = {'SQL'}} + local res = {pcall(box.schema.func.create, 'S6', def)} + return {tostring(res[2])} + end, { + "Failed to create function 'S6': step or finalize of aggregate ".. + "function is undefined" + }) + +-- +-- Make sure body of user-defined aggregate function should have exactly two +-- fields: "step" and "finalize". +-- +test:do_test( + "gh-2579-11", + function() + local body = {step = "function(x) return x / 1000 end"} + local def = {aggregate = 'group', returns = 'number', body = body, + param_list = {'integer'}, exports = {'SQL'}} + local res = {pcall(box.schema.func.create, 'S7', def)} + return {tostring(res[2])} + end, { + "Failed to create function 'S7': body of aggregate function should ".. + "be map that contains exactly two string fields: 'step' and 'finalize'" + }) + +test:do_test( + "gh-2579-12", + function() + local body = {finalize = "function(x) return x / 1000 end"} + local def = {aggregate = 'group', returns = 'number', body = body, + param_list = {'integer'}, exports = {'SQL'}} + local res = {pcall(box.schema.func.create, 'S8', def)} + return {tostring(res[2])} + end, { + "Failed to create function 'S8': body of aggregate function should ".. + "be map that contains exactly two string fields: 'step' and 'finalize'" + }) + +test:do_test( + "gh-2579-13", + function() + local body = { + step = [[function(x, y, z) + if x == nil then + x = {sum = 0, count = 0} + end + x.sum = x.sum + y + z + x.count = x.count + 1 + return x + end]], + finalize = [[function(x) + return x.sum / x.count + end]], + something = 0 + } + local def = {aggregate = 'group', returns = 'number', body = body, + param_list = {'integer'}, exports = {'SQL'}} + local res = {pcall(box.schema.func.create, 'S9', def)} + return {tostring(res[2])} + end, { + "Failed to create function 'S9': body of aggregate function should ".. + "be map that contains exactly two string fields: 'step' and 'finalize'" + }) + +-- +-- Make sure that user-defined aggregate functions that are throwing an error +-- are working correctly. +-- +local body_s10 = { + step = [[function(x) error("some error") return 0 end]], + finalize = [[function(x) return 0 end]] +} + +box.schema.func.create('S10', {aggregate = 'group', returns = 'number', + body = body_s10, exports = {'SQL'}}) + +test:do_catchsql_test( + "gh-2579-14", + [[ + SELECT s10(); + ]], { + 1, '[string "return function(x) error("some error") return..."]:1: '.. + 'some error' + }) + +local body_s11 = { + step = [[function(x) return 0 end]], + finalize = [[function(x) error("another error") return 0 end]] +} + +box.schema.func.create('S11', {aggregate = 'group', returns = 'number', + body = body_s11, exports = {'SQL'}}) + +test:do_catchsql_test( + "gh-2579-15", + [[ + SELECT s11(); + ]], { + 1, '[string "return function(x) error("another error") ret..."]:1: '.. + 'another error' + }) + +box.space._func.index[2]:delete("S1") +box.space._func.index[2]:delete("S2") +box.space._func.index[2]:delete("gh-2579-custom-aggregate.S3") +box.space._func.index[2]:delete("S4") +box.space._func.index[2]:delete("S10") +box.space._func.index[2]:delete("S11") +box.space.T:drop() + +test:finish_test()