1
0
mirror of https://github.com/mariadb-corporation/mariadb-connector-c.git synced 2025-08-08 14:02:17 +03:00

[MDEV-30366] Bulk unitary result flag client implementation part.

With MDEV-30366, server now permit to send a result-set containing generated id and Affected rows for each bulk operation. This feature can be enabled with option MARIADB_OPT_BULK_UNIT_RESULTS when server supports it.
This commit is contained in:
rusher
2024-03-06 16:03:55 +01:00
parent 4a74f8784d
commit abce07da2a
8 changed files with 385 additions and 8 deletions

View File

@@ -73,6 +73,7 @@ struct st_mysql_options_extension {
unsigned int tls_cipher_strength;
char *tls_version;
my_bool read_only;
my_bool bulk_unit_results;
char *connection_handler;
my_bool (*set_option)(MYSQL *mysql, const char *config_option, const char *config_value);
MA_HASHTBL userdata;

View File

@@ -177,6 +177,8 @@ enum enum_server_command
#define MARIADB_CLIENT_EXTENDED_METADATA (1ULL << 35)
/* Do not resend metadata for prepared statements, since 10.6*/
#define MARIADB_CLIENT_CACHE_METADATA (1ULL << 36)
/* permit sending unit result-set for BULK commands */
#define MARIADB_CLIENT_BULK_UNIT_RESULTS (1ULL << 37)
#define IS_MARIADB_EXTENDED_SERVER(mysql)\
(!(mysql->server_capabilities & CLIENT_MYSQL))
@@ -184,7 +186,8 @@ enum enum_server_command
#define MARIADB_CLIENT_SUPPORTED_FLAGS (MARIADB_CLIENT_PROGRESS |\
MARIADB_CLIENT_STMT_BULK_OPERATIONS|\
MARIADB_CLIENT_EXTENDED_METADATA|\
MARIADB_CLIENT_CACHE_METADATA)
MARIADB_CLIENT_CACHE_METADATA|\
MARIADB_CLIENT_BULK_UNIT_RESULTS)
#define CLIENT_SUPPORTED_FLAGS (CLIENT_MYSQL |\
CLIENT_FOUND_ROWS |\
@@ -225,6 +228,9 @@ enum enum_server_command
#define CLIENT_DEFAULT_FLAGS ((CLIENT_SUPPORTED_FLAGS & ~CLIENT_COMPRESS)\
& ~CLIENT_SSL)
#define CLIENT_DEFAULT_EXTENDED_FLAGS (MARIADB_CLIENT_SUPPORTED_FLAGS &\
~MARIADB_CLIENT_BULK_UNIT_RESULTS)
#define SERVER_STATUS_IN_TRANS 1 /* Transaction has started */
#define SERVER_STATUS_AUTOCOMMIT 2 /* Server in auto_commit mode */
#define SERVER_MORE_RESULTS_EXIST 8

View File

@@ -34,6 +34,12 @@
((stmt)->mysql->extension->mariadb_server_capabilities & \
(MARIADB_CLIENT_STMT_BULK_OPERATIONS >> 32))))
#define MARIADB_STMT_BULK_UNIT_RESULTS_SUPPORTED(stmt)\
((stmt)->mysql && \
(!((stmt)->mysql->server_capabilities & CLIENT_MYSQL) &&\
((stmt)->mysql->extension->mariadb_client_flag & \
(MARIADB_CLIENT_BULK_UNIT_RESULTS >> 32))))
#define CLEAR_CLIENT_STMT_ERROR(a) \
do { \
(a)->last_errno= 0;\
@@ -88,7 +94,7 @@ enum enum_indicator_type
bulk PS flags
*/
#define STMT_BULK_FLAG_CLIENT_SEND_TYPES 128
#define STMT_BULK_FLAG_INSERT_ID_REQUEST 64
#define STMT_BULK_FLAG_SEND_UNIT_RESULTS 64
typedef enum mysql_stmt_state
{

View File

@@ -257,7 +257,8 @@ extern const char *SQLSTATE_UNKNOWN;
MARIADB_OPT_RESTRICTED_AUTH,
MARIADB_OPT_RPL_REGISTER_REPLICA,
MARIADB_OPT_STATUS_CALLBACK,
MARIADB_OPT_SERVER_PLUGINS
MARIADB_OPT_SERVER_PLUGINS,
MARIADB_OPT_BULK_UNIT_RESULTS
};
enum mariadb_value {

View File

@@ -693,6 +693,7 @@ struct st_default_options mariadb_defaults[] =
{{MYSQL_OPT_SSL_ENFORCE}, MARIADB_OPTION_BOOL, "ssl-enforce"},
{{MARIADB_OPT_RESTRICTED_AUTH}, MARIADB_OPTION_STR, "restricted-auth"},
{{.option_func=parse_connection_string}, MARIADB_OPTION_FUNC, "connection"},
{{MARIADB_OPT_BULK_UNIT_RESULTS}, MARIADB_OPTION_BOOL, "bulk-unit-results"},
/* Aliases */
{{MARIADB_OPT_SCHEMA}, MARIADB_OPTION_STR, "db"},
{{MARIADB_OPT_UNIXSOCKET}, MARIADB_OPTION_STR, "unix_socket"},
@@ -3830,6 +3831,9 @@ mysql_optionsv(MYSQL *mysql,enum mysql_option option, ...)
}
}
break;
case MARIADB_OPT_BULK_UNIT_RESULTS:
OPT_SET_EXTENDED_VALUE_INT(&mysql->options, bulk_unit_results, *(my_bool *)arg1);
break;
default:
va_end(ap);
SET_CLIENT_ERROR(mysql, CR_NOT_IMPLEMENTED, SQLSTATE_UNKNOWN, 0);
@@ -4052,6 +4056,9 @@ mysql_get_optionv(MYSQL *mysql, enum mysql_option option, void *arg, ...)
case MARIADB_OPT_SKIP_READ_RESPONSE:
*((my_bool*)arg)= mysql->options.extension ? mysql->options.extension->skip_read_response : 0;
break;
case MARIADB_OPT_BULK_UNIT_RESULTS:
*((my_bool *)arg)= mysql->options.extension ? mysql->options.extension->bulk_unit_results : 0;
break;
default:
va_end(ap);
SET_CLIENT_ERROR(mysql, CR_NOT_IMPLEMENTED, SQLSTATE_UNKNOWN, 0);

View File

@@ -911,7 +911,7 @@ unsigned char* ma_stmt_execute_generate_bulk_request(MYSQL_STMT *stmt, size_t *r
0 4 Statement id
4 2 Flags (cursor type):
STMT_BULK_FLAG_CLIENT_SEND_TYPES = 128
STMT_BULK_FLAG_INSERT_ID_REQUEST = 64
STMT_BULK_FLAG_SEND_UNIT_RESULTS = 64
-----------------------------------------
if (stmt->send_types_to_server):
for (i=0; i < param_count; i++)
@@ -964,6 +964,9 @@ unsigned char* ma_stmt_execute_generate_bulk_request(MYSQL_STMT *stmt, size_t *r
/* todo: request to return auto generated ids */
if (stmt->send_types_to_server)
flags|= STMT_BULK_FLAG_CLIENT_SEND_TYPES;
if (MARIADB_STMT_BULK_UNIT_RESULTS_SUPPORTED(stmt))
flags|= STMT_BULK_FLAG_SEND_UNIT_RESULTS;
int2store(p, flags);
p+=2;

View File

@@ -338,9 +338,11 @@ static int send_client_reply_packet(MCPVIO_EXT *mpvio,
if (!(mysql->server_capabilities & CLIENT_MYSQL))
{
uint server_extended_cap= mysql->extension->mariadb_server_capabilities;
uint client_extended_cap= (uint)(MARIADB_CLIENT_SUPPORTED_FLAGS >> 32);
ulonglong client_extended_flag = CLIENT_DEFAULT_EXTENDED_FLAGS;
if (mysql->options.extension && mysql->options.extension->bulk_unit_results)
client_extended_flag|= MARIADB_CLIENT_BULK_UNIT_RESULTS;
mysql->extension->mariadb_client_flag=
server_extended_cap & client_extended_cap;
server_extended_cap & (long)(client_extended_flag >> 32);
int4store(buff + 28, mysql->extension->mariadb_client_flag);
}
end= buff+32;

View File

@@ -21,6 +21,12 @@
static my_bool bulk_enabled= 0;
#define SERVER_SUPPORT_BULK_UNIT_RESULTS(mysql)\
(!(mysql->server_capabilities & CLIENT_MYSQL) &&\
(mysql->extension->mariadb_server_capabilities & \
(MARIADB_CLIENT_BULK_UNIT_RESULTS >> 32)))
char *rand_str(size_t length) {
const char charset[] = "0123456789"
"abcdefghijklmnopqrstuvwxyz"
@@ -57,11 +63,14 @@ static int bulk1(MYSQL *mysql)
MYSQL_RES *res;
MYSQL_ROW row;
unsigned int intval;
my_bool bool_val;
if (!bulk_enabled)
return SKIP;
rc= mysql_select_db(mysql, "testc");
mysql_get_option(mysql, MARIADB_OPT_BULK_UNIT_RESULTS, &bool_val);
FAIL_IF(bool_val, "bool_val == true");
rc= mysql_query(mysql, "DROP TABLE IF EXISTS bulk1");
check_mysql_rc(rc, mysql);
@@ -1067,6 +1076,345 @@ static int test_mdev16593(MYSQL *mysql)
return OK;
}
static int bulk_with_unit_result_insert(MYSQL *my)
{
my_bool unique_result= 1;
my_bool bool_val;
MYSQL *mysql;
MYSQL_STMT *stmt;
unsigned int array_size= TEST_ARRAY_SIZE;
int rc, rowcount= 0;
unsigned int i;
char **buffer;
unsigned long *lengths;
MYSQL_BIND bind[1];
MYSQL_RES *res;
MYSQL_ROW row;
MYSQL_BIND bind_out[2];
int id, affected_rows = 0;
int expectedId = 1;
unsigned int intval;
SKIP_MAXSCALE;
if (!SERVER_SUPPORT_BULK_UNIT_RESULTS(my))
{
diag("Server doesn't support bulk unit results");
return SKIP;
}
mysql= mysql_init(NULL);
stmt= mysql_stmt_init(mysql);
mysql_options(mysql, MARIADB_OPT_BULK_UNIT_RESULTS, &unique_result);
FAIL_IF(!my_test_connect(mysql, hostname, username, password, schema,
port, socketname, 0), mysql_error(mysql));
mysql_get_option(mysql, MARIADB_OPT_BULK_UNIT_RESULTS, &bool_val);
FAIL_UNLESS(bool_val, "bool_val != true");
if (!bulk_enabled)
return SKIP;
rc= mysql_select_db(mysql, "testc");
rc= mysql_query(mysql, "DROP TABLE IF EXISTS bulk_with_unit_result_insert");
check_mysql_rc(rc, mysql);
rc= mysql_query(mysql, "CREATE TABLE bulk_with_unit_result_insert (a int NOT NULL AUTO_INCREMENT, b VARCHAR(255), PRIMARY KEY (a)) engine=InnoDB");
check_mysql_rc(rc, mysql);
rc= mysql_stmt_prepare(stmt, SL("INSERT INTO bulk_with_unit_result_insert(b) VALUES (?)"));
check_stmt_rc(rc, stmt);
/* allocate memory */
buffer= calloc(TEST_ARRAY_SIZE, sizeof(char *));
lengths= (unsigned long *)calloc(sizeof(long), TEST_ARRAY_SIZE);
for (i=0; i < TEST_ARRAY_SIZE; i++)
{
buffer[i]= rand_str(254);
lengths[i]= -1;
}
memset(bind, 0, sizeof(MYSQL_BIND) * 1);
memset(bind_out, '\0', sizeof(bind_out));
bind[0].buffer_type= MYSQL_TYPE_STRING;
bind[0].buffer= (void *)buffer;
bind[0].length= (unsigned long *)lengths;
bind_out[0].buffer_type= MYSQL_TYPE_LONG;
bind_out[0].buffer= (void*) &id;
bind_out[1].buffer_type= MYSQL_TYPE_LONG;
bind_out[1].buffer= (void*) &affected_rows;
rc= mysql_stmt_attr_set(stmt, STMT_ATTR_ARRAY_SIZE, &array_size);
check_stmt_rc(rc, stmt);
rc= mysql_stmt_bind_param(stmt, bind);
check_stmt_rc(rc, stmt);
for (i=0; i < 100; i++)
{
rc= mysql_stmt_execute(stmt);
check_stmt_rc(rc, stmt);
rc= mysql_stmt_bind_result(stmt, bind_out);
check_stmt_rc(rc, stmt);
rowcount= 0;
while (mysql_stmt_fetch(stmt) != MYSQL_NO_DATA)
{
rowcount++;
// diag("id:%llu expected %llu", id, expectedId);
FAIL_UNLESS(id == expectedId, "id != expectedId");
expectedId++;
// diag("affected_rows:%llu", affected_rows);
FAIL_UNLESS(affected_rows == 1, "affected_rows != 1");
}
// test can be improved depending on auto_increment_increment/auto_increment_offset...
expectedId = expectedId + 1023;
FAIL_IF(rowcount != TEST_ARRAY_SIZE, "rowcount != TEST_ARRAY_SIZE");
}
for (i=0; i < array_size; i++)
free(buffer[i]);
free(buffer);
free(lengths);
rc= mysql_stmt_close(stmt);
check_mysql_rc(rc, mysql);
rc= mysql_query(mysql, "SELECT COUNT(*) FROM bulk_with_unit_result_insert");
check_mysql_rc(rc, mysql);
res= mysql_store_result(mysql);
row= mysql_fetch_row(res);
intval= atoi(row[0]);
mysql_free_result(res);
FAIL_IF(intval != array_size * 100, "Expected 102400 rows");
rc= mysql_query(mysql, "DROP TABLE IF EXISTS bulk_with_unit_result_insert");
check_mysql_rc(rc, mysql);
mysql_close(mysql);
check_mysql_rc(rc, my);
return OK;
}
static int bulk_with_unit_result_delete(MYSQL *my)
{
my_bool unique_result= 1;
unsigned int array_size= 5;
int rc, rowcount= 0;
unsigned int i, j;
MYSQL_BIND bind[1];
MYSQL_RES *res;
MYSQL_ROW row;
MYSQL_BIND bind_out[2];
unsigned int *vals;
int id, affected_rows = 0;
unsigned int intval;
MYSQL *mysql;
MYSQL_STMT *stmt;
SKIP_MAXSCALE;
if (!SERVER_SUPPORT_BULK_UNIT_RESULTS(my))
{
diag("Server doesn't support bulk unit results");
return SKIP;
}
mysql= mysql_init(NULL);
stmt= mysql_stmt_init(mysql);
mysql_options(mysql, MARIADB_OPT_BULK_UNIT_RESULTS, &unique_result);
FAIL_IF(!my_test_connect(mysql, hostname, username, password, schema,
port, socketname, 0), mysql_error(mysql));
if (!bulk_enabled)
return SKIP;
rc= mysql_select_db(mysql, "testc");
rc= mysql_query(mysql, "DROP TABLE IF EXISTS bulk_with_unit_result_delete");
check_mysql_rc(rc, mysql);
rc= mysql_query(mysql, "CREATE TABLE bulk_with_unit_result_delete (a int NOT NULL AUTO_INCREMENT, b VARCHAR(255), PRIMARY KEY (a))");
check_mysql_rc(rc, mysql);
rc= mysql_query(mysql, "INSERT INTO bulk_with_unit_result_delete(b) (SELECT CONCAT(seq, 'test') FROM seq_1_to_100)");
check_mysql_rc(rc, mysql);
rc= mysql_stmt_prepare(stmt, SL("DELETE FROM bulk_with_unit_result_delete WHERE a = ?"));
check_stmt_rc(rc, stmt);
memset(bind_out, '\0', sizeof(bind_out));
bind_out[0].buffer_type= MYSQL_TYPE_LONG;
bind_out[0].buffer= (void*) &id;
bind_out[1].buffer_type= MYSQL_TYPE_LONG;
bind_out[1].buffer= (void*) &affected_rows;
rc= mysql_stmt_attr_set(stmt, STMT_ATTR_ARRAY_SIZE, &array_size);
check_stmt_rc(rc, stmt);
vals= (unsigned int *)calloc(sizeof(int), 5);
memset(bind, 0, sizeof(MYSQL_BIND) * 1);
bind[0].buffer_type= MYSQL_TYPE_LONG;
bind[0].buffer= vals;
for (i=0; i < 10; i++)
{
for (j=0; j < 5; j++)
vals[j]= 1 + j * 2 + i * 10;
rc= mysql_stmt_bind_param(stmt, bind);
check_stmt_rc(rc, stmt);
rc= mysql_stmt_execute(stmt);
check_stmt_rc(rc, stmt);
rc= mysql_stmt_bind_result(stmt, bind_out);
check_stmt_rc(rc, stmt);
rowcount= 0;
while (mysql_stmt_fetch(stmt) != MYSQL_NO_DATA)
{
rowcount++;
FAIL_UNLESS(id == 0, "id != 0");
FAIL_UNLESS(affected_rows == 1, "affected_rows != 1");
}
// test can be improved depending on auto_increment_increment/auto_increment_offset...
FAIL_UNLESS(rowcount == 5, "rowcount != 5");
}
rc= mysql_stmt_close(stmt);
check_mysql_rc(rc, mysql);
free(vals);
rc= mysql_query(mysql, "SELECT a FROM bulk_with_unit_result_delete");
check_mysql_rc(rc, mysql);
res= mysql_store_result(mysql);
rowcount= 0;
for (i=1; i < 51; i++)
{
row=mysql_fetch_row(res);
intval = atoi(row[0]);
FAIL_UNLESS(intval == i * 2, "intval != i * 2");
}
mysql_free_result(res);
rc= mysql_query(mysql, "DROP TABLE IF EXISTS bulk_with_unit_result_delete");
check_mysql_rc(rc, mysql);
mysql_close(mysql);
check_mysql_rc(rc, my);
return OK;
}
static int bulk_with_unit_result_update(MYSQL *my)
{
my_bool unique_result= 1;
unsigned int array_size= 5;
int rc, rowcount= 0;
unsigned int i, j;
MYSQL_BIND bind[1];
MYSQL_RES *res;
MYSQL_ROW row;
MYSQL_BIND bind_out[2];
unsigned int *vals;
int id, affected_rows = 0;
char str[50];
MYSQL *mysql;
MYSQL_STMT *stmt;
SKIP_MAXSCALE;
if (!SERVER_SUPPORT_BULK_UNIT_RESULTS(my))
{
diag("Server doesn't support bulk unit results");
return SKIP;
}
mysql= mysql_init(NULL);
stmt= mysql_stmt_init(mysql);
mysql_options(mysql, MARIADB_OPT_BULK_UNIT_RESULTS, &unique_result);
FAIL_IF(!my_test_connect(mysql, hostname, username, password, schema,
port, socketname, 0), mysql_error(mysql));
if (!bulk_enabled)
return SKIP;
rc= mysql_select_db(mysql, "testc");
rc= mysql_query(mysql, "DROP TABLE IF EXISTS bulk_with_unit_result_update");
check_mysql_rc(rc, mysql);
rc= mysql_query(mysql, "CREATE TABLE bulk_with_unit_result_update (a int NOT NULL AUTO_INCREMENT, b VARCHAR(255), PRIMARY KEY (a))");
check_mysql_rc(rc, mysql);
rc= mysql_query(mysql, "INSERT INTO bulk_with_unit_result_update(b) (SELECT CONCAT(seq, 'test') FROM seq_1_to_100)");
check_mysql_rc(rc, mysql);
rc= mysql_stmt_prepare(stmt, SL("UPDATE bulk_with_unit_result_update SET b=CONCAT(b,'added') WHERE a = ?"));
check_stmt_rc(rc, stmt);
memset(bind_out, '\0', sizeof(bind_out));
bind_out[0].buffer_type= MYSQL_TYPE_LONG;
bind_out[0].buffer= (void*) &id;
bind_out[1].buffer_type= MYSQL_TYPE_LONG;
bind_out[1].buffer= (void*) &affected_rows;
rc= mysql_stmt_attr_set(stmt, STMT_ATTR_ARRAY_SIZE, &array_size);
check_stmt_rc(rc, stmt);
vals= (unsigned int *)calloc(sizeof(int), 5);
memset(bind, 0, sizeof(MYSQL_BIND) * 1);
bind[0].buffer_type= MYSQL_TYPE_LONG;
bind[0].buffer= vals;
for (i=0; i < 10; i++)
{
for (j=0; j < 5; j++)
vals[j]= 1 + j * 2 + i * 10;
rc= mysql_stmt_bind_param(stmt, bind);
check_stmt_rc(rc, stmt);
rc= mysql_stmt_execute(stmt);
check_stmt_rc(rc, stmt);
rc= mysql_stmt_bind_result(stmt, bind_out);
check_stmt_rc(rc, stmt);
rowcount= 0;
while (mysql_stmt_fetch(stmt) != MYSQL_NO_DATA)
{
rowcount++;
FAIL_UNLESS(id == 0, "id != 0");
FAIL_UNLESS(affected_rows == 1, "affected_rows != 1");
}
// test can be improved depending on auto_increment_increment/auto_increment_offset...
FAIL_UNLESS(rowcount == 5, "rowcount != 5");
}
rc= mysql_stmt_close(stmt);
check_mysql_rc(rc, mysql);
free(vals);
rc= mysql_query(mysql, "SELECT b FROM bulk_with_unit_result_update");
check_mysql_rc(rc, mysql);
res= mysql_store_result(mysql);
rowcount= 0;
for (i=1; i < 101; i++)
{
row=mysql_fetch_row(res);
if (i % 2 == 0) {
sprintf(str, "%dtest", i);
} else {
sprintf(str, "%dtestadded", i);
}
FAIL_IF(strcmp(row[0], str) != 0, "strcmp(row[0], str) != 0");
}
mysql_free_result(res);
rc= mysql_query(mysql, "DROP TABLE IF EXISTS bulk_with_unit_result_update");
check_mysql_rc(rc, mysql);
mysql_close(mysql);
check_mysql_rc(rc, my);
return OK;
}
struct my_tests_st my_tests[] = {
{"check_bulk", check_bulk, TEST_CONNECTION_DEFAULT, 0, NULL, NULL},
{"test_mdev16593", test_mdev16593, TEST_CONNECTION_NEW, 0, NULL, NULL},
@@ -1083,6 +1431,9 @@ struct my_tests_st my_tests[] = {
{"bulk4", bulk4, TEST_CONNECTION_DEFAULT, 0, NULL, NULL},
{"bulk_null", bulk_null, TEST_CONNECTION_DEFAULT, 0, NULL, NULL},
{"bulk_skip_row", bulk_skip_row, TEST_CONNECTION_DEFAULT, 0, NULL, NULL},
{"bulk_with_unit_result_insert", bulk_with_unit_result_insert, TEST_CONNECTION_DEFAULT, 0, NULL, NULL},
{"bulk_with_unit_result_delete", bulk_with_unit_result_delete, TEST_CONNECTION_DEFAULT, 0, NULL, NULL},
{"bulk_with_unit_result_update", bulk_with_unit_result_update, TEST_CONNECTION_DEFAULT, 0, NULL, NULL},
{NULL, NULL, 0, 0, NULL, NULL}
};