diff --git a/oam/etc/Columnstore.xml b/oam/etc/Columnstore.xml index b5610bf11..60bc2d9fc 100644 --- a/oam/etc/Columnstore.xml +++ b/oam/etc/Columnstore.xml @@ -102,130 +102,162 @@ 0.0.0.0 8620 + ON 0.0.0.0 8620 + ON 0.0.0.0 8620 + ON 0.0.0.0 8620 + ON 0.0.0.0 8620 + ON 0.0.0.0 8620 + ON 0.0.0.0 8620 + ON 0.0.0.0 8620 + ON 0.0.0.0 8620 + ON 0.0.0.0 8620 + ON 0.0.0.0 8620 + ON 0.0.0.0 8620 + ON 0.0.0.0 8620 + ON 0.0.0.0 8620 + ON 0.0.0.0 8620 + ON 0.0.0.0 8620 + ON 0.0.0.0 8620 + ON 0.0.0.0 8620 + ON 0.0.0.0 8620 + ON 0.0.0.0 8620 + ON 0.0.0.0 8620 + ON 0.0.0.0 8620 + ON 0.0.0.0 8620 + ON 0.0.0.0 8620 + ON 0.0.0.0 8620 + ON 0.0.0.0 8620 + ON 0.0.0.0 8620 + ON 0.0.0.0 8620 + ON 0.0.0.0 8620 + ON 0.0.0.0 8620 + ON 0.0.0.0 8620 + ON 0.0.0.0 8620 + ON C diff --git a/utils/idbdatafile/BufferedFile.cpp b/utils/idbdatafile/BufferedFile.cpp index 15066456f..ae12d0fd4 100644 --- a/utils/idbdatafile/BufferedFile.cpp +++ b/utils/idbdatafile/BufferedFile.cpp @@ -287,7 +287,7 @@ int BufferedFile::fallocate(int mode, off64_t offset, off64_t length) ret = ::fallocate( fileno(m_fp), mode, offset, length ); savedErrno = errno; - if ( ret == -1 && IDBLogger::isEnabled() ) + if ( IDBLogger::isEnabled() ) { IDBLogger::logNoArg(m_fname, this, "fallocate", errno); } diff --git a/utils/idbdatafile/CMakeLists.txt b/utils/idbdatafile/CMakeLists.txt index e06bf1fc8..fd81d8da8 100644 --- a/utils/idbdatafile/CMakeLists.txt +++ b/utils/idbdatafile/CMakeLists.txt @@ -16,7 +16,7 @@ set(idbdatafile_LIB_SRCS add_library(idbdatafile SHARED ${idbdatafile_LIB_SRCS}) -target_link_libraries(idbdatafile ${NETSNMP_LIBRARIES}) +target_link_libraries(idbdatafile ${NETSNMP_LIBRARIES} ${ENGINE_OAM_LIBS}) set_target_properties(idbdatafile PROPERTIES VERSION 1.0.0 SOVERSION 1) diff --git a/utils/idbdatafile/IDBPolicy.cpp b/utils/idbdatafile/IDBPolicy.cpp index 4d0679308..a8c194918 100644 --- a/utils/idbdatafile/IDBPolicy.cpp +++ b/utils/idbdatafile/IDBPolicy.cpp @@ -18,12 +18,14 @@ #include #include #include +#include #include #include #include // to_upper #include #include "configcpp.h" // for Config +#include "oamcache.h" #include "IDBPolicy.h" #include "PosixFileSystem.h" //#include "HdfsFileSystem.h" @@ -48,6 +50,7 @@ int64_t IDBPolicy::s_hdfsRdwrBufferMaxSize = 0; std::string IDBPolicy::s_hdfsRdwrScratch; bool IDBPolicy::s_configed = false; boost::mutex IDBPolicy::s_mutex; +bool IDBPolicy::s_PreallocSpace = true; void IDBPolicy::init( bool bEnableLogging, bool bUseRdwrMemBuffer, const string& hdfsRdwrScratch, int64_t hdfsRdwrBufferMaxSize ) { @@ -224,6 +227,23 @@ void IDBPolicy::configIDBPolicy() string scratch = cf->getConfig("SystemConfig", "hdfsRdwrScratch"); string hdfsRdwrScratch = tmpDir + scratch; + // MCOL-498. Set the PMSX.PreallocSpace knob, where X is a PM number, + // to disable file space preallocation. The feature is used in the FileOp code + // and is enabled by default for a backward compatibility. + oam::OamCache* oamcache = oam::OamCache::makeOamCache(); + int PMId = oamcache->getLocalPMId(); + char configSectionPref[] = "PMS"; + char configSection[sizeof(configSectionPref)+oam::MAX_MODULE_ID_SIZE]; + ::memset(configSection, 0, sizeof(configSection)); + sprintf(configSection, "%s%d", configSectionPref, PMId); + string PreallocSpace = cf->getConfig(configSection, "PreallocSpace"); + + if ( PreallocSpace.length() != 0 ) + { + boost::to_upper(PreallocSpace); + s_PreallocSpace = ( PreallocSpace != "OFF" ); + } + IDBPolicy::init( idblog, bUseRdwrMemBuffer, hdfsRdwrScratch, hdfsRdwrBufferMaxSize ); s_configed = true; diff --git a/utils/idbdatafile/IDBPolicy.h b/utils/idbdatafile/IDBPolicy.h index 333b71808..5212f3b11 100644 --- a/utils/idbdatafile/IDBPolicy.h +++ b/utils/idbdatafile/IDBPolicy.h @@ -80,6 +80,11 @@ public: */ static bool useHdfs(); + /** + * Accessor method that returns whether or not HDFS is enabled + */ + static bool PreallocSpace(); + /** * Accessor method that returns whether to use HDFS memory buffers */ @@ -134,6 +139,7 @@ private: static bool isLocalFile( const std::string& path ); static bool s_usehdfs; + static bool s_PreallocSpace; static bool s_bUseRdwrMemBuffer; static std::string s_hdfsRdwrScratch; static int64_t s_hdfsRdwrBufferMaxSize; @@ -153,6 +159,12 @@ bool IDBPolicy::useHdfs() return s_usehdfs; } +inline +bool IDBPolicy::PreallocSpace() +{ + return s_PreallocSpace; +} + inline bool IDBPolicy::useRdwrMemBuffer() { diff --git a/utils/idbdatafile/UnbufferedFile.cpp b/utils/idbdatafile/UnbufferedFile.cpp index 5f7072aa8..200b583b4 100644 --- a/utils/idbdatafile/UnbufferedFile.cpp +++ b/utils/idbdatafile/UnbufferedFile.cpp @@ -337,7 +337,7 @@ int UnbufferedFile::fallocate(int mode, off64_t offset, off64_t length) ret = ::fallocate( m_fd, mode, offset, length ); savedErrno = errno; - if ( ret == -1 && IDBLogger::isEnabled() ) + if ( IDBLogger::isEnabled() ) { IDBLogger::logNoArg(m_fname, this, "fallocate", errno); } diff --git a/writeengine/dictionary/we_dctnry.cpp b/writeengine/dictionary/we_dctnry.cpp index 4e93ca60a..c5461d4e5 100644 --- a/writeengine/dictionary/we_dctnry.cpp +++ b/writeengine/dictionary/we_dctnry.cpp @@ -329,6 +329,7 @@ int Dctnry::expandDctnryExtent() blksToAdd, m_dctnryHeader2, m_totalHdrBytes, + true, true ); if (rc != NO_ERROR) diff --git a/writeengine/shared/we_fileop.cpp b/writeengine/shared/we_fileop.cpp index 939450980..0b94d0d81 100644 --- a/writeengine/shared/we_fileop.cpp +++ b/writeengine/shared/we_fileop.cpp @@ -1017,7 +1017,7 @@ int FileOp::addExtentExactFile( * headers will be included "if" it is a compressed file. * bExpandExtent (in) - Expand existing extent, or initialize a new one * bAbbrevExtent(in) - if creating new extent, is it an abbreviated extent - * bOptExtension(in) - full sized segment file allocation optimization flag + * bOptExtension(in) - use fallocate() to extend the file if it is possible. * RETURN: * returns ERR_FILE_WRITE if an error occurs, * else returns NO_ERROR. @@ -1046,7 +1046,8 @@ int FileOp::initColumnExtent( } // @bug5769 Don't initialize extents or truncate db files on HDFS - if (idbdatafile::IDBPolicy::useHdfs()) + // MCOL-498 We don't need sequential segment files if a PM uses SSD either. + if (idbdatafile::IDBPolicy::useHdfs() || !idbdatafile::IDBPolicy::PreallocSpace()) { //@Bug 3219. update the compression header after the extent is expanded. if ((!bNewFile) && (m_compressionType) && (bExpandExtent)) @@ -1100,85 +1101,91 @@ int FileOp::initColumnExtent( Stats::stopParseEvent(WE_STATS_WAIT_TO_EXPAND_COL_EXTENT); else Stats::stopParseEvent(WE_STATS_WAIT_TO_CREATE_COL_EXTENT); - - Stats::startParseEvent(WE_STATS_INIT_COL_EXTENT); #endif + int savedErrno = 0; - // Try to fallocate the space - fallback to write if fallocate has failed + // MCOL-498 Try to preallocate the space, fallback to write if fallocate has failed if (!bOptExtension || pFile->fallocate(0, currFileSize, writeSize)) { + savedErrno = errno; + // Log the failed fallocate() call result + if ( bOptExtension ) + { + std::ostringstream oss; + std::string errnoMsg; + Convertor::mapErrnoToString(savedErrno, errnoMsg); + oss << "FileOp::initColumnExtent(): fallocate(" << currFileSize << + ", " << writeSize << "): errno = " << savedErrno << + ": " << errnoMsg; + logging::Message::Args args; + args.add(oss.str()); + SimpleSysLog::instance()->logMsg(args, logging::LOG_TYPE_INFO, + logging::M0006); + } + +#ifdef PROFILE + Stats::startParseEvent(WE_STATS_INIT_COL_EXTENT); +#endif // Allocate buffer, store it in scoped_array to insure it's deletion. // Create scope {...} to manage deletion of writeBuf. - // Save errno of the failed fallocate() to log it later. - savedErrno = errno; - unsigned char* writeBuf = new unsigned char[writeSize]; - boost::scoped_array writeBufPtr( writeBuf ); + { - setEmptyBuf( writeBuf, writeSize, emptyVal, width ); + unsigned char* writeBuf = new unsigned char[writeSize]; + boost::scoped_array writeBufPtr( writeBuf ); + + setEmptyBuf( writeBuf, writeSize, emptyVal, width ); + + #ifdef PROFILE + Stats::stopParseEvent(WE_STATS_INIT_COL_EXTENT); + + if (bExpandExtent) + Stats::startParseEvent(WE_STATS_EXPAND_COL_EXTENT); + else + Stats::startParseEvent(WE_STATS_CREATE_COL_EXTENT); + + #endif + + //std::ostringstream oss; + //oss << "initColExtent: width-" << width << + //"; loopCount-" << loopCount << + //"; writeSize-" << writeSize; + //std::cout << oss.str() << std::endl; + if (remWriteSize > 0) + { + if ( pFile->write( writeBuf, remWriteSize ) != remWriteSize ) + { + return ERR_FILE_WRITE; + } + } + + for (int j = 0; j < loopCount; j++) + { + if ( pFile->write( writeBuf, writeSize ) != writeSize ) + { + return ERR_FILE_WRITE; + } + } + } + + //@Bug 3219. update the compression header after the extent is expanded. + if ((!bNewFile) && (m_compressionType) && (bExpandExtent)) + { + updateColumnExtent(pFile, nBlocks); + } + + // @bug 2378. Synchronize here to avoid write buffer pile up too much, + // which could cause controllernode to timeout later when it needs to + // save a snapshot. + pFile->flush(); #ifdef PROFILE - Stats::stopParseEvent(WE_STATS_INIT_COL_EXTENT); - if (bExpandExtent) - Stats::startParseEvent(WE_STATS_EXPAND_COL_EXTENT); + Stats::stopParseEvent(WE_STATS_EXPAND_COL_EXTENT); else - Stats::startParseEvent(WE_STATS_CREATE_COL_EXTENT); - + Stats::stopParseEvent(WE_STATS_CREATE_COL_EXTENT); #endif - //std::ostringstream oss; - //oss << "initColExtent: width-" << width << - //"; loopCount-" << loopCount << - //"; writeSize-" << writeSize; - //std::cout << oss.str() << std::endl; - if (remWriteSize > 0) - { - if ( pFile->write( writeBuf, remWriteSize ) != remWriteSize ) - { - return ERR_FILE_WRITE; - } - } - - for (int j = 0; j < loopCount; j++) - { - if ( pFile->write( writeBuf, writeSize ) != writeSize ) - { - return ERR_FILE_WRITE; - } - } } - - //@Bug 3219. update the compression header after the extent is expanded. - if ((!bNewFile) && (m_compressionType) && (bExpandExtent)) - { - updateColumnExtent(pFile, nBlocks); - } - - // @bug 2378. Synchronize here to avoid write buffer pile up too much, - // which could cause controllernode to timeout later when it needs to - // save a snapshot. - pFile->flush(); - -#ifdef PROFILE - - if (bExpandExtent) - Stats::stopParseEvent(WE_STATS_EXPAND_COL_EXTENT); - else - Stats::stopParseEvent(WE_STATS_CREATE_COL_EXTENT); - -#endif - // Log the fallocate() call result - std::ostringstream oss; - std::string errnoMsg; - Convertor::mapErrnoToString(savedErrno, errnoMsg); - oss << "FileOp::initColumnExtent(): fallocate(" << currFileSize << - ", " << writeSize << "): errno = " << savedErrno << - ": " << errnoMsg; - logging::Message::Args args; - args.add(oss.str()); - SimpleSysLog::instance()->logMsg(args, logging::LOG_TYPE_INFO, - logging::M0006); - } return NO_ERROR; @@ -1794,6 +1801,7 @@ int FileOp::writeHeaders(IDBDataFile* pFile, const char* controlHdr, * blockHdrInit(in) - data used to initialize each block * blockHdrInitSize(in) - number of bytes in blockHdrInit * bExpandExtent (in) - Expand existing extent, or initialize a new one + * bOptExtension(in) - use fallocate() to extend the file if it is possible. * RETURN: * returns ERR_FILE_WRITE if an error occurs, * else returns NO_ERROR. @@ -1804,10 +1812,13 @@ int FileOp::initDctnryExtent( int nBlocks, unsigned char* blockHdrInit, int blockHdrInitSize, - bool bExpandExtent ) + bool bExpandExtent, + bool bOptExtension ) { + off64_t currFileSize = pFile->size(); // @bug5769 Don't initialize extents or truncate db files on HDFS - if (idbdatafile::IDBPolicy::useHdfs()) + // MCOL-498 We don't need sequential segment files if a PM uses SSD either. + if (idbdatafile::IDBPolicy::useHdfs() || !idbdatafile::IDBPolicy::PreallocSpace()) { if (m_compressionType) updateDctnryExtent(pFile, nBlocks); @@ -1841,86 +1852,107 @@ int FileOp::initDctnryExtent( // Allocate a buffer, initialize it, and use it to create the extent idbassert(dbRoot > 0); -#ifdef PROFILE +#ifdef PROFILE if (bExpandExtent) Stats::startParseEvent(WE_STATS_WAIT_TO_EXPAND_DCT_EXTENT); else Stats::startParseEvent(WE_STATS_WAIT_TO_CREATE_DCT_EXTENT); - #endif - boost::mutex::scoped_lock lk(*m_DbRootAddExtentMutexes[dbRoot]); -#ifdef PROFILE + boost::mutex::scoped_lock lk(*m_DbRootAddExtentMutexes[dbRoot]); + +#ifdef PROFILE if (bExpandExtent) Stats::stopParseEvent(WE_STATS_WAIT_TO_EXPAND_DCT_EXTENT); else Stats::stopParseEvent(WE_STATS_WAIT_TO_CREATE_DCT_EXTENT); - - Stats::startParseEvent(WE_STATS_INIT_DCT_EXTENT); #endif - - // Allocate buffer, and store in scoped_array to insure it's deletion. - // Create scope {...} to manage deletion of writeBuf. + int savedErrno = 0; + // MCOL-498 Try to preallocate the space, fallback to write if fallocate + // has failed + if (!bOptExtension || pFile->fallocate(0, currFileSize, writeSize)) { - unsigned char* writeBuf = new unsigned char[writeSize]; - boost::scoped_array writeBufPtr( writeBuf ); - - memset(writeBuf, 0, writeSize); - - for (int i = 0; i < nBlocks; i++) + // Log the failed fallocate() call result + if ( bOptExtension ) { - memcpy( writeBuf + (i * BYTE_PER_BLOCK), - blockHdrInit, - blockHdrInitSize ); + std::ostringstream oss; + std::string errnoMsg; + Convertor::mapErrnoToString(savedErrno, errnoMsg); + oss << "FileOp::initColumnExtent(): fallocate(" << currFileSize << + ", " << writeSize << "): errno = " << savedErrno << + ": " << errnoMsg; + logging::Message::Args args; + args.add(oss.str()); + SimpleSysLog::instance()->logMsg(args, logging::LOG_TYPE_INFO, + logging::M0006); } + // Allocate buffer, and store in scoped_array to insure it's deletion. + // Create scope {...} to manage deletion of writeBuf. + { +#ifdef PROFILE + Stats::startParseEvent(WE_STATS_INIT_DCT_EXTENT); +#endif + + unsigned char* writeBuf = new unsigned char[writeSize]; + boost::scoped_array writeBufPtr( writeBuf ); + + memset(writeBuf, 0, writeSize); + + for (int i = 0; i < nBlocks; i++) + { + memcpy( writeBuf + (i * BYTE_PER_BLOCK), + blockHdrInit, + blockHdrInitSize ); + } + +#ifdef PROFILE + Stats::stopParseEvent(WE_STATS_INIT_DCT_EXTENT); + + if (bExpandExtent) + Stats::startParseEvent(WE_STATS_EXPAND_DCT_EXTENT); + else + Stats::startParseEvent(WE_STATS_CREATE_DCT_EXTENT); +#endif + + //std::ostringstream oss; + //oss << "initDctnryExtent: width-8(assumed)" << + //"; loopCount-" << loopCount << + //"; writeSize-" << writeSize; + //std::cout << oss.str() << std::endl; + if (remWriteSize > 0) + { + if (pFile->write( writeBuf, remWriteSize ) != remWriteSize) + { + return ERR_FILE_WRITE; + } + } + + for (int j = 0; j < loopCount; j++) + { + if (pFile->write( writeBuf, writeSize ) != writeSize) + { + return ERR_FILE_WRITE; + } + } + } + + if (m_compressionType) + updateDctnryExtent(pFile, nBlocks); + + // Synchronize to avoid write buffer pile up too much, which could cause + // controllernode to timeout later when it needs to save a snapshot. + pFile->flush(); #ifdef PROFILE - Stats::stopParseEvent(WE_STATS_INIT_DCT_EXTENT); if (bExpandExtent) - Stats::startParseEvent(WE_STATS_EXPAND_DCT_EXTENT); + Stats::stopParseEvent(WE_STATS_EXPAND_DCT_EXTENT); else - Stats::startParseEvent(WE_STATS_CREATE_DCT_EXTENT); - + Stats::stopParseEvent(WE_STATS_CREATE_DCT_EXTENT); #endif - - //std::ostringstream oss; - //oss << "initDctnryExtent: width-8(assumed)" << - //"; loopCount-" << loopCount << - //"; writeSize-" << writeSize; - //std::cout << oss.str() << std::endl; - if (remWriteSize > 0) - { - if (pFile->write( writeBuf, remWriteSize ) != remWriteSize) - { - return ERR_FILE_WRITE; - } - } - - for (int j = 0; j < loopCount; j++) - { - if (pFile->write( writeBuf, writeSize ) != writeSize) - { - return ERR_FILE_WRITE; - } - } } - if (m_compressionType) - updateDctnryExtent(pFile, nBlocks); - - // Synchronize to avoid write buffer pile up too much, which could cause - // controllernode to timeout later when it needs to save a snapshot. - pFile->flush(); -#ifdef PROFILE - - if (bExpandExtent) - Stats::stopParseEvent(WE_STATS_EXPAND_DCT_EXTENT); - else - Stats::stopParseEvent(WE_STATS_CREATE_DCT_EXTENT); - -#endif } return NO_ERROR; diff --git a/writeengine/shared/we_fileop.h b/writeengine/shared/we_fileop.h index 8a81e9815..67d926910 100644 --- a/writeengine/shared/we_fileop.h +++ b/writeengine/shared/we_fileop.h @@ -324,13 +324,15 @@ public: * @param blockHdrInit(in) - data used to initialize each block header * @param blockHdrInitSize(in) - number of bytes in blockHdrInit * @param bExpandExtent (in) - Expand existing extent, or initialize new one + * @param bOptExtension (in) - use fallocate() to extend the file if it is possible. */ EXPORT int initDctnryExtent( IDBDataFile* pFile, uint16_t dbRoot, int nBlocks, unsigned char* blockHdrInit, int blockHdrInitSize, - bool bExpandExtent ); + bool bExpandExtent, + bool bOptExtension = false ); /** * @brief Check whether it is an directory @@ -500,6 +502,7 @@ private: // bNewFile (in) - Adding extent to new file // bExpandExtent (in) - Expand existing extent, or initialize new one // bAbbrevExtent (in) - If adding new extent, is it abbreviated + // bOptExtension(in) - use fallocate() to extend the file if it is possible. int initColumnExtent( IDBDataFile* pFile, uint16_t dbRoot, int nBlocks,