1
0
mirror of https://github.com/mariadb-corporation/mariadb-connector-c.git synced 2025-08-07 02:42:49 +03:00

Initial implementation for bulk operations/array binding in prepared statements

This commit is contained in:
Georg Richter
2016-07-01 10:10:11 +02:00
parent 45a635dead
commit 8e44202e74
4 changed files with 171 additions and 70 deletions

View File

@@ -165,7 +165,10 @@ enum enum_server_command
#define MARIADB_CLIENT_FLAGS 0xFFFFFFFF00000000ULL #define MARIADB_CLIENT_FLAGS 0xFFFFFFFF00000000ULL
#define MARIADB_CLIENT_PROGRESS (1ULL << 32) #define MARIADB_CLIENT_PROGRESS (1ULL << 32)
#define MARIADB_CLIENT_COM_MULTI (1ULL << 33) #define MARIADB_CLIENT_COM_MULTI (1ULL << 33)
//#define MARIADB_CLIENT_EXTENDED_FLAGS (1ULL << 63) #define MARIADB_CLIENT_STMT_BULK_OPERATIONS (1ULL << 34)
#define IS_MARIADB_EXTENDED_SERVER(mysql)\
!(mysql->server_capabilities & CLIENT_MYSQL)
#define MARIADB_CLIENT_SUPPORTED_FLAGS (MARIADB_CLIENT_PROGRESS |\ #define MARIADB_CLIENT_SUPPORTED_FLAGS (MARIADB_CLIENT_PROGRESS |\
MARIADB_CLIENT_COM_MULTI) MARIADB_CLIENT_COM_MULTI)

View File

@@ -57,7 +57,9 @@ enum enum_stmt_attr_type
STMT_ATTR_UPDATE_MAX_LENGTH, STMT_ATTR_UPDATE_MAX_LENGTH,
STMT_ATTR_CURSOR_TYPE, STMT_ATTR_CURSOR_TYPE,
STMT_ATTR_PREFETCH_ROWS, STMT_ATTR_PREFETCH_ROWS,
STMT_ATTR_PREBIND_PARAMS=200 STMT_ATTR_PREBIND_PARAMS=200,
STMT_ATTR_ARRAY_SIZE,
STMT_ATTR_BIND_TYPE
}; };
enum enum_cursor_type enum enum_cursor_type
@@ -68,6 +70,20 @@ enum enum_cursor_type
CURSOR_TYPE_SCROLLABLE= 4 CURSOR_TYPE_SCROLLABLE= 4
}; };
enum enum_indicator_type
{
STMT_INDICATOR_NONE=0,
STMT_INDICATOR_NULL=1,
STMT_INDICATOR_DEFAULT=2,
STMT_INDICATOR_NTS=4
};
enum enum_bind_type
{
STMT_BIND_ROW=0,
STMT_BIND_COLUMN
};
typedef enum mysql_stmt_state typedef enum mysql_stmt_state
{ {
MYSQL_STMT_INITTED = 0, MYSQL_STMT_INITTED = 0,
@@ -96,7 +112,12 @@ typedef struct st_mysql_bind
unsigned char **row); unsigned char **row);
/* output buffer length, must be set when fetching str/binary */ /* output buffer length, must be set when fetching str/binary */
unsigned long buffer_length; unsigned long buffer_length;
unsigned long offset; /* offset position for char/binary fetch */ union {
unsigned long offset; /* offset position for char/binary fetch */
struct {
uchar *indicator;
};
};
unsigned long length_value; /* Used if length is 0 */ unsigned long length_value; /* Used if length is 0 */
unsigned int flags; /* special flags, e.g. for dummy bind */ unsigned int flags; /* special flags, e.g. for dummy bind */
unsigned int pack_length; /* Internal length for packed data */ unsigned int pack_length; /* Internal length for packed data */
@@ -206,6 +227,8 @@ struct st_mysql_stmt
unsigned int execute_count;/* count how many times the stmt was executed */ unsigned int execute_count;/* count how many times the stmt was executed */
mysql_stmt_use_or_store_func default_rset_handler; mysql_stmt_use_or_store_func default_rset_handler;
struct st_mysqlnd_stmt_methods *m; struct st_mysqlnd_stmt_methods *m;
unsigned int array_size;
enum enum_bind_type bind_type;
}; };
typedef void (*ps_field_fetch_func)(MYSQL_BIND *r_param, const MYSQL_FIELD * field, unsigned char **row); typedef void (*ps_field_fetch_func)(MYSQL_BIND *r_param, const MYSQL_FIELD * field, unsigned char **row);

View File

@@ -55,6 +55,7 @@
#include <time.h> #include <time.h>
#include <mysql/client_plugin.h> #include <mysql/client_plugin.h>
#define STMT_NUM_OFS(type, a,r) ((type *)(a))[r]
#define MADB_RESET_ERROR 1 #define MADB_RESET_ERROR 1
#define MADB_RESET_LONGDATA 2 #define MADB_RESET_LONGDATA 2
#define MADB_RESET_SERVER 4 #define MADB_RESET_SERVER 4
@@ -399,34 +400,47 @@ unsigned char *mysql_net_store_length(unsigned char *packet, size_t length)
return packet + 8; return packet + 8;
} }
int store_param(MYSQL_STMT *stmt, int column, unsigned char **p) static void *ma_get_buffer_offset(MYSQL_STMT *stmt, enum enum_field_types type,
void *buffer, unsigned long row_nr)
{
if (stmt->array_size)
{
int len= mysql_ps_fetch_functions[type].pack_len;
if (len > 0)
return buffer + len * row_nr;
return ((void **)buffer)[row_nr];
}
return buffer;
}
int store_param(MYSQL_STMT *stmt, int column, unsigned char **p, unsigned long row_nr)
{ {
switch (stmt->params[column].buffer_type) { switch (stmt->params[column].buffer_type) {
case MYSQL_TYPE_TINY: case MYSQL_TYPE_TINY:
int1store(*p, *(uchar *)stmt->params[column].buffer); int1store(*p, ((uchar *)stmt->params[column].buffer)[row_nr]);
(*p) += 1; (*p) += 1;
break; break;
case MYSQL_TYPE_SHORT: case MYSQL_TYPE_SHORT:
case MYSQL_TYPE_YEAR: case MYSQL_TYPE_YEAR:
int2store(*p, *(short *)stmt->params[column].buffer); int2store(*p, ((short *)stmt->params[column].buffer)[row_nr]);
(*p) += 2; (*p) += 2;
break; break;
case MYSQL_TYPE_FLOAT: case MYSQL_TYPE_FLOAT:
float4store(*p, *(float *)stmt->params[column].buffer); float4store(*p, ((float *)stmt->params[column].buffer)[row_nr]);
(*p) += 4; (*p) += 4;
break; break;
case MYSQL_TYPE_DOUBLE: case MYSQL_TYPE_DOUBLE:
float8store(*p, *(double *)stmt->params[column].buffer); float8store(*p, ((double *)stmt->params[column].buffer)[row_nr]);
(*p) += 8; (*p) += 8;
break; break;
case MYSQL_TYPE_LONGLONG: case MYSQL_TYPE_LONGLONG:
int8store(*p, *(unsigned long long *)stmt->params[column].buffer); int8store(*p, ((ulonglong *)stmt->params[column].buffer)[row_nr]);
(*p) += 8; (*p) += 8;
break; break;
case MYSQL_TYPE_LONG: case MYSQL_TYPE_LONG:
case MYSQL_TYPE_INT24: case MYSQL_TYPE_INT24:
int4store(*p, *(int32 *)stmt->params[column].buffer); int4store(*p, ((int32 *)stmt->params[column].buffer)[row_nr]);
(*p) += 4; (*p)+= 4;
break; break;
case MYSQL_TYPE_TIME: case MYSQL_TYPE_TIME:
{ {
@@ -440,12 +454,11 @@ int store_param(MYSQL_STMT *stmt, int column, unsigned char **p)
8 1 second; 8 1 second;
9-13 4 second_part 9-13 4 second_part
*/ */
MYSQL_TIME *t= (MYSQL_TIME *)stmt->params[column].buffer; MYSQL_TIME *t= (MYSQL_TIME *)ma_get_buffer_offset(stmt, stmt->params[column].buffer_type,
stmt->params[column].buffer, row_nr);
char t_buffer[MAX_TIME_STR_LEN]; char t_buffer[MAX_TIME_STR_LEN];
uint len= 0; uint len= 0;
memset(t_buffer, 0, MAX_TIME_STR_LEN);
t_buffer[1]= t->neg ? 1 : 0; t_buffer[1]= t->neg ? 1 : 0;
int4store(t_buffer + 2, t->day); int4store(t_buffer + 2, t->day);
t_buffer[6]= (uchar) t->hour; t_buffer[6]= (uchar) t->hour;
@@ -478,11 +491,11 @@ int store_param(MYSQL_STMT *stmt, int column, unsigned char **p)
7 1 second 7 1 second
8-11 4 secondpart 8-11 4 secondpart
*/ */
MYSQL_TIME *t= (MYSQL_TIME *)stmt->params[column].buffer; MYSQL_TIME *t= (MYSQL_TIME *)ma_get_buffer_offset(stmt, stmt->params[column].buffer_type,
stmt->params[column].buffer, row_nr);
char t_buffer[MAX_DATETIME_STR_LEN]; char t_buffer[MAX_DATETIME_STR_LEN];
uint len; uint len;
memset(t_buffer, 0, MAX_DATETIME_STR_LEN);
int2store(t_buffer + 1, t->year); int2store(t_buffer + 1, t->year);
t_buffer[3]= (char) t->month; t_buffer[3]= (char) t->month;
t_buffer[4]= (char) t->day; t_buffer[4]= (char) t->day;
@@ -492,7 +505,7 @@ int store_param(MYSQL_STMT *stmt, int column, unsigned char **p)
if (t->second_part) if (t->second_part)
{ {
int4store(t_buffer + 8, t->second_part); int4store(t_buffer + 8, t->second_part);
len= 11; len= 12;
} }
else if (t->hour || t->minute || t->second) else if (t->hour || t->minute || t->second)
len= 7; len= 7;
@@ -514,13 +527,19 @@ int store_param(MYSQL_STMT *stmt, int column, unsigned char **p)
case MYSQL_TYPE_STRING: case MYSQL_TYPE_STRING:
case MYSQL_TYPE_DECIMAL: case MYSQL_TYPE_DECIMAL:
case MYSQL_TYPE_NEWDECIMAL: case MYSQL_TYPE_NEWDECIMAL:
case MYSQL_TYPE_GEOMETRY:
{ {
ulong len= (ulong)*stmt->params[column].length; ulong len= (ulong)STMT_NUM_OFS(long, stmt->params[column].length, row_nr);
/* to is after p. The latter hasn't been moved */ /* to is after p. The latter hasn't been moved */
uchar *to = mysql_net_store_length(*p, len); uchar *to;
if ((long)len == -1)
len= strlen(ma_get_buffer_offset(stmt, stmt->params[column].buffer_type,
stmt->params[column].buffer, row_nr));
to = mysql_net_store_length(*p, len);
if (len) if (len)
memcpy(to, stmt->params[column].buffer, len); memcpy(to, ma_get_buffer_offset(stmt, stmt->params[column].buffer_type,
stmt->params[column].buffer, row_nr), len);
(*p) = to + len; (*p) = to + len;
break; break;
} }
@@ -528,9 +547,9 @@ int store_param(MYSQL_STMT *stmt, int column, unsigned char **p)
default: default:
/* unsupported parameter type */ /* unsupported parameter type */
SET_CLIENT_STMT_ERROR(stmt, CR_UNSUPPORTED_PARAM_TYPE, SQLSTATE_UNKNOWN, 0); SET_CLIENT_STMT_ERROR(stmt, CR_UNSUPPORTED_PARAM_TYPE, SQLSTATE_UNKNOWN, 0);
return(1); return 1;
} }
return(0); return 0;
} }
/* {{{ mysqlnd_stmt_execute_generate_request */ /* {{{ mysqlnd_stmt_execute_generate_request */
@@ -548,17 +567,31 @@ unsigned char* mysql_stmt_execute_generate_request(MYSQL_STMT *stmt, size_t *req
------------------------------------------ ------------------------------------------
if (stmt->send_types_to_server): if (stmt->send_types_to_server):
param_count*2 parameter types param_count*2 parameter types
1st byte: parameter type
2nd byte flag:
unsigned flag (32768)
indicator variable exists (16384)
------------------------------------------ ------------------------------------------
Pre 10.2 protocol
n data from bind_buffer n data from bind_buffer
10.2 protocol
if indicator variable exists
1st byte: indicator variable
2nd-n: data
*/ */
size_t length= 1024; size_t length= 1024;
size_t free_bytes= 0; size_t free_bytes= 0;
size_t data_size= 0; size_t null_byte_offset;
uint i; uint i, j, num_rows= 1;
my_bool bulk_supported=
!(stmt->mysql->server_capabilities & CLIENT_MYSQL) &&
(stmt->mysql->server_capabilities & MARIADB_CLIENT_STMT_BULK_OPERATIONS);
uchar *start= NULL, *p; uchar *start= NULL, *p;
/* preallocate length bytes */ /* preallocate length bytes */
/* check: gr */ /* check: gr */
if (!(start= p= (uchar *)malloc(length))) if (!(start= p= (uchar *)malloc(length)))
@@ -570,14 +603,14 @@ unsigned char* mysql_stmt_execute_generate_request(MYSQL_STMT *stmt, size_t *req
/* flags is 4 bytes, we store just 1 */ /* flags is 4 bytes, we store just 1 */
int1store(p, (unsigned char) stmt->flags); int1store(p, (unsigned char) stmt->flags);
p++; p++;
if (bulk_supported && stmt->array_size)
int4store(p, 1); /* and send 1 for iteration count */ num_rows= stmt->array_size;
int4store(p, num_rows);
p+= 4; p+= 4;
if (stmt->param_count) if (stmt->param_count)
{ {
size_t null_byte_offset, size_t null_count= (stmt->param_count + 7) / 8;
null_count= (stmt->param_count + 7) / 8;
free_bytes= length - (p - start); free_bytes= length - (p - start);
if (null_count + 20 > free_bytes) if (null_count + 20 > free_bytes)
@@ -589,11 +622,10 @@ unsigned char* mysql_stmt_execute_generate_request(MYSQL_STMT *stmt, size_t *req
p= start + offset; p= start + offset;
} }
null_byte_offset = p - start; null_byte_offset= p - start;
memset(p, 0, null_count); memset(p, 0, null_count);
p += null_count; p += null_count;
int1store(p, stmt->send_types_to_server); int1store(p, stmt->send_types_to_server);
p++; p++;
@@ -608,7 +640,7 @@ unsigned char* mysql_stmt_execute_generate_request(MYSQL_STMT *stmt, size_t *req
{ {
size_t offset= p - start; size_t offset= p - start;
length= offset + stmt->param_count * 2 + 20; length= offset + stmt->param_count * 2 + 20;
if (!(start= (uchar *)realloc((gptr)start, length))) if (!(start= (uchar *)realloc(start, length)))
goto mem_error; goto mem_error;
p= start + offset; p= start + offset;
} }
@@ -616,22 +648,45 @@ unsigned char* mysql_stmt_execute_generate_request(MYSQL_STMT *stmt, size_t *req
{ {
/* this differs from mysqlnd, c api supports unsinged !! */ /* this differs from mysqlnd, c api supports unsinged !! */
uint buffer_type= stmt->params[i].buffer_type | (stmt->params[i].is_unsigned ? 32768 : 0); uint buffer_type= stmt->params[i].buffer_type | (stmt->params[i].is_unsigned ? 32768 : 0);
/* check if parameter requires indicator variable */
if (bulk_supported && stmt->params[i].indicator)
buffer_type|= 16384;
int2store(p, buffer_type); int2store(p, buffer_type);
p+= 2; p+= 2;
} }
} }
/* calculate data size */ /* calculate data size */
for (i = 0; i < stmt->param_count; i++) { for (j=0; j < num_rows; j++)
if (stmt->params[i].buffer && !stmt->params[i].is_null) {
stmt->params[i].is_null = &is_not_null; for (i=0; i < stmt->param_count; i++)
if (!stmt->params[i].length)
stmt->params[i].length= &stmt->params[i].length_value;
if (!(stmt->params[i].is_null && *stmt->params[i].is_null) && !stmt->params[i].long_data_used)
{ {
switch (stmt->params[i].buffer_type) { ulong size= 0;
my_bool has_data= TRUE;
uchar indicator= 0;
if (bulk_supported && stmt->params[i].indicator)
{
indicator= stmt->params[i].indicator[j];
/* check if we need to send data */
if (indicator == STMT_INDICATOR_NULL ||
indicator == STMT_INDICATOR_DEFAULT)
has_data= FALSE;
size= 1;
}
if (stmt->params[i].long_data_used)
{
has_data= FALSE;
stmt->params[i].long_data_used= 0;
}
if (has_data)
{
switch (stmt->params[i].buffer_type) {
case MYSQL_TYPE_NULL: case MYSQL_TYPE_NULL:
stmt->params[i].is_null = &is_null; if (bulk_supported)
indicator= STMT_INDICATOR_NULL;
has_data= FALSE;
break; break;
case MYSQL_TYPE_TINY_BLOB: case MYSQL_TYPE_TINY_BLOB:
case MYSQL_TYPE_MEDIUM_BLOB: case MYSQL_TYPE_MEDIUM_BLOB:
@@ -647,50 +702,57 @@ unsigned char* mysql_stmt_execute_generate_request(MYSQL_STMT *stmt, size_t *req
case MYSQL_TYPE_ENUM: case MYSQL_TYPE_ENUM:
case MYSQL_TYPE_BIT: case MYSQL_TYPE_BIT:
case MYSQL_TYPE_SET: case MYSQL_TYPE_SET:
data_size+= 5; /* max 8 bytes for size */ size+= 5; /* max 8 bytes for size */
data_size+= (size_t)*stmt->params[i].length; if (stmt->params[i].length[j] == -1)
size+= strlen(ma_get_buffer_offset(stmt, stmt->params[i].buffer_type, stmt->params[i].buffer,j));
else
size+= (size_t)stmt->params[i].length[j];
break; break;
default: default:
data_size+= mysql_ps_fetch_functions[stmt->params[i].buffer_type].pack_len; size+= mysql_ps_fetch_functions[stmt->params[i].buffer_type].pack_len;
break; break;
}
}
free_bytes= length - (p - start);
if (free_bytes < size + 20)
{
size_t offset= p - start;
length= MAX(2 * length, offset + size + 20);
if (!(start= (uchar *)realloc(start, length)))
goto mem_error;
p= start + offset;
}
if (bulk_supported && (indicator || stmt->params[i].indicator))
{
int1store(p, indicator);
p++;
}
if (num_rows == 1)
{
if ((stmt->params[i].is_null && *stmt->params[i].is_null) ||
stmt->params[i].buffer_type == MYSQL_TYPE_NULL ||
!stmt->params[i].buffer)
{
(start + null_byte_offset)[i/8] |= (unsigned char) (1 << (i & 7));
has_data= FALSE;
}
}
if (has_data)
{
store_param(stmt, i, &p, j);
} }
} }
} }
/* store data */
free_bytes= length - (p - start);
if (free_bytes < data_size + 20)
{
size_t offset= p - start;
length= offset + data_size + 20;
if (!(start= (uchar *)realloc((gptr)start, length)))
goto mem_error;
p= start + offset;
}
for (i = 0; i < stmt->param_count; i++)
{
if (stmt->params[i].long_data_used) {
stmt->params[i].long_data_used= 0;
}
else {
if (!stmt->params[i].buffer || *stmt->params[i].is_null || stmt->params[i].buffer_type == MYSQL_TYPE_NULL) {
(start + null_byte_offset)[i/8] |= (unsigned char) (1 << (i & 7));
} else {
store_param(stmt, i, &p);
}
}
}
} }
stmt->send_types_to_server= 0; stmt->send_types_to_server= 0;
*request_len = (size_t)(p - start); *request_len = (size_t)(p - start);
return(start); return start;
mem_error: mem_error:
SET_CLIENT_STMT_ERROR(stmt, CR_OUT_OF_MEMORY, SQLSTATE_UNKNOWN, 0); SET_CLIENT_STMT_ERROR(stmt, CR_OUT_OF_MEMORY, SQLSTATE_UNKNOWN, 0);
free(start); free(start);
*request_len= 0; *request_len= 0;
return(NULL); return NULL;
} }
/* }}} */ /* }}} */
@@ -723,6 +785,13 @@ my_bool STDCALL mysql_stmt_attr_get(MYSQL_STMT *stmt, enum enum_stmt_attr_type a
break; break;
case STMT_ATTR_PREBIND_PARAMS: case STMT_ATTR_PREBIND_PARAMS:
*(unsigned int *)value= stmt->param_count; *(unsigned int *)value= stmt->param_count;
break;
case STMT_ATTR_ARRAY_SIZE:
*(unsigned int *)value= stmt->array_size;
break;
case STMT_ATTR_BIND_TYPE:
*(enum enum_bind_type *)value= stmt->bind_type;
break;
default: default:
return(1); return(1);
} }
@@ -752,6 +821,11 @@ my_bool STDCALL mysql_stmt_attr_set(MYSQL_STMT *stmt, enum enum_stmt_attr_type a
case STMT_ATTR_PREBIND_PARAMS: case STMT_ATTR_PREBIND_PARAMS:
stmt->param_count= *(unsigned int *)value; stmt->param_count= *(unsigned int *)value;
break; break;
case STMT_ATTR_ARRAY_SIZE:
stmt->array_size= *(unsigned int *)value;
break;
case STMT_ATTR_BIND_TYPE:
stmt->bind_type= *(enum enum_bind_type *)value;
default: default:
SET_CLIENT_STMT_ERROR(stmt, CR_NOT_IMPLEMENTED, SQLSTATE_UNKNOWN, 0); SET_CLIENT_STMT_ERROR(stmt, CR_NOT_IMPLEMENTED, SQLSTATE_UNKNOWN, 0);
return(1); return(1);

View File

@@ -1715,6 +1715,7 @@ static int test_ps_null_param(MYSQL *mysql)
stmt= mysql_stmt_init(mysql); stmt= mysql_stmt_init(mysql);
FAIL_IF(!stmt, mysql_error(mysql)); FAIL_IF(!stmt, mysql_error(mysql));
rc= mysql_stmt_prepare(stmt, query, strlen(query)); rc= mysql_stmt_prepare(stmt, query, strlen(query));
diag("statement: %s", query);
check_stmt_rc(rc, stmt); check_stmt_rc(rc, stmt);
FAIL_IF(mysql_stmt_param_count(stmt) != 1, "param_count != 1"); FAIL_IF(mysql_stmt_param_count(stmt) != 1, "param_count != 1");