1
0
mirror of https://github.com/MariaDB/server.git synced 2025-08-01 03:47:19 +03:00

XA (not completely polished out yet)

This commit is contained in:
serg@serg.mylan
2005-01-16 13:16:23 +01:00
parent 184cbf6d37
commit 1034677f94
72 changed files with 3384 additions and 2291 deletions

View File

@ -102,11 +102,32 @@ static int write_status(DB *status_block, char *buff, uint length);
static void update_status(BDB_SHARE *share, TABLE *table);
static void berkeley_noticecall(DB_ENV *db_env, db_notices notice);
static int berkeley_close_connection(THD *thd);
static int berkeley_commit(THD *thd, bool all);
static int berkeley_rollback(THD *thd, bool all);
static handlerton berkeley_hton = {
0, /* slot */
0, /* savepoint size */
berkeley_close_connection,
NULL, /* savepoint_set */
NULL, /* savepoint_rollback */
NULL, /* savepoint_release */
berkeley_commit,
berkeley_rollback,
NULL, /* prepare */
NULL, /* recover */
};
typedef struct st_berkeley_trx_data {
DB_TXN *all;
DB_TXN *stmt;
uint bdb_lock_count;
} berkeley_trx_data;
/* General functions */
bool berkeley_init(void)
handlerton *berkeley_init(void)
{
DBUG_ENTER("berkeley_init");
@ -135,7 +156,7 @@ bool berkeley_init(void)
berkeley_log_file_size= max(berkeley_log_file_size, 10*1024*1024L);
if (db_env_create(&db_env,0))
DBUG_RETURN(1); /* purecov: inspected */
DBUG_RETURN(0);
db_env->set_errcall(db_env,berkeley_print_error);
db_env->set_errpfx(db_env,"bdb");
db_env->set_noticecall(db_env, berkeley_noticecall);
@ -163,14 +184,15 @@ bool berkeley_init(void)
DB_INIT_LOG | DB_INIT_MPOOL | DB_INIT_TXN |
DB_CREATE | DB_THREAD, 0666))
{
db_env->close(db_env,0); /* purecov: inspected */
db_env=0; /* purecov: inspected */
db_env->close(db_env,0);
db_env=0;
DBUG_RETURN(0);
}
(void) hash_init(&bdb_open_tables,system_charset_info,32,0,0,
(hash_get_key) bdb_get_key,0,0);
pthread_mutex_init(&bdb_mutex,MY_MUTEX_INIT_FAST);
DBUG_RETURN(db_env == 0);
DBUG_RETURN(&berkeley_hton);
}
@ -188,6 +210,11 @@ bool berkeley_end(void)
DBUG_RETURN(error != 0);
}
static int berkeley_close_connection(THD *thd)
{
my_free((gptr)thd->ha_data[berkeley_hton.slot], MYF(0));
}
bool berkeley_flush_logs()
{
int error;
@ -206,26 +233,33 @@ bool berkeley_flush_logs()
DBUG_RETURN(result);
}
int berkeley_commit(THD *thd, void *trans)
static int berkeley_commit(THD *thd, bool all)
{
DBUG_ENTER("berkeley_commit");
DBUG_PRINT("trans",("ending transaction %s",
trans == thd->transaction.stmt.bdb_tid ? "stmt" : "all"));
int error=txn_commit((DB_TXN*) trans,0);
DBUG_PRINT("trans",("ending transaction %s", all ? "all" : "stmt"));
berkeley_trx_data *trx=(berkeley_trx_data *)thd->ha_data[berkeley_hton.slot];
int error=txn_commit(all ? trx->all : trx->stmt,0);
#ifndef DBUG_OFF
if (error)
DBUG_PRINT("error",("error: %d",error)); /* purecov: inspected */
DBUG_PRINT("error",("error: %d",error));
#endif
if (all)
trx->all=0;
else
trx->stmt=0;
DBUG_RETURN(error);
}
int berkeley_rollback(THD *thd, void *trans)
static int berkeley_rollback(THD *thd, bool all)
{
DBUG_ENTER("berkeley_rollback");
DBUG_PRINT("trans",("aborting transaction %s",
trans == thd->transaction.stmt.bdb_tid ? "stmt" : "all"));
int error=txn_abort((DB_TXN*) trans);
DBUG_PRINT("trans",("aborting transaction %s", all ? "all" : "stmt"));
berkeley_trx_data *trx=(berkeley_trx_data *)thd->ha_data[berkeley_hton.slot];
int error=txn_abort(all ? trx->all : trx->stmt);
if (all)
trx->all=0;
else
trx->stmt=0;
DBUG_RETURN(error);
}
@ -1818,52 +1852,56 @@ int ha_berkeley::reset(void)
int ha_berkeley::external_lock(THD *thd, int lock_type)
{
int error=0;
berkeley_trx_data *trx=(berkeley_trx_data *)thd->ha_data[berkeley_hton.slot];
DBUG_ENTER("ha_berkeley::external_lock");
if (!trx)
{
thd->ha_data[berkeley_hton.slot]= trx= (berkeley_trx_data *)
my_malloc(sizeof(*trx), MYF(MY_ZEROFILL));
if (!trx)
DBUG_RETURN(1);
}
if (lock_type != F_UNLCK)
{
if (!thd->transaction.bdb_lock_count++)
if (!trx->bdb_lock_count++)
{
DBUG_ASSERT(thd->transaction.stmt.bdb_tid == 0);
DBUG_ASSERT(trx->stmt == 0);
transaction=0; // Safety
/* First table lock, start transaction */
if ((thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN |
OPTION_TABLE_LOCK)) &&
!thd->transaction.all.bdb_tid)
OPTION_TABLE_LOCK)) && !trx->all)
{
/* We have to start a master transaction */
DBUG_PRINT("trans",("starting transaction all: options: 0x%lx",
(ulong) thd->options));
if ((error=txn_begin(db_env, 0,
(DB_TXN**) &thd->transaction.all.bdb_tid,
0)))
if ((error=txn_begin(db_env, 0, &trx->all, 0)))
{
thd->transaction.bdb_lock_count--; // We didn't get the lock /* purecov: inspected */
DBUG_RETURN(error); /* purecov: inspected */
trx->bdb_lock_count--; // We didn't get the lock
DBUG_RETURN(error);
}
trans_register_ha(thd, TRUE, &berkeley_hton);
if (thd->in_lock_tables)
DBUG_RETURN(0); // Don't create stmt trans
}
DBUG_PRINT("trans",("starting transaction stmt"));
if ((error=txn_begin(db_env,
(DB_TXN*) thd->transaction.all.bdb_tid,
(DB_TXN**) &thd->transaction.stmt.bdb_tid,
0)))
if ((error=txn_begin(db_env, trx->all, &trx->stmt, 0)))
{
/* We leave the possible master transaction open */
thd->transaction.bdb_lock_count--; // We didn't get the lock /* purecov: inspected */
DBUG_RETURN(error); /* purecov: inspected */
trx->bdb_lock_count--; // We didn't get the lock
DBUG_RETURN(error);
}
trans_register_ha(thd, FALSE, &berkeley_hton);
}
transaction= (DB_TXN*) thd->transaction.stmt.bdb_tid;
transaction= trx->stmt;
}
else
{
lock.type=TL_UNLOCK; // Unlocked
thread_safe_add(share->rows, changed_rows, &share->mutex);
changed_rows=0;
if (!--thd->transaction.bdb_lock_count)
if (!--trx->bdb_lock_count)
{
if (thd->transaction.stmt.bdb_tid)
if (trx->stmt)
{
/*
F_UNLOCK is done without a transaction commit / rollback.
@ -1871,9 +1909,8 @@ int ha_berkeley::external_lock(THD *thd, int lock_type)
We must in this case commit the work to keep the row locks
*/
DBUG_PRINT("trans",("commiting non-updating transaction"));
error=txn_commit((DB_TXN*) thd->transaction.stmt.bdb_tid,0);
thd->transaction.stmt.bdb_tid=0;
transaction=0;
error= txn_commit(trx->stmt,0);
trx->stmt= transaction= 0;
}
}
}
@ -1891,14 +1928,19 @@ int ha_berkeley::start_stmt(THD *thd)
{
int error=0;
DBUG_ENTER("ha_berkeley::start_stmt");
if (!thd->transaction.stmt.bdb_tid)
berkeley_trx_data *trx=(berkeley_trx_data *)thd->ha_data[berkeley_hton.slot];
/*
note that trx->stmt may have been already initialized as start_stmt()
is called for *each table* not for each storage engine,
and there could be many bdb tables referenced in the query
*/
if (!trx->stmt)
{
DBUG_PRINT("trans",("starting transaction stmt"));
error=txn_begin(db_env, (DB_TXN*) thd->transaction.all.bdb_tid,
(DB_TXN**) &thd->transaction.stmt.bdb_tid,
0);
error=txn_begin(db_env, trx->all, &trx->stmt, 0);
trans_register_ha(thd, FALSE, &berkeley_hton);
}
transaction= (DB_TXN*) thd->transaction.stmt.bdb_tid;
transaction= trx->stmt;
DBUG_RETURN(error);
}
@ -2234,6 +2276,7 @@ int ha_berkeley::analyze(THD* thd, HA_CHECK_OPT* check_opt)
uint i;
DB_BTREE_STAT *stat=0;
DB_TXN_STAT *txn_stat_ptr= 0;
berkeley_trx_data *trx=(berkeley_trx_data *)thd->ha_data[berkeley_hton.slot];
/*
Original bdb documentation says:
@ -2249,11 +2292,8 @@ int ha_berkeley::analyze(THD* thd, HA_CHECK_OPT* check_opt)
{
DB_TXN_ACTIVE *atxn_stmt= 0, *atxn_all= 0;
DB_TXN *txn_all= (DB_TXN*) thd->transaction.all.bdb_tid;
u_int32_t all_id= txn_all->id(txn_all);
DB_TXN *txn_stmt= (DB_TXN*) thd->transaction.stmt.bdb_tid;
u_int32_t stmt_id= txn_stmt->id(txn_stmt);
u_int32_t all_id= trx->all->id(trx->all);
u_int32_t stmt_id= trx->stmt->id(trx->stmt);
DB_TXN_ACTIVE *cur= txn_stat_ptr->st_txnarray;
DB_TXN_ACTIVE *end= cur + txn_stat_ptr->st_nactive;