diff --git a/tests/shared_components_tests.cpp b/tests/shared_components_tests.cpp index c84105a47..d747ee569 100644 --- a/tests/shared_components_tests.cpp +++ b/tests/shared_components_tests.cpp @@ -404,6 +404,7 @@ public: nBlocks, // number of blocks emptyVal, // NULL value width, // width + execplan::CalpontSystemCatalog::BIGINT, 1 ); // dbroot CPPUNIT_ASSERT( rc == NO_ERROR ); @@ -987,6 +988,7 @@ public: nBlocks, // number of blocks emptyVal, // NULL value width, // width + execplan::CalpontSystemCatalog::BIGINT, dbRoot ); // dbroot CPPUNIT_ASSERT( rc == NO_ERROR ); @@ -1011,6 +1013,7 @@ public: BYTE_PER_BLOCK, // number of blocks emptyVal, width, + execplan::CalpontSystemCatalog::BIGINT, false, // use existing file true, // expand the extent false, // add full (not abbreviated) extent @@ -1031,6 +1034,7 @@ public: nBlocks, // number of blocks emptyVal, // NULL value width, // width + execplan::CalpontSystemCatalog::BIGINT, dbRoot ); // dbroot CPPUNIT_ASSERT( rc == NO_ERROR ); @@ -1054,6 +1058,7 @@ public: BYTE_PER_BLOCK, // number of blocks emptyVal, width, + execplan::CalpontSystemCatalog::BIGINT, false, // use existing file true, // expand the extent false, // add full (not abbreviated) extent diff --git a/utils/compress/idbcompress.cpp b/utils/compress/idbcompress.cpp index c5be0f09e..ae8e04b4d 100644 --- a/utils/compress/idbcompress.cpp +++ b/utils/compress/idbcompress.cpp @@ -69,6 +69,8 @@ struct CompressedDBFileHeader uint64_t fCompressionType; uint64_t fHeaderSize; uint64_t fBlockCount; + uint64_t fColumnWidth; + execplan::CalpontSystemCatalog::ColDataType fColDataType; }; // Make the header to be 4K, regardless number of fields being defined/used in header. @@ -86,6 +88,23 @@ void initCompressedDBFileHeader(void* hdrBuf, int compressionType, int hdrSize) hdr->fHeader.fCompressionType = compressionType; hdr->fHeader.fBlockCount = 0; hdr->fHeader.fHeaderSize = hdrSize; + hdr->fHeader.fColumnWidth = 0; + hdr->fHeader.fColDataType = execplan::CalpontSystemCatalog::ColDataType::UNDEFINED; +} + +void initCompressedDBFileHeader( + void* hdrBuf, uint32_t columnWidth, + execplan::CalpontSystemCatalog::ColDataType colDataType, + int compressionType, int hdrSize) +{ + CompressedDBFileHeaderBlock* hdr = reinterpret_cast(hdrBuf); + hdr->fHeader.fMagicNumber = MAGIC_NUMBER; + hdr->fHeader.fVersionNum = VERSION_NUM2; + hdr->fHeader.fCompressionType = compressionType; + hdr->fHeader.fBlockCount = 0; + hdr->fHeader.fHeaderSize = hdrSize; + hdr->fHeader.fColumnWidth = columnWidth; + hdr->fHeader.fColDataType = colDataType; } } // namespace @@ -351,6 +370,19 @@ void IDBCompressInterface::initHdr(void* hdrBuf, void* ptrBuf, int compressionTy initCompressedDBFileHeader(hdrBuf, compressionType, hdrSize); } +//------------------------------------------------------------------------------ +// Initialize the header blocks to be written at the start of a column file. +//------------------------------------------------------------------------------ +void IDBCompressInterface::initHdr( + void* hdrBuf, uint32_t columnWidth, + execplan::CalpontSystemCatalog::ColDataType columnType, + int compressionType) const +{ + memset(hdrBuf, 0, HDR_BUF_LEN * 2); + initCompressedDBFileHeader(hdrBuf, columnWidth, columnType, + compressionType, HDR_BUF_LEN * 2); +} + //------------------------------------------------------------------------------ // Set the file's block count //------------------------------------------------------------------------------ diff --git a/utils/compress/idbcompress.h b/utils/compress/idbcompress.h index e81151611..523ca0a68 100644 --- a/utils/compress/idbcompress.h +++ b/utils/compress/idbcompress.h @@ -27,6 +27,8 @@ #include #include +#include "calpontsystemcatalog.h" + #if defined(_MSC_VER) && defined(xxxIDBCOMP_DLLEXPORT) #define EXPORT __declspec(dllexport) #else @@ -114,6 +116,15 @@ public: */ EXPORT void initHdr(void* hdrBuf, void* ptrBuf, int compressionType, int hdrSize) const; + /** + * Initialize header buffer at start of compressed db file. + * + * @warning hdrBuf must be at least HDR_BUF_LEN*2 bytes + */ + EXPORT void initHdr(void* hdrBuf, uint32_t columnWidth, + execplan::CalpontSystemCatalog::ColDataType columnType, + int compressionType) const; + /** * Verify the passed in buffer contains a compressed db file header. */ @@ -277,6 +288,7 @@ inline int IDBCompressInterface::uncompress(const char* in, size_t inLen, char* } inline void IDBCompressInterface::initHdr(void*, int) const {} inline void IDBCompressInterface::initHdr(void*, void*, int, int) const {} +inline void initHdr(void*, uint32_t, execplan::CalpontSystemCatalog::ColDataType, int) const {} inline int IDBCompressInterface::verifyHdr(const void*) const { return -1; diff --git a/writeengine/bulk/we_colbufcompressed.cpp b/writeengine/bulk/we_colbufcompressed.cpp index 43534dc75..9e1a4f949 100644 --- a/writeengine/bulk/we_colbufcompressed.cpp +++ b/writeengine/bulk/we_colbufcompressed.cpp @@ -582,7 +582,9 @@ int ColumnBufferCompressed::saveCompressionHeaders( ) { // Construct the header records char hdrBuf[IDBCompressInterface::HDR_BUF_LEN * 2]; - fCompressor->initHdr( hdrBuf, fColInfo->column.compressionType ); + fCompressor->initHdr(hdrBuf, fColInfo->column.width, + fColInfo->column.dataType, + fColInfo->column.compressionType); fCompressor->setBlockCount(hdrBuf, (fColInfo->getFileSize() / BYTE_PER_BLOCK) ); diff --git a/writeengine/bulk/we_columninfo.cpp b/writeengine/bulk/we_columninfo.cpp index 7b2cd5663..f4a157a76 100644 --- a/writeengine/bulk/we_columninfo.cpp +++ b/writeengine/bulk/we_columninfo.cpp @@ -897,7 +897,8 @@ int ColumnInfo::extendColumnOldExtent( } rc = colOp->expandAbbrevColumnExtent( pFile, dbRootNext, - column.emptyVal, column.width); + column.emptyVal, column.width, + column.dataType ); if (rc != NO_ERROR) { diff --git a/writeengine/bulk/we_columninfocompressed.cpp b/writeengine/bulk/we_columninfocompressed.cpp index 25af8bc22..e30e74382 100644 --- a/writeengine/bulk/we_columninfocompressed.cpp +++ b/writeengine/bulk/we_columninfocompressed.cpp @@ -544,6 +544,7 @@ int ColumnInfoCompressed::extendColumnOldExtent( curCol.dataFile.fDbRoot, curCol.dataFile.fPartition, curCol.dataFile.fSegment, + curCol.colDataType, curCol.dataFile.hwm, segFileName, errTask); diff --git a/writeengine/shared/we_fileop.cpp b/writeengine/shared/we_fileop.cpp index 7bfe9a32e..26acc7612 100644 --- a/writeengine/shared/we_fileop.cpp +++ b/writeengine/shared/we_fileop.cpp @@ -162,6 +162,7 @@ int FileOp::createDir( const char* dirName, mode_t mode ) const ***********************************************************/ int FileOp::createFile( const char* fileName, int numOfBlock, const uint8_t* emptyVal, int width, + execplan::CalpontSystemCatalog::ColDataType colDataType, uint16_t dbRoot ) { IDBDataFile* pFile = @@ -183,7 +184,8 @@ int FileOp::createFile( const char* fileName, int numOfBlock, dbRoot, numOfBlock, emptyVal, - width ); + width, + colDataType ); } else { @@ -192,6 +194,7 @@ int FileOp::createFile( const char* fileName, int numOfBlock, numOfBlock, emptyVal, width, + colDataType, true, // new file false, // don't expand; add new extent true ); // add abbreviated extent @@ -281,7 +284,7 @@ int FileOp::createFile(FID fid, //timer.stop( "allocateColExtent" ); - return createFile( fileName, totalSize, emptyVal, width, dbRoot ); + return createFile( fileName, totalSize, emptyVal, width, colDataType, dbRoot ); } /*********************************************************** @@ -571,6 +574,7 @@ int FileOp::extendFile( OID oid, const uint8_t* emptyVal, int width, + execplan::CalpontSystemCatalog::ColDataType colDataType, HWM hwm, BRM::LBID_t startLbid, int allocSize, @@ -687,15 +691,9 @@ int FileOp::extendFile( pFile = NULL; string failedTask; // For return error message, if any. - rc = FileOp::fillCompColumnExtentEmptyChunks(oid, - width, - emptyVal, - dbRoot, - partition, - segment, - hwm, - segFile, - failedTask); + rc = FileOp::fillCompColumnExtentEmptyChunks( + oid, width, emptyVal, dbRoot, partition, segment, + colDataType, hwm, segFile, failedTask); if (rc != NO_ERROR) { @@ -756,7 +754,8 @@ int FileOp::extendFile( // This generally won't ever happen, as uncompressed files // are created with full extents. rc = FileOp::expandAbbrevColumnExtent( pFile, dbRoot, - emptyVal, width); + emptyVal, width, + colDataType ); if (rc != NO_ERROR) { @@ -815,7 +814,7 @@ int FileOp::extendFile( if ((m_compressionType) && (hdrs)) { IDBCompressInterface compressor; - compressor.initHdr(hdrs, m_compressionType); + compressor.initHdr(hdrs, width, colDataType, m_compressionType); } } @@ -846,6 +845,7 @@ int FileOp::extendFile( allocSize, emptyVal, width, + colDataType, newFile, // new or existing file false, // don't expand; new extent false, // add full (not abbreviated) extent @@ -972,7 +972,7 @@ int FileOp::addExtentExactFile( if ((m_compressionType) && (hdrs)) { IDBCompressInterface compressor; - compressor.initHdr(hdrs, m_compressionType); + compressor.initHdr(hdrs, width, colDataType, m_compressionType); } } @@ -1004,6 +1004,7 @@ int FileOp::addExtentExactFile( allocSize, emptyVal, width, + colDataType, newFile, // new or existing file false, // don't expand; new extent false ); // add full (not abbreviated) extent @@ -1047,6 +1048,7 @@ int FileOp::initColumnExtent( int nBlocks, const uint8_t* emptyVal, int width, + execplan::CalpontSystemCatalog::ColDataType colDataType, bool bNewFile, bool bExpandExtent, bool bAbbrevExtent, @@ -1056,7 +1058,7 @@ int FileOp::initColumnExtent( { char hdrs[IDBCompressInterface::HDR_BUF_LEN * 2]; IDBCompressInterface compressor; - compressor.initHdr(hdrs, m_compressionType); + compressor.initHdr(hdrs, width, colDataType, m_compressionType); if (bAbbrevExtent) compressor.setBlockCount(hdrs, nBlocks); @@ -1226,7 +1228,8 @@ int FileOp::initAbbrevCompColumnExtent( uint16_t dbRoot, int nBlocks, const uint8_t* emptyVal, - int width) + int width, + execplan::CalpontSystemCatalog::ColDataType colDataType) { // Reserve disk space for optimized abbreviated extent int rc = initColumnExtent( pFile, @@ -1234,6 +1237,7 @@ int FileOp::initAbbrevCompColumnExtent( nBlocks, emptyVal, width, + colDataType, true, // new file false, // don't expand; add new extent true, // add abbreviated extent @@ -1253,6 +1257,7 @@ int FileOp::initAbbrevCompColumnExtent( INITIAL_EXTENT_ROWS_TO_DISK, emptyVal, width, + colDataType, hdrs ); if (rc != NO_ERROR) @@ -1287,6 +1292,7 @@ int FileOp::writeInitialCompColumnChunk( int nRows, const uint8_t* emptyVal, int width, + execplan::CalpontSystemCatalog::ColDataType colDataType, char* hdrs) { const int INPUT_BUFFER_SIZE = nRows * width; @@ -1328,7 +1334,7 @@ int FileOp::writeInitialCompColumnChunk( // "; blkAllocCnt: " << nBlocksAllocated << // "; compressedByteCnt: " << outputLen << std::endl; - compressor.initHdr(hdrs, m_compressionType); + compressor.initHdr(hdrs, width, colDataType, m_compressionType); compressor.setBlockCount(hdrs, nBlocksAllocated); // Store compression pointers in the header @@ -1352,15 +1358,16 @@ int FileOp::writeInitialCompColumnChunk( * DESCRIPTION: * Fill specified compressed extent with empty value chunks. * PARAMETERS: - * oid - OID for relevant column - * colWidth - width in bytes of this column - * emptyVal - empty value to be used in filling empty chunks - * dbRoot - DBRoot of extent to be filled - * partition - partition of extent to be filled - * segment - segment file number of extent to be filled - * hwm - proposed new HWM of filled in extent - * segFile - (out) name of updated segment file - * failedTask - (out) if error occurs, this is the task that failed + * oid - OID for relevant column + * colWidth - width in bytes of this column + * emptyVal - empty value to be used in filling empty chunks + * dbRoot - DBRoot of extent to be filled + * partition - partition of extent to be filled + * segment - segment file number of extent to be filled + * colDataType - Column data type + * hwm - proposed new HWM of filled in extent + * segFile - (out) name of updated segment file + * failedTask - (out) if error occurs, this is the task that failed * RETURN: * returns NO_ERROR if success. ***********************************************************/ @@ -1370,6 +1377,7 @@ int FileOp::fillCompColumnExtentEmptyChunks(OID oid, uint16_t dbRoot, uint32_t partition, uint16_t segment, + execplan::CalpontSystemCatalog::ColDataType colDataType, HWM hwm, std::string& segFile, std::string& failedTask) @@ -1459,7 +1467,8 @@ int FileOp::fillCompColumnExtentEmptyChunks(OID oid, } off64_t endHdrsOffset = pFile->tell(); - rc = expandAbbrevColumnExtent( pFile, dbRoot, emptyVal, colWidth ); + rc = expandAbbrevColumnExtent(pFile, dbRoot, emptyVal, colWidth, + colDataType); if (rc != NO_ERROR) { @@ -2846,7 +2855,8 @@ int FileOp::expandAbbrevColumnExtent( IDBDataFile* pFile, // FILE ptr to file where abbrev extent is to be expanded uint16_t dbRoot, // The DBRoot of the file with the abbreviated extent const uint8_t* emptyVal,// Empty value to be used in expanding the extent - int width ) // Width of the column (in bytes) + int width, // Width of the column (in bytes) + execplan::CalpontSystemCatalog::ColDataType colDataType) // Column data type. { // Based on extent size, see how many blocks to add to fill the extent int blksToAdd = ( ((int)BRMWrapper::getInstance()->getExtentRows() - @@ -2862,11 +2872,12 @@ int FileOp::expandAbbrevColumnExtent( } // Add blocks to turn the abbreviated extent into a full extent. - int rc = FileOp::initColumnExtent(pFile, dbRoot, blksToAdd, emptyVal, width, - false, // existing file - true, // expand existing extent - false, // n/a since not adding new extent - true); // optimize segment file extension + int rc = FileOp::initColumnExtent(pFile, dbRoot, blksToAdd, emptyVal, + width, colDataType, + false, // existing file + true, // expand existing extent + false, // n/a since not adding new extent + true); // optimize segment file extension return rc; } diff --git a/writeengine/shared/we_fileop.h b/writeengine/shared/we_fileop.h index b7f908a14..9557a5949 100644 --- a/writeengine/shared/we_fileop.h +++ b/writeengine/shared/we_fileop.h @@ -43,6 +43,7 @@ #include "we_config.h" #include "we_stats.h" #include "idbcompress.h" +#include "calpontsystemcatalog.h" #if defined(_MSC_VER) && defined(WRITEENGINE_DLLEXPORT) #define EXPORT __declspec(dllexport) @@ -101,6 +102,7 @@ public: */ int createFile( const char* fileName, int fileSize, const uint8_t* emptyVal, int width, + execplan::CalpontSystemCatalog::ColDataType colDataType, uint16_t dbRoot ); /** @@ -159,12 +161,14 @@ public: * @param dbRoot DBRoot of the file being updated. * @param emptyVal Empty value used in initializing extents for this column * @param width Width of this column (in bytes) + * @param colDataType Column data type. */ EXPORT virtual int expandAbbrevColumnExtent( IDBDataFile* pFile, uint16_t dbRoot, const uint8_t* emptyVal, - int width ); + int width, + execplan::CalpontSystemCatalog::ColDataType colDataType); /** * @brief Add an extent to the specified Column OID and DBRoot. @@ -200,6 +204,7 @@ public: */ EXPORT int extendFile(OID oid, const uint8_t* emptyVal, int width, + execplan::CalpontSystemCatalog::ColDataType colDataType, HWM hwm, BRM::LBID_t startLbid, int allocSize, @@ -246,6 +251,7 @@ public: * @param dbRoot DBRoot of the extent to be filled * @param partition Partition of the extent to be filled * @param segment Segment file number of the extent to be filled + * @param colDataType Column data type * @param hwm New HWM blk setting for the segment file after extent is padded * @param segFile (out) Name of updated segment file * @param errTask (out) Task that failed if error occurs @@ -257,6 +263,7 @@ public: uint16_t dbRoot, uint32_t partition, uint16_t segment, + execplan::CalpontSystemCatalog::ColDataType colDataType, HWM hwm, std::string& segFile, std::string& errTask); @@ -499,6 +506,7 @@ public: int nBlocks, const uint8_t* emptyVal, int width, + execplan::CalpontSystemCatalog::ColDataType colDataType, bool bNewFile, bool bExpandExtent, bool bAbbrevExtent, @@ -524,21 +532,18 @@ private: const compress::CompChunkPtr& chunkInPtr, compress::CompChunkPtr& chunkOutPt); - int initAbbrevCompColumnExtent( IDBDataFile* pFile, - uint16_t dbRoot, - int nBlocks, - const uint8_t* emptyVal, - int width); + int initAbbrevCompColumnExtent( + IDBDataFile* pFile, uint16_t dbRoot, int nBlocks, + const uint8_t* emptyVal, int width, + execplan::CalpontSystemCatalog::ColDataType colDataType); static void initDbRootExtentMutexes(); static void removeDbRootExtentMutexes(); - int writeInitialCompColumnChunk( IDBDataFile* pFile, - int nBlocksAllocated, - int nRows, - const uint8_t* emptyVal, - int width, - char* hdrs); + int writeInitialCompColumnChunk( + IDBDataFile* pFile, int nBlocksAllocated, int nRows, + const uint8_t* emptyVal, int width, + execplan::CalpontSystemCatalog::ColDataType colDataType, char* hdrs); TxnID m_transId; bool m_isBulk; diff --git a/writeengine/wrapper/we_colop.cpp b/writeengine/wrapper/we_colop.cpp index 9ddb0d795..13511582d 100644 --- a/writeengine/wrapper/we_colop.cpp +++ b/writeengine/wrapper/we_colop.cpp @@ -286,8 +286,12 @@ int ColumnOp::allocRowId(const TxnID& txnid, bool useStartingExtent, if (newColStructList[i].fCompressionType > 0) { string errorInfo; - rc = fileOp.fillCompColumnExtentEmptyChunks(newColStructList[i].dataOid, newColStructList[i].colWidth, - emptyVal, dbRoot, partition, segment, newHwm, segFile, errorInfo); + rc = fileOp.fillCompColumnExtentEmptyChunks( + newColStructList[i].dataOid, + newColStructList[i].colWidth, emptyVal, dbRoot, + partition, segment, + newColStructList[i].colDataType, newHwm, + segFile, errorInfo); if (rc != NO_ERROR) return rc; @@ -310,7 +314,11 @@ int ColumnOp::allocRowId(const TxnID& txnid, bool useStartingExtent, return rc; } - rc = fileOp.expandAbbrevColumnExtent( pFile, dbRoot, emptyVal, newColStructList[i].colWidth); + rc = fileOp.expandAbbrevColumnExtent( + pFile, dbRoot, emptyVal, + newColStructList[i].colWidth, + newColStructList[i].colDataType); + //set hwm for this extent. fileOp.closeFile(pFile); @@ -1330,6 +1338,7 @@ int ColumnOp::extendColumn( int rc = extendFile(column.dataFile.fid, emptyVal, column.colWidth, + column.colDataType, hwm, startLbid, allocSize, @@ -1401,7 +1410,8 @@ int ColumnOp::expandAbbrevExtent(const Column& column) int rc = expandAbbrevColumnExtent(column.dataFile.pFile, column.dataFile.fDbRoot, emptyVal, - column.colWidth); + column.colWidth, + column.colDataType); return rc; } diff --git a/writeengine/wrapper/we_colopcompress.cpp b/writeengine/wrapper/we_colopcompress.cpp index 4337c6026..dc4a2d473 100644 --- a/writeengine/wrapper/we_colopcompress.cpp +++ b/writeengine/wrapper/we_colopcompress.cpp @@ -191,7 +191,8 @@ int ColumnOpCompress1::flushFile(int rc, std::map& columnOids) int ColumnOpCompress1::expandAbbrevColumnExtent( - IDBDataFile* pFile, uint16_t dbRoot, const uint8_t* emptyVal, int width) + IDBDataFile* pFile, uint16_t dbRoot, const uint8_t* emptyVal, int width, + execplan::CalpontSystemCatalog::ColDataType colDataType ) { // update the uncompressed initial chunk to full chunk int rc = m_chunkManager->expandAbbrevColumnExtent(pFile, emptyVal, width); @@ -204,7 +205,8 @@ int ColumnOpCompress1::expandAbbrevColumnExtent( } // let the base to physically expand extent. - return FileOp::expandAbbrevColumnExtent(pFile, dbRoot, emptyVal, width); + return FileOp::expandAbbrevColumnExtent(pFile, dbRoot, emptyVal, width, + colDataType); } diff --git a/writeengine/wrapper/we_colopcompress.h b/writeengine/wrapper/we_colopcompress.h index 0b865fb1b..b611a2f58 100644 --- a/writeengine/wrapper/we_colopcompress.h +++ b/writeengine/wrapper/we_colopcompress.h @@ -27,6 +27,7 @@ #include "we_colop.h" #include "we_chunkmanager.h" +#include "calpontsystemcatalog.h" #if defined(_MSC_VER) && defined(WRITEENGINE_DLLEXPORT) #define EXPORT __declspec(dllexport) @@ -111,7 +112,9 @@ public: /** * @brief virtual method in FileOp */ - int expandAbbrevColumnExtent(IDBDataFile* pFile, uint16_t dbRoot, const uint8_t* emptyVal, int width); + int expandAbbrevColumnExtent( + IDBDataFile* pFile, uint16_t dbRoot, const uint8_t* emptyVal, + int width, execplan::CalpontSystemCatalog::ColDataType colDataType); /** * @brief virtual method in ColumnOp