From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtpng3.m.smailru.net (smtpng3.m.smailru.net [94.100.177.149]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id EA33246970F for ; Wed, 27 Nov 2019 15:15:54 +0300 (MSK) From: Nikita Pettik Date: Wed, 27 Nov 2019 15:15:41 +0300 Message-Id: <2a81f02865168030c1632b4b4000ea331c84a016.1574846892.git.korablev@tarantool.org> In-Reply-To: References: In-Reply-To: References: Subject: [Tarantool-patches] [PATCH 1/6] sql: refactor resulting set metadata List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: tarantool-patches@dev.tarantool.org Cc: v.shpilevoy@tarantool.org Move names and types of resulting set to a separate structure. Simplify their storage by introducing separate members for name and type (previously names and types were stored in one char * array). It will allow us to add new metadata properties with ease. Needed for #4407 --- src/box/sql/delete.c | 6 ++-- src/box/sql/insert.c | 5 ++-- src/box/sql/legacy.c | 2 +- src/box/sql/pragma.c | 14 ++++----- src/box/sql/prepare.c | 9 +++--- src/box/sql/select.c | 60 ++++++++++++++++++-------------------- src/box/sql/update.c | 6 ++-- src/box/sql/vdbe.h | 28 ++++++++++-------- src/box/sql/vdbeInt.h | 8 ++++- src/box/sql/vdbeapi.c | 81 +++++++++------------------------------------------ src/box/sql/vdbeaux.c | 81 +++++++++++++++++++++++++++------------------------ 11 files changed, 124 insertions(+), 176 deletions(-) diff --git a/src/box/sql/delete.c b/src/box/sql/delete.c index 91c2157ac..31570099b 100644 --- a/src/box/sql/delete.c +++ b/src/box/sql/delete.c @@ -418,10 +418,8 @@ sql_table_delete_from(struct Parse *parse, struct SrcList *tab_list, parse->triggered_space != NULL) { sqlVdbeAddOp2(v, OP_ResultRow, reg_count, 1); sqlVdbeSetNumCols(v, 1); - sqlVdbeSetColName(v, 0, COLNAME_NAME, "rows deleted", - SQL_STATIC); - sqlVdbeSetColName(v, 0, COLNAME_DECLTYPE, "integer", - SQL_STATIC); + vdbe_set_metadata_col_name(v, 0, "rows deleted"); + vdbe_set_metadata_col_type(v, 0, "integer"); } delete_from_cleanup: diff --git a/src/box/sql/insert.c b/src/box/sql/insert.c index 70504c800..9be9c191d 100644 --- a/src/box/sql/insert.c +++ b/src/box/sql/insert.c @@ -785,9 +785,8 @@ sqlInsert(Parse * pParse, /* Parser context */ column_name = "rows replaced"; else column_name = "rows inserted"; - sqlVdbeSetColName(v, 0, COLNAME_NAME, column_name, SQL_STATIC); - sqlVdbeSetColName(v, 0, COLNAME_DECLTYPE, "integer", - SQL_STATIC); + vdbe_set_metadata_col_name(v, 0, column_name); + vdbe_set_metadata_col_type(v, 0, "integer"); } insert_cleanup: diff --git a/src/box/sql/legacy.c b/src/box/sql/legacy.c index 0b1370f4a..ee58f1eb7 100644 --- a/src/box/sql/legacy.c +++ b/src/box/sql/legacy.c @@ -103,7 +103,7 @@ sql_exec(sql * db, /* The database on which the SQL executes */ (char *) sql_column_name(pStmt, i); - /* sqlVdbeSetColName() installs column names as UTF8 + /* vdbe_set_metadata_col_name() installs column names as UTF8 * strings so there is no way for sql_column_name() to fail. */ assert(azCols[i] != 0); diff --git a/src/box/sql/pragma.c b/src/box/sql/pragma.c index 92bcf4e68..874eb93d2 100644 --- a/src/box/sql/pragma.c +++ b/src/box/sql/pragma.c @@ -120,10 +120,8 @@ vdbe_set_pragma_result_columns(struct Vdbe *v, const struct PragmaName *pragma) assert(n > 0); sqlVdbeSetNumCols(v, n); for (int i = 0, j = pragma->iPragCName; i < n; ++i) { - sqlVdbeSetColName(v, i, COLNAME_NAME, pragCName[j++], - SQL_STATIC); - sqlVdbeSetColName(v, i, COLNAME_DECLTYPE, pragCName[j++], - SQL_STATIC); + vdbe_set_metadata_col_name(v, i, pragCName[j++]); + vdbe_set_metadata_col_type(v, i, pragCName[j++]); } } @@ -168,10 +166,10 @@ vdbe_emit_pragma_status(struct Parse *parse) struct session *user_session = current_session(); sqlVdbeSetNumCols(v, 2); - sqlVdbeSetColName(v, 0, COLNAME_NAME, "pragma_name", SQL_STATIC); - sqlVdbeSetColName(v, 0, COLNAME_DECLTYPE, "text", SQL_STATIC); - sqlVdbeSetColName(v, 1, COLNAME_NAME, "pragma_value", SQL_STATIC); - sqlVdbeSetColName(v, 1, COLNAME_DECLTYPE, "integer", SQL_STATIC); + vdbe_set_metadata_col_name(v, 0, "pragma_name"); + vdbe_set_metadata_col_type(v, 0, "text"); + vdbe_set_metadata_col_name(v, 1, "pragma_value"); + vdbe_set_metadata_col_type(v, 1, "integer"); parse->nMem = 2; for (int i = 0; i < ArraySize(aPragmaName); ++i) { diff --git a/src/box/sql/prepare.c b/src/box/sql/prepare.c index 0ecc676e2..2d3466cc7 100644 --- a/src/box/sql/prepare.c +++ b/src/box/sql/prepare.c @@ -146,11 +146,10 @@ sqlPrepare(sql * db, /* Database handle. */ sqlVdbeSetNumCols(sParse.pVdbe, name_count); for (int i = 0; i < name_count; i++) { int name_index = 2 * i + name_first; - sqlVdbeSetColName(sParse.pVdbe, i, COLNAME_NAME, - azColName[name_index], SQL_STATIC); - sqlVdbeSetColName(sParse.pVdbe, i, COLNAME_DECLTYPE, - azColName[name_index + 1], - SQL_STATIC); + vdbe_set_metadata_col_name(sParse.pVdbe, i, + azColName[name_index]); + vdbe_set_metadata_col_type(sParse.pVdbe, i, + azColName[name_index + 1]); } } diff --git a/src/box/sql/select.c b/src/box/sql/select.c index 8f93edd16..d6b8a158f 100644 --- a/src/box/sql/select.c +++ b/src/box/sql/select.c @@ -1747,15 +1747,18 @@ generateSortTail(Parse * pParse, /* Parsing context */ sqlVdbeResolveLabel(v, addrBreak); } -/* +/** * Generate code that will tell the VDBE the names of columns - * in the result set. This information is used to provide the - * azCol[] values in the callback. + * in the result set. This information is used to provide the + * metadata during/after statement execution. + * + * @param pParse Parsing context. + * @param pTabList List of tables. + * @param pEList Expressions defining the result set. */ static void -generateColumnNames(Parse * pParse, /* Parser context */ - SrcList * pTabList, /* List of tables */ - ExprList * pEList) /* Expressions defining the result set */ +generate_column_metadata(struct Parse *pParse, struct SrcList *pTabList, + struct ExprList *pEList) { Vdbe *v = pParse->pVdbe; int i, j; @@ -1789,12 +1792,11 @@ generateColumnNames(Parse * pParse, /* Parser context */ continue; if (p->op == TK_VARIABLE) var_pos[var_count++] = i; - sqlVdbeSetColName(v, i, COLNAME_DECLTYPE, - field_type_strs[sql_expr_type(p)], SQL_TRANSIENT); + vdbe_set_metadata_col_type(v, i, + field_type_strs[sql_expr_type(p)]); if (pEList->a[i].zName) { char *zName = pEList->a[i].zName; - sqlVdbeSetColName(v, i, COLNAME_NAME, zName, - SQL_TRANSIENT); + vdbe_set_metadata_col_name(v, i, zName); } else if (p->op == TK_COLUMN || p->op == TK_AGG_COLUMN) { char *zCol; int iCol = p->iColumn; @@ -1807,27 +1809,21 @@ generateColumnNames(Parse * pParse, /* Parser context */ assert(iCol >= 0 && iCol < (int)space_def->field_count); zCol = space_def->fields[iCol].name; if (!shortNames && !fullNames) { - sqlVdbeSetColName(v, i, COLNAME_NAME, - sqlDbStrDup(db, - pEList->a[i].zSpan), - SQL_DYNAMIC); + vdbe_set_metadata_col_name(v, i, + pEList->a[i].zSpan); } else if (fullNames) { - char *zName = 0; - zName = sqlMPrintf(db, "%s.%s", - space_def->name, zCol); - sqlVdbeSetColName(v, i, COLNAME_NAME, zName, - SQL_DYNAMIC); + const char *zName = tt_sprintf("%s.%s", + space_def->name, + zCol); + vdbe_set_metadata_col_name(v, i, zName); } else { - sqlVdbeSetColName(v, i, COLNAME_NAME, zCol, - SQL_TRANSIENT); + vdbe_set_metadata_col_name(v, i, zCol); } } else { const char *z = pEList->a[i].zSpan; - z = z == 0 ? sqlMPrintf(db, "column%d", - i + 1) : sqlDbStrDup(db, - z); - sqlVdbeSetColName(v, i, COLNAME_NAME, z, - SQL_DYNAMIC); + if (z == NULL) + z = tt_sprintf("column%d", i + 1); + vdbe_set_metadata_col_name(v, i, z); } } if (var_count == 0) @@ -2828,7 +2824,7 @@ multiSelect(Parse * pParse, /* Parsing context */ Select *pFirst = p; while (pFirst->pPrior) pFirst = pFirst->pPrior; - generateColumnNames(pParse, + generate_column_metadata(pParse, pFirst->pSrc, pFirst->pEList); } @@ -2927,9 +2923,9 @@ multiSelect(Parse * pParse, /* Parsing context */ Select *pFirst = p; while (pFirst->pPrior) pFirst = pFirst->pPrior; - generateColumnNames(pParse, - pFirst->pSrc, - pFirst->pEList); + generate_column_metadata(pParse, + pFirst->pSrc, + pFirst->pEList); } iBreak = sqlVdbeMakeLabel(v); iCont = sqlVdbeMakeLabel(v); @@ -3575,7 +3571,7 @@ multiSelectOrderBy(Parse * pParse, /* Parsing context */ Select *pFirst = pPrior; while (pFirst->pPrior) pFirst = pFirst->pPrior; - generateColumnNames(pParse, pFirst->pSrc, pFirst->pEList); + generate_column_metadata(pParse, pFirst->pSrc, pFirst->pEList); } /* Reassembly the compound query so that it will be freed correctly @@ -6433,7 +6429,7 @@ sqlSelect(Parse * pParse, /* The parser context */ /* Identify column names if results of the SELECT are to be output. */ if (rc == 0 && pDest->eDest == SRT_Output) { - generateColumnNames(pParse, pTabList, pEList); + generate_column_metadata(pParse, pTabList, pEList); } sqlDbFree(db, sAggInfo.aCol); diff --git a/src/box/sql/update.c b/src/box/sql/update.c index 6d69b7252..881f87d6f 100644 --- a/src/box/sql/update.c +++ b/src/box/sql/update.c @@ -498,10 +498,8 @@ sqlUpdate(Parse * pParse, /* The parser context */ pParse->triggered_space == NULL) { sqlVdbeAddOp2(v, OP_ResultRow, regRowCount, 1); sqlVdbeSetNumCols(v, 1); - sqlVdbeSetColName(v, 0, COLNAME_NAME, "rows updated", - SQL_STATIC); - sqlVdbeSetColName(v, 0, COLNAME_DECLTYPE, "integer", - SQL_STATIC); + vdbe_set_metadata_col_name(v, 0, "rows updated"); + vdbe_set_metadata_col_type(v, 0, "integer"); } update_cleanup: diff --git a/src/box/sql/vdbe.h b/src/box/sql/vdbe.h index 582d48a1f..4142fb6ba 100644 --- a/src/box/sql/vdbe.h +++ b/src/box/sql/vdbe.h @@ -148,17 +148,6 @@ struct SubProgram { #define P5_ConstraintUnique 2 #define P5_ConstraintFK 4 -/* - * The Vdbe.aColName array contains 5n Mem structures, where n is the - * number of columns of data returned by the statement. - */ -#define COLNAME_NAME 0 -#define COLNAME_DECLTYPE 1 -#define COLNAME_DATABASE 2 -#define COLNAME_TABLE 3 -#define COLNAME_COLUMN 4 -#define COLNAME_N 2 /* Store the name and decltype */ - /* * The following macro converts a relative address in the p2 field * of a VdbeOp structure into a negative number. @@ -238,6 +227,10 @@ sql_vdbe_set_p4_key_def(struct Parse *parse, struct key_def *key_def); VdbeOp *sqlVdbeGetOp(Vdbe *, int); int sqlVdbeMakeLabel(Vdbe *); void sqlVdbeRunOnlyOnce(Vdbe *); + +void +vdbe_metadata_delete(struct Vdbe *v); + void sqlVdbeDelete(Vdbe *); void sqlVdbeClearObject(sql *, Vdbe *); void sqlVdbeMakeReady(Vdbe *, Parse *); @@ -248,7 +241,18 @@ void sqlVdbeResetStepResult(Vdbe *); void sqlVdbeRewind(Vdbe *); int sqlVdbeReset(Vdbe *); void sqlVdbeSetNumCols(Vdbe *, int); -int sqlVdbeSetColName(Vdbe *, int, int, const char *, void (*)(void *)); + +/** + * Set the name of the idx'th column to be returned by the SQL + * statement. @name must be a pointer to a nul terminated string. + * This call must be made after a call to sqlVdbeSetNumCols(). + */ +int +vdbe_set_metadata_col_name(struct Vdbe *v, int col_idx, const char *name); + +int +vdbe_set_metadata_col_type(struct Vdbe *v, int col_idx, const char *type); + void sqlVdbeCountChanges(Vdbe *); sql *sqlVdbeDb(Vdbe *); void sqlVdbeSetSql(Vdbe *, const char *z, int n, int); diff --git a/src/box/sql/vdbeInt.h b/src/box/sql/vdbeInt.h index 0f32b4cd6..9ab3753cb 100644 --- a/src/box/sql/vdbeInt.h +++ b/src/box/sql/vdbeInt.h @@ -346,6 +346,11 @@ struct ScanStatus { char *zName; /* Name of table or index */ }; +struct sql_column_metadata { + const char *name; + const char *type; +}; + /* * An instance of the virtual machine. This structure contains the complete * state of the virtual machine. @@ -394,7 +399,8 @@ struct Vdbe { Op *aOp; /* Space to hold the virtual machine's program */ Mem *aMem; /* The memory locations */ Mem **apArg; /* Arguments to currently executing user function */ - Mem *aColName; /* Column names to return */ + /** SQL metadata for SELECT queries. */ + struct sql_column_metadata *metadata; Mem *pResultSet; /* Pointer to an array of results */ VdbeCursor **apCsr; /* One element of this array for each open cursor */ Mem *aVar; /* Values for the OP_Variable opcode. */ diff --git a/src/box/sql/vdbeapi.c b/src/box/sql/vdbeapi.c index 685212d91..d746a42f2 100644 --- a/src/box/sql/vdbeapi.c +++ b/src/box/sql/vdbeapi.c @@ -725,77 +725,24 @@ sql_column_subtype(struct sql_stmt *stmt, int i) return sql_value_subtype(columnMem(stmt, i)); } -/* - * Convert the N-th element of pStmt->pColName[] into a string using - * xFunc() then return that string. If N is out of range, return 0. - * - * There are up to 5 names for each column. useType determines which - * name is returned. Here are the names: - * - * 0 The column name as it should be displayed for output - * 1 The datatype name for the column - * 2 The name of the database that the column derives from - * 3 The name of the table that the column derives from - * 4 The name of the table column that the result column derives from - * - * If the result is not a simple column reference (if it is an expression - * or a constant) then useTypes 2, 3, and 4 return NULL. - */ -static const void * -columnName(sql_stmt * pStmt, - int N, const void *(*xFunc) (Mem *), int useType) -{ - const void *ret; - Vdbe *p; - int n; - sql *db; - ret = 0; - p = (Vdbe *) pStmt; - db = p->db; - assert(db != 0); - n = sql_column_count(pStmt); - if (N < n && N >= 0) { - N += useType * n; - assert(db->mallocFailed == 0); - ret = xFunc(&p->aColName[N]); - /* A malloc may have failed inside of the xFunc() call. If this - * is the case, clear the mallocFailed flag and return NULL. - */ - if (db->mallocFailed) { - sqlOomClear(db); - ret = 0; - } - } - return ret; -} - /* * Return the name of the Nth column of the result set returned by SQL * statement pStmt. */ const char * -sql_column_name(sql_stmt * pStmt, int N) -{ - return columnName(pStmt, N, (const void *(*)(Mem *))sql_value_text, - COLNAME_NAME); -} - -const char * -sql_column_datatype(sql_stmt *pStmt, int N) +sql_column_name(sql_stmt *stmt, int n) { - return columnName(pStmt, N, (const void *(*)(Mem *))sql_value_text, - COLNAME_DECLTYPE); + struct Vdbe *p = (struct Vdbe *) stmt; + assert(n < sql_column_count(stmt) && n >= 0); + return p->metadata[n].name; } -/* - * Return the column declaration type (if applicable) of the 'i'th column - * of the result set of SQL statement pStmt. - */ const char * -sql_column_decltype(sql_stmt * pStmt, int N) +sql_column_datatype(sql_stmt *stmt, int n) { - return columnName(pStmt, N, (const void *(*)(Mem *))sql_value_text, - COLNAME_DECLTYPE); + struct Vdbe *p = (struct Vdbe *) stmt; + assert(n < sql_column_count(stmt) && n >= 0); + return p->metadata[n].type; } /******************************* sql_bind_ ************************** @@ -853,17 +800,15 @@ sql_bind_type(struct Vdbe *v, uint32_t position, const char *type) if (v->res_var_count < position) return 0; int rc = 0; - if (sqlVdbeSetColName(v, v->var_pos[position - 1], COLNAME_DECLTYPE, - type, SQL_TRANSIENT) != 0) + if (vdbe_set_metadata_col_type(v, v->var_pos[position - 1], type) != 0) rc = -1; - const char *bind_name = v->aColName[position - 1].z; + const char *bind_name = v->metadata[position - 1].name; if (strcmp(bind_name, "?") == 0) return rc; for (uint32_t i = position; i < v->res_var_count; ++i) { - if (strcmp(bind_name, v->aColName[i].z) == 0) { - if (sqlVdbeSetColName(v, v->var_pos[i], - COLNAME_DECLTYPE, type, - SQL_TRANSIENT) != 0) + if (strcmp(bind_name, v->metadata[i].name) == 0) { + if (vdbe_set_metadata_col_type(v, v->var_pos[i], + type) != 0) return -1; } } diff --git a/src/box/sql/vdbeaux.c b/src/box/sql/vdbeaux.c index a1d658648..db11fbf33 100644 --- a/src/box/sql/vdbeaux.c +++ b/src/box/sql/vdbeaux.c @@ -1827,6 +1827,18 @@ Cleanup(Vdbe * p) p->pResultSet = 0; } +void +vdbe_metadata_delete(struct Vdbe *v) +{ + if (v->metadata != NULL) { + for (int i = 0; i < v->nResColumn; ++i) { + free((void *)v->metadata[i].name); + free((void *)v->metadata[i].type); + } + free(v->metadata); + } +} + /* * Set the number of result columns that will be returned by this SQL * statement. This is now set at compile time, rather than during @@ -1836,50 +1848,44 @@ Cleanup(Vdbe * p) void sqlVdbeSetNumCols(Vdbe * p, int nResColumn) { - int n; - sql *db = p->db; - - releaseMemArray(p->aColName, p->nResColumn * COLNAME_N); - sqlDbFree(db, p->aColName); - n = nResColumn * COLNAME_N; + vdbe_metadata_delete(p); p->nResColumn = (u16) nResColumn; - p->aColName = (Mem *) sqlDbMallocRawNN(db, sizeof(Mem) * n); - if (p->aColName == 0) + p->metadata = (struct sql_column_metadata *) + calloc(nResColumn, sizeof(struct sql_column_metadata)); + if (p->metadata == NULL) { + diag_set(OutOfMemory, + nResColumn * sizeof(struct sql_column_metadata), + "calloc", "metadata"); return; - initMemArray(p->aColName, n, p->db, MEM_Null); + } } -/* - * Set the name of the idx'th column to be returned by the SQL statement. - * zName must be a pointer to a nul terminated string. - * - * This call must be made after a call to sqlVdbeSetNumCols(). - * - * The final parameter, xDel, must be one of SQL_DYNAMIC, SQL_STATIC - * or SQL_TRANSIENT. If it is SQL_DYNAMIC, then the buffer pointed - * to by zName will be freed by sqlDbFree() when the vdbe is destroyed. - */ int -sqlVdbeSetColName(Vdbe * p, /* Vdbe being configured */ - int idx, /* Index of column zName applies to */ - int var, /* One of the COLNAME_* constants */ - const char *zName, /* Pointer to buffer containing name */ - void (*xDel) (void *)) /* Memory management strategy for zName */ -{ - int rc; - Mem *pColName; +vdbe_set_metadata_col_name(struct Vdbe *p, int idx, const char *name) +{ assert(idx < p->nResColumn); - assert(var < COLNAME_N); - if (p->db->mallocFailed) { - assert(!zName || xDel != SQL_DYNAMIC); + if (p->metadata[idx].name != NULL) + free((void *)p->metadata[idx].name); + p->metadata[idx].name = strdup(name); + if (p->metadata[idx].name == NULL) { + diag_set(OutOfMemory, strlen(name), "strdup", "name"); return -1; } - assert(p->aColName != 0); - assert(var == COLNAME_NAME || var == COLNAME_DECLTYPE); - pColName = &(p->aColName[idx + var * p->nResColumn]); - rc = sqlVdbeMemSetStr(pColName, zName, -1, 1, xDel); - assert(rc != 0 || !zName || (pColName->flags & MEM_Term) != 0); - return rc; + return 0; +} + +int +vdbe_set_metadata_col_type(struct Vdbe *p, int idx, const char *type) +{ + assert(idx < p->nResColumn); + if (p->metadata[idx].type != NULL) + free((void *)p->metadata[idx].type); + p->metadata[idx].type = strdup(type); + if (p->metadata[idx].type == NULL) { + diag_set(OutOfMemory, strlen(type), "strdup", "type"); + return -1; + } + return 0; } /* @@ -2230,7 +2236,7 @@ sqlVdbeClearObject(sql * db, Vdbe * p) { SubProgram *pSub, *pNext; assert(p->db == 0 || p->db == db); - releaseMemArray(p->aColName, p->nResColumn * COLNAME_N); + vdbe_metadata_delete(p); for (pSub = p->pProgram; pSub; pSub = pNext) { pNext = pSub->pNext; vdbeFreeOpArray(db, pSub->aOp, pSub->nOp); @@ -2242,7 +2248,6 @@ sqlVdbeClearObject(sql * db, Vdbe * p) sqlDbFree(db, p->pFree); } vdbeFreeOpArray(db, p->aOp, p->nOp); - sqlDbFree(db, p->aColName); sqlDbFree(db, p->zSql); } -- 2.15.1