1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

MCOL-267 DML support

* DML writes for multi-block dictionary (blob) now works
* PrimProc fixed so that the first block in multi-block is read
correctly
* Performance optimisation (removed string copy into stack) for new
dictionary entries
This commit is contained in:
Andrew Hutchings
2017-03-18 14:31:29 +00:00
parent c08d03fba4
commit aea729fe7d
14 changed files with 76 additions and 52 deletions

View File

@ -282,7 +282,7 @@ namespace ddlpackageprocessor
dictStruct.treeOid = colType.ddn.treeOID;
dictStruct.listOid = colType.ddn.listOID;
dictStruct.dctnryOid = colType.ddn.dictOID;
memcpy(dictTuple.sigValue, data.c_str(), data.length());
dictTuple.sigValue = data.c_str();
dictTuple.sigSize = data.length();
int error = NO_ERROR;
if ( NO_ERROR != (error = fWriteEngine->tokenize( fTxnID, dictStruct, dictTuple)) )

View File

@ -673,7 +673,7 @@ boost::any DDLPackageProcessor::tokenizeData(execplan::CalpontSystemCatalog::SCN
//added for multifiles per oid
dictStruct.columnOid = colType.columnOID;
WriteEngine::DctnryTuple dictTuple;
memcpy(dictTuple.sigValue, str.c_str(), str.length());
dictTuple.sigValue = (unsigned char*)str.c_str();
dictTuple.sigSize = str.length();
int error = NO_ERROR;
if (NO_ERROR != (error = fWriteEngine.tokenize(txnID, dictStruct, dictTuple, false))) // @bug 5572 HDFS tmp file

View File

@ -131,7 +131,7 @@ boost::any DMLPackageProcessor::tokenizeData( execplan::CalpontSystemCatalog::SC
dictStruct.dctnryOid = colType.ddn.dictOID;
//cout << "Dictionary OIDs: " << colType.ddn.treeOID << " " << colType.ddn.listOID << endl;
WriteEngine::DctnryTuple dictTuple;
memcpy(dictTuple.sigValue, data.c_str(), data.length());
dictTuple.sigValue = data.c_str();
dictTuple.sigSize = data.length();
int error = NO_ERROR;
if ( NO_ERROR != (error = fWriteEngine.tokenize( txnID, dictStruct, dictTuple)) )

View File

@ -332,7 +332,7 @@ void DictStep::_execute()
i = 0;
while (i < bpp->ridCount) {
l_lbid = ((int64_t) newRidList[i].token) >> 10;
primMsg->LBID = l_lbid;
primMsg->LBID = (l_lbid == -1) ? l_lbid : l_lbid & 0xFFFFFFFFFL;
primMsg->NVALS = 0;
/* When this is used as a filter, the strings can be thrown out. JLF currently
@ -399,7 +399,7 @@ void DictStep::_project()
i = 0;
while (i < bpp->ridCount) {
l_lbid = ((int64_t) newRidList[i].token) >> 10;
primMsg->LBID = l_lbid;
primMsg->LBID = (l_lbid == -1) ? l_lbid : l_lbid & 0xFFFFFFFFFL;
primMsg->NVALS = 0;
primMsg->OutputType = OT_DATAVALUE;
pt = (OldGetSigParams *) (primMsg->tokens);
@ -456,7 +456,7 @@ void DictStep::_projectToRG(RowGroup &rg, uint32_t col)
//cout << "DS: projectingToRG rids: " << bpp->ridCount << endl;
while (i < bpp->ridCount) {
l_lbid = ((int64_t) newRidList[i].token) >> 10;
primMsg->LBID = l_lbid;
primMsg->LBID = (l_lbid == -1) ? l_lbid : l_lbid & 0xFFFFFFFFFL;
primMsg->NVALS = 0;
primMsg->OutputType = OT_DATAVALUE;
pt = (OldGetSigParams *) (primMsg->tokens);
@ -499,7 +499,7 @@ void DictStep::_projectToRG(RowGroup &rg, uint32_t col)
}
if (((int64_t)primMsg->LBID)<0 && o_lbid>0)
primMsg->LBID = o_lbid;
primMsg->LBID = o_lbid & 0xFFFFFFFFFL;
memcpy(&pt[primMsg->NVALS], filterString.buf(), filterString.length());
issuePrimitive(false);

View File

@ -630,7 +630,7 @@ void Dctnry::insertDctnry2(Signature& sig)
sig.token.fbo = m_curLbid;
sig.token.op = m_curOp;
sig.token.spare = 0U;
sig.token.bc = 0U;
}
/*******************************************************************************
@ -705,8 +705,7 @@ int Dctnry::insertDctnry(const char* buf,
// it is too late to reject the row. However, as a precaution, we
// still check against max size & set to null token if needed.
if ((curSig.size == 0) ||
(curSig.size == COLPOSPAIR_NULL_TOKEN_OFFSET) ||
(curSig.size > MAX_SIGNATURE_SIZE))
(curSig.size == COLPOSPAIR_NULL_TOKEN_OFFSET))
{
if (m_defVal.length() > 0) // use default string if available
{
@ -736,7 +735,8 @@ int Dctnry::insertDctnry(const char* buf,
}
//...Search for the string in our string cache
if (m_arraySize < MAX_STRING_CACHE_SIZE)
if ((m_arraySize < MAX_STRING_CACHE_SIZE) &&
(curSig.size <= MAX_SIGNATURE_SIZE))
{
//Stats::startParseEvent("getTokenFromArray");
found = getTokenFromArray(curSig);
@ -778,7 +778,9 @@ int Dctnry::insertDctnry(const char* buf,
}
//...Add string to cache, if we have not exceeded cache limit
if (m_arraySize < MAX_STRING_CACHE_SIZE)
// Don't cache big blobs
if ((m_arraySize < MAX_STRING_CACHE_SIZE) &&
(curSig.size <= MAX_SIGNATURE_SIZE))
{
addToStringCache( curSig );
}
@ -885,7 +887,8 @@ int Dctnry::insertDctnry(const char* buf,
startPos++;
//...Add string to cache, if we have not exceeded cache limit
if (m_arraySize < MAX_STRING_CACHE_SIZE)
if ((m_arraySize < MAX_STRING_CACHE_SIZE) &&
(curSig.size <= MAX_SIGNATURE_SIZE))
{
addToStringCache( curSig );
}
@ -943,9 +946,12 @@ int Dctnry::insertDctnry(const int& sgnature_size,
int i;
unsigned char* value = NULL;
int size;
if (sgnature_size > MAX_SIGNATURE_SIZE)
int write_size;
bool lbid_in_token = false;
// Round down for safety. In theory we can take 262143 * 8176 bytes
if (sgnature_size > (2100000000))
{
return ERR_DICT_SIZE_GT_8000;
return ERR_DICT_SIZE_GT_2G;
}
if (sgnature_size == 0)
{
@ -960,23 +966,41 @@ int Dctnry::insertDctnry(const int& sgnature_size,
size = sgnature_size;
value = (unsigned char*)sgnature_value;
token.bc = 0;
for (i = m_lastFbo; i < m_numBlocks; i++)
{
// @bug 3960: Add MAX_OP_COUNT check to handle case after bulk rollback
if( (m_freeSpace>= (size + HDR_UNIT_SIZE)) &&
if( ((m_freeSpace>= (size + m_totalHdrBytes)) ||
((size > 8176) && (m_freeSpace > m_totalHdrBytes))) &&
(m_curOp < (MAX_OP_COUNT-1)) )
{ // found the perfect block; signature size fit in this block
insertDctnryHdr(m_curBlock.data, size);
insertSgnture(m_curBlock.data, size, value);
if (size > (m_freeSpace - m_totalHdrBytes))
{
write_size = (m_freeSpace - m_totalHdrBytes);
}
else
{
write_size = size;
}
insertDctnryHdr(m_curBlock.data, write_size);
insertSgnture(m_curBlock.data, write_size, value);
size -= write_size;
value += write_size;
m_curBlock.state = BLK_WRITE;
// We only want the start LBID for a multi-block dict in the token
if (!lbid_in_token)
{
token.fbo = m_curLbid;
token.op = m_curOp;
token.spare = 0;
lbid_in_token = true;
}
if (size > 0)
token.bc++;
m_lastFbo = i;
m_curFbo = m_lastFbo;
if (m_curOp < (MAX_OP_COUNT-1))
if ((m_curOp < (MAX_OP_COUNT-1)) && (size <= 0))
return NO_ERROR;
}//end Found

View File

@ -2304,7 +2304,7 @@ uint8_t WE_DDLCommandProc::updateSyscolumnTablename(ByteStream& bs, std::string
//It's the same string for each column, so we just need one dictionary struct
memset(&dictTuple, 0, sizeof(dictTuple));
memcpy(dictTuple.sigValue, newTablename.c_str(), newTablename.length());
dictTuple.sigValue = (unsigned char*)newTablename.c_str();
dictTuple.sigSize = newTablename.length();
dictTuple.isNull = false;
dctColList = dictTuple;
@ -2639,7 +2639,7 @@ uint8_t WE_DDLCommandProc::updateSystableTablename(ByteStream& bs, std::string &
dictStruct.columnOid = column.colType.columnOID;
WriteEngine::DctnryTuple dictTuple;
dictTuple.isNull = false;
memcpy(dictTuple.sigValue, newTablename.c_str(), newTablename.length());
dictTuple.sigValue = (unsigned char*)newTablename.c_str();
dictTuple.sigSize = newTablename.length();
if (idbdatafile::IDBPolicy::useHdfs())
@ -2855,7 +2855,7 @@ uint8_t WE_DDLCommandProc::updateSystablesTablename(ByteStream& bs, std::string
dictStruct.columnOid = column.colType.columnOID;
WriteEngine::DctnryTuple dictTuple;
dictTuple.isNull = false;
memcpy(dictTuple.sigValue, newTablename.c_str(), newTablename.length());
dictTuple.sigValue = (unsigned char*)newTablename.c_str();
dictTuple.sigSize = newTablename.length();
//int error = NO_ERROR;
//if (NO_ERROR != (error = fWEWrapper.tokenize(txnID, dictStruct, dictTuple)))
@ -3028,7 +3028,7 @@ uint8_t WE_DDLCommandProc::updateSystablesTablename(ByteStream& bs, std::string
//Tokenize the data value
dictStruct.dctnryOid = column.colType.ddn.dictOID;
dictStruct.columnOid = column.colType.columnOID;
memcpy(dictTuple.sigValue, newTablename.c_str(), newTablename.length());
dictTuple.sigValue = (unsigned char*)newTablename.c_str();
dictTuple.sigSize = newTablename.length();
dictTuple.isNull = false;
/*
@ -3079,7 +3079,7 @@ uint8_t WE_DDLCommandProc::updateSystablesTablename(ByteStream& bs, std::string
//It's the same string for each column, so we just need one dictionary struct
memset(&dictTuple, 0, sizeof(dictTuple));
memcpy(dictTuple.sigValue, newTablename.c_str(), newTablename.length());
dictTuple.sigValue = (unsigned char*)newTablename.c_str();
dictTuple.sigSize = newTablename.length();
dictTuple.isNull = false;
dctColList = dictTuple;
@ -3921,7 +3921,7 @@ uint8_t WE_DDLCommandProc::updateSyscolumnSetDefault(messageqcpp::ByteStream& bs
else
{
WriteEngine::DctnryTuple dictTuple;
memcpy(dictTuple.sigValue, defaultvalue.c_str(), defaultvalue.length());
dictTuple.sigValue = (unsigned char*)defaultvalue.c_str();
dictTuple.sigSize = defaultvalue.length();
dictTuple.isNull = false;
int error = NO_ERROR;
@ -3967,7 +3967,7 @@ uint8_t WE_DDLCommandProc::updateSyscolumnSetDefault(messageqcpp::ByteStream& bs
WriteEngine::DctnryTuple dctnryTuple;
if(defaultvalue.length() > 0)
{
memcpy(dctnryTuple.sigValue, defaultvalue.c_str(), defaultvalue.length());
dctnryTuple.sigValue = (unsigned char*)defaultvalue.c_str();
dctnryTuple.sigSize = defaultvalue.length();
dctnryTuple.isNull = false;
}
@ -4187,7 +4187,7 @@ uint8_t WE_DDLCommandProc::updateSyscolumnRenameColumn(messageqcpp::ByteStream&
dictStruct.dctnryOid = column1.colType.ddn.dictOID;
dictStruct.columnOid = column1.colType.columnOID;
WriteEngine::DctnryTuple dictTuple;
memcpy(dictTuple.sigValue, colNewName.c_str(), colNewName.length());
dictTuple.sigValue = (unsigned char*)colNewName.c_str();
dictTuple.sigSize = colNewName.length();
dictTuple.isNull = false;
int error = NO_ERROR;
@ -4238,7 +4238,7 @@ uint8_t WE_DDLCommandProc::updateSyscolumnRenameColumn(messageqcpp::ByteStream&
colValuesList.push_back(aColList1);
WriteEngine::DctnryTuple dctnryTuple;
boost::to_lower(colNewName);
memcpy(dctnryTuple.sigValue, colNewName.c_str(), colNewName.length());
dctnryTuple.sigValue = (unsigned char*)colNewName.c_str();
dctnryTuple.sigSize = colNewName.length();
dctnryTuple.isNull = false;
dctColList = dctnryTuple;
@ -4388,7 +4388,7 @@ uint8_t WE_DDLCommandProc::updateSyscolumnRenameColumn(messageqcpp::ByteStream&
else
{
WriteEngine::DctnryTuple dictTuple;
memcpy(dictTuple.sigValue, defaultvalue.c_str(), defaultvalue.length());
dictTuple.sigValue = (unsigned char*)defaultvalue.c_str();
dictTuple.sigSize = defaultvalue.length();
dictTuple.isNull = false;
int error = NO_ERROR;
@ -4437,7 +4437,7 @@ uint8_t WE_DDLCommandProc::updateSyscolumnRenameColumn(messageqcpp::ByteStream&
if(defaultvalue.length() > 0)
{
memcpy(dctnryTuple.sigValue, defaultvalue.c_str(), defaultvalue.length());
dctnryTuple.sigValue = (unsigned char*)defaultvalue.c_str();
dctnryTuple.sigSize = defaultvalue.length();
dctnryTuple.isNull = false;
}

View File

@ -2027,7 +2027,7 @@ uint8_t WE_DMLCommandProc::processUpdate(messageqcpp::ByteStream& bs,
nameNeeded = true;
}
WriteEngine::DctnryTuple dctTuple;
memcpy(dctTuple.sigValue, value.c_str(), value.length());
dctTuple.sigValue = (unsigned char*)value.c_str();
dctTuple.sigSize = value.length();
dctTuple.isNull = false;
@ -2204,7 +2204,7 @@ uint8_t WE_DMLCommandProc::processUpdate(messageqcpp::ByteStream& bs,
}
WriteEngine::DctnryTuple dctTuple;
memcpy(dctTuple.sigValue, value.c_str(), value.length());
dctTuple.sigValue = (unsigned char*)value.c_str();
dctTuple.sigSize = value.length();
dctTuple.isNull = false;
@ -2254,7 +2254,7 @@ uint8_t WE_DMLCommandProc::processUpdate(messageqcpp::ByteStream& bs,
nameNeeded = true;
}
WriteEngine::DctnryTuple dctTuple;
memcpy(dctTuple.sigValue, value.c_str(), value.length());
dctTuple.sigValue = (unsigned char*)value.c_str();
dctTuple.sigSize = value.length();
dctTuple.isNull = false;
error = fWEWrapper.tokenize(txnId, dctTuple, colType.compressionType);
@ -2294,7 +2294,7 @@ uint8_t WE_DMLCommandProc::processUpdate(messageqcpp::ByteStream& bs,
nameNeeded = true;
}
WriteEngine::DctnryTuple dctTuple;
memcpy(dctTuple.sigValue, value.c_str(), value.length());
dctTuple.sigValue = (unsigned char*)value.c_str();
dctTuple.sigSize = value.length();
dctTuple.isNull = false;
error = fWEWrapper.tokenize(txnId, dctTuple, colType.compressionType);

View File

@ -1587,7 +1587,7 @@ int ChunkManager::calculateHeaderSize(int width)
int rowsPerExtent = BRMWrapper::getInstance()->getExtentRows();
int rowsPerFile = rowsPerExtent * extentsPerFile;
int stringsPerBlock = 8180 / (width + 2); // 8180 = 8192 - 12
//TODO: temporary fix for Blob
// BLOB is 1 string per block
if (stringsPerBlock == 0)
stringsPerBlock = 1;
int blocksNeeded = rowsPerFile / stringsPerBlock;

View File

@ -135,7 +135,7 @@ WErrorCodes::WErrorCodes() : fErrorCodes()
// Dictionary error
fErrorCodes[ERR_DICT_NO_SPACE_INSERT] = " no space for a dictionary insert";
fErrorCodes[ERR_DICT_SIZE_GT_8000] = " the dictionary size was >8000";
fErrorCodes[ERR_DICT_SIZE_GT_2G] = " the dictionary size was > 2GB";
fErrorCodes[ERR_DICT_NO_OP_DELETE] = " in the dictionary no op delete";
fErrorCodes[ERR_DICT_NO_OFFSET_DELETE] = " a dictionary bad Delete offset";
fErrorCodes[ERR_DICT_INVALID_HDR] = " a dictionary bad Delete Hdr";

View File

@ -231,7 +231,7 @@ namespace WriteEngine
// Dictionary error
//--------------------------------------------------------------------------
const int ERR_DICT_NO_SPACE_INSERT= ERR_DCTNRYBASE+ 1; // ins no space
const int ERR_DICT_SIZE_GT_8000 = ERR_DCTNRYBASE+ 2; // ins size >8000
const int ERR_DICT_SIZE_GT_2G = ERR_DCTNRYBASE+ 2; // ins size >8000
const int ERR_DICT_NO_OP_DELETE = ERR_DCTNRYBASE+ 3; // del no op
const int ERR_DICT_NO_OFFSET_DELETE=ERR_DCTNRYBASE+ 4; // del bad offset
const int ERR_DICT_INVALID_HDR = ERR_DCTNRYBASE+ 5; // Delete Hdr

View File

@ -290,7 +290,7 @@ namespace WriteEngine
struct DctnryTuple /** @brief Dictionary Tuple struct*/
{
unsigned char sigValue[MAX_SIGNATURE_SIZE]; /** @brief dictionary signature value*/
unsigned char *sigValue; /** @brief dictionary signature value*/
int sigSize; /** @brief dictionary signature size */
Token token; /** @brief dictionary token */
bool isNull;

View File

@ -45,12 +45,12 @@ namespace WriteEngine
struct Token {
uint64_t op : 10; // ordinal position within a block
uint64_t fbo : 36; // file block number
uint64_t spare : 18; // spare
uint64_t bc : 18; // block count
Token() // constructor, set to null value
{
op = 0x3FE;
fbo = 0xFFFFFFFFFLL;
spare = 0x3FFFF;
bc = 0x3FFFF;
}
};

View File

@ -692,7 +692,7 @@ int ColumnOp::fillColumn(const TxnID& txnid, Column& column, Column& refCol, voi
dctnryStruct.colWidth = dictColWidth;
dctnryStruct.fCompressionType = column.compressionType;
DctnryTuple dctnryTuple;
memcpy(dctnryTuple.sigValue, defaultValStr.c_str(), defaultValStr.length());
dctnryTuple.sigValue = (unsigned char*)defaultValStr.c_str();
dctnryTuple.sigSize = defaultValStr.length();
rc = dctnry->openDctnry(dctnryStruct.dctnryOid,
@ -761,7 +761,7 @@ int ColumnOp::fillColumn(const TxnID& txnid, Column& column, Column& refCol, voi
dctnryStruct.colWidth = dictColWidth;
dctnryStruct.fCompressionType = column.compressionType;
DctnryTuple dctnryTuple;
memcpy(dctnryTuple.sigValue, defaultValStr.c_str(), defaultValStr.length());
dctnryTuple.sigValue = (unsigned char*)defaultValStr.c_str();
//WriteEngineWrapper wrapper;
dctnryTuple.sigSize = defaultValStr.length();
//rc = wrapper.tokenize(txnid, dctnryStruct, dctnryTuple);

View File

@ -624,7 +624,7 @@ int WriteEngineWrapper::fillColumn(const TxnID& txnid, const OID& dataOid,
DctnryTuple dctnryTuple;
DctColTupleList dctColTuples;
memcpy(dctnryTuple.sigValue, tmpStr.c_str(), tmpStr.length());
dctnryTuple.sigValue = (unsigned char*)tmpStr.c_str();
dctnryTuple.sigSize = tmpStr.length();
dctnryTuple.isNull = true;
dctColTuples.push_back (dctnryTuple);
@ -1200,7 +1200,7 @@ timer.stop("allocRowId");
timer.start("tokenize");
#endif
DctnryTuple dctTuple;
memcpy(dctTuple.sigValue, dctStr_iter->c_str(), dctStr_iter->length());
dctTuple.sigValue = (unsigned char*)dctStr_iter->c_str();
dctTuple.sigSize = dctStr_iter->length();
dctTuple.isNull = false;
rc = tokenize(txnid, dctTuple, dctnryStructList[i].fCompressionType);
@ -1248,7 +1248,7 @@ timer.stop("tokenize");
timer.start("tokenize");
#endif
DctnryTuple dctTuple;
memcpy(dctTuple.sigValue, dctStr_iter->c_str(), dctStr_iter->length());
dctTuple.sigValue = (unsigned char*)dctStr_iter->c_str();
dctTuple.sigSize = dctStr_iter->length();
dctTuple.isNull = false;
rc = tokenize(txnid, dctTuple, newDctnryStructList[i].fCompressionType);
@ -1710,7 +1710,7 @@ timer.start("allocRowId");
timer.start("tokenize");
#endif
DctnryTuple dctTuple;
memcpy(dctTuple.sigValue, dctStr_iter->c_str(), dctStr_iter->length());
dctTuple.sigValue = (unsigned char*)dctStr_iter->c_str();
dctTuple.sigSize = dctStr_iter->length();
dctTuple.isNull = false;
rc = tokenize(txnid, dctTuple, dctnryStructList[i].fCompressionType);
@ -1776,7 +1776,7 @@ timer.stop("tokenize");
timer.start("tokenize");
#endif
DctnryTuple dctTuple;
memcpy(dctTuple.sigValue, dctStr_iter->c_str(), dctStr_iter->length());
dctTuple.sigValue = (unsigned char*)dctStr_iter->c_str();
dctTuple.sigSize = dctStr_iter->length();
dctTuple.isNull = false;
rc = tokenize(txnid, dctTuple, newDctnryStructList[i].fCompressionType);
@ -2333,7 +2333,7 @@ timer.stop("allocRowId");
timer.start("tokenize");
#endif
DctnryTuple dctTuple;
memcpy(dctTuple.sigValue, dctStr_iter->c_str(), dctStr_iter->length());
dctTuple.sigValue = (unsigned char*)dctStr_iter->c_str();
dctTuple.sigSize = dctStr_iter->length();
dctTuple.isNull = false;
rc = tokenize(txnid,
@ -2403,7 +2403,7 @@ timer.stop("tokenize");
timer.start("tokenize");
#endif
DctnryTuple dctTuple;
memcpy(dctTuple.sigValue, dctStr_iter->c_str(), dctStr_iter->length());
dctTuple.sigValue = (unsigned char*)dctStr_iter->c_str();
dctTuple.sigSize = dctStr_iter->length();
dctTuple.isNull = false;
rc = tokenize(txnid,