1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

MCOL-498. Add the knob to disable segment|dict file preallocation. Dict files extension uses fallocate() if possible.

This commit is contained in:
Roman Nozdrin
2018-03-06 14:20:46 +03:00
parent 1d9f47a55c
commit 81fe7fa1a9
9 changed files with 230 additions and 130 deletions

View File

@ -329,6 +329,7 @@ int Dctnry::expandDctnryExtent()
blksToAdd,
m_dctnryHeader2,
m_totalHdrBytes,
true,
true );
if (rc != NO_ERROR)

View File

@ -1017,7 +1017,7 @@ int FileOp::addExtentExactFile(
* headers will be included "if" it is a compressed file.
* bExpandExtent (in) - Expand existing extent, or initialize a new one
* bAbbrevExtent(in) - if creating new extent, is it an abbreviated extent
* bOptExtension(in) - full sized segment file allocation optimization flag
* bOptExtension(in) - use fallocate() to extend the file if it is possible.
* RETURN:
* returns ERR_FILE_WRITE if an error occurs,
* else returns NO_ERROR.
@ -1046,7 +1046,8 @@ int FileOp::initColumnExtent(
}
// @bug5769 Don't initialize extents or truncate db files on HDFS
if (idbdatafile::IDBPolicy::useHdfs())
// MCOL-498 We don't need sequential segment files if a PM uses SSD either.
if (idbdatafile::IDBPolicy::useHdfs() || !idbdatafile::IDBPolicy::PreallocSpace())
{
//@Bug 3219. update the compression header after the extent is expanded.
if ((!bNewFile) && (m_compressionType) && (bExpandExtent))
@ -1100,85 +1101,91 @@ int FileOp::initColumnExtent(
Stats::stopParseEvent(WE_STATS_WAIT_TO_EXPAND_COL_EXTENT);
else
Stats::stopParseEvent(WE_STATS_WAIT_TO_CREATE_COL_EXTENT);
Stats::startParseEvent(WE_STATS_INIT_COL_EXTENT);
#endif
int savedErrno = 0;
// Try to fallocate the space - fallback to write if fallocate has failed
// MCOL-498 Try to preallocate the space, fallback to write if fallocate has failed
if (!bOptExtension || pFile->fallocate(0, currFileSize, writeSize))
{
savedErrno = errno;
// Log the failed fallocate() call result
if ( bOptExtension )
{
std::ostringstream oss;
std::string errnoMsg;
Convertor::mapErrnoToString(savedErrno, errnoMsg);
oss << "FileOp::initColumnExtent(): fallocate(" << currFileSize <<
", " << writeSize << "): errno = " << savedErrno <<
": " << errnoMsg;
logging::Message::Args args;
args.add(oss.str());
SimpleSysLog::instance()->logMsg(args, logging::LOG_TYPE_INFO,
logging::M0006);
}
#ifdef PROFILE
Stats::startParseEvent(WE_STATS_INIT_COL_EXTENT);
#endif
// Allocate buffer, store it in scoped_array to insure it's deletion.
// Create scope {...} to manage deletion of writeBuf.
// Save errno of the failed fallocate() to log it later.
savedErrno = errno;
unsigned char* writeBuf = new unsigned char[writeSize];
boost::scoped_array<unsigned char> writeBufPtr( writeBuf );
{
setEmptyBuf( writeBuf, writeSize, emptyVal, width );
unsigned char* writeBuf = new unsigned char[writeSize];
boost::scoped_array<unsigned char> writeBufPtr( writeBuf );
setEmptyBuf( writeBuf, writeSize, emptyVal, width );
#ifdef PROFILE
Stats::stopParseEvent(WE_STATS_INIT_COL_EXTENT);
if (bExpandExtent)
Stats::startParseEvent(WE_STATS_EXPAND_COL_EXTENT);
else
Stats::startParseEvent(WE_STATS_CREATE_COL_EXTENT);
#endif
//std::ostringstream oss;
//oss << "initColExtent: width-" << width <<
//"; loopCount-" << loopCount <<
//"; writeSize-" << writeSize;
//std::cout << oss.str() << std::endl;
if (remWriteSize > 0)
{
if ( pFile->write( writeBuf, remWriteSize ) != remWriteSize )
{
return ERR_FILE_WRITE;
}
}
for (int j = 0; j < loopCount; j++)
{
if ( pFile->write( writeBuf, writeSize ) != writeSize )
{
return ERR_FILE_WRITE;
}
}
}
//@Bug 3219. update the compression header after the extent is expanded.
if ((!bNewFile) && (m_compressionType) && (bExpandExtent))
{
updateColumnExtent(pFile, nBlocks);
}
// @bug 2378. Synchronize here to avoid write buffer pile up too much,
// which could cause controllernode to timeout later when it needs to
// save a snapshot.
pFile->flush();
#ifdef PROFILE
Stats::stopParseEvent(WE_STATS_INIT_COL_EXTENT);
if (bExpandExtent)
Stats::startParseEvent(WE_STATS_EXPAND_COL_EXTENT);
Stats::stopParseEvent(WE_STATS_EXPAND_COL_EXTENT);
else
Stats::startParseEvent(WE_STATS_CREATE_COL_EXTENT);
Stats::stopParseEvent(WE_STATS_CREATE_COL_EXTENT);
#endif
//std::ostringstream oss;
//oss << "initColExtent: width-" << width <<
//"; loopCount-" << loopCount <<
//"; writeSize-" << writeSize;
//std::cout << oss.str() << std::endl;
if (remWriteSize > 0)
{
if ( pFile->write( writeBuf, remWriteSize ) != remWriteSize )
{
return ERR_FILE_WRITE;
}
}
for (int j = 0; j < loopCount; j++)
{
if ( pFile->write( writeBuf, writeSize ) != writeSize )
{
return ERR_FILE_WRITE;
}
}
}
//@Bug 3219. update the compression header after the extent is expanded.
if ((!bNewFile) && (m_compressionType) && (bExpandExtent))
{
updateColumnExtent(pFile, nBlocks);
}
// @bug 2378. Synchronize here to avoid write buffer pile up too much,
// which could cause controllernode to timeout later when it needs to
// save a snapshot.
pFile->flush();
#ifdef PROFILE
if (bExpandExtent)
Stats::stopParseEvent(WE_STATS_EXPAND_COL_EXTENT);
else
Stats::stopParseEvent(WE_STATS_CREATE_COL_EXTENT);
#endif
// Log the fallocate() call result
std::ostringstream oss;
std::string errnoMsg;
Convertor::mapErrnoToString(savedErrno, errnoMsg);
oss << "FileOp::initColumnExtent(): fallocate(" << currFileSize <<
", " << writeSize << "): errno = " << savedErrno <<
": " << errnoMsg;
logging::Message::Args args;
args.add(oss.str());
SimpleSysLog::instance()->logMsg(args, logging::LOG_TYPE_INFO,
logging::M0006);
}
return NO_ERROR;
@ -1794,6 +1801,7 @@ int FileOp::writeHeaders(IDBDataFile* pFile, const char* controlHdr,
* blockHdrInit(in) - data used to initialize each block
* blockHdrInitSize(in) - number of bytes in blockHdrInit
* bExpandExtent (in) - Expand existing extent, or initialize a new one
* bOptExtension(in) - use fallocate() to extend the file if it is possible.
* RETURN:
* returns ERR_FILE_WRITE if an error occurs,
* else returns NO_ERROR.
@ -1804,10 +1812,13 @@ int FileOp::initDctnryExtent(
int nBlocks,
unsigned char* blockHdrInit,
int blockHdrInitSize,
bool bExpandExtent )
bool bExpandExtent,
bool bOptExtension )
{
off64_t currFileSize = pFile->size();
// @bug5769 Don't initialize extents or truncate db files on HDFS
if (idbdatafile::IDBPolicy::useHdfs())
// MCOL-498 We don't need sequential segment files if a PM uses SSD either.
if (idbdatafile::IDBPolicy::useHdfs() || !idbdatafile::IDBPolicy::PreallocSpace())
{
if (m_compressionType)
updateDctnryExtent(pFile, nBlocks);
@ -1841,86 +1852,107 @@ int FileOp::initDctnryExtent(
// Allocate a buffer, initialize it, and use it to create the extent
idbassert(dbRoot > 0);
#ifdef PROFILE
#ifdef PROFILE
if (bExpandExtent)
Stats::startParseEvent(WE_STATS_WAIT_TO_EXPAND_DCT_EXTENT);
else
Stats::startParseEvent(WE_STATS_WAIT_TO_CREATE_DCT_EXTENT);
#endif
boost::mutex::scoped_lock lk(*m_DbRootAddExtentMutexes[dbRoot]);
#ifdef PROFILE
boost::mutex::scoped_lock lk(*m_DbRootAddExtentMutexes[dbRoot]);
#ifdef PROFILE
if (bExpandExtent)
Stats::stopParseEvent(WE_STATS_WAIT_TO_EXPAND_DCT_EXTENT);
else
Stats::stopParseEvent(WE_STATS_WAIT_TO_CREATE_DCT_EXTENT);
Stats::startParseEvent(WE_STATS_INIT_DCT_EXTENT);
#endif
// Allocate buffer, and store in scoped_array to insure it's deletion.
// Create scope {...} to manage deletion of writeBuf.
int savedErrno = 0;
// MCOL-498 Try to preallocate the space, fallback to write if fallocate
// has failed
if (!bOptExtension || pFile->fallocate(0, currFileSize, writeSize))
{
unsigned char* writeBuf = new unsigned char[writeSize];
boost::scoped_array<unsigned char> writeBufPtr( writeBuf );
memset(writeBuf, 0, writeSize);
for (int i = 0; i < nBlocks; i++)
// Log the failed fallocate() call result
if ( bOptExtension )
{
memcpy( writeBuf + (i * BYTE_PER_BLOCK),
blockHdrInit,
blockHdrInitSize );
std::ostringstream oss;
std::string errnoMsg;
Convertor::mapErrnoToString(savedErrno, errnoMsg);
oss << "FileOp::initColumnExtent(): fallocate(" << currFileSize <<
", " << writeSize << "): errno = " << savedErrno <<
": " << errnoMsg;
logging::Message::Args args;
args.add(oss.str());
SimpleSysLog::instance()->logMsg(args, logging::LOG_TYPE_INFO,
logging::M0006);
}
// Allocate buffer, and store in scoped_array to insure it's deletion.
// Create scope {...} to manage deletion of writeBuf.
{
#ifdef PROFILE
Stats::startParseEvent(WE_STATS_INIT_DCT_EXTENT);
#endif
unsigned char* writeBuf = new unsigned char[writeSize];
boost::scoped_array<unsigned char> writeBufPtr( writeBuf );
memset(writeBuf, 0, writeSize);
for (int i = 0; i < nBlocks; i++)
{
memcpy( writeBuf + (i * BYTE_PER_BLOCK),
blockHdrInit,
blockHdrInitSize );
}
#ifdef PROFILE
Stats::stopParseEvent(WE_STATS_INIT_DCT_EXTENT);
if (bExpandExtent)
Stats::startParseEvent(WE_STATS_EXPAND_DCT_EXTENT);
else
Stats::startParseEvent(WE_STATS_CREATE_DCT_EXTENT);
#endif
//std::ostringstream oss;
//oss << "initDctnryExtent: width-8(assumed)" <<
//"; loopCount-" << loopCount <<
//"; writeSize-" << writeSize;
//std::cout << oss.str() << std::endl;
if (remWriteSize > 0)
{
if (pFile->write( writeBuf, remWriteSize ) != remWriteSize)
{
return ERR_FILE_WRITE;
}
}
for (int j = 0; j < loopCount; j++)
{
if (pFile->write( writeBuf, writeSize ) != writeSize)
{
return ERR_FILE_WRITE;
}
}
}
if (m_compressionType)
updateDctnryExtent(pFile, nBlocks);
// Synchronize to avoid write buffer pile up too much, which could cause
// controllernode to timeout later when it needs to save a snapshot.
pFile->flush();
#ifdef PROFILE
Stats::stopParseEvent(WE_STATS_INIT_DCT_EXTENT);
if (bExpandExtent)
Stats::startParseEvent(WE_STATS_EXPAND_DCT_EXTENT);
Stats::stopParseEvent(WE_STATS_EXPAND_DCT_EXTENT);
else
Stats::startParseEvent(WE_STATS_CREATE_DCT_EXTENT);
Stats::stopParseEvent(WE_STATS_CREATE_DCT_EXTENT);
#endif
//std::ostringstream oss;
//oss << "initDctnryExtent: width-8(assumed)" <<
//"; loopCount-" << loopCount <<
//"; writeSize-" << writeSize;
//std::cout << oss.str() << std::endl;
if (remWriteSize > 0)
{
if (pFile->write( writeBuf, remWriteSize ) != remWriteSize)
{
return ERR_FILE_WRITE;
}
}
for (int j = 0; j < loopCount; j++)
{
if (pFile->write( writeBuf, writeSize ) != writeSize)
{
return ERR_FILE_WRITE;
}
}
}
if (m_compressionType)
updateDctnryExtent(pFile, nBlocks);
// Synchronize to avoid write buffer pile up too much, which could cause
// controllernode to timeout later when it needs to save a snapshot.
pFile->flush();
#ifdef PROFILE
if (bExpandExtent)
Stats::stopParseEvent(WE_STATS_EXPAND_DCT_EXTENT);
else
Stats::stopParseEvent(WE_STATS_CREATE_DCT_EXTENT);
#endif
}
return NO_ERROR;

View File

@ -324,13 +324,15 @@ public:
* @param blockHdrInit(in) - data used to initialize each block header
* @param blockHdrInitSize(in) - number of bytes in blockHdrInit
* @param bExpandExtent (in) - Expand existing extent, or initialize new one
* @param bOptExtension (in) - use fallocate() to extend the file if it is possible.
*/
EXPORT int initDctnryExtent( IDBDataFile* pFile,
uint16_t dbRoot,
int nBlocks,
unsigned char* blockHdrInit,
int blockHdrInitSize,
bool bExpandExtent );
bool bExpandExtent,
bool bOptExtension = false );
/**
* @brief Check whether it is an directory
@ -500,6 +502,7 @@ private:
// bNewFile (in) - Adding extent to new file
// bExpandExtent (in) - Expand existing extent, or initialize new one
// bAbbrevExtent (in) - If adding new extent, is it abbreviated
// bOptExtension(in) - use fallocate() to extend the file if it is possible.
int initColumnExtent( IDBDataFile* pFile,
uint16_t dbRoot,
int nBlocks,