You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-08-01 06:46:55 +03:00
MCOL-267 multi-block support for PrimProc and bulk
* Adds multi-block bulk write support * Adds PrimProc multi-block read support * Allows the functions length() and hex() to work with BLOB columns
This commit is contained in:
@ -64,7 +64,7 @@ namespace
|
||||
const int START_HDR1 = // start loc of 2nd offset (HDR1)
|
||||
HDR_UNIT_SIZE + NEXT_PTR_BYTES + HDR_UNIT_SIZE;
|
||||
const int PSEUDO_COL_WIDTH = DICT_COL_WIDTH; // used to convert row count to block count
|
||||
|
||||
const int MAX_BLOB_SIZE = 2100000000; // for safety, we use an 18bit block count of 8KB blocks
|
||||
}
|
||||
|
||||
namespace WriteEngine
|
||||
@ -620,17 +620,104 @@ bool Dctnry::getTokenFromArray(Signature& sig)
|
||||
* token - token that was assigned to the inserted signature
|
||||
*
|
||||
* RETURN:
|
||||
* none
|
||||
* success - successfully write the signature to the block
|
||||
* failure - failed to extend/create an extent for the block
|
||||
******************************************************************************/
|
||||
void Dctnry::insertDctnry2(Signature& sig)
|
||||
int Dctnry::insertDctnry2(Signature& sig)
|
||||
{
|
||||
insertDctnryHdr(m_curBlock.data,
|
||||
sig.size);
|
||||
insertSgnture(m_curBlock.data, sig.size, (unsigned char*)sig.signature);
|
||||
int rc = 0;
|
||||
int write_size;
|
||||
bool lbid_in_token;
|
||||
|
||||
sig.token.fbo = m_curLbid;
|
||||
sig.token.op = m_curOp;
|
||||
sig.token.bc = 0U;
|
||||
sig.token.bc = 0;
|
||||
|
||||
while (sig.size > 0)
|
||||
{
|
||||
if (sig.size > (m_freeSpace - m_totalHdrBytes))
|
||||
{
|
||||
write_size = (m_freeSpace - m_totalHdrBytes);
|
||||
}
|
||||
else
|
||||
{
|
||||
write_size = sig.size;
|
||||
}
|
||||
|
||||
insertDctnryHdr(m_curBlock.data, write_size);
|
||||
insertSgnture(m_curBlock.data, write_size, (unsigned char*)sig.signature);
|
||||
|
||||
sig.size -= write_size;
|
||||
sig.signature += write_size;
|
||||
|
||||
if (!lbid_in_token)
|
||||
{
|
||||
sig.token.fbo = m_curLbid;
|
||||
sig.token.op = m_curOp;
|
||||
lbid_in_token = true;
|
||||
}
|
||||
|
||||
if (sig.size > 0)
|
||||
{
|
||||
CommBlock cb;
|
||||
cb.file.oid = m_dctnryOID;
|
||||
cb.file.pFile = m_dFile;
|
||||
sig.token.bc++;
|
||||
|
||||
RETURN_ON_ERROR( writeDBFile(cb, &m_curBlock, m_curLbid) );
|
||||
memset( m_curBlock.data, 0, sizeof(m_curBlock.data));
|
||||
memcpy( m_curBlock.data, &m_dctnryHeader2, m_totalHdrBytes);
|
||||
m_freeSpace = BYTE_PER_BLOCK - m_totalHdrBytes;
|
||||
m_curBlock.state = BLK_WRITE;
|
||||
m_curOp =0;
|
||||
m_lastFbo++;
|
||||
m_curFbo = m_lastFbo;
|
||||
|
||||
//...Expand current extent if it is an abbreviated initial extent
|
||||
if ((m_curFbo == m_numBlocks) &&
|
||||
(m_numBlocks == NUM_BLOCKS_PER_INITIAL_EXTENT))
|
||||
{
|
||||
RETURN_ON_ERROR( expandDctnryExtent() );
|
||||
}
|
||||
|
||||
//...Allocate a new extent if we have reached the last block in the
|
||||
// current extent.
|
||||
if (m_curFbo == m_numBlocks)
|
||||
{//last block
|
||||
//for roll back the extent to use
|
||||
//Save those empty extents in case of failure to rollback
|
||||
std::vector<ExtentInfo> dictExtentInfo;
|
||||
ExtentInfo info;
|
||||
info.oid = m_dctnryOID;
|
||||
info.partitionNum = m_partition;
|
||||
info.segmentNum = m_segment;
|
||||
info.dbRoot = m_dbRoot;
|
||||
info.hwm = m_hwm;
|
||||
info.newFile = false;
|
||||
dictExtentInfo.push_back (info);
|
||||
LBID_t startLbid;
|
||||
// Add an extent.
|
||||
rc = createDctnry(m_dctnryOID,
|
||||
0, // dummy column width
|
||||
m_dbRoot,
|
||||
m_partition,
|
||||
m_segment,
|
||||
startLbid,
|
||||
false) ;
|
||||
if ( rc != NO_ERROR )
|
||||
{
|
||||
//roll back the extent
|
||||
BRMWrapper::getInstance()->deleteEmptyDictStoreExtents(
|
||||
dictExtentInfo);
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
RETURN_ON_ERROR( BRMWrapper::getInstance()->getBrmInfo(m_dctnryOID,
|
||||
m_partition, m_segment,
|
||||
m_curFbo, m_curLbid) );
|
||||
m_curBlock.lbid = m_curLbid;
|
||||
|
||||
}
|
||||
}
|
||||
return NO_ERROR;
|
||||
}
|
||||
|
||||
/*******************************************************************************
|
||||
@ -705,7 +792,8 @@ int Dctnry::insertDctnry(const char* buf,
|
||||
// it is too late to reject the row. However, as a precaution, we
|
||||
// still check against max size & set to null token if needed.
|
||||
if ((curSig.size == 0) ||
|
||||
(curSig.size == COLPOSPAIR_NULL_TOKEN_OFFSET))
|
||||
(curSig.size == COLPOSPAIR_NULL_TOKEN_OFFSET) ||
|
||||
(curSig.size > MAX_BLOB_SIZE))
|
||||
{
|
||||
if (m_defVal.length() > 0) // use default string if available
|
||||
{
|
||||
@ -735,6 +823,7 @@ int Dctnry::insertDctnry(const char* buf,
|
||||
}
|
||||
|
||||
//...Search for the string in our string cache
|
||||
//if it fits into one block (< 8KB)
|
||||
if ((m_arraySize < MAX_STRING_CACHE_SIZE) &&
|
||||
(curSig.size <= MAX_SIGNATURE_SIZE))
|
||||
{
|
||||
@ -750,14 +839,15 @@ int Dctnry::insertDctnry(const char* buf,
|
||||
}
|
||||
//Stats::stopParseEvent("getTokenFromArray");
|
||||
}
|
||||
totalUseSize = HDR_UNIT_SIZE + curSig.size;
|
||||
totalUseSize = m_totalHdrBytes + curSig.size;
|
||||
|
||||
//...String not found in cache, so proceed.
|
||||
// If room is available in current block then insert into block.
|
||||
// @bug 3960: Add MAX_OP_COUNT check to handle case after bulk rollback
|
||||
if( (totalUseSize <= m_freeSpace) &&
|
||||
if( ((totalUseSize <= m_freeSpace) ||
|
||||
((curSig.size > 8176) && (m_freeSpace > m_totalHdrBytes))) &&
|
||||
(m_curOp < (MAX_OP_COUNT-1)) ) {
|
||||
insertDctnry2(curSig); //m_freeSpace updated!
|
||||
RETURN_ON_ERROR(insertDctnry2(curSig)); //m_freeSpace updated!
|
||||
m_curBlock.state = BLK_WRITE;
|
||||
memcpy( pOut + outOffset, &curSig.token, 8 );
|
||||
outOffset += 8;
|
||||
@ -880,7 +970,7 @@ int Dctnry::insertDctnry(const char* buf,
|
||||
// we need to add the string to the new block.
|
||||
if (!found)
|
||||
{
|
||||
insertDctnry2(curSig); //m_freeSpace updated!
|
||||
RETURN_ON_ERROR(insertDctnry2(curSig)); //m_freeSpace updated!
|
||||
m_curBlock.state = BLK_WRITE;
|
||||
memcpy( pOut + outOffset, &curSig.token, 8 );
|
||||
outOffset += 8;
|
||||
@ -949,7 +1039,7 @@ int Dctnry::insertDctnry(const int& sgnature_size,
|
||||
int write_size;
|
||||
bool lbid_in_token = false;
|
||||
// Round down for safety. In theory we can take 262143 * 8176 bytes
|
||||
if (sgnature_size > (2100000000))
|
||||
if (sgnature_size > MAX_BLOB_SIZE)
|
||||
{
|
||||
return ERR_DICT_SIZE_GT_2G;
|
||||
}
|
||||
|
@ -239,7 +239,7 @@ protected:
|
||||
// insertDctnryHdr inserts the new value info into the header.
|
||||
// insertSgnture inserts the new value into the block.
|
||||
//
|
||||
void insertDctnry2(Signature& sig);
|
||||
int insertDctnry2(Signature& sig);
|
||||
void insertDctnryHdr( unsigned char* blockBuf, const int& size);
|
||||
void insertSgnture(unsigned char* blockBuf,
|
||||
const int& size, unsigned char*value);
|
||||
|
Reference in New Issue
Block a user