1
0
mirror of https://github.com/MariaDB/server.git synced 2025-08-31 22:22:30 +03:00

query_id and my_xid -> ulonglong

fix for binlog+autocommit+tclog
comments, style fixes


libmysqld/libmysqld.rc:
  Change mode to -rw-rw-r--
libmysqld/resource.h:
  Change mode to -rw-rw-r--
configure.in:
  check for getpagesize
include/my_global.h:
  typo ?
include/my_pthread.h:
  bug in thread_safe_decrement_and_test()
mysql-test/r/bdb.result:
  results updated
mysql-test/r/innodb.result:
  results updated
mysql-test/r/mix_innodb_myisam_binlog.result:
  results updated
mysql-test/r/rpl_relayrotate.result:
  results updated
sql/ha_berkeley.cc:
  style fixes
sql/ha_innodb.cc:
  fixes to follow innodb coding style
sql/handler.cc:
  more comments. XA COMMIT ONE PHASE fix.
sql/handler.h:
  my_xid -> ulonglong. comments
sql/item_func.cc:
  DO RELEASE_LOCK("...") is no cache_stmt
sql/log.cc:
  comments, better error messages
sql/log_event.cc:
  even in autocommit mode we may need to cache_stmt
  xid is ulonglong
sql/log_event.h:
  more comments.
sql/mysql_priv.h:
  query_id is ulonglong
sql/mysqld.cc:
  default value for --log-tc changed
sql/share/errmsg.txt:
  better error messages
sql/sql_class.h:
  cleanup, comments
sql/sql_delete.cc:
  deleting from temporary tables is not always transactional
sql/sql_insert.cc:
  insert into temporary table is not always transactional
sql/sql_load.cc:
  load data into temp table is not always transactional
sql/sql_parse.cc:
  comments. bad merge fixed. xa_state_names[]
sql/sql_table.cc:
  create/drop temp table is not always transactional
sql/sql_update.cc:
  update temp table is not always transactional
This commit is contained in:
unknown
2005-01-27 22:38:56 +01:00
parent a7401bf7cc
commit 042448aa33
27 changed files with 377 additions and 301 deletions

View File

@@ -75,6 +75,7 @@ handlerton *binlog_init()
static int binlog_close_connection(THD *thd)
{
IO_CACHE *trans_log= (IO_CACHE*)thd->ha_data[binlog_hton.slot];
DBUG_ASSERT(mysql_bin_log.is_open() && !my_b_tell(trans_log));
close_cached_file(trans_log);
my_free((gptr)trans_log, MYF(0));
return 0;
@@ -109,7 +110,7 @@ static int binlog_commit(THD *thd, bool all)
IO_CACHE *trans_log= (IO_CACHE*)thd->ha_data[binlog_hton.slot];
DBUG_ENTER("binlog_commit");
DBUG_ASSERT(mysql_bin_log.is_open() &&
(all || !(thd->options & OPTION_NOT_AUTOCOMMIT)));
(all || !(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))));
if (!my_b_tell(trans_log))
{
@@ -1247,7 +1248,7 @@ void MYSQL_LOG::new_file(bool need_lock)
{
pthread_mutex_lock(&LOCK_log);
pthread_mutex_lock(&LOCK_index);
}
}
safe_mutex_assert_owner(&LOCK_log);
safe_mutex_assert_owner(&LOCK_index);
@@ -1567,10 +1568,14 @@ bool MYSQL_LOG::write(Log_event* event_info)
goto err;
}
trans_log->end_of_file= max_binlog_cache_size;
trans_register_ha(thd, TRUE, &binlog_hton);
trans_register_ha(thd,
thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN),
&binlog_hton);
}
else if (!my_b_tell(trans_log))
trans_register_ha(thd, TRUE, &binlog_hton);
trans_register_ha(thd,
thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN),
&binlog_hton);
file= trans_log;
}
else if (trans_log && my_b_tell(trans_log))
@@ -2057,8 +2062,7 @@ void MYSQL_LOG::close(uint exiting)
{
my_off_t offset= BIN_LOG_HEADER_SIZE + FLAGS_OFFSET;
char flags=LOG_EVENT_BINLOG_CLOSED_F;
my_pwrite(log_file.file, &flags, 1,
BIN_LOG_HEADER_SIZE + FLAGS_OFFSET, MYF(0));
my_pwrite(log_file.file, &flags, 1, offset, MYF(0));
}
if (my_close(log_file.file,MYF(0)) < 0 && ! write_error)
@@ -2382,26 +2386,33 @@ void sql_print_information(const char *format, ...)
(usable size of the first page is smaller because of log header)
there's PAGE control structure for each page
each page (or rather PAGE control structure) can be in one of three
states - active, in_sync, pool.
there could be only one page in active or in_sync states,
states - active, syncing, pool.
there could be only one page in active or syncing states,
but many in pool - pool is fifo queue.
usual lifecycle of a page is pool->active->in_sync->pool
usual lifecycle of a page is pool->active->syncing->pool
"active" page - is a page where new xid's are logged.
the page stays active as long as in_sync slot is taken.
"in_sync" page is being synced to disk. no new xid can be added to it.
the page stays active as long as syncing slot is taken.
"syncing" page is being synced to disk. no new xid can be added to it.
when the sync is done the page is moved to a pool and an active page
becomes "in_sync".
becomes "syncing".
when a xid is added to an active page, the thread of this xid waits for
a page's condition until the page is synced. when in_sync slot becomes vacant
one of these waiters is awaken to take care of syncing. it syncs the page
and signals all waiters that the page is synced.
the result of such an architecture is a natural "commit grouping" -
If commits are coming faster than the system can sync, they do not
stall. Instead, all commit that came since the last sync are
logged to the same page, and they all are synced with the next -
one - sync. Thus, thought individual commits are delayed, throughput
is not decreasing.
when a xid is added to an active page, the thread of this xid waits
for a page's condition until the page is synced. when syncing slot
becomes vacant one of these waiters is awaken to take care of syncing.
it syncs the page and signals all waiters that the page is synced.
PAGE::waiters is used to count these waiters, and a page may never
become active again until waiters==0 (that is all waiters from the
previous sync have noticed the sync was completed)
note, that the page becomes "dirty" and has to be synced only when
a new xid is added into it. Removing a xid from a page does not make it
note, that the page becomes "dirty" and has to be synced only when a
new xid is added into it. Removing a xid from a page does not make it
dirty - we don't sync removals to disk.
*/
#define TC_LOG_HEADER_SIZE (sizeof(tc_log_magic)+1)
@@ -2433,35 +2444,38 @@ int TC_LOG_MMAP::open(const char *opt_name)
#endif
fn_format(logname,opt_name,mysql_data_home,"",MY_UNPACK_FILENAME);
fd=my_open(logname, O_RDWR, MYF(0)); // TODO use O_CREAT and check len==0 ?
fd= my_open(logname, O_RDWR, MYF(0));
if (fd == -1)
{
if (my_errno != ENOENT)
goto err;
if (using_heuristic_recover())
return 1;
fd=my_open(logname, O_RDWR|O_CREAT, MYF(MY_WME));
fd= my_create(logname, O_RDWR, 0, MYF(MY_WME));
if (fd == -1)
goto err;
inited=1;
file_length=opt_tc_log_size;
file_length= opt_tc_log_size;
if (my_chsize(fd, file_length, 0, MYF(MY_WME)))
goto err;
}
else
{
inited=1;
crashed=TRUE;
inited= 1;
crashed= TRUE;
sql_print_information("Recovering after a crash");
if (tc_heuristic_recover)
{
sql_print_error("Cannot perform automatic crash recovery when "
"--tc-heuristic-recover is used");
goto err;
}
file_length = my_seek(fd, 0L, MY_SEEK_END, MYF(MY_WME+MY_FAE));
file_length= my_seek(fd, 0L, MY_SEEK_END, MYF(MY_WME+MY_FAE));
if (file_length == MY_FILEPOS_ERROR || file_length % tc_log_page_size)
goto err;
}
data=(uchar *)my_mmap(0, file_length, PROT_READ|PROT_WRITE,
data= (uchar *)my_mmap(0, file_length, PROT_READ|PROT_WRITE,
MAP_NOSYNC|MAP_SHARED, fd, 0);
if (data == MAP_FAILED)
{
@@ -2486,18 +2500,15 @@ int TC_LOG_MMAP::open(const char *opt_name)
pg->end=(my_xid *)(pg->start + tc_log_page_size);
pg->size=pg->free=tc_log_page_size/sizeof(my_xid);
}
pages[0].start=(my_xid *)(data+TC_LOG_HEADER_SIZE);
pages[0].size=pages[0].free=
(tc_log_page_size-TC_LOG_HEADER_SIZE)/sizeof(my_xid);
pages[0].start=pages[0].end-pages[0].size;
pages[npages-1].next=0;
inited=4;
if (crashed)
{
sql_print_error("Recovering after a crash");
if (recover())
if (crashed && recover())
goto err;
}
memcpy(data, tc_log_magic, sizeof(tc_log_magic));
data[sizeof(tc_log_magic)]= total_ha_2pc;
my_msync(fd, data, tc_log_page_size, MS_SYNC);
@@ -2511,7 +2522,7 @@ int TC_LOG_MMAP::open(const char *opt_name)
inited=6;
in_sync= 0;
syncing= 0;
active=pages;
pool=pages+1;
pool_last=pages+npages-1;
@@ -2524,6 +2535,8 @@ err:
}
/*
there is no active page, let's got one from the pool
two strategies here:
1. take the first from the pool
2. if there're waiters - take the one with the most free space
@@ -2536,13 +2549,16 @@ void TC_LOG_MMAP::get_active_from_pool()
PAGE **p, **best_p=0;
int best_free;
if (syncing)
pthread_mutex_lock(&LOCK_pool);
do
{
best_p= p= &pool;
if ((*p)->waiters == 0)
break;
if ((*p)->waiters == 0) // can the first page be used ?
break; // yes - take it.
best_free=0;
best_free=0; // no - trying second strategy
for (p=&(*p)->next; *p; p=&(*p)->next)
{
if ((*p)->waiters == 0 && (*p)->free > best_free)
@@ -2555,16 +2571,19 @@ void TC_LOG_MMAP::get_active_from_pool()
while ((*best_p == 0 || best_free == 0) && overflow());
active=*best_p;
if (active->free == active->size)
if (active->free == active->size) // we've chosen an empty page
{
tc_log_cur_pages_used++;
set_if_bigger(tc_log_max_pages_used, tc_log_cur_pages_used);
}
if ((*best_p)->next)
if ((*best_p)->next) // unlink the page from the pool
*best_p=(*best_p)->next;
else
pool_last=*best_p;
if (syncing)
pthread_mutex_unlock(&LOCK_pool);
}
int TC_LOG_MMAP::overflow()
@@ -2576,14 +2595,22 @@ int TC_LOG_MMAP::overflow()
*/
tc_log_page_waits++;
pthread_cond_wait(&COND_pool, &LOCK_pool);
return 1; // returns always 1
return 1; // always return 1
}
/*
all access to active page is serialized but it's not a problem, as we're
assuming that fsync() will be a bottleneck anyway.
That is, parallelizing writes to log pages we'll decrease number of threads
waiting for a page, but then all these threads will be waiting for fsync()
all access to active page is serialized but it's not a problem, as
we're assuming that fsync() will be a main bottleneck.
That is, parallelizing writes to log pages we'll decrease number of
threads waiting for a page, but then all these threads will be waiting
for a fsync() anyway
RETURN
0 - error
otherwise - "cookie", a number that will be passed as an argument
to unlog() call. tc_log can define it any way it wants,
and use for whatever purposes. TC_LOG_MMAP sets it
to the position in memory where xid was logged to.
*/
int TC_LOG_MMAP::log(THD *thd, my_xid xid)
@@ -2594,42 +2621,50 @@ int TC_LOG_MMAP::log(THD *thd, my_xid xid)
pthread_mutex_lock(&LOCK_active);
/*
if active page is full - just wait...
frankly speaking, active->free here accessed outside of mutex
protection, but it's safe, because it only means we may miss an
unlog() for the active page, and we're not waiting for it here -
unlog() does not signal COND_active.
*/
while (unlikely(active && active->free == 0))
pthread_cond_wait(&COND_active, &LOCK_active);
/* no active page ? take one from the pool */
if (active == 0)
{
lock_queue(&LOCK_pool);
get_active_from_pool();
unlock_queue(&LOCK_pool);
}
p=active;
pthread_mutex_lock(&p->lock);
/* searching for an empty slot */
while (*p->ptr)
{
p->ptr++;
DBUG_ASSERT(p->ptr < p->end); // because p->free > 0
}
cookie= (ulong)((uchar *)p->ptr - data); // Can never be zero
/* found! store xid there and mark the page dirty */
cookie= (ulong)((uchar *)p->ptr - data); // can never be zero
*p->ptr++= xid;
p->free--;
p->state= DIRTY;
/* to sync or not to sync - this is the question */
pthread_mutex_unlock(&LOCK_active);
pthread_mutex_lock(&LOCK_sync);
pthread_mutex_unlock(&p->lock);
if (in_sync)
if (syncing)
{ // somebody's syncing. let's wait
pthread_mutex_unlock(&LOCK_active);
p->waiters++;
do
{
/*
note - it must be while(), not do ... while() here
as p->state may be not DIRTY when we come here
*/
while (p->state == DIRTY && syncing)
pthread_cond_wait(&p->cond, &LOCK_sync);
} while (p->state == DIRTY && in_sync);
p->waiters--;
err= p->state == ERROR;
if (p->state != DIRTY) // page was synced
@@ -2640,9 +2675,10 @@ int TC_LOG_MMAP::log(THD *thd, my_xid xid)
goto done; // we're done
}
} // page was not synced! do it now
in_sync=p; // place is vacant - take it
DBUG_ASSERT(active == p);
active=0;
DBUG_ASSERT(active == p && syncing == 0);
pthread_mutex_lock(&LOCK_active);
syncing=p; // place is vacant - take it
active=0; // page is not active anymore
pthread_cond_broadcast(&COND_active); // in case somebody's waiting
pthread_mutex_unlock(&LOCK_active);
pthread_mutex_unlock(&LOCK_sync);
@@ -2656,30 +2692,36 @@ int TC_LOG_MMAP::sync()
{
int err;
DBUG_ASSERT(in_sync != active);
DBUG_ASSERT(syncing != active);
/*
sit down and relax - this can take a while...
note - no locks are held at this point
*/
err= my_msync(fd, in_sync->start, 1, MS_SYNC);
err= my_msync(fd, syncing->start, 1, MS_SYNC);
/* page is synced. let's move it to the pool */
pthread_mutex_lock(&LOCK_pool);
pool_last->next=in_sync;
pool_last=in_sync;
in_sync->next=0;
in_sync->state= err ? ERROR : POOL;
pthread_cond_broadcast(&in_sync->cond); // signal "sync done"
pool_last->next=syncing;
pool_last=syncing;
syncing->next=0;
syncing->state= err ? ERROR : POOL;
pthread_cond_broadcast(&syncing->cond); // signal "sync done"
pthread_cond_signal(&COND_pool); // in case somebody's waiting
pthread_mutex_unlock(&LOCK_pool);
/* marking 'syncing' slot free */
pthread_mutex_lock(&LOCK_sync);
in_sync=0;
syncing=0;
pthread_cond_signal(&active->cond); // wake up a new syncer
pthread_mutex_unlock(&LOCK_sync);
return err;
}
/*
erase xid from the page, update page free space counters/pointers.
cookie points directly to the memory where xid was logged
*/
void TC_LOG_MMAP::unlog(ulong cookie, my_xid xid)
{
PAGE *p=pages+(cookie/tc_log_page_size);
@@ -2693,10 +2735,10 @@ void TC_LOG_MMAP::unlog(ulong cookie, my_xid xid)
p->free++;
DBUG_ASSERT(p->free <= p->size);
set_if_smaller(p->ptr, x);
if (p->free == p->size)
if (p->free == p->size) // the page is completely empty
statistic_decrement(tc_log_cur_pages_used, &LOCK_status);
if (p->waiters == 0) // the page is in pool and ready to rock
pthread_cond_signal(&COND_pool); // ping ... in case somebody's waiting
pthread_cond_signal(&COND_pool); // ping ... for overflow()
pthread_mutex_unlock(&p->lock);
}
@@ -2747,8 +2789,10 @@ int TC_LOG_MMAP::recover()
*/
if (data[sizeof(tc_log_magic)] != total_ha_2pc)
{
sql_print_error("For recovery to work all storage engines and binary log "
"must have exactly the same settings as before the crash!");
sql_print_error("Recovery failed! You must have enabled "
"exactly %d storage engines that support "
"two-phase commit protocol",
data[sizeof(tc_log_magic)]);
goto err1;
}
@@ -2793,6 +2837,7 @@ int TC_LOG::using_heuristic_recover()
}
/****** transaction coordinator log for 2pc - binlog() based solution ******/
#define TC_LOG_BINLOG MYSQL_LOG
/*
TODO keep in-memory list of prepared transactions
@@ -2801,9 +2846,10 @@ int TC_LOG::using_heuristic_recover()
but let's check the behaviour of tc_log_page_waits first!
*/
int MYSQL_LOG::open(const char *opt_name)
int TC_LOG_BINLOG::open(const char *opt_name)
{
LOG_INFO log_info, new_log_info;
int error;
DBUG_ASSERT(total_ha_2pc > 1);
@@ -2821,8 +2867,11 @@ int MYSQL_LOG::open(const char *opt_name)
let's keep it happy.
*/
if (find_log_pos(&new_log_info, NullS, 1))
if ((error= find_log_pos(&new_log_info, NullS, 1)))
{
sql_print_error("find_log_pos() failed (error: %d)", error);
goto err; // er ? where's the current entry ?
}
if (strcmp(log_file_name, new_log_info.log_file_name))
{
@@ -2830,9 +2879,8 @@ int MYSQL_LOG::open(const char *opt_name)
char last_event_type=UNKNOWN_EVENT;
IO_CACHE log;
File file;
int error;
Log_event *ev;
Format_description_log_event fdle(4);
Log_event *ev=0;
Format_description_log_event fdle(BINLOG_VERSION);
if (! fdle.is_valid())
goto err;
@@ -2842,8 +2890,11 @@ int MYSQL_LOG::open(const char *opt_name)
log_info.index_file_offset=new_log_info.index_file_offset;
log_info.index_file_start_offset=new_log_info.index_file_offset;
strcpy(log_info.log_file_name, new_log_info.log_file_name);
if (find_next_log(&new_log_info, 1))
goto err; // er ? where's the current entry ?
if ((error= find_next_log(&new_log_info, 1)))
{
sql_print_error("find_log_pos() failed (error: %d)", error);
goto err; // er ? where's the current entry ?
}
} while (strcmp(log_file_name, new_log_info.log_file_name));
if ((file= open_binlog(&log, log_info.log_file_name, &errmsg)) < 0)
@@ -2853,11 +2904,14 @@ int MYSQL_LOG::open(const char *opt_name)
}
if (((ev= Log_event::read_log_event(&log, 0, &fdle))) &&
(ev->get_type_code() == FORMAT_DESCRIPTION_EVENT) &&
(ev->flags & LOG_EVENT_BINLOG_CLOSED_F))
error=0;
else
error= recover(&log, (Format_description_log_event *)ev);
(ev->get_type_code() == FORMAT_DESCRIPTION_EVENT))
{
if (ev->flags & LOG_EVENT_BINLOG_CLOSED_F)
error=0;
else
error= recover(&log, (Format_description_log_event *)ev);
}
// else nothing to do (probably MySQL 4.x binlog)
delete ev;
end_io_cache(&log);
@@ -2873,15 +2927,22 @@ err:
return 1;
}
void MYSQL_LOG::close()
/* this is called on shutdown, after ha_panic */
void TC_LOG_BINLOG::close()
{
DBUG_ASSERT(prepared_xids==0);
pthread_mutex_destroy(&LOCK_prep_xids);
pthread_cond_destroy (&COND_prep_xids);
}
/* TODO group commit */
int MYSQL_LOG::log(THD *thd, my_xid xid)
/*
TODO group commit
RETURN
0 - error
1 - success
*/
int TC_LOG_BINLOG::log(THD *thd, my_xid xid)
{
Xid_log_event xle(thd, xid);
if (xle.write((IO_CACHE*)thd->ha_data[binlog_hton.slot]))
@@ -2890,13 +2951,13 @@ int MYSQL_LOG::log(THD *thd, my_xid xid)
return !binlog_commit(thd,1); // invert return value
}
void MYSQL_LOG::unlog(ulong cookie, my_xid xid)
void TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid)
{
if (thread_safe_dec_and_test(prepared_xids, &LOCK_prep_xids))
pthread_cond_signal(&COND_prep_xids);
}
int MYSQL_LOG::recover(IO_CACHE *log, Format_description_log_event *fdle)
int TC_LOG_BINLOG::recover(IO_CACHE *log, Format_description_log_event *fdle)
{
Log_event *ev;
HASH xids;