[Tarantool-patches] [PATCH 1/6] sql: refactor resulting set metadata
Nikita Pettik
korablev at tarantool.org
Wed Nov 27 15:15:41 MSK 2019
Move names and types of resulting set to a separate structure. Simplify
their storage by introducing separate members for name and type
(previously names and types were stored in one char * array). It will
allow us to add new metadata properties with ease.
Needed for #4407
---
src/box/sql/delete.c | 6 ++--
src/box/sql/insert.c | 5 ++--
src/box/sql/legacy.c | 2 +-
src/box/sql/pragma.c | 14 ++++-----
src/box/sql/prepare.c | 9 +++---
src/box/sql/select.c | 60 ++++++++++++++++++--------------------
src/box/sql/update.c | 6 ++--
src/box/sql/vdbe.h | 28 ++++++++++--------
src/box/sql/vdbeInt.h | 8 ++++-
src/box/sql/vdbeapi.c | 81 +++++++++------------------------------------------
src/box/sql/vdbeaux.c | 81 +++++++++++++++++++++++++++------------------------
11 files changed, 124 insertions(+), 176 deletions(-)
diff --git a/src/box/sql/delete.c b/src/box/sql/delete.c
index 91c2157ac..31570099b 100644
--- a/src/box/sql/delete.c
+++ b/src/box/sql/delete.c
@@ -418,10 +418,8 @@ sql_table_delete_from(struct Parse *parse, struct SrcList *tab_list,
parse->triggered_space != NULL) {
sqlVdbeAddOp2(v, OP_ResultRow, reg_count, 1);
sqlVdbeSetNumCols(v, 1);
- sqlVdbeSetColName(v, 0, COLNAME_NAME, "rows deleted",
- SQL_STATIC);
- sqlVdbeSetColName(v, 0, COLNAME_DECLTYPE, "integer",
- SQL_STATIC);
+ vdbe_set_metadata_col_name(v, 0, "rows deleted");
+ vdbe_set_metadata_col_type(v, 0, "integer");
}
delete_from_cleanup:
diff --git a/src/box/sql/insert.c b/src/box/sql/insert.c
index 70504c800..9be9c191d 100644
--- a/src/box/sql/insert.c
+++ b/src/box/sql/insert.c
@@ -785,9 +785,8 @@ sqlInsert(Parse * pParse, /* Parser context */
column_name = "rows replaced";
else
column_name = "rows inserted";
- sqlVdbeSetColName(v, 0, COLNAME_NAME, column_name, SQL_STATIC);
- sqlVdbeSetColName(v, 0, COLNAME_DECLTYPE, "integer",
- SQL_STATIC);
+ vdbe_set_metadata_col_name(v, 0, column_name);
+ vdbe_set_metadata_col_type(v, 0, "integer");
}
insert_cleanup:
diff --git a/src/box/sql/legacy.c b/src/box/sql/legacy.c
index 0b1370f4a..ee58f1eb7 100644
--- a/src/box/sql/legacy.c
+++ b/src/box/sql/legacy.c
@@ -103,7 +103,7 @@ sql_exec(sql * db, /* The database on which the SQL executes */
(char *)
sql_column_name(pStmt,
i);
- /* sqlVdbeSetColName() installs column names as UTF8
+ /* vdbe_set_metadata_col_name() installs column names as UTF8
* strings so there is no way for sql_column_name() to fail.
*/
assert(azCols[i] != 0);
diff --git a/src/box/sql/pragma.c b/src/box/sql/pragma.c
index 92bcf4e68..874eb93d2 100644
--- a/src/box/sql/pragma.c
+++ b/src/box/sql/pragma.c
@@ -120,10 +120,8 @@ vdbe_set_pragma_result_columns(struct Vdbe *v, const struct PragmaName *pragma)
assert(n > 0);
sqlVdbeSetNumCols(v, n);
for (int i = 0, j = pragma->iPragCName; i < n; ++i) {
- sqlVdbeSetColName(v, i, COLNAME_NAME, pragCName[j++],
- SQL_STATIC);
- sqlVdbeSetColName(v, i, COLNAME_DECLTYPE, pragCName[j++],
- SQL_STATIC);
+ vdbe_set_metadata_col_name(v, i, pragCName[j++]);
+ vdbe_set_metadata_col_type(v, i, pragCName[j++]);
}
}
@@ -168,10 +166,10 @@ vdbe_emit_pragma_status(struct Parse *parse)
struct session *user_session = current_session();
sqlVdbeSetNumCols(v, 2);
- sqlVdbeSetColName(v, 0, COLNAME_NAME, "pragma_name", SQL_STATIC);
- sqlVdbeSetColName(v, 0, COLNAME_DECLTYPE, "text", SQL_STATIC);
- sqlVdbeSetColName(v, 1, COLNAME_NAME, "pragma_value", SQL_STATIC);
- sqlVdbeSetColName(v, 1, COLNAME_DECLTYPE, "integer", SQL_STATIC);
+ vdbe_set_metadata_col_name(v, 0, "pragma_name");
+ vdbe_set_metadata_col_type(v, 0, "text");
+ vdbe_set_metadata_col_name(v, 1, "pragma_value");
+ vdbe_set_metadata_col_type(v, 1, "integer");
parse->nMem = 2;
for (int i = 0; i < ArraySize(aPragmaName); ++i) {
diff --git a/src/box/sql/prepare.c b/src/box/sql/prepare.c
index 0ecc676e2..2d3466cc7 100644
--- a/src/box/sql/prepare.c
+++ b/src/box/sql/prepare.c
@@ -146,11 +146,10 @@ sqlPrepare(sql * db, /* Database handle. */
sqlVdbeSetNumCols(sParse.pVdbe, name_count);
for (int i = 0; i < name_count; i++) {
int name_index = 2 * i + name_first;
- sqlVdbeSetColName(sParse.pVdbe, i, COLNAME_NAME,
- azColName[name_index], SQL_STATIC);
- sqlVdbeSetColName(sParse.pVdbe, i, COLNAME_DECLTYPE,
- azColName[name_index + 1],
- SQL_STATIC);
+ vdbe_set_metadata_col_name(sParse.pVdbe, i,
+ azColName[name_index]);
+ vdbe_set_metadata_col_type(sParse.pVdbe, i,
+ azColName[name_index + 1]);
}
}
diff --git a/src/box/sql/select.c b/src/box/sql/select.c
index 8f93edd16..d6b8a158f 100644
--- a/src/box/sql/select.c
+++ b/src/box/sql/select.c
@@ -1747,15 +1747,18 @@ generateSortTail(Parse * pParse, /* Parsing context */
sqlVdbeResolveLabel(v, addrBreak);
}
-/*
+/**
* Generate code that will tell the VDBE the names of columns
- * in the result set. This information is used to provide the
- * azCol[] values in the callback.
+ * in the result set. This information is used to provide the
+ * metadata during/after statement execution.
+ *
+ * @param pParse Parsing context.
+ * @param pTabList List of tables.
+ * @param pEList Expressions defining the result set.
*/
static void
-generateColumnNames(Parse * pParse, /* Parser context */
- SrcList * pTabList, /* List of tables */
- ExprList * pEList) /* Expressions defining the result set */
+generate_column_metadata(struct Parse *pParse, struct SrcList *pTabList,
+ struct ExprList *pEList)
{
Vdbe *v = pParse->pVdbe;
int i, j;
@@ -1789,12 +1792,11 @@ generateColumnNames(Parse * pParse, /* Parser context */
continue;
if (p->op == TK_VARIABLE)
var_pos[var_count++] = i;
- sqlVdbeSetColName(v, i, COLNAME_DECLTYPE,
- field_type_strs[sql_expr_type(p)], SQL_TRANSIENT);
+ vdbe_set_metadata_col_type(v, i,
+ field_type_strs[sql_expr_type(p)]);
if (pEList->a[i].zName) {
char *zName = pEList->a[i].zName;
- sqlVdbeSetColName(v, i, COLNAME_NAME, zName,
- SQL_TRANSIENT);
+ vdbe_set_metadata_col_name(v, i, zName);
} else if (p->op == TK_COLUMN || p->op == TK_AGG_COLUMN) {
char *zCol;
int iCol = p->iColumn;
@@ -1807,27 +1809,21 @@ generateColumnNames(Parse * pParse, /* Parser context */
assert(iCol >= 0 && iCol < (int)space_def->field_count);
zCol = space_def->fields[iCol].name;
if (!shortNames && !fullNames) {
- sqlVdbeSetColName(v, i, COLNAME_NAME,
- sqlDbStrDup(db,
- pEList->a[i].zSpan),
- SQL_DYNAMIC);
+ vdbe_set_metadata_col_name(v, i,
+ pEList->a[i].zSpan);
} else if (fullNames) {
- char *zName = 0;
- zName = sqlMPrintf(db, "%s.%s",
- space_def->name, zCol);
- sqlVdbeSetColName(v, i, COLNAME_NAME, zName,
- SQL_DYNAMIC);
+ const char *zName = tt_sprintf("%s.%s",
+ space_def->name,
+ zCol);
+ vdbe_set_metadata_col_name(v, i, zName);
} else {
- sqlVdbeSetColName(v, i, COLNAME_NAME, zCol,
- SQL_TRANSIENT);
+ vdbe_set_metadata_col_name(v, i, zCol);
}
} else {
const char *z = pEList->a[i].zSpan;
- z = z == 0 ? sqlMPrintf(db, "column%d",
- i + 1) : sqlDbStrDup(db,
- z);
- sqlVdbeSetColName(v, i, COLNAME_NAME, z,
- SQL_DYNAMIC);
+ if (z == NULL)
+ z = tt_sprintf("column%d", i + 1);
+ vdbe_set_metadata_col_name(v, i, z);
}
}
if (var_count == 0)
@@ -2828,7 +2824,7 @@ multiSelect(Parse * pParse, /* Parsing context */
Select *pFirst = p;
while (pFirst->pPrior)
pFirst = pFirst->pPrior;
- generateColumnNames(pParse,
+ generate_column_metadata(pParse,
pFirst->pSrc,
pFirst->pEList);
}
@@ -2927,9 +2923,9 @@ multiSelect(Parse * pParse, /* Parsing context */
Select *pFirst = p;
while (pFirst->pPrior)
pFirst = pFirst->pPrior;
- generateColumnNames(pParse,
- pFirst->pSrc,
- pFirst->pEList);
+ generate_column_metadata(pParse,
+ pFirst->pSrc,
+ pFirst->pEList);
}
iBreak = sqlVdbeMakeLabel(v);
iCont = sqlVdbeMakeLabel(v);
@@ -3575,7 +3571,7 @@ multiSelectOrderBy(Parse * pParse, /* Parsing context */
Select *pFirst = pPrior;
while (pFirst->pPrior)
pFirst = pFirst->pPrior;
- generateColumnNames(pParse, pFirst->pSrc, pFirst->pEList);
+ generate_column_metadata(pParse, pFirst->pSrc, pFirst->pEList);
}
/* Reassembly the compound query so that it will be freed correctly
@@ -6433,7 +6429,7 @@ sqlSelect(Parse * pParse, /* The parser context */
/* Identify column names if results of the SELECT are to be output.
*/
if (rc == 0 && pDest->eDest == SRT_Output) {
- generateColumnNames(pParse, pTabList, pEList);
+ generate_column_metadata(pParse, pTabList, pEList);
}
sqlDbFree(db, sAggInfo.aCol);
diff --git a/src/box/sql/update.c b/src/box/sql/update.c
index 6d69b7252..881f87d6f 100644
--- a/src/box/sql/update.c
+++ b/src/box/sql/update.c
@@ -498,10 +498,8 @@ sqlUpdate(Parse * pParse, /* The parser context */
pParse->triggered_space == NULL) {
sqlVdbeAddOp2(v, OP_ResultRow, regRowCount, 1);
sqlVdbeSetNumCols(v, 1);
- sqlVdbeSetColName(v, 0, COLNAME_NAME, "rows updated",
- SQL_STATIC);
- sqlVdbeSetColName(v, 0, COLNAME_DECLTYPE, "integer",
- SQL_STATIC);
+ vdbe_set_metadata_col_name(v, 0, "rows updated");
+ vdbe_set_metadata_col_type(v, 0, "integer");
}
update_cleanup:
diff --git a/src/box/sql/vdbe.h b/src/box/sql/vdbe.h
index 582d48a1f..4142fb6ba 100644
--- a/src/box/sql/vdbe.h
+++ b/src/box/sql/vdbe.h
@@ -148,17 +148,6 @@ struct SubProgram {
#define P5_ConstraintUnique 2
#define P5_ConstraintFK 4
-/*
- * The Vdbe.aColName array contains 5n Mem structures, where n is the
- * number of columns of data returned by the statement.
- */
-#define COLNAME_NAME 0
-#define COLNAME_DECLTYPE 1
-#define COLNAME_DATABASE 2
-#define COLNAME_TABLE 3
-#define COLNAME_COLUMN 4
-#define COLNAME_N 2 /* Store the name and decltype */
-
/*
* The following macro converts a relative address in the p2 field
* of a VdbeOp structure into a negative number.
@@ -238,6 +227,10 @@ sql_vdbe_set_p4_key_def(struct Parse *parse, struct key_def *key_def);
VdbeOp *sqlVdbeGetOp(Vdbe *, int);
int sqlVdbeMakeLabel(Vdbe *);
void sqlVdbeRunOnlyOnce(Vdbe *);
+
+void
+vdbe_metadata_delete(struct Vdbe *v);
+
void sqlVdbeDelete(Vdbe *);
void sqlVdbeClearObject(sql *, Vdbe *);
void sqlVdbeMakeReady(Vdbe *, Parse *);
@@ -248,7 +241,18 @@ void sqlVdbeResetStepResult(Vdbe *);
void sqlVdbeRewind(Vdbe *);
int sqlVdbeReset(Vdbe *);
void sqlVdbeSetNumCols(Vdbe *, int);
-int sqlVdbeSetColName(Vdbe *, int, int, const char *, void (*)(void *));
+
+/**
+ * Set the name of the idx'th column to be returned by the SQL
+ * statement. @name must be a pointer to a nul terminated string.
+ * This call must be made after a call to sqlVdbeSetNumCols().
+ */
+int
+vdbe_set_metadata_col_name(struct Vdbe *v, int col_idx, const char *name);
+
+int
+vdbe_set_metadata_col_type(struct Vdbe *v, int col_idx, const char *type);
+
void sqlVdbeCountChanges(Vdbe *);
sql *sqlVdbeDb(Vdbe *);
void sqlVdbeSetSql(Vdbe *, const char *z, int n, int);
diff --git a/src/box/sql/vdbeInt.h b/src/box/sql/vdbeInt.h
index 0f32b4cd6..9ab3753cb 100644
--- a/src/box/sql/vdbeInt.h
+++ b/src/box/sql/vdbeInt.h
@@ -346,6 +346,11 @@ struct ScanStatus {
char *zName; /* Name of table or index */
};
+struct sql_column_metadata {
+ const char *name;
+ const char *type;
+};
+
/*
* An instance of the virtual machine. This structure contains the complete
* state of the virtual machine.
@@ -394,7 +399,8 @@ struct Vdbe {
Op *aOp; /* Space to hold the virtual machine's program */
Mem *aMem; /* The memory locations */
Mem **apArg; /* Arguments to currently executing user function */
- Mem *aColName; /* Column names to return */
+ /** SQL metadata for SELECT queries. */
+ struct sql_column_metadata *metadata;
Mem *pResultSet; /* Pointer to an array of results */
VdbeCursor **apCsr; /* One element of this array for each open cursor */
Mem *aVar; /* Values for the OP_Variable opcode. */
diff --git a/src/box/sql/vdbeapi.c b/src/box/sql/vdbeapi.c
index 685212d91..d746a42f2 100644
--- a/src/box/sql/vdbeapi.c
+++ b/src/box/sql/vdbeapi.c
@@ -725,77 +725,24 @@ sql_column_subtype(struct sql_stmt *stmt, int i)
return sql_value_subtype(columnMem(stmt, i));
}
-/*
- * Convert the N-th element of pStmt->pColName[] into a string using
- * xFunc() then return that string. If N is out of range, return 0.
- *
- * There are up to 5 names for each column. useType determines which
- * name is returned. Here are the names:
- *
- * 0 The column name as it should be displayed for output
- * 1 The datatype name for the column
- * 2 The name of the database that the column derives from
- * 3 The name of the table that the column derives from
- * 4 The name of the table column that the result column derives from
- *
- * If the result is not a simple column reference (if it is an expression
- * or a constant) then useTypes 2, 3, and 4 return NULL.
- */
-static const void *
-columnName(sql_stmt * pStmt,
- int N, const void *(*xFunc) (Mem *), int useType)
-{
- const void *ret;
- Vdbe *p;
- int n;
- sql *db;
- ret = 0;
- p = (Vdbe *) pStmt;
- db = p->db;
- assert(db != 0);
- n = sql_column_count(pStmt);
- if (N < n && N >= 0) {
- N += useType * n;
- assert(db->mallocFailed == 0);
- ret = xFunc(&p->aColName[N]);
- /* A malloc may have failed inside of the xFunc() call. If this
- * is the case, clear the mallocFailed flag and return NULL.
- */
- if (db->mallocFailed) {
- sqlOomClear(db);
- ret = 0;
- }
- }
- return ret;
-}
-
/*
* Return the name of the Nth column of the result set returned by SQL
* statement pStmt.
*/
const char *
-sql_column_name(sql_stmt * pStmt, int N)
-{
- return columnName(pStmt, N, (const void *(*)(Mem *))sql_value_text,
- COLNAME_NAME);
-}
-
-const char *
-sql_column_datatype(sql_stmt *pStmt, int N)
+sql_column_name(sql_stmt *stmt, int n)
{
- return columnName(pStmt, N, (const void *(*)(Mem *))sql_value_text,
- COLNAME_DECLTYPE);
+ struct Vdbe *p = (struct Vdbe *) stmt;
+ assert(n < sql_column_count(stmt) && n >= 0);
+ return p->metadata[n].name;
}
-/*
- * Return the column declaration type (if applicable) of the 'i'th column
- * of the result set of SQL statement pStmt.
- */
const char *
-sql_column_decltype(sql_stmt * pStmt, int N)
+sql_column_datatype(sql_stmt *stmt, int n)
{
- return columnName(pStmt, N, (const void *(*)(Mem *))sql_value_text,
- COLNAME_DECLTYPE);
+ struct Vdbe *p = (struct Vdbe *) stmt;
+ assert(n < sql_column_count(stmt) && n >= 0);
+ return p->metadata[n].type;
}
/******************************* sql_bind_ **************************
@@ -853,17 +800,15 @@ sql_bind_type(struct Vdbe *v, uint32_t position, const char *type)
if (v->res_var_count < position)
return 0;
int rc = 0;
- if (sqlVdbeSetColName(v, v->var_pos[position - 1], COLNAME_DECLTYPE,
- type, SQL_TRANSIENT) != 0)
+ if (vdbe_set_metadata_col_type(v, v->var_pos[position - 1], type) != 0)
rc = -1;
- const char *bind_name = v->aColName[position - 1].z;
+ const char *bind_name = v->metadata[position - 1].name;
if (strcmp(bind_name, "?") == 0)
return rc;
for (uint32_t i = position; i < v->res_var_count; ++i) {
- if (strcmp(bind_name, v->aColName[i].z) == 0) {
- if (sqlVdbeSetColName(v, v->var_pos[i],
- COLNAME_DECLTYPE, type,
- SQL_TRANSIENT) != 0)
+ if (strcmp(bind_name, v->metadata[i].name) == 0) {
+ if (vdbe_set_metadata_col_type(v, v->var_pos[i],
+ type) != 0)
return -1;
}
}
diff --git a/src/box/sql/vdbeaux.c b/src/box/sql/vdbeaux.c
index a1d658648..db11fbf33 100644
--- a/src/box/sql/vdbeaux.c
+++ b/src/box/sql/vdbeaux.c
@@ -1827,6 +1827,18 @@ Cleanup(Vdbe * p)
p->pResultSet = 0;
}
+void
+vdbe_metadata_delete(struct Vdbe *v)
+{
+ if (v->metadata != NULL) {
+ for (int i = 0; i < v->nResColumn; ++i) {
+ free((void *)v->metadata[i].name);
+ free((void *)v->metadata[i].type);
+ }
+ free(v->metadata);
+ }
+}
+
/*
* Set the number of result columns that will be returned by this SQL
* statement. This is now set at compile time, rather than during
@@ -1836,50 +1848,44 @@ Cleanup(Vdbe * p)
void
sqlVdbeSetNumCols(Vdbe * p, int nResColumn)
{
- int n;
- sql *db = p->db;
-
- releaseMemArray(p->aColName, p->nResColumn * COLNAME_N);
- sqlDbFree(db, p->aColName);
- n = nResColumn * COLNAME_N;
+ vdbe_metadata_delete(p);
p->nResColumn = (u16) nResColumn;
- p->aColName = (Mem *) sqlDbMallocRawNN(db, sizeof(Mem) * n);
- if (p->aColName == 0)
+ p->metadata = (struct sql_column_metadata *)
+ calloc(nResColumn, sizeof(struct sql_column_metadata));
+ if (p->metadata == NULL) {
+ diag_set(OutOfMemory,
+ nResColumn * sizeof(struct sql_column_metadata),
+ "calloc", "metadata");
return;
- initMemArray(p->aColName, n, p->db, MEM_Null);
+ }
}
-/*
- * Set the name of the idx'th column to be returned by the SQL statement.
- * zName must be a pointer to a nul terminated string.
- *
- * This call must be made after a call to sqlVdbeSetNumCols().
- *
- * The final parameter, xDel, must be one of SQL_DYNAMIC, SQL_STATIC
- * or SQL_TRANSIENT. If it is SQL_DYNAMIC, then the buffer pointed
- * to by zName will be freed by sqlDbFree() when the vdbe is destroyed.
- */
int
-sqlVdbeSetColName(Vdbe * p, /* Vdbe being configured */
- int idx, /* Index of column zName applies to */
- int var, /* One of the COLNAME_* constants */
- const char *zName, /* Pointer to buffer containing name */
- void (*xDel) (void *)) /* Memory management strategy for zName */
-{
- int rc;
- Mem *pColName;
+vdbe_set_metadata_col_name(struct Vdbe *p, int idx, const char *name)
+{
assert(idx < p->nResColumn);
- assert(var < COLNAME_N);
- if (p->db->mallocFailed) {
- assert(!zName || xDel != SQL_DYNAMIC);
+ if (p->metadata[idx].name != NULL)
+ free((void *)p->metadata[idx].name);
+ p->metadata[idx].name = strdup(name);
+ if (p->metadata[idx].name == NULL) {
+ diag_set(OutOfMemory, strlen(name), "strdup", "name");
return -1;
}
- assert(p->aColName != 0);
- assert(var == COLNAME_NAME || var == COLNAME_DECLTYPE);
- pColName = &(p->aColName[idx + var * p->nResColumn]);
- rc = sqlVdbeMemSetStr(pColName, zName, -1, 1, xDel);
- assert(rc != 0 || !zName || (pColName->flags & MEM_Term) != 0);
- return rc;
+ return 0;
+}
+
+int
+vdbe_set_metadata_col_type(struct Vdbe *p, int idx, const char *type)
+{
+ assert(idx < p->nResColumn);
+ if (p->metadata[idx].type != NULL)
+ free((void *)p->metadata[idx].type);
+ p->metadata[idx].type = strdup(type);
+ if (p->metadata[idx].type == NULL) {
+ diag_set(OutOfMemory, strlen(type), "strdup", "type");
+ return -1;
+ }
+ return 0;
}
/*
@@ -2230,7 +2236,7 @@ sqlVdbeClearObject(sql * db, Vdbe * p)
{
SubProgram *pSub, *pNext;
assert(p->db == 0 || p->db == db);
- releaseMemArray(p->aColName, p->nResColumn * COLNAME_N);
+ vdbe_metadata_delete(p);
for (pSub = p->pProgram; pSub; pSub = pNext) {
pNext = pSub->pNext;
vdbeFreeOpArray(db, pSub->aOp, pSub->nOp);
@@ -2242,7 +2248,6 @@ sqlVdbeClearObject(sql * db, Vdbe * p)
sqlDbFree(db, p->pFree);
}
vdbeFreeOpArray(db, p->aOp, p->nOp);
- sqlDbFree(db, p->aColName);
sqlDbFree(db, p->zSql);
}
--
2.15.1
More information about the Tarantool-patches
mailing list