From b582ea2bd4c5e901b41f6506cc94cec5a1a33cb6 Mon Sep 17 00:00:00 2001 From: unknown Date: Wed, 9 Nov 2005 05:53:34 -0800 Subject: [PATCH] Re-applying the work initially done by Brian, and since worked upon by me previously in several separate patches to the 5.1 parent but never pushed. WL#2952 - add simple single-table only transactions to federated. sql/ha_federated.cc: added handlerton functions for commit and rollback, added handler methods for same. sql/ha_federated.h: added member variable for transaction data (linked list of federated handlers used in transaction) and member functions for support commit and rollback. mysql-test/r/federated_transactions.result: New BitKeeper file ``mysql-test/r/federated_transactions.result'' mysql-test/t/federated_transactions.test: New BitKeeper file ``mysql-test/t/federated_transactions.test'' --- mysql-test/r/federated_transactions.result | 49 ++++++ mysql-test/t/federated_transactions.test | 37 +++++ sql/ha_federated.cc | 167 ++++++++++++++++++++- sql/ha_federated.h | 15 +- 4 files changed, 257 insertions(+), 11 deletions(-) create mode 100644 mysql-test/r/federated_transactions.result create mode 100644 mysql-test/t/federated_transactions.test diff --git a/mysql-test/r/federated_transactions.result b/mysql-test/r/federated_transactions.result new file mode 100644 index 00000000000..403b65b5484 --- /dev/null +++ b/mysql-test/r/federated_transactions.result @@ -0,0 +1,49 @@ +stop slave; +drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9; +reset master; +reset slave; +drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9; +start slave; +stop slave; +DROP DATABASE IF EXISTS federated; +CREATE DATABASE federated; +DROP DATABASE IF EXISTS federated; +CREATE DATABASE federated; +DROP TABLE IF EXISTS federated.t1; +Warnings: +Note 1051 Unknown table 't1' +CREATE TABLE federated.t1 ( +`id` int(20) NOT NULL, +`name` varchar(32) NOT NULL default '' + ) +DEFAULT CHARSET=latin1 ENGINE=BerkeleyDB; +DROP TABLE IF EXISTS federated.t1; +Warnings: +Note 1051 Unknown table 't1' +CREATE TABLE federated.t1 ( +`id` int(20) NOT NULL, +`name` varchar(32) NOT NULL default '' + ) +ENGINE="FEDERATED" DEFAULT CHARSET=latin1 +CONNECTION='mysql://root@127.0.0.1:SLAVE_PORT/federated/t1'; +set autocommit=0; +INSERT INTO federated.t1 (id, name) VALUES (1, 'foo'); +INSERT INTO federated.t1 (id, name) VALUES (2, 'fee'); +COMMIT; +INSERT INTO federated.t1 (id, name) VALUES (3, 'fie'); +INSERT INTO federated.t1 (id, name) VALUES (4, 'fum'); +ROLLBACK; +set autocommit=1; +INSERT INTO federated.t1 (id, name) VALUES (5, 'foe'); +INSERT INTO federated.t1 (id, name) VALUES (6, 'fig'); +SELECT * FROM federated.t1; +id name +1 foo +2 fee +5 foe +6 fig +DELETE FROM federated.t1; +DROP TABLE IF EXISTS federated.t1; +DROP DATABASE IF EXISTS federated; +DROP TABLE IF EXISTS federated.t1; +DROP DATABASE IF EXISTS federated; diff --git a/mysql-test/t/federated_transactions.test b/mysql-test/t/federated_transactions.test new file mode 100644 index 00000000000..46359b8e024 --- /dev/null +++ b/mysql-test/t/federated_transactions.test @@ -0,0 +1,37 @@ +source include/federated.inc; + +connection slave; +DROP TABLE IF EXISTS federated.t1; +#SHOW ENGINES; +CREATE TABLE federated.t1 ( + `id` int(20) NOT NULL, + `name` varchar(32) NOT NULL default '' + ) + DEFAULT CHARSET=latin1 ENGINE=BerkeleyDB; + +connection master; +DROP TABLE IF EXISTS federated.t1; +# # correct connection, same named tables +--replace_result $SLAVE_MYPORT SLAVE_PORT +eval CREATE TABLE federated.t1 ( + `id` int(20) NOT NULL, + `name` varchar(32) NOT NULL default '' + ) + ENGINE="FEDERATED" DEFAULT CHARSET=latin1 + CONNECTION='mysql://root@127.0.0.1:$SLAVE_MYPORT/federated/t1'; + +set autocommit=0; +INSERT INTO federated.t1 (id, name) VALUES (1, 'foo'); +INSERT INTO federated.t1 (id, name) VALUES (2, 'fee'); +COMMIT; +INSERT INTO federated.t1 (id, name) VALUES (3, 'fie'); +INSERT INTO federated.t1 (id, name) VALUES (4, 'fum'); +ROLLBACK; +set autocommit=1; +INSERT INTO federated.t1 (id, name) VALUES (5, 'foe'); +INSERT INTO federated.t1 (id, name) VALUES (6, 'fig'); + +SELECT * FROM federated.t1; +DELETE FROM federated.t1; + +source include/federated_cleanup.inc; diff --git a/sql/ha_federated.cc b/sql/ha_federated.cc index 8a26b9d8fb3..57dc51edb90 100644 --- a/sql/ha_federated.cc +++ b/sql/ha_federated.cc @@ -363,9 +363,9 @@ static int federated_init= FALSE; // Variable for checking the // init state of hash /* Static declaration for handerton */ - static handler *federated_create_handler(TABLE *table); - +static int federated_commit(THD *thd, bool all); +static int federated_rollback(THD *thd, bool all); /* Federated storage engine handlerton */ @@ -381,8 +381,8 @@ handlerton federated_hton= { NULL, /* savepoint */ NULL, /* rollback to savepoint */ NULL, /* release savepoint */ - NULL, /* commit */ - NULL, /* rollback */ + federated_commit, /* commit */ + federated_rollback, /* rollback */ NULL, /* prepare */ NULL, /* recover */ NULL, /* commit_by_xid */ @@ -647,8 +647,8 @@ static int parse_url(FEDERATED_SHARE *share, TABLE *table, share->port= 0; share->socket= 0; - DBUG_PRINT("info", ("Length %d \n", table->s->connect_string.length)); - DBUG_PRINT("info", ("String %.*s \n", table->s->connect_string.length, + DBUG_PRINT("info", ("Length: %d", table->s->connect_string.length)); + DBUG_PRINT("info", ("String: '%.*s'", table->s->connect_string.length, table->s->connect_string.str)); share->scheme= my_strdup_with_length((const byte*)table->s-> connect_string.str, @@ -740,7 +740,7 @@ static int parse_url(FEDERATED_SHARE *share, TABLE *table, DBUG_PRINT("info", ("scheme %s username %s password %s \ - hostname %s port %d database %s tablename %s\n", + hostname %s port %d database %s tablename %s", share->scheme, share->username, share->password, share->hostname, share->port, share->database, share->table_name)); @@ -760,7 +760,9 @@ ha_federated::ha_federated(TABLE *table_arg) :handler(&federated_hton, table_arg), mysql(0), stored_result(0), scan_flag(0), ref_length(sizeof(MYSQL_ROW_OFFSET)), current_position(0) -{} +{ + trx_next= 0; +} /* @@ -1488,6 +1490,7 @@ int ha_federated::open(const char *name, int mode, uint test_if_locked) with transactions */ mysql->reconnect= 1; + DBUG_RETURN(0); } @@ -2633,3 +2636,151 @@ bool ha_federated::get_error_message(int error, String* buf) DBUG_RETURN(FALSE); } +int ha_federated::external_lock(THD *thd, int lock_type) +{ + int error= 0; + ha_federated *trx= (ha_federated *)thd->ha_data[federated_hton.slot]; + DBUG_ENTER("ha_federated::external_lock"); + + if (lock_type != F_UNLCK) + { + DBUG_PRINT("info",("federated not lock F_UNLCK")); + if (!(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))) + { + DBUG_PRINT("info",("federated autocommit")); + /* + This means we are doing an autocommit + */ + error= connection_autocommit(TRUE); + if (error) + { + DBUG_PRINT("info", ("error setting autocommit TRUE: %d", error)); + DBUG_RETURN(error); + } + trans_register_ha(thd, FALSE, &federated_hton); + } + else + { + DBUG_PRINT("info",("not autocommit")); + if (!trx) + { + /* + This is where a transaction gets its start + */ + error= connection_autocommit(FALSE); + if (error) + { + DBUG_PRINT("info", ("error setting autocommit FALSE: %d", error)); + DBUG_RETURN(error); + } + thd->ha_data[federated_hton.slot]= this; + trans_register_ha(thd, TRUE, &federated_hton); + /* + Send a lock table to the remote end. + We do not support this at the moment + */ + if (thd->options & (OPTION_TABLE_LOCK)) + { + DBUG_PRINT("info", ("We do not support lock table yet")); + } + } + else + { + ha_federated *ptr; + for (ptr= trx; ptr; ptr= ptr->trx_next) + if (ptr == this) + break; + else if (!ptr->trx_next) + ptr->trx_next= this; + } + } + } + DBUG_RETURN(0); +} + + +static int federated_commit(THD *thd, bool all) +{ + int return_val= 0; + ha_federated *trx= (ha_federated *)thd->ha_data[federated_hton.slot]; + DBUG_ENTER("federated_commit"); + + if (all) + { + int error= 0; + ha_federated *ptr, *old= NULL; + for (ptr= trx; ptr; old= ptr, ptr= ptr->trx_next) + { + if (old) + old->trx_next= NULL; + error= ptr->connection_commit(); + if (error && !return_val); + return_val= error; + } + thd->ha_data[federated_hton.slot]= NULL; + } + + DBUG_PRINT("info", ("error val: %d", return_val)); + DBUG_RETURN(return_val); +} + + +static int federated_rollback(THD *thd, bool all) +{ + int return_val= 0; + ha_federated *trx= (ha_federated *)thd->ha_data[federated_hton.slot]; + DBUG_ENTER("federated_rollback"); + + if (all) + { + int error= 0; + ha_federated *ptr, *old= NULL; + for (ptr= trx; ptr; old= ptr, ptr= ptr->trx_next) + { + if (old) + old->trx_next= NULL; + error= ptr->connection_rollback(); + if (error && !return_val) + return_val= error; + } + thd->ha_data[federated_hton.slot]= NULL; + } + + DBUG_PRINT("info", ("error val: %d", return_val)); + DBUG_RETURN(return_val); +} + +int ha_federated::connection_commit() +{ + DBUG_ENTER("ha_federated::connection_commit"); + DBUG_RETURN(execute_simple_query("COMMIT", 6)); +} + + +int ha_federated::connection_rollback() +{ + DBUG_ENTER("ha_federated::connection_rollback"); + DBUG_RETURN(execute_simple_query("ROLLBACK", 8)); +} + + +int ha_federated::connection_autocommit(bool state) +{ + const char *text; + DBUG_ENTER("ha_federated::connection_autocommit"); + text= (state == true) ? "SET AUTOCOMMIT=1" : "SET AUTOCOMMIT=0"; + DBUG_RETURN(execute_simple_query(text, 16)); +} + + +int ha_federated::execute_simple_query(const char *query, int len) +{ + DBUG_ENTER("ha_federated::execute_simple_query"); + + if (mysql_real_query(mysql, query, len)) + { + DBUG_RETURN(stash_remote_error()); + } + DBUG_RETURN(0); +} + diff --git a/sql/ha_federated.h b/sql/ha_federated.h index 52f4fad9a27..4cca27b3900 100644 --- a/sql/ha_federated.h +++ b/sql/ha_federated.h @@ -174,11 +174,13 @@ private: public: ha_federated(TABLE *table_arg); - ~ha_federated() - { - } + ~ha_federated() {} /* The name that will be used for display purposes */ const char *table_type() const { return "FEDERATED"; } + /* + Next pointer used in transaction + */ + ha_federated *trx_next; /* The name of the index type that will be used for display don't implement this method unless you really have indexes @@ -298,7 +300,14 @@ public: THR_LOCK_DATA **store_lock(THD *thd, THR_LOCK_DATA **to, enum thr_lock_type lock_type); //required virtual bool get_error_message(int error, String *buf); + int external_lock(THD *thd, int lock_type); + int connection_commit(); + int connection_rollback(); + bool has_transactions() { return 1; } + int connection_autocommit(bool state); + int execute_simple_query(const char *query, int len); }; bool federated_db_init(void); int federated_db_end(ha_panic_function type); +