diff --git a/writeengine/bulk/we_bulkload.h b/writeengine/bulk/we_bulkload.h index c4aecdbc2..4c1adab09 100644 --- a/writeengine/bulk/we_bulkload.h +++ b/writeengine/bulk/we_bulkload.h @@ -64,7 +64,7 @@ class BulkLoad : public FileOp public: /** - * @brief BulkLoad onstructor + * @brief BulkLoad constructor */ EXPORT BulkLoad(); diff --git a/writeengine/bulk/we_colbuf.cpp b/writeengine/bulk/we_colbuf.cpp index 1d164858c..4ade584ed 100644 --- a/writeengine/bulk/we_colbuf.cpp +++ b/writeengine/bulk/we_colbuf.cpp @@ -29,6 +29,7 @@ #include "we_columninfo.h" #include "we_log.h" #include "we_stats.h" +#include "we_blockop.h" #include #include "IDBDataFile.h" using namespace idbdatafile; @@ -101,15 +102,33 @@ void ColumnBuffer::resizeAndCopy(int newSize, int startOffset, int endOffset) //------------------------------------------------------------------------------ // Write data stored up in the output buffer to the segment column file. //------------------------------------------------------------------------------ -int ColumnBuffer::writeToFile(int startOffset, int writeSize) +int ColumnBuffer::writeToFile(int startOffset, int writeSize, bool fillUpWNulls) { if (writeSize == 0) // skip unnecessary write, if 0 bytes given return NO_ERROR; + unsigned char *newBuf = NULL; + + if ( fillUpWNulls ) + { + BlockOp blockOp; + //TO DO Use scoped_ptr here + newBuf = new unsigned char[BYTE_PER_BLOCK]; + uint64_t EmptyValue = blockOp.getEmptyRowValue(fColInfo->column.dataType, + fColInfo->column.width); + ::memcpy(static_cast(newBuf), + static_cast(fBuffer + startOffset), writeSize); + blockOp.setEmptyBuf(newBuf + writeSize, BYTE_PER_BLOCK - writeSize, + EmptyValue, fColInfo->column.width); + } #ifdef PROFILE Stats::startParseEvent(WE_STATS_WRITE_COL); #endif - size_t nitems = fFile->write(fBuffer + startOffset, writeSize) / writeSize; + size_t nitems; + if ( fillUpWNulls ) + nitems = fFile->write(newBuf, BYTE_PER_BLOCK) / BYTE_PER_BLOCK; + else + nitems = fFile->write(fBuffer + startOffset, writeSize) / writeSize; if (nitems != 1) return ERR_FILE_WRITE; @@ -118,6 +137,8 @@ int ColumnBuffer::writeToFile(int startOffset, int writeSize) Stats::stopParseEvent(WE_STATS_WRITE_COL); #endif + //TO DO Use scoped_ptr here + delete newBuf; return NO_ERROR; } diff --git a/writeengine/bulk/we_colbuf.h b/writeengine/bulk/we_colbuf.h index d5f3af570..baf11b6bb 100644 --- a/writeengine/bulk/we_colbuf.h +++ b/writeengine/bulk/we_colbuf.h @@ -107,8 +107,11 @@ public: * * @param startOffset The buffer offset from where the write should begin * @param writeSize The number of bytes to be written to the file + * @param fillUpWNulls The flag to fill the buffer with NULLs up to + * the block boundary. */ - virtual int writeToFile(int startOffset, int writeSize); + virtual int writeToFile(int startOffset, int writeSize, + bool fillUpWNulls = false); protected: diff --git a/writeengine/bulk/we_colbufcompressed.cpp b/writeengine/bulk/we_colbufcompressed.cpp index 9abc0038a..019fe6e46 100644 --- a/writeengine/bulk/we_colbufcompressed.cpp +++ b/writeengine/bulk/we_colbufcompressed.cpp @@ -167,7 +167,8 @@ int ColumnBufferCompressed::resetToBeCompressedColBuf( // file, and instead buffer up the data to be compressed in 4M chunks before // writing it out. //------------------------------------------------------------------------------ -int ColumnBufferCompressed::writeToFile(int startOffset, int writeSize) +int ColumnBufferCompressed::writeToFile(int startOffset, int writeSize, + bool fillUpWNulls) { if (writeSize == 0) // skip unnecessary write, if 0 bytes given return NO_ERROR; diff --git a/writeengine/bulk/we_colbufcompressed.h b/writeengine/bulk/we_colbufcompressed.h index 6ea70339c..335a8f2ba 100644 --- a/writeengine/bulk/we_colbufcompressed.h +++ b/writeengine/bulk/we_colbufcompressed.h @@ -83,8 +83,11 @@ public: * * @param startOffset The buffer offset from where the write should begin * @param writeSize The number of bytes to be written to the file + * @param fillUpWNulls The flag to fill the buffer with NULLs up to + * the block boundary. */ - virtual int writeToFile(int startOffset, int writeSize); + virtual int writeToFile(int startOffset, int writeSize, + bool fillUpWNulls = false); private: diff --git a/writeengine/bulk/we_colbufmgr.cpp b/writeengine/bulk/we_colbufmgr.cpp index 123847260..14ba19402 100644 --- a/writeengine/bulk/we_colbufmgr.cpp +++ b/writeengine/bulk/we_colbufmgr.cpp @@ -529,7 +529,7 @@ int ColumnBufferManager::writeToFile(int endOffset) // internal buffer, or if an abbreviated extent is expanded. //------------------------------------------------------------------------------ int ColumnBufferManager::writeToFileExtentCheck( - uint32_t startOffset, uint32_t writeSize) + uint32_t startOffset, uint32_t writeSize, bool fillUpWNulls) { if (fLog->isDebug( DEBUG_3 )) @@ -571,7 +571,7 @@ int ColumnBufferManager::writeToFileExtentCheck( if (availableFileSize >= writeSize) { - int rc = fCBuf->writeToFile(startOffset, writeSize); + int rc = fCBuf->writeToFile(startOffset, writeSize, fillUpWNulls); if (rc != NO_ERROR) { @@ -583,6 +583,10 @@ int ColumnBufferManager::writeToFileExtentCheck( return rc; } + // MCOL-498 Fill it up to the block size boundary. + if ( fillUpWNulls ) + writeSize = BLOCK_SIZE; + fColInfo->updateBytesWrittenCounts( writeSize ); } else @@ -624,7 +628,7 @@ int ColumnBufferManager::writeToFileExtentCheck( } int writeSize2 = writeSize - writeSize1; - fCBuf->writeToFile(startOffset + writeSize1, writeSize2); + rc = fCBuf->writeToFile(startOffset + writeSize1, writeSize2, fillUpWNulls); if (rc != NO_ERROR) { @@ -636,6 +640,10 @@ int ColumnBufferManager::writeToFileExtentCheck( return rc; } + // MCOL-498 Fill it up to the block size boundary. + if ( fillUpWNulls ) + writeSize2 = BLOCK_SIZE; + fColInfo->updateBytesWrittenCounts( writeSize2 ); } @@ -668,17 +676,21 @@ int ColumnBufferManager::flush( ) int bufferSize = fCBuf->getSize(); + // MCOL-498 There are less the BLOCK_SIZE bytes in the buffer left, so // Account for circular buffer by making 2 calls to write the data, // if we are wrapping around at the end of the buffer. if (fBufFreeOffset < fBufWriteOffset) { - RETURN_ON_ERROR( writeToFileExtentCheck( - fBufWriteOffset, bufferSize - fBufWriteOffset) ); + // The check could be redundant. + bool fillUpWEmpty = ( static_cast(bufferSize - fBufWriteOffset) >= BLOCK_SIZE ) + ? false : true; + RETURN_ON_ERROR( writeToFileExtentCheck( fBufWriteOffset, + bufferSize - fBufWriteOffset, fillUpWEmpty) ); fBufWriteOffset = 0; } - + // fill the buffer up with NULLs. RETURN_ON_ERROR( writeToFileExtentCheck( - fBufWriteOffset, fBufFreeOffset - fBufWriteOffset) ); + fBufWriteOffset, fBufFreeOffset - fBufWriteOffset, true) ); fBufWriteOffset = fBufFreeOffset; return NO_ERROR; diff --git a/writeengine/bulk/we_colbufmgr.h b/writeengine/bulk/we_colbufmgr.h index 8ec86fabe..35e837ec1 100644 --- a/writeengine/bulk/we_colbufmgr.h +++ b/writeengine/bulk/we_colbufmgr.h @@ -193,9 +193,12 @@ protected: * write out the buffer. * @param startOffset The buffer offset where the write should begin * @param writeSize The number of bytes to be written to the file + * @param fillUpWNulls The flag to fill the buffer with NULLs up to + * the block boundary. * @return success or fail status */ - virtual int writeToFileExtentCheck(uint32_t startOffset, uint32_t writeSize); + virtual int writeToFileExtentCheck(uint32_t startOffset, uint32_t writeSize, + bool fillUpWNulls = false); //------------------------------------------------------------------------- // Protected Data Members