mirror of
https://github.com/MariaDB/server.git
synced 2025-08-01 03:47:19 +03:00
Fix for Bug #18184 SELECT ... FOR UPDATE does not work..: implemented ha_ndblcuster::unlock_row() and explicitly lock all rows that are not being unlocked
This commit is contained in:
@ -63,3 +63,62 @@ pk u o
|
|||||||
5 5 5
|
5 5 5
|
||||||
insert into t1 values (1,1,1);
|
insert into t1 values (1,1,1);
|
||||||
drop table t1;
|
drop table t1;
|
||||||
|
create table t1 (x integer not null primary key, y varchar(32)) engine = ndb;
|
||||||
|
insert into t1 values (1,'one'), (2,'two'),(3,"three");
|
||||||
|
begin;
|
||||||
|
select * from t1 where x = 1 for update;
|
||||||
|
x y
|
||||||
|
1 one
|
||||||
|
begin;
|
||||||
|
select * from t1 where x = 2 for update;
|
||||||
|
x y
|
||||||
|
2 two
|
||||||
|
select * from t1 where x = 1 for update;
|
||||||
|
ERROR HY000: Lock wait timeout exceeded; try restarting transaction
|
||||||
|
rollback;
|
||||||
|
commit;
|
||||||
|
begin;
|
||||||
|
select * from t1 where y = 'one' or y = 'three' for update;
|
||||||
|
x y
|
||||||
|
3 three
|
||||||
|
1 one
|
||||||
|
begin;
|
||||||
|
select * from t1 where x = 2 for update;
|
||||||
|
x y
|
||||||
|
2 two
|
||||||
|
select * from t1 where x = 1 for update;
|
||||||
|
ERROR HY000: Lock wait timeout exceeded; try restarting transaction
|
||||||
|
rollback;
|
||||||
|
commit;
|
||||||
|
begin;
|
||||||
|
select * from t1 where x = 1 lock in share mode;
|
||||||
|
x y
|
||||||
|
1 one
|
||||||
|
begin;
|
||||||
|
select * from t1 where x = 1 lock in share mode;
|
||||||
|
x y
|
||||||
|
1 one
|
||||||
|
select * from t1 where x = 2 for update;
|
||||||
|
x y
|
||||||
|
2 two
|
||||||
|
select * from t1 where x = 1 for update;
|
||||||
|
ERROR HY000: Lock wait timeout exceeded; try restarting transaction
|
||||||
|
rollback;
|
||||||
|
commit;
|
||||||
|
begin;
|
||||||
|
select * from t1 where y = 'one' or y = 'three' lock in share mode;
|
||||||
|
x y
|
||||||
|
3 three
|
||||||
|
1 one
|
||||||
|
begin;
|
||||||
|
select * from t1 where y = 'one' lock in share mode;
|
||||||
|
x y
|
||||||
|
1 one
|
||||||
|
select * from t1 where x = 2 for update;
|
||||||
|
x y
|
||||||
|
2 two
|
||||||
|
select * from t1 where x = 1 for update;
|
||||||
|
ERROR HY000: Lock wait timeout exceeded; try restarting transaction
|
||||||
|
rollback;
|
||||||
|
commit;
|
||||||
|
drop table t1;
|
||||||
|
@ -69,4 +69,80 @@ insert into t1 values (1,1,1);
|
|||||||
|
|
||||||
drop table t1;
|
drop table t1;
|
||||||
|
|
||||||
|
# Lock for update
|
||||||
|
|
||||||
|
create table t1 (x integer not null primary key, y varchar(32)) engine = ndb;
|
||||||
|
|
||||||
|
insert into t1 values (1,'one'), (2,'two'),(3,"three");
|
||||||
|
|
||||||
|
# PK access
|
||||||
|
connection con1;
|
||||||
|
begin;
|
||||||
|
select * from t1 where x = 1 for update;
|
||||||
|
|
||||||
|
connection con2;
|
||||||
|
begin;
|
||||||
|
select * from t1 where x = 2 for update;
|
||||||
|
--error 1205
|
||||||
|
select * from t1 where x = 1 for update;
|
||||||
|
rollback;
|
||||||
|
|
||||||
|
connection con1;
|
||||||
|
commit;
|
||||||
|
|
||||||
|
# scan
|
||||||
|
connection con1;
|
||||||
|
begin;
|
||||||
|
select * from t1 where y = 'one' or y = 'three' for update;
|
||||||
|
|
||||||
|
connection con2;
|
||||||
|
begin;
|
||||||
|
# Have to check with pk access here since scans take locks on
|
||||||
|
# all rows and then release them in chunks
|
||||||
|
select * from t1 where x = 2 for update;
|
||||||
|
--error 1205
|
||||||
|
select * from t1 where x = 1 for update;
|
||||||
|
rollback;
|
||||||
|
|
||||||
|
connection con1;
|
||||||
|
commit;
|
||||||
|
|
||||||
|
# share locking
|
||||||
|
|
||||||
|
# PK access
|
||||||
|
connection con1;
|
||||||
|
begin;
|
||||||
|
select * from t1 where x = 1 lock in share mode;
|
||||||
|
|
||||||
|
connection con2;
|
||||||
|
begin;
|
||||||
|
select * from t1 where x = 1 lock in share mode;
|
||||||
|
select * from t1 where x = 2 for update;
|
||||||
|
--error 1205
|
||||||
|
select * from t1 where x = 1 for update;
|
||||||
|
rollback;
|
||||||
|
|
||||||
|
connection con1;
|
||||||
|
commit;
|
||||||
|
|
||||||
|
# scan
|
||||||
|
connection con1;
|
||||||
|
begin;
|
||||||
|
select * from t1 where y = 'one' or y = 'three' lock in share mode;
|
||||||
|
|
||||||
|
connection con2;
|
||||||
|
begin;
|
||||||
|
select * from t1 where y = 'one' lock in share mode;
|
||||||
|
# Have to check with pk access here since scans take locks on
|
||||||
|
# all rows and then release them in chunks
|
||||||
|
select * from t1 where x = 2 for update;
|
||||||
|
--error 1205
|
||||||
|
select * from t1 where x = 1 for update;
|
||||||
|
rollback;
|
||||||
|
|
||||||
|
connection con1;
|
||||||
|
commit;
|
||||||
|
|
||||||
|
drop table t1;
|
||||||
|
|
||||||
# End of 4.1 tests
|
# End of 4.1 tests
|
||||||
|
@ -45,14 +45,15 @@ public:
|
|||||||
NdbResultSet* readTuples(LockMode = LM_Read,
|
NdbResultSet* readTuples(LockMode = LM_Read,
|
||||||
Uint32 batch = 0,
|
Uint32 batch = 0,
|
||||||
Uint32 parallel = 0,
|
Uint32 parallel = 0,
|
||||||
bool order_by = false);
|
bool order_by = false,
|
||||||
|
bool keyinfo = false);
|
||||||
|
|
||||||
inline NdbResultSet* readTuples(int parallell){
|
inline NdbResultSet* readTuples(int parallell){
|
||||||
return readTuples(LM_Read, 0, parallell, false);
|
return readTuples(LM_Read, 0, parallell);
|
||||||
}
|
}
|
||||||
|
|
||||||
inline NdbResultSet* readTuplesExclusive(int parallell = 0){
|
inline NdbResultSet* readTuplesExclusive(int parallell = 0){
|
||||||
return readTuples(LM_Exclusive, 0, parallell, false);
|
return readTuples(LM_Exclusive, 0, parallell);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -101,6 +101,27 @@ public:
|
|||||||
*/
|
*/
|
||||||
int restart(bool forceSend = false);
|
int restart(bool forceSend = false);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Lock current row by transfering scan operation to a locking transaction.
|
||||||
|
* Use this function
|
||||||
|
* when a scan has found a record that you want to lock.
|
||||||
|
* 1. Start a new transaction.
|
||||||
|
* 2. Call the function takeOverForUpdate using your new transaction
|
||||||
|
* as parameter, all the properties of the found record will be copied
|
||||||
|
* to the new transaction.
|
||||||
|
* 3. When you execute the new transaction, the lock held by the scan will
|
||||||
|
* be transferred to the new transaction(it's taken over).
|
||||||
|
*
|
||||||
|
* @note You must have started the scan with openScanExclusive
|
||||||
|
* or explictly have requested keyinfo to be able to lock
|
||||||
|
* the found tuple.
|
||||||
|
*
|
||||||
|
* @param lockingTrans the locking transaction connection.
|
||||||
|
* @return an NdbOperation or NULL.
|
||||||
|
*/
|
||||||
|
NdbOperation* lockTuple();
|
||||||
|
NdbOperation* lockTuple(NdbConnection* lockingTrans);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Transfer scan operation to an updating transaction. Use this function
|
* Transfer scan operation to an updating transaction. Use this function
|
||||||
* when a scan has found a record that you want to update.
|
* when a scan has found a record that you want to update.
|
||||||
|
@ -64,14 +64,16 @@ public:
|
|||||||
* Tuples are not stored in NdbResultSet until execute(NoCommit)
|
* Tuples are not stored in NdbResultSet until execute(NoCommit)
|
||||||
* has been executed and nextResult has been called.
|
* has been executed and nextResult has been called.
|
||||||
*
|
*
|
||||||
|
* @param keyinfo Return primary key, needed to be able to call lockTuple
|
||||||
* @param parallel Scan parallelism
|
* @param parallel Scan parallelism
|
||||||
* @param batch No of rows to fetch from each fragment at a time
|
* @param batch No of rows to fetch from each fragment at a time
|
||||||
* @param LockMode Scan lock handling
|
* @param LockMode Scan lock handling
|
||||||
* @returns NdbResultSet.
|
* @returns NdbResultSet.
|
||||||
* @note specifying 0 for batch and parallall means max performance
|
* @note specifying 0 for batch and parallell means max performance
|
||||||
*/
|
*/
|
||||||
NdbResultSet* readTuples(LockMode = LM_Read,
|
NdbResultSet* readTuples(LockMode = LM_Read,
|
||||||
Uint32 batch = 0, Uint32 parallel = 0);
|
Uint32 batch = 0, Uint32 parallel = 0,
|
||||||
|
bool keyinfo = false);
|
||||||
|
|
||||||
inline NdbResultSet* readTuples(int parallell){
|
inline NdbResultSet* readTuples(int parallell){
|
||||||
return readTuples(LM_Read, 0, parallell);
|
return readTuples(LM_Read, 0, parallell);
|
||||||
|
@ -72,6 +72,17 @@ void NdbResultSet::close(bool forceSend)
|
|||||||
m_operation->closeScan(forceSend, true);
|
m_operation->closeScan(forceSend, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
NdbOperation*
|
||||||
|
NdbResultSet::lockTuple(){
|
||||||
|
return lockTuple(m_operation->m_transConnection);
|
||||||
|
}
|
||||||
|
|
||||||
|
NdbOperation*
|
||||||
|
NdbResultSet::lockTuple(NdbConnection* takeOverTrans){
|
||||||
|
return m_operation->takeOverScanOp(NdbOperation::ReadRequest,
|
||||||
|
takeOverTrans);
|
||||||
|
}
|
||||||
|
|
||||||
NdbOperation*
|
NdbOperation*
|
||||||
NdbResultSet::updateTuple(){
|
NdbResultSet::updateTuple(){
|
||||||
return updateTuple(m_operation->m_transConnection);
|
return updateTuple(m_operation->m_transConnection);
|
||||||
|
@ -126,7 +126,8 @@ NdbScanOperation::init(const NdbTableImpl* tab, NdbConnection* myConnection)
|
|||||||
|
|
||||||
NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
|
NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
|
||||||
Uint32 batch,
|
Uint32 batch,
|
||||||
Uint32 parallel)
|
Uint32 parallel,
|
||||||
|
bool keyinfo)
|
||||||
{
|
{
|
||||||
m_ordered = 0;
|
m_ordered = 0;
|
||||||
|
|
||||||
@ -170,7 +171,7 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
m_keyInfo = lockExcl ? 1 : 0;
|
m_keyInfo = (keyinfo || lockExcl) ? 1 : 0;
|
||||||
|
|
||||||
bool range = false;
|
bool range = false;
|
||||||
if (m_accessTable->m_indexType == NdbDictionary::Index::OrderedIndex ||
|
if (m_accessTable->m_indexType == NdbDictionary::Index::OrderedIndex ||
|
||||||
@ -956,18 +957,28 @@ NdbScanOperation::takeOverScanOp(OperationType opType, NdbConnection* pTrans){
|
|||||||
if (newOp == NULL){
|
if (newOp == NULL){
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
if (!m_keyInfo)
|
||||||
|
{
|
||||||
|
// Cannot take over lock if no keyinfo was requested
|
||||||
|
setErrorCodeAbort(4604);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
pTrans->theSimpleState = 0;
|
pTrans->theSimpleState = 0;
|
||||||
|
|
||||||
const Uint32 len = (tRecAttr->attrSize() * tRecAttr->arraySize() + 3)/4-1;
|
const Uint32 len = (tRecAttr->attrSize() * tRecAttr->arraySize() + 3)/4-1;
|
||||||
|
|
||||||
newOp->theTupKeyLen = len;
|
newOp->theTupKeyLen = len;
|
||||||
newOp->theOperationType = opType;
|
newOp->theOperationType = opType;
|
||||||
if (opType == DeleteRequest) {
|
switch (opType) {
|
||||||
newOp->theStatus = GetValue;
|
case (ReadRequest):
|
||||||
} else {
|
newOp->theLockMode = theLockMode;
|
||||||
newOp->theStatus = SetValue;
|
// Fall through
|
||||||
|
case (DeleteRequest):
|
||||||
|
newOp->theStatus = GetValue;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
newOp->theStatus = SetValue;
|
||||||
}
|
}
|
||||||
|
|
||||||
const Uint32 * src = (Uint32*)tRecAttr->aRef();
|
const Uint32 * src = (Uint32*)tRecAttr->aRef();
|
||||||
const Uint32 tScanInfo = src[len] & 0x3FFFF;
|
const Uint32 tScanInfo = src[len] & 0x3FFFF;
|
||||||
const Uint32 tTakeOverNode = src[len] >> 20;
|
const Uint32 tTakeOverNode = src[len] >> 20;
|
||||||
@ -1241,8 +1252,9 @@ NdbResultSet*
|
|||||||
NdbIndexScanOperation::readTuples(LockMode lm,
|
NdbIndexScanOperation::readTuples(LockMode lm,
|
||||||
Uint32 batch,
|
Uint32 batch,
|
||||||
Uint32 parallel,
|
Uint32 parallel,
|
||||||
bool order_by){
|
bool order_by,
|
||||||
NdbResultSet * rs = NdbScanOperation::readTuples(lm, batch, 0);
|
bool keyinfo){
|
||||||
|
NdbResultSet * rs = NdbScanOperation::readTuples(lm, batch, 0, keyinfo);
|
||||||
if(rs && order_by){
|
if(rs && order_by){
|
||||||
m_ordered = 1;
|
m_ordered = 1;
|
||||||
Uint32 cnt = m_accessTable->getNoOfColumns() - 1;
|
Uint32 cnt = m_accessTable->getNoOfColumns() - 1;
|
||||||
|
@ -281,7 +281,7 @@ ErrorBundle ErrorCodes[] = {
|
|||||||
{ 4601, AE, "Transaction is not started"},
|
{ 4601, AE, "Transaction is not started"},
|
||||||
{ 4602, AE, "You must call getNdbOperation before executeScan" },
|
{ 4602, AE, "You must call getNdbOperation before executeScan" },
|
||||||
{ 4603, AE, "There can only be ONE operation in a scan transaction" },
|
{ 4603, AE, "There can only be ONE operation in a scan transaction" },
|
||||||
{ 4604, AE, "takeOverScanOp, opType must be UpdateRequest or DeleteRequest" },
|
{ 4604, AE, "takeOverScanOp, to take over a scanned row one must explicitly request keyinfo in readTuples call" },
|
||||||
{ 4605, AE, "You may only call openScanRead or openScanExclusive once for each operation"},
|
{ 4605, AE, "You may only call openScanRead or openScanExclusive once for each operation"},
|
||||||
{ 4607, AE, "There may only be one operation in a scan transaction"},
|
{ 4607, AE, "There may only be one operation in a scan transaction"},
|
||||||
{ 4608, AE, "You can not takeOverScan unless you have used openScanExclusive"},
|
{ 4608, AE, "You can not takeOverScan unless you have used openScanExclusive"},
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
/* Copyright (C) 2000-2003 MySQL AB
|
/* Copyright (C) 2000-2003 MySQL AB
|
||||||
|
|
||||||
This program is free software; you can redistribute it and/or modify
|
This program is free software; you can redistribute it and/or modify
|
||||||
it under the terms of the GNU General Public License as published by
|
it under the terms of the GNU General Public License as published by
|
||||||
@ -1043,12 +1043,23 @@ void ha_ndbcluster::release_metadata()
|
|||||||
|
|
||||||
int ha_ndbcluster::get_ndb_lock_type(enum thr_lock_type type)
|
int ha_ndbcluster::get_ndb_lock_type(enum thr_lock_type type)
|
||||||
{
|
{
|
||||||
|
DBUG_ENTER("ha_ndbcluster::get_ndb_lock_type");
|
||||||
if (type >= TL_WRITE_ALLOW_WRITE)
|
if (type >= TL_WRITE_ALLOW_WRITE)
|
||||||
return NdbOperation::LM_Exclusive;
|
{
|
||||||
else if (uses_blob_value(m_retrieve_all_fields))
|
DBUG_PRINT("info", ("Using exclusive lock"));
|
||||||
return NdbOperation::LM_Read;
|
DBUG_RETURN(NdbOperation::LM_Exclusive);
|
||||||
|
}
|
||||||
|
else if (type == TL_READ_WITH_SHARED_LOCKS ||
|
||||||
|
uses_blob_value(m_retrieve_all_fields))
|
||||||
|
{
|
||||||
|
DBUG_PRINT("info", ("Using read lock"));
|
||||||
|
DBUG_RETURN(NdbOperation::LM_Read);
|
||||||
|
}
|
||||||
else
|
else
|
||||||
return NdbOperation::LM_CommittedRead;
|
{
|
||||||
|
DBUG_PRINT("info", ("Using committed read"));
|
||||||
|
DBUG_RETURN(NdbOperation::LM_CommittedRead);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static const ulong index_type_flags[]=
|
static const ulong index_type_flags[]=
|
||||||
@ -1384,12 +1395,35 @@ inline int ha_ndbcluster::next_result(byte *buf)
|
|||||||
|
|
||||||
if (!cursor)
|
if (!cursor)
|
||||||
DBUG_RETURN(HA_ERR_END_OF_FILE);
|
DBUG_RETURN(HA_ERR_END_OF_FILE);
|
||||||
|
|
||||||
|
if (m_lock_tuple)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
Lock level m_lock.type either TL_WRITE_ALLOW_WRITE
|
||||||
|
(SELECT FOR UPDATE) or TL_READ_WITH_SHARED_LOCKS (SELECT
|
||||||
|
LOCK WITH SHARE MODE) and row was not explictly unlocked
|
||||||
|
with unlock_row() call
|
||||||
|
*/
|
||||||
|
NdbConnection *trans= m_active_trans;
|
||||||
|
NdbOperation *op;
|
||||||
|
// Lock row
|
||||||
|
DBUG_PRINT("info", ("Keeping lock on scanned row"));
|
||||||
|
|
||||||
|
if (!(op= m_active_cursor->lockTuple()))
|
||||||
|
{
|
||||||
|
m_lock_tuple= false;
|
||||||
|
ERR_RETURN(trans->getNdbError());
|
||||||
|
}
|
||||||
|
m_ops_pending++;
|
||||||
|
}
|
||||||
|
m_lock_tuple= false;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
If this an update or delete, call nextResult with false
|
If this an update or delete, call nextResult with false
|
||||||
to process any records already cached in NdbApi
|
to process any records already cached in NdbApi
|
||||||
*/
|
*/
|
||||||
bool contact_ndb= m_lock.type < TL_WRITE_ALLOW_WRITE;
|
bool contact_ndb= m_lock.type < TL_WRITE_ALLOW_WRITE &&
|
||||||
|
m_lock.type != TL_READ_WITH_SHARED_LOCKS;
|
||||||
do {
|
do {
|
||||||
DBUG_PRINT("info", ("Call nextResult, contact_ndb: %d", contact_ndb));
|
DBUG_PRINT("info", ("Call nextResult, contact_ndb: %d", contact_ndb));
|
||||||
/*
|
/*
|
||||||
@ -1407,9 +1441,16 @@ inline int ha_ndbcluster::next_result(byte *buf)
|
|||||||
{
|
{
|
||||||
// One more record found
|
// One more record found
|
||||||
DBUG_PRINT("info", ("One more record found"));
|
DBUG_PRINT("info", ("One more record found"));
|
||||||
|
|
||||||
unpack_record(buf);
|
unpack_record(buf);
|
||||||
table->status= 0;
|
table->status= 0;
|
||||||
|
/*
|
||||||
|
Explicitly lock tuple if "select for update" or
|
||||||
|
"select lock in share mode"
|
||||||
|
*/
|
||||||
|
m_lock_tuple= (m_lock.type == TL_WRITE_ALLOW_WRITE
|
||||||
|
||
|
||||||
|
m_lock.type == TL_READ_WITH_SHARED_LOCKS);
|
||||||
DBUG_RETURN(0);
|
DBUG_RETURN(0);
|
||||||
}
|
}
|
||||||
else if (check == 1 || check == 2)
|
else if (check == 1 || check == 2)
|
||||||
@ -1444,7 +1485,6 @@ inline int ha_ndbcluster::next_result(byte *buf)
|
|||||||
contact_ndb= (check == 2);
|
contact_ndb= (check == 2);
|
||||||
}
|
}
|
||||||
} while (check == 2);
|
} while (check == 2);
|
||||||
|
|
||||||
table->status= STATUS_NOT_FOUND;
|
table->status= STATUS_NOT_FOUND;
|
||||||
if (check == -1)
|
if (check == -1)
|
||||||
DBUG_RETURN(ndb_err(trans));
|
DBUG_RETURN(ndb_err(trans));
|
||||||
@ -1679,10 +1719,11 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
|
|||||||
restart= false;
|
restart= false;
|
||||||
NdbOperation::LockMode lm=
|
NdbOperation::LockMode lm=
|
||||||
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
|
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
|
||||||
|
bool need_pk = (lm == NdbOperation::LM_Read);
|
||||||
if (!(op= trans->getNdbIndexScanOperation((NDBINDEX *)
|
if (!(op= trans->getNdbIndexScanOperation((NDBINDEX *)
|
||||||
m_index[active_index].index,
|
m_index[active_index].index,
|
||||||
(const NDBTAB *) m_table)) ||
|
(const NDBTAB *) m_table)) ||
|
||||||
!(cursor= op->readTuples(lm, 0, parallelism, sorted)))
|
!(cursor= op->readTuples(lm, 0, parallelism, sorted, need_pk)))
|
||||||
ERR_RETURN(trans->getNdbError());
|
ERR_RETURN(trans->getNdbError());
|
||||||
m_active_cursor= cursor;
|
m_active_cursor= cursor;
|
||||||
} else {
|
} else {
|
||||||
@ -1817,8 +1858,9 @@ int ha_ndbcluster::full_table_scan(byte *buf)
|
|||||||
|
|
||||||
NdbOperation::LockMode lm=
|
NdbOperation::LockMode lm=
|
||||||
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
|
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
|
||||||
|
bool need_pk = (lm == NdbOperation::LM_Read);
|
||||||
if (!(op=trans->getNdbScanOperation((const NDBTAB *) m_table)) ||
|
if (!(op=trans->getNdbScanOperation((const NDBTAB *) m_table)) ||
|
||||||
!(cursor= op->readTuples(lm, 0, parallelism)))
|
!(cursor= op->readTuples(lm, 0, parallelism, need_pk)))
|
||||||
ERR_RETURN(trans->getNdbError());
|
ERR_RETURN(trans->getNdbError());
|
||||||
m_active_cursor= cursor;
|
m_active_cursor= cursor;
|
||||||
DBUG_RETURN(define_read_attrs(buf, op));
|
DBUG_RETURN(define_read_attrs(buf, op));
|
||||||
@ -2082,6 +2124,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
|
|||||||
DBUG_PRINT("info", ("Calling updateTuple on cursor"));
|
DBUG_PRINT("info", ("Calling updateTuple on cursor"));
|
||||||
if (!(op= cursor->updateTuple()))
|
if (!(op= cursor->updateTuple()))
|
||||||
ERR_RETURN(trans->getNdbError());
|
ERR_RETURN(trans->getNdbError());
|
||||||
|
m_lock_tuple= false;
|
||||||
m_ops_pending++;
|
m_ops_pending++;
|
||||||
if (uses_blob_value(FALSE))
|
if (uses_blob_value(FALSE))
|
||||||
m_blobs_pending= TRUE;
|
m_blobs_pending= TRUE;
|
||||||
@ -2157,6 +2200,7 @@ int ha_ndbcluster::delete_row(const byte *record)
|
|||||||
DBUG_PRINT("info", ("Calling deleteTuple on cursor"));
|
DBUG_PRINT("info", ("Calling deleteTuple on cursor"));
|
||||||
if (cursor->deleteTuple() != 0)
|
if (cursor->deleteTuple() != 0)
|
||||||
ERR_RETURN(trans->getNdbError());
|
ERR_RETURN(trans->getNdbError());
|
||||||
|
m_lock_tuple= false;
|
||||||
m_ops_pending++;
|
m_ops_pending++;
|
||||||
|
|
||||||
no_uncommitted_rows_update(-1);
|
no_uncommitted_rows_update(-1);
|
||||||
@ -2439,6 +2483,12 @@ int ha_ndbcluster::index_init(uint index)
|
|||||||
{
|
{
|
||||||
DBUG_ENTER("index_init");
|
DBUG_ENTER("index_init");
|
||||||
DBUG_PRINT("enter", ("index: %u", index));
|
DBUG_PRINT("enter", ("index: %u", index));
|
||||||
|
/*
|
||||||
|
Locks are are explicitly released in scan
|
||||||
|
unless m_lock.type == TL_READ_HIGH_PRIORITY
|
||||||
|
and no sub-sequent call to unlock_row()
|
||||||
|
*/
|
||||||
|
m_lock_tuple= false;
|
||||||
DBUG_RETURN(handler::index_init(index));
|
DBUG_RETURN(handler::index_init(index));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2683,7 +2733,7 @@ int ha_ndbcluster::close_scan()
|
|||||||
if (!cursor)
|
if (!cursor)
|
||||||
DBUG_RETURN(1);
|
DBUG_RETURN(1);
|
||||||
|
|
||||||
|
m_lock_tuple= false;
|
||||||
if (m_ops_pending)
|
if (m_ops_pending)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
@ -3383,6 +3433,15 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
|
|||||||
since ndb does not currently does not support table locking
|
since ndb does not currently does not support table locking
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
void ha_ndbcluster::unlock_row()
|
||||||
|
{
|
||||||
|
DBUG_ENTER("unlock_row");
|
||||||
|
|
||||||
|
DBUG_PRINT("info", ("Unlocking row"));
|
||||||
|
m_lock_tuple= false;
|
||||||
|
DBUG_VOID_RETURN;
|
||||||
|
}
|
||||||
|
|
||||||
int ha_ndbcluster::start_stmt(THD *thd)
|
int ha_ndbcluster::start_stmt(THD *thd)
|
||||||
{
|
{
|
||||||
int error=0;
|
int error=0;
|
||||||
|
@ -120,6 +120,7 @@ class ha_ndbcluster: public handler
|
|||||||
int extra_opt(enum ha_extra_function operation, ulong cache_size);
|
int extra_opt(enum ha_extra_function operation, ulong cache_size);
|
||||||
int reset();
|
int reset();
|
||||||
int external_lock(THD *thd, int lock_type);
|
int external_lock(THD *thd, int lock_type);
|
||||||
|
void unlock_row();
|
||||||
int start_stmt(THD *thd);
|
int start_stmt(THD *thd);
|
||||||
const char * table_type() const;
|
const char * table_type() const;
|
||||||
const char ** bas_ext() const;
|
const char ** bas_ext() const;
|
||||||
@ -223,6 +224,7 @@ class ha_ndbcluster: public handler
|
|||||||
char m_tabname[FN_HEADLEN];
|
char m_tabname[FN_HEADLEN];
|
||||||
ulong m_table_flags;
|
ulong m_table_flags;
|
||||||
THR_LOCK_DATA m_lock;
|
THR_LOCK_DATA m_lock;
|
||||||
|
bool m_lock_tuple;
|
||||||
NDB_SHARE *m_share;
|
NDB_SHARE *m_share;
|
||||||
NDB_INDEX_DATA m_index[MAX_KEY];
|
NDB_INDEX_DATA m_index[MAX_KEY];
|
||||||
// NdbRecAttr has no reference to blob
|
// NdbRecAttr has no reference to blob
|
||||||
|
Reference in New Issue
Block a user