1
0
mirror of https://github.com/MariaDB/server.git synced 2025-09-03 20:43:11 +03:00
This commit is contained in:
joreland@mysql.com
2004-11-11 10:45:45 +01:00
39 changed files with 535 additions and 1366 deletions

View File

@@ -118,7 +118,7 @@ NdbScanOperation::init(const NdbTableImpl* tab, NdbConnection* myConnection)
theStatus = GetValue;
theOperationType = OpenScanRequest;
theNdbCon->theMagicNumber = 0xFE11DF;
theNoOfTupKeyLeft = tab->m_noOfDistributionKeys;
return 0;
}
@@ -199,6 +199,7 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
return 0;
}//if
theSCAN_TABREQ->setSignal(GSN_SCAN_TABREQ);
ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
req->apiConnectPtr = theNdbCon->theTCConPtr;
req->tableId = m_accessTable->m_tableId;
@@ -219,16 +220,17 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
req->transId1 = (Uint32) transId;
req->transId2 = (Uint32) (transId >> 32);
NdbApiSignal* tSignal =
theFirstKEYINFO;
theFirstKEYINFO = (tSignal ? tSignal : tSignal = theNdb->getSignal());
NdbApiSignal* tSignal = theSCAN_TABREQ->next();
if(!tSignal)
{
theSCAN_TABREQ->next(tSignal = theNdb->getSignal());
}
theLastKEYINFO = tSignal;
tSignal->setSignal(GSN_KEYINFO);
theKEYINFOptr = ((KeyInfo*)tSignal->getDataPtrSend())->keyData;
theTotalNrOfKeyWordInSignal= 0;
getFirstATTRINFOScan();
return getResultSet();
}
@@ -348,60 +350,6 @@ NdbScanOperation::getFirstATTRINFOScan()
#define FAKE_PTR 2
#define API_PTR 3
/*
* After setBound() are done, move the accumulated ATTRINFO signals to
* a separate list. Then continue with normal scan.
*/
#if 0
int
NdbIndexScanOperation::saveBoundATTRINFO()
{
theCurrentATTRINFO->setLength(theAI_LenInCurrAI);
theBoundATTRINFO = theFirstATTRINFO;
theTotalBoundAI_Len = theTotalCurrAI_Len;
theTotalCurrAI_Len = 5;
theBoundATTRINFO->setData(theTotalBoundAI_Len, 4);
theBoundATTRINFO->setData(0, 5);
theBoundATTRINFO->setData(0, 6);
theBoundATTRINFO->setData(0, 7);
theBoundATTRINFO->setData(0, 8);
theStatus = GetValue;
int res = getFirstATTRINFOScan();
/**
* Define each key with getValue (if ordered)
* unless the one's with EqBound
*/
if(!res && m_ordered){
/**
* If setBound EQ
*/
Uint32 i = 0;
while(theTupleKeyDefined[i][0] == SETBOUND_EQ)
i++;
Uint32 cnt = m_accessTable->getNoOfColumns() - 1;
m_sort_columns = cnt - i;
for(; i<cnt; i++){
const NdbColumnImpl* key = m_accessTable->m_index->m_columns[i];
const NdbColumnImpl* col = m_currentTable->getColumn(key->m_keyInfoPos);
NdbRecAttr* tmp = NdbScanOperation::getValue_impl(col, (char*)-1);
UintPtr newVal = UintPtr(tmp);
theTupleKeyDefined[i][0] = FAKE_PTR;
theTupleKeyDefined[i][1] = (newVal & 0xFFFFFFFF);
#if (SIZEOF_CHARP == 8)
theTupleKeyDefined[i][2] = (newVal >> 32);
#endif
}
}
return res;
}
#endif
#define WAITFOR_SCAN_TIMEOUT 120000
int
@@ -683,12 +631,14 @@ void NdbScanOperation::release()
for(Uint32 i = 0; i<m_allocated_receivers; i++){
m_receivers[i]->release();
}
NdbOperation::release();
if(theSCAN_TABREQ)
{
theNdb->releaseSignal(theSCAN_TABREQ);
theSCAN_TABREQ = 0;
}
NdbOperation::release();
}
/***************************************************************************
@@ -783,10 +733,6 @@ NdbScanOperation::doSendScan(int aProcessorId)
assert(theSCAN_TABREQ != NULL);
tSignal = theSCAN_TABREQ;
if (tSignal->setSignal(GSN_SCAN_TABREQ) == -1) {
setErrorCode(4001);
return -1;
}
Uint32 tupKeyLen = theTupKeyLen;
Uint32 len = theTotalNrOfKeyWordInSignal;
@@ -798,6 +744,10 @@ NdbScanOperation::doSendScan(int aProcessorId)
// we created the ATTRINFO signals after the SCAN_TABREQ signal.
ScanTabReq * const req = CAST_PTR(ScanTabReq, tSignal->getDataPtrSend());
req->attrLenKeyLen = (tupKeyLen << 16) | theTotalCurrAI_Len;
Uint32 tmp = req->requestInfo;
ScanTabReq::setDistributionKeyFlag(tmp, theDistrKeyIndicator_);
req->distributionKey = theDistributionKey;
tSignal->setLength(ScanTabReq::StaticLength + theDistrKeyIndicator_);
TransporterFacade *tp = TransporterFacade::instance();
LinearSectionPtr ptr[3];
@@ -814,8 +764,8 @@ NdbScanOperation::doSendScan(int aProcessorId)
tSignal = theLastKEYINFO;
tSignal->setLength(KeyInfo::HeaderLength + theTotalNrOfKeyWordInSignal);
assert(theFirstKEYINFO != NULL);
tSignal = theFirstKEYINFO;
assert(theSCAN_TABREQ->next() != NULL);
tSignal = theSCAN_TABREQ->next();
NdbApiSignal* last;
do {
@@ -942,7 +892,7 @@ NdbScanOperation::takeOverScanOp(OperationType opType, NdbConnection* pTrans){
if(i < len){
NdbApiSignal* tSignal = theNdb->getSignal();
newOp->theFirstKEYINFO = tSignal;
newOp->theTCREQ->next(tSignal);
Uint32 left = len - i;
while(tSignal && left > KeyInfo::DataLength){
@@ -1077,37 +1027,51 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo,
Uint32 currLen = theTotalNrOfKeyWordInSignal;
Uint32 remaining = KeyInfo::DataLength - currLen;
Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize;
bool tDistrKey = tAttrInfo->m_distributionKey;
// normalize char bound
CHARSET_INFO* cs = tAttrInfo->m_cs;
Uint32 xfrmData[2000];
if (cs != NULL && aValue != NULL) {
// current limitation: strxfrm does not increase length
assert(cs->strxfrm_multiply == 1);
unsigned n =
(*cs->coll->strnxfrm)(cs,
(uchar*)xfrmData, sizeof(xfrmData),
(const uchar*)aValue, sizeInBytes);
while (n < sizeInBytes)
((uchar*)xfrmData)[n++] = 0x20;
aValue = (char*)xfrmData;
}
len = aValue != NULL ? sizeInBytes : 0;
if (len != sizeInBytes && (len != 0)) {
setErrorCodeAbort(4209);
return -1;
}
// normalize char bound
CHARSET_INFO* cs = tAttrInfo->m_cs;
Uint64 xfrmData[1001];
if (cs != NULL && aValue != NULL) {
// current limitation: strxfrm does not increase length
assert(cs->strxfrm_multiply == 1);
((Uint32*)xfrmData)[len >> 2] = 0;
unsigned n =
(*cs->coll->strnxfrm)(cs,
(uchar*)xfrmData, sizeof(xfrmData),
(const uchar*)aValue, len);
while (n < len)
((uchar*)xfrmData)[n++] = 0x20;
if(len & 3)
{
len += (4 - (len & 3));
}
aValue = (char*)xfrmData;
}
// insert attribute header
len = aValue != NULL ? sizeInBytes : 0;
Uint32 tIndexAttrId = tAttrInfo->m_attrId;
Uint32 sizeInWords = (len + 3) / 4;
AttributeHeader ah(tIndexAttrId, sizeInWords);
const Uint32 ahValue = ah.m_value;
const bool aligned = (UintPtr(aValue) & 3) == 0;
const Uint32 align = (UintPtr(aValue) & 7);
const bool aligned = (tDistrKey && type == BoundEQ) ?
(align == 0) : (align & 3) == 0;
const bool nobytes = (len & 0x3) == 0;
const Uint32 totalLen = 2 + sizeInWords;
Uint32 tupKeyLen = theTupKeyLen;
if(remaining > totalLen && aligned && nobytes){
if(remaining > totalLen && aligned && nobytes){
Uint32 * dst = theKEYINFOptr + currLen;
* dst ++ = type;
* dst ++ = ahValue;
@@ -1115,12 +1079,12 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo,
theTotalNrOfKeyWordInSignal = currLen + totalLen;
} else {
if(!aligned || !nobytes){
Uint32 tempData[2002];
Uint32 *tempData = (Uint32*)xfrmData;
tempData[0] = type;
tempData[1] = ahValue;
tempData[2 + (len >> 2)] = 0;
memcpy(tempData+2, aValue, len);
while ((len & 0x3) != 0)
((char*)&tempData[2])[len++] = 0;
insertBOUNDS(tempData, 2+sizeInWords);
} else {
Uint32 buf[2] = { type, ahValue };
@@ -1139,11 +1103,11 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo,
* so it's safe to use [tIndexAttrId]
* (instead of looping as is NdbOperation::equal_impl)
*/
if(type == BoundEQ && !theTupleKeyDefined[tIndexAttrId][0]){
theNoOfTupKeyDefined++;
theTupleKeyDefined[tIndexAttrId][0] = SETBOUND_EQ;
if(type == BoundEQ && tDistrKey)
{
theNoOfTupKeyLeft--;
return handle_distribution_key((Uint64*)aValue, sizeInWords);
}
return 0;
} else {
setErrorCodeAbort(4228); // XXX wrong code
@@ -1559,10 +1523,12 @@ NdbIndexScanOperation::reset_bounds(){
theError.code = 0;
reset_receivers(theParallelism, m_ordered);
theLastKEYINFO = theFirstKEYINFO;
theKEYINFOptr = ((KeyInfo*)theFirstKEYINFO->getDataPtrSend())->keyData;
theLastKEYINFO = theSCAN_TABREQ->next();
theKEYINFOptr = ((KeyInfo*)theLastKEYINFO->getDataPtrSend())->keyData;
theTupKeyLen = 0;
theTotalNrOfKeyWordInSignal = 0;
theNoOfTupKeyLeft = m_accessTable->m_noOfDistributionKeys;
theDistrKeyIndicator_ = 0;
m_transConnection
->remove_list((NdbOperation*&)m_transConnection->m_firstExecutedScanOp,
this);