/* Copyright (C) 2014 InfiniDB, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ /******************************************************************************* * $Id: we_dctnry.cpp 4737 2013-08-14 20:45:46Z bwilkinson $ * *******************************************************************************/ /** @we_dctnry.cpp * When a signature is given, the value will be stored in dictionary and * a token will be issued. Given a token, the signature in the dictionary * can be deleted. * The whole file contains only one class Dctnry */ #include #include #include #include #include #include #include #include using namespace std; #include "bytestream.h" #include "brmtypes.h" #include "extentmap.h" // for DICT_COL_WIDTH #include "we_stats.h" #include "we_log.h" #include "we_dctnry.h" using namespace messageqcpp; using namespace WriteEngine; using namespace BRM; #include "IDBDataFile.h" #include "IDBPolicy.h" #include "cacheutils.h" #include using namespace idbdatafile; #include "checks.h" #include "utils_utf8.h" // for utf8_truncate_point() namespace { using BinaryArraySharedPtr = std::shared_ptr; using FixedSizeBinaryArraySharedPtr = std::shared_ptr; // These used to be member variables, hence the "m_" prefix. But they are // all constants, so I removed them as member variables. May change the // variable name later (to remove the m_ prefix) as time allows. const uint16_t m_endHeader = DCTNRY_END_HEADER; // end of header flag (0xffff) const uint16_t m_offSetZero = BYTE_PER_BLOCK; // value for 0 offset (8192) const int m_totalHdrBytes = // # bytes in header HDR_UNIT_SIZE + NEXT_PTR_BYTES + HDR_UNIT_SIZE + HDR_UNIT_SIZE; const int START_HDR1 = // start loc of 2nd offset (HDR1) HDR_UNIT_SIZE + NEXT_PTR_BYTES + HDR_UNIT_SIZE; const int PSEUDO_COL_WIDTH = DICT_COL_WIDTH; // used to convert row count to block count const int MAX_BLOB_SIZE = 16777215; // MCOL-4758 limit TEXT and BLOB to 16MB } // namespace namespace WriteEngine { // We will make this a constant for now. If we ever decide to make // INITIAL_EXTENT_ROWS_TO_DISK configurable, we will need to move this // statement, and use Config class to get INITIAL_EXTENT_ROWS_TO_DISK. int NUM_BLOCKS_PER_INITIAL_EXTENT = ((INITIAL_EXTENT_ROWS_TO_DISK / BYTE_PER_BLOCK) * PSEUDO_COL_WIDTH); /******************************************************************************* * Description: * Dctnry constructor ******************************************************************************/ Dctnry::Dctnry() : m_nextPtr(NOT_USED_PTR) , m_partition(0) , m_segment(0) , m_dbRoot(1) , m_numBlocks(0) , m_lastFbo(0) , m_hwm(0) , m_newStartOffset(0) , m_freeSpace(0) , m_curOp(0) , m_colWidth(0) , m_importDataMode(IMPORT_DATA_TEXT) { memset(m_dctnryHeader, 0, sizeof(m_dctnryHeader)); memset(m_curBlock.data, 0, sizeof(m_curBlock.data)); m_curBlock.lbid = INVALID_LBID; // add all initial header sizes for an empty block m_freeSpace = BYTE_PER_BLOCK - m_totalHdrBytes; memcpy(m_dctnryHeader2, &m_freeSpace, HDR_UNIT_SIZE); memcpy(m_dctnryHeader2 + HDR_UNIT_SIZE, &m_nextPtr, NEXT_PTR_BYTES); memcpy(m_dctnryHeader2 + HDR_UNIT_SIZE + NEXT_PTR_BYTES, &m_offSetZero, HDR_UNIT_SIZE); memcpy(m_dctnryHeader2 + HDR_UNIT_SIZE + NEXT_PTR_BYTES + HDR_UNIT_SIZE, &m_endHeader, HDR_UNIT_SIZE); m_curFbo = INVALID_NUM; m_curLbid = INVALID_LBID; m_arraySize = 0; clear(); // files } /******************************************************************************* * Description: * Dctnry destructor ******************************************************************************/ Dctnry::~Dctnry() { // clear string cache here! freeStringCache(); } /******************************************************************************* * Description: * Free memory consumed by dictionary string cache ******************************************************************************/ void Dctnry::freeStringCache() { std::set::iterator it; for (it = m_sigArray.begin(); it != m_sigArray.end(); it++) { Signature sig = *it; delete[] sig.signature; sig.signature = 0; } m_arraySize = 0; m_sigArray.clear(); } /******************************************************************************* * Description: * Create a dictionary file and initialize the header * * PARAMETERS: * none * * RETURN: * success - successfully write the header to block * failure - it did not write the header to block ******************************************************************************/ int Dctnry::init() { // cout <<"Init called! m_dctnryOID =" << m_dctnryOID << endl; m_lastFbo = 0; m_hwm = 0; m_newStartOffset = 0; m_freeSpace = 0; m_curOp = 0; memset(m_curBlock.data, 0, sizeof(m_curBlock.data)); m_curBlock.lbid = INVALID_LBID; m_arraySize = 0; return NO_ERROR; } /******************************************************************************* * Description: * Create a dictionary file and initialize the header, or can be used to * just add an extent to an already open dictionary store file. * * PARAMETERS: * input * dctnryOID - dictionary OID * colWidth - dictionary string width (not the token width) * dbRoot - DBRoot where file is located * partition - partition number associated with the file * segment - segment number associated with the file * startLbid - (out) starting LBID of the newly allocated extent * flag - "true" indicates we are adding the first block and the * file needs to be created with an abbreviated extent. * "false" indicates we just want to add an extent to * an existing file, and the file has already been opened. * * RETURN: * success - successfully created file and/or extent * failure - failed to create file and/or extent ******************************************************************************/ int Dctnry::createDctnry(const OID& dctnryOID, int colWidth, const uint16_t dbRoot, const uint32_t partition, const uint16_t segment, LBID_t& startLbid, bool flag) { int allocSize = 0; char fileName[FILE_NAME_SIZE]; int rc; std::map oids; #ifdef PROFILE Stats::startParseEvent(WE_STATS_ALLOC_DCT_EXTENT); #endif if (flag) { // Allocate extent before file creation. // If we got an error while allocating dictionary store extent, // we do not need to create/close the file, because it was not created // yet. This logic is the same as column segment file creation - at // first we allocate an extent, then we create a segment file. rc = BRMWrapper::getInstance()->allocateDictStoreExtent((OID)dctnryOID, dbRoot, partition, segment, startLbid, allocSize); if (rc != NO_ERROR) { return rc; } m_dctnryOID = dctnryOID; m_partition = partition; m_segment = segment; m_dbRoot = dbRoot; RETURN_ON_ERROR((rc = oid2FileName(m_dctnryOID, fileName, true, m_dbRoot, m_partition, m_segment))); m_segFileName = fileName; // if obsolete file exists, "w+b" will truncate and write over m_dFile = createDctnryFile(fileName, colWidth, "w+b", DEFAULT_BUFSIZ, startLbid); { // We presume the path will contain / std::string filePath(fileName); if (chownDataPath(filePath)) { return ERR_FILE_CHOWN; } } } else { rc = BRMWrapper::getInstance()->allocateDictStoreExtent((OID)m_dctnryOID, m_dbRoot, m_partition, m_segment, startLbid, allocSize); if (rc != NO_ERROR) { return rc; } RETURN_ON_ERROR(setFileOffset(m_dFile, 0, SEEK_END)); } // We allocate a full extent from BRM, but only write an abbreviated 256K // rows to disk for 1st extent in each store file, to conserve disk usage. int totalSize = allocSize; if (flag) { totalSize = NUM_BLOCKS_PER_INITIAL_EXTENT; } if (!isDiskSpaceAvail(Config::getDBRootByNum(m_dbRoot), totalSize)) { if (flag) { closeDctnryFile(false, oids); } return ERR_FILE_DISK_SPACE; } #ifdef PROFILE Stats::stopParseEvent(WE_STATS_ALLOC_DCT_EXTENT); #endif if (m_dFile != NULL) { // MCOL-498 CS optimizes abbreviated extent // creation. rc = FileOp::initDctnryExtent(m_dFile, m_dbRoot, totalSize, m_dctnryHeader2, m_totalHdrBytes, false, true, // explicitly optimize startLbid); if (rc != NO_ERROR) { if (flag) { closeDctnryFile(false, oids); } return rc; } } else return ERR_FILE_CREATE; if (flag) { closeDctnryFile(true, oids); m_numBlocks = totalSize; m_hwm = 0; rc = BRMWrapper::getInstance()->setLocalHWM(m_dctnryOID, m_partition, m_segment, m_hwm); } else { m_numBlocks = m_numBlocks + totalSize; } return rc; } /******************************************************************************* * Description: * This function should be called to expand an abbreviated dictionary extent * into a full extent on disk. * * PARAMETERS: * none * * RETURN: * success - successfully expanded extent * failure - failed to expand extent ******************************************************************************/ int Dctnry::expandDctnryExtent() { RETURN_ON_NULL(m_dFile, ERR_FILE_SEEK); off64_t oldOffset = m_dFile->tell(); RETURN_ON_ERROR(setFileOffset(m_dFile, 0, SEEK_END)); // Based on extent size, see how many blocks to add to fill the extent int blksToAdd = (((int)BRMWrapper::getInstance()->getExtentRows() - INITIAL_EXTENT_ROWS_TO_DISK) / BYTE_PER_BLOCK) * PSEUDO_COL_WIDTH; if (!isDiskSpaceAvail(Config::getDBRootByNum(m_dbRoot), blksToAdd)) { return ERR_FILE_DISK_SPACE; } int rc = FileOp::initDctnryExtent(m_dFile, m_dbRoot, blksToAdd, m_dctnryHeader2, m_totalHdrBytes, true, true); // explicitly optimize if (rc != NO_ERROR) return rc; // Restore offset back to where we were before expanding the extent RETURN_ON_ERROR(setFileOffset(m_dFile, oldOffset, SEEK_SET)); // Update block count to reflect disk space added by expanding the extent. m_numBlocks = m_numBlocks + blksToAdd; return rc; } /******************************************************************************* * DESCRIPTION: * Close dictionary files * * PARAMETERS: * none * * RETURN: * none ******************************************************************************/ int Dctnry::closeDctnry(bool realClose) { if (!m_dFile) return NO_ERROR; int rc; CommBlock cb; cb.file.oid = m_dctnryOID; cb.file.pFile = m_dFile; std::map oids; if (m_curBlock.state == BLK_WRITE) { rc = writeDBFile(cb, &m_curBlock, m_curBlock.lbid); if (rc != NO_ERROR) { closeDctnryFile(false, oids); return rc; } memset(m_curBlock.data, 0, sizeof(m_curBlock.data)); // m_curBlock.state== BLK_INIT; } //@Bug 5572. always close file for uncompressed file. if (FileOp::compressionType() == 0) realClose = true; if (realClose) { //@Bug 5689. Need pass oid to write to the right file. oids[m_dctnryOID] = m_dctnryOID; // dmc-error handling (should detect/report error in closing file) closeDctnryFile(true, oids); } m_hwm = (HWM)m_lastFbo; idbassert(utils::is_nonnegative(m_dctnryOID)); if (idbdatafile::IDBPolicy::useHdfs() && realClose) { BRM::FileInfo aFile; std::vector oidsToFlush; oidsToFlush.push_back(m_dctnryOID); aFile.oid = m_dctnryOID; aFile.partitionNum = m_partition; aFile.segmentNum = m_segment; aFile.dbRoot = m_dbRoot; aFile.compType = FileOp::compressionType(); std::vector aFileInfo; aFileInfo.push_back(aFile); cacheutils::purgePrimProcFdCache(aFileInfo, Config::getLocalModuleID()); cacheutils::flushOIDsFromCache(oidsToFlush); } rc = BRMWrapper::getInstance()->setLocalHWM(m_dctnryOID, m_partition, m_segment, m_hwm); if (rc != NO_ERROR) return rc; // cout <<"Init called! m_dctnryOID =" << m_dctnryOID << endl; freeStringCache(); return NO_ERROR; } /******************************************************************************* * DESCRIPTION: * Close dictionary file without flushing block buffer or updating * BRM with HWM. * * PARAMETERS: * none * * RETURN: * none ******************************************************************************/ int Dctnry::closeDctnryOnly() { if (!m_dFile) return NO_ERROR; // dmc-error handling (should detect/report error in closing file) std::map oids; closeDctnryFile(false, oids); freeStringCache(); return NO_ERROR; } /******************************************************************************* * DESCRIPTION: * drop/delete dictionary file * * PARAMETERS: * dctnryOID -- file number to drop * * RETURN: * none ******************************************************************************/ int Dctnry::dropDctnry(const OID& dctnryOID) { m_dctnryOID = dctnryOID; if (m_dFile) { RETURN_ON_ERROR(closeDctnry()); } return deleteFile(dctnryOID); } /******************************************************************************* * DESCRIPTION: * open dictionary file * * PARAMETERS: * dctnryOID-- for open dictionary file * dbRoot -- DBRoot for dictionary store segment file * partition-- partition for dictionary store segment file * segment -- segment for dictionary store segment file * useTmpSuffix - for Bulk HDFS usage: use or not use *.tmp file suffix * * RETURN: * successful- NO_ERROR * Fail - Error Code ******************************************************************************/ // @bug 5572 - HDFS usage: add *.tmp file backup flag int Dctnry::openDctnry(const OID& dctnryOID, const uint16_t dbRoot, const uint32_t partition, const uint16_t segment, const bool useTmpSuffix) { #ifdef PROFILE Stats::startParseEvent(WE_STATS_OPEN_DCT_FILE); #endif int rc = NO_ERROR; m_dctnryOID = dctnryOID; m_dbRoot = dbRoot; m_partition = partition; m_segment = segment; m_dFile = openDctnryFile(useTmpSuffix); if (m_dFile == NULL) { ostringstream oss; oss << "oid:partition:segment " << dctnryOID << ":" << partition << ":" << segment; logging::Message::Args args; logging::Message message(1); args.add("Error opening dictionary file "); args.add(oss.str()); args.add(""); args.add(""); message.format(args); logging::LoggingID lid(21); logging::MessageLog ml(lid); ml.logErrorMessage(message); return ERR_FILE_OPEN; } m_numBlocks = numOfBlocksInFile(); std::map oids; // Initialize other misc member variables init(); int extState; rc = BRMWrapper::getInstance()->getLocalHWM(dctnryOID, m_partition, m_segment, m_hwm, extState); if (rc != NO_ERROR) { closeDctnryFile(false, oids); return rc; } m_lastFbo = (int)m_hwm; memset(m_curBlock.data, 0, sizeof(m_curBlock.data)); m_curFbo = m_lastFbo; rc = BRMWrapper::getInstance()->getBrmInfo(m_dctnryOID, m_partition, m_segment, m_curFbo, m_curLbid); if (rc != NO_ERROR) { closeDctnryFile(false, oids); return rc; } CommBlock cb; cb.file.oid = m_dctnryOID; cb.file.pFile = m_dFile; #ifdef PROFILE // We omit the call to readDBFile from OPEN_DCT_FILE stats, because com- // pressed files have separate stats that readDBFile() will capture thru // ChunkManager::fetchChunkFromFile(). Stats::stopParseEvent(WE_STATS_OPEN_DCT_FILE); #endif rc = readDBFile(cb, m_curBlock.data, m_curLbid); #ifdef PROFILE Stats::startParseEvent(WE_STATS_OPEN_DCT_FILE); #endif if (rc != NO_ERROR) { closeDctnryFile(false, oids); return rc; } //@Bug 5567 Don't seek for compressed file. if (m_compressionType == 0) { // Position file to the start of the current block; // Determine file byte offset based on the current block offset (m_curFbo) long long byteOffset = ((long long)m_curFbo) * (long)BYTE_PER_BLOCK; rc = setFileOffset(m_dFile, byteOffset); if (rc != NO_ERROR) { closeDctnryFile(false, oids); return rc; } } m_curBlock.lbid = m_curLbid; m_curBlock.state = BLK_READ; int opCnt = 0; // Get new free space (m_freeSpace) from header too! Here!!!!!!!!!!!!!!! getBlockOpCount(m_curBlock, opCnt); m_curOp = opCnt; // "If" this store file contains no more than 1 block, then we preload // the string cache used to recognize duplicates during row insertion. if (m_hwm == 0) { preLoadStringCache(m_curBlock); } #ifdef PROFILE Stats::stopParseEvent(WE_STATS_OPEN_DCT_FILE); #endif return rc; } /******************************************************************************* * Description: * Determine if the specified signature is present in the string cache. * * PARAMETERS: * input * sig - signature to search for * * RETURN: * true - if signature if found * false - if signature is not found ******************************************************************************/ bool Dctnry::getTokenFromArray(Signature& sig) { std::set::iterator it; it = m_sigArray.find(sig); if (it == m_sigArray.end()) { return false; } else { Signature sigfound = *it; sig.token = sigfound.token; return true; } return false; } /******************************************************************************* * Description: * Used by bulk import to insert a signature into m_curBlock, and update * the m_curBlock header accordingly. * * PARAMETERS: * input * sig - signature to be inserted * output * token - token that was assigned to the inserted signature * * RETURN: * success - successfully write the signature to the block * failure - failed to extend/create an extent for the block ******************************************************************************/ int Dctnry::insertDctnry2(Signature& sig) { int rc = 0; int write_size; bool lbid_in_token = false; size_t origSigSize = sig.size; unsigned char* origSig = sig.signature; sig.token.bc = 0; while (sig.size > 0 || !lbid_in_token) { if (sig.size > (m_freeSpace - HDR_UNIT_SIZE)) { write_size = (m_freeSpace - HDR_UNIT_SIZE); } else { write_size = sig.size; } insertDctnryHdr(m_curBlock.data, write_size); insertSgnture(m_curBlock.data, write_size, (unsigned char*)sig.signature); sig.size -= write_size; sig.signature += write_size; m_curFbo = m_lastFbo; if (!lbid_in_token) { sig.token.fbo = m_curLbid; sig.token.op = m_curOp; lbid_in_token = true; } if (sig.size > 0) { CommBlock cb; cb.file.oid = m_dctnryOID; cb.file.pFile = m_dFile; sig.token.bc++; RETURN_ON_ERROR(writeDBFileNoVBCache(cb, &m_curBlock, m_curFbo)); memset(m_curBlock.data, 0, sizeof(m_curBlock.data)); memcpy(m_curBlock.data, &m_dctnryHeader2, m_totalHdrBytes); m_freeSpace = BYTE_PER_BLOCK - m_totalHdrBytes; m_curBlock.state = BLK_WRITE; m_curOp = 0; m_lastFbo++; m_curFbo = m_lastFbo; //...Expand current extent if it is an abbreviated initial extent if ((m_curFbo == m_numBlocks) && (m_numBlocks == NUM_BLOCKS_PER_INITIAL_EXTENT)) { RETURN_ON_ERROR(expandDctnryExtent()); } //...Allocate a new extent if we have reached the last block in the // current extent. if (m_curFbo == m_numBlocks) { // last block // for roll back the extent to use // Save those empty extents in case of failure to rollback std::vector dictExtentInfo; ExtentInfo info; info.oid = m_dctnryOID; info.partitionNum = m_partition; info.segmentNum = m_segment; info.dbRoot = m_dbRoot; info.hwm = m_hwm; info.newFile = false; dictExtentInfo.push_back(info); LBID_t startLbid; // Add an extent. rc = createDctnry(m_dctnryOID, 0, // dummy column width m_dbRoot, m_partition, m_segment, startLbid, false); if (rc != NO_ERROR) { // roll back the extent BRMWrapper::getInstance()->deleteEmptyDictStoreExtents(dictExtentInfo); return rc; } } RETURN_ON_ERROR( BRMWrapper::getInstance()->getBrmInfo(m_dctnryOID, m_partition, m_segment, m_curFbo, m_curLbid)); m_curBlock.lbid = m_curLbid; } } sig.size = origSigSize; sig.signature = origSig; return NO_ERROR; } int Dctnry::insertDctnry1(Signature& curSig, bool found, char* pOut, int& outOffset, int& startPos, int& totalUseSize, CommBlock& cb, bool& next, long long& truncCount, const CHARSET_INFO* cs, const WriteEngine::ColType& weType) { if (cs->mbmaxlen > 1) { // For TEXT columns, we truncate based on the number of bytes, // and not based on the number of characters, as for CHAR/VARCHAR // columns in the else block. if (weType == WriteEngine::WR_TEXT) { if (curSig.size > m_colWidth) { uint8_t truncate_point = utf8::utf8_truncate_point((const char*)curSig.signature, m_colWidth); curSig.size = m_colWidth - truncate_point; truncCount++; } } else { const char* start = (const char*)curSig.signature; const char* end = (const char*)(curSig.signature + curSig.size); size_t numChars = cs->numchars(start, end); size_t maxCharLength = m_colWidth / cs->mbmaxlen; if (numChars > maxCharLength) { MY_STRCOPY_STATUS status; cs->well_formed_char_length(start, end, maxCharLength, &status); curSig.size = status.m_source_end_pos - start; truncCount++; } } } else // cs->mbmaxlen == 1 { if (curSig.size > m_colWidth) { curSig.size = m_colWidth; truncCount++; } } //...Search for the string in our string cache // if it fits into one block (< 8KB) if (curSig.size <= MAX_SIGNATURE_SIZE) { // Stats::startParseEvent("getTokenFromArray"); found = getTokenFromArray(curSig); if (found) { memcpy(pOut + outOffset, &curSig.token, 8); outOffset += 8; startPos++; // Stats::stopParseEvent("getTokenFromArray"); return NO_ERROR; } // Stats::stopParseEvent("getTokenFromArray"); } totalUseSize = m_totalHdrBytes + curSig.size; //...String not found in cache, so proceed. // If room is available in current block then insert into block. // @bug 3960: Add MAX_OP_COUNT check to handle case after bulk rollback if (((totalUseSize <= m_freeSpace - HDR_UNIT_SIZE) || ((curSig.size > 8176) && (m_freeSpace > HDR_UNIT_SIZE))) && (m_curOp < (MAX_OP_COUNT - 1))) { RETURN_ON_ERROR(insertDctnry2(curSig)); // m_freeSpace updated! m_curBlock.state = BLK_WRITE; memcpy(pOut + outOffset, &curSig.token, 8); outOffset += 8; startPos++; found = true; //...If we have reached limit for the number of strings allowed in // a block, then we write the current block so that we can start // another block. if (m_curOp >= MAX_OP_COUNT - 1) { #ifdef PROFILE Stats::stopParseEvent(WE_STATS_PARSE_DCT); #endif RETURN_ON_ERROR(writeDBFileNoVBCache(cb, &m_curBlock, m_curFbo)); m_curBlock.state = BLK_READ; next = true; } //...Add string to cache, if we have not exceeded cache limit // Don't cache big blobs if ((m_arraySize < MAX_STRING_CACHE_SIZE) && (curSig.size <= MAX_SIGNATURE_SIZE)) { addToStringCache(curSig); } } else //...No room for this string in current block, so we write // out the current block, so we can start another block { #ifdef PROFILE Stats::stopParseEvent(WE_STATS_PARSE_DCT); #endif RETURN_ON_ERROR(writeDBFileNoVBCache(cb, &m_curBlock, m_curFbo)); m_curBlock.state = BLK_READ; next = true; found = false; } // if m_freeSpace //..."next" flag is used to indicate that we need to advance to the // next block in the store file. if (next) { memset(m_curBlock.data, 0, sizeof(m_curBlock.data)); memcpy(m_curBlock.data, &m_dctnryHeader2, m_totalHdrBytes); m_freeSpace = BYTE_PER_BLOCK - m_totalHdrBytes; m_curBlock.state = BLK_WRITE; m_curOp = 0; next = false; m_lastFbo++; m_curFbo = m_lastFbo; //...Expand current extent if it is an abbreviated initial extent if ((m_curFbo == m_numBlocks) && (m_numBlocks == NUM_BLOCKS_PER_INITIAL_EXTENT)) { RETURN_ON_ERROR(expandDctnryExtent()); } //...Allocate a new extent if we have reached the last block in the // current extent. if (m_curFbo == m_numBlocks) { // last block LBID_t startLbid; // Add an extent. RETURN_ON_ERROR( createDctnry(m_dctnryOID, m_colWidth, m_dbRoot, m_partition, m_segment, startLbid, false)); if (m_logger) { std::ostringstream oss; oss << "Add dictionary extent OID-" << m_dctnryOID << "; DBRoot-" << m_dbRoot << "; part-" << m_partition << "; seg-" << m_segment << "; hwm-" << m_curFbo << "; LBID-" << startLbid << "; file-" << m_segFileName; m_logger->logMsg(oss.str(), MSGLVL_INFO2); } m_curLbid = startLbid; // now seek back to the curFbo, after adding an extent // @bug5769 For uncompressed only; // ChunkManager manages the file offset for the compression case if (m_compressionType == 0) { #ifdef PROFILE Stats::startParseEvent(WE_STATS_PARSE_DCT_SEEK_EXTENT_BLK); #endif long long byteOffset = m_curFbo; byteOffset *= BYTE_PER_BLOCK; RETURN_ON_ERROR(setFileOffset(m_dFile, byteOffset)); #ifdef PROFILE Stats::stopParseEvent(WE_STATS_PARSE_DCT_SEEK_EXTENT_BLK); #endif } } else { // LBIDs are numbered collectively and consecutively within an // extent, so within an extent we can derive the LBID by simply // incrementing it rather than having to go back to BRM to look // up the LBID for each FBO. m_curLbid++; } #ifdef PROFILE Stats::startParseEvent(WE_STATS_PARSE_DCT); #endif m_curBlock.lbid = m_curLbid; //..."found" flag indicates whether the string was already found // "or" added to the end of the previous block. If false, then // we need to add the string to the new block. if (!found) { RETURN_ON_ERROR(insertDctnry2(curSig)); // m_freeSpace updated! m_curBlock.state = BLK_WRITE; memcpy(pOut + outOffset, &curSig.token, 8); outOffset += 8; startPos++; //...Add string to cache, if we have not exceeded cache limit if ((m_arraySize < MAX_STRING_CACHE_SIZE) && (curSig.size <= MAX_SIGNATURE_SIZE)) { addToStringCache(curSig); } } } // if next return NO_ERROR; } /******************************************************************************* * Description: * Used by bulk import to insert batch of parquet strings into this store file. * Function assumes that the file is already positioned to the current block. * * PARAMETERS: * input * columnData - arrow array containing input strings * startRowIdx - start position for current batch parquet data * totalRow - number of rows in "buf" * col - column of strings to be parsed from "buf" * output * tokenBuf - tokens assigned to inserted strings * * RETURN: * success - successfully write the header to block * failure - it did not write the header to block ******************************************************************************/ int Dctnry::insertDctnryParquet(std::shared_ptr columnData, int startRowIdx, const int totalRow, const int col, char* tokenBuf, long long& truncCount, const CHARSET_INFO* cs, const WriteEngine::ColType& weType) { #ifdef PROFILE Stats::startParseEvent(WE_STATS_PARSE_DCT); #endif int startPos = 0; int totalUseSize = 0; int outOffset = 0; const char* pIn; char* pOut = tokenBuf; Signature curSig; bool found = false; bool next = false; CommBlock cb; cb.file.oid = m_dctnryOID; cb.file.pFile = m_dFile; WriteEngine::Token nullToken; BinaryArraySharedPtr binaryArray; FixedSizeBinaryArraySharedPtr fixedSizeBinaryArray; if (columnData->type_id() != arrow::Type::type::FIXED_SIZE_BINARY) binaryArray = std::static_pointer_cast(columnData); else fixedSizeBinaryArray = std::static_pointer_cast(columnData); // check if this column data imported is NULL array or not bool isNonNullArray = columnData->type_id() == arrow::Type::type::NA ? false : true; //...Loop through all the rows for the specified column while (startPos < totalRow) { found = false; void* curSigPtr = static_cast(&curSig); memset(curSigPtr, 0, sizeof(curSig)); // if this column is not null data if (isNonNullArray) { const uint8_t* data; if (binaryArray != nullptr) { data = binaryArray->GetValue(startPos + startRowIdx, &curSig.size); } else { data = fixedSizeBinaryArray->GetValue(startPos + startRowIdx); std::shared_ptr tType = fixedSizeBinaryArray->type(); curSig.size = tType->byte_width(); } const char* dataPtr = reinterpret_cast(data); // Strip trailing null bytes '\0' (by adjusting curSig.size) if import- // ing in binary mode. If entire string is binary zeros, then we treat // as a NULL value. if (curSig.size > 0) { const char* fld = dataPtr; int kk = curSig.size - 1; for (; kk >= 0; kk--) { if (fld[kk] != '\0') break; } curSig.size = kk + 1; } // Read thread should validate against max size so that the entire row // can be rejected up front. Once we get here in the parsing thread, // it is too late to reject the row. However, as a precaution, we // still check against max size & set to null token if needed. if ((curSig.size == 0) || (curSig.size > MAX_BLOB_SIZE)) { if (m_defVal.length() > 0) // use default string if available { pIn = m_defVal.str(); curSig.signature = (unsigned char*)pIn; curSig.size = m_defVal.length(); } else { memcpy(pOut + outOffset, &nullToken, 8); outOffset += 8; startPos++; continue; } } else { pIn = dataPtr; curSig.signature = (unsigned char*)pIn; } } else { curSig.size = 0; if (m_defVal.length() > 0) // use default string if available { pIn = m_defVal.str(); curSig.signature = (unsigned char*)pIn; curSig.size = m_defVal.length(); } else { memcpy(pOut + outOffset, &nullToken, 8); outOffset += 8; startPos++; continue; } } RETURN_ON_ERROR(insertDctnry1(curSig, found, pOut, outOffset, startPos, totalUseSize, cb, next, truncCount, cs, weType)); } #ifdef PROFILE Stats::stopParseEvent(WE_STATS_PARSE_DCT); #endif // Done // If any data leftover and not written by subsequent call to // insertDctnry(), then it will be written by closeDctnry(). return NO_ERROR; } /******************************************************************************* * Description: * Used by bulk import to insert collection of strings into this store file. * Function assumes that the file is already positioned to the current block. * * PARAMETERS: * input * buf - character buffer containing input strings * pos - meta data describing data in "buf" * totalRow - number of rows in "buf" * col - column of strings to be parsed from "buf" * output * tokenBuf - tokens assigned to inserted strings * * RETURN: * success - successfully write the header to block * failure - it did not write the header to block ******************************************************************************/ int Dctnry::insertDctnry(const char* buf, ColPosPair** pos, const int totalRow, const int col, char* tokenBuf, long long& truncCount, const CHARSET_INFO* cs, const WriteEngine::ColType& weType) { #ifdef PROFILE Stats::startParseEvent(WE_STATS_PARSE_DCT); #endif int startPos = 0; int totalUseSize = 0; int outOffset = 0; const char* pIn; char* pOut = tokenBuf; Signature curSig; bool found = false; bool next = false; CommBlock cb; cb.file.oid = m_dctnryOID; cb.file.pFile = m_dFile; WriteEngine::Token nullToken; //...Loop through all the rows for the specified column while (startPos < totalRow) { found = false; void* curSigPtr = static_cast(&curSig); memset(curSigPtr, 0, sizeof(curSig)); curSig.size = pos[startPos][col].offset; // Strip trailing null bytes '\0' (by adjusting curSig.size) if import- // ing in binary mode. If entire string is binary zeros, then we treat // as a NULL value. if (m_importDataMode != IMPORT_DATA_TEXT) { if ((curSig.size > 0) && (curSig.size != COLPOSPAIR_NULL_TOKEN_OFFSET)) { char* fld = (char*)buf + pos[startPos][col].start; int kk = curSig.size - 1; for (; kk >= 0; kk--) { if (fld[kk] != '\0') break; } curSig.size = kk + 1; } } // Read thread should validate against max size so that the entire row // can be rejected up front. Once we get here in the parsing thread, // it is too late to reject the row. However, as a precaution, we // still check against max size & set to null token if needed. if ((curSig.size == 0) || (curSig.size == COLPOSPAIR_NULL_TOKEN_OFFSET) || (curSig.size > MAX_BLOB_SIZE)) { if (m_defVal.length() > 0) // use default string if available { pIn = m_defVal.str(); curSig.signature = (unsigned char*)pIn; curSig.size = m_defVal.length(); } else { memcpy(pOut + outOffset, &nullToken, 8); outOffset += 8; startPos++; continue; } } else { pIn = (char*)buf + pos[startPos][col].start; curSig.signature = (unsigned char*)pIn; } RETURN_ON_ERROR(insertDctnry1(curSig, found, pOut, outOffset, startPos, totalUseSize, cb, next, truncCount, cs, weType)); } // end while #ifdef PROFILE Stats::stopParseEvent(WE_STATS_PARSE_DCT); #endif // Done // If any data leftover and not written by subsequent call to // insertDctnry(), then it will be written by closeDctnry(). return NO_ERROR; } /******************************************************************************* * DESCRIPTION: * Used by DML to insert a single string into this store file. * (1) Insert a signature value into the block * (2) The header information inserted at front * (3) The signature inserted from back * (4) Total minimum header size-- free space 2bytes, next pointer 8 bytes * zero offset 2 bytes, end of header 2 bytes, total 14 bytes * plus 2 bytes for new values' starting offset value storage * total 14 bytes * (5) Values size <=8176 =(8192-16) will not be split into two blocks * (6) For smaller value <=8176, it has to fit into one block or * unsuccessfully to insert * (7) For large value > 8176, * smaller space first then take up a whole block * or a whole block first then some left over space in another * block * (8) limit to 8000 byte for this release size * * PARAMETERS: * input dFile * -- File handle * Input sgnature_size * -- how many bytes the signature occupies * Input sgnature_value * -- the value of the signature * output token * -- token structure carrying the assigned fbo and op * * RETURN: * success - successfully insert the signature * failure - it did not insert the signature ******************************************************************************/ int Dctnry::insertDctnry(const int& sgnature_size, const unsigned char* sgnature_value, Token& token) { int rc = 0; int i; unsigned char* value = NULL; int size; int write_size; bool lbid_in_token = false; // Round down for safety. In theory we can take 262143 * 8176 bytes if (sgnature_size > MAX_BLOB_SIZE) { return ERR_DICT_SIZE_GT_2G; } CommBlock cb; cb.file.oid = m_dctnryOID; cb.file.pFile = m_dFile; size = sgnature_size; value = (unsigned char*)sgnature_value; token.bc = 0; for (i = m_lastFbo; i < m_numBlocks; i++) { // @bug 3960: Add MAX_OP_COUNT check to handle case after bulk rollback if (((m_freeSpace - HDR_UNIT_SIZE >= size) || ((size > 8176) && (m_freeSpace > HDR_UNIT_SIZE))) && (m_curOp < (MAX_OP_COUNT - 1))) { // found the perfect block; signature size fit in this block if (size > (m_freeSpace - HDR_UNIT_SIZE)) { write_size = (m_freeSpace - HDR_UNIT_SIZE); } else { write_size = size; } insertDctnryHdr(m_curBlock.data, write_size); insertSgnture(m_curBlock.data, write_size, value); size -= write_size; value += write_size; m_curBlock.state = BLK_WRITE; // We only want the start LBID for a multi-block dict in the token if (!lbid_in_token) { token.fbo = m_curLbid; token.op = m_curOp; lbid_in_token = true; } if (size > 0) token.bc++; m_lastFbo = i; m_curFbo = m_lastFbo; if ((m_curOp < (MAX_OP_COUNT - 1)) && (size <= 0)) return NO_ERROR; } // end Found //@bug 3832. check error code RETURN_ON_ERROR(writeDBFile(cb, &m_curBlock, m_curLbid)); memset(m_curBlock.data, 0, sizeof(m_curBlock.data)); memcpy(m_curBlock.data, &m_dctnryHeader2, m_totalHdrBytes); m_freeSpace = BYTE_PER_BLOCK - m_totalHdrBytes; m_curBlock.state = BLK_WRITE; m_curOp = 0; m_lastFbo++; m_curFbo = m_lastFbo; //...Expand current extent if it is an abbreviated initial extent if ((m_curFbo == m_numBlocks) && (m_numBlocks == NUM_BLOCKS_PER_INITIAL_EXTENT)) { RETURN_ON_ERROR(expandDctnryExtent()); } //...Allocate a new extent if we have reached the last block in the // current extent. if (m_curFbo == m_numBlocks) { // last block // for roll back the extent to use // Save those empty extents in case of failure to rollback std::vector dictExtentInfo; ExtentInfo info; info.oid = m_dctnryOID; info.partitionNum = m_partition; info.segmentNum = m_segment; info.dbRoot = m_dbRoot; info.hwm = m_hwm; info.newFile = false; dictExtentInfo.push_back(info); LBID_t startLbid; // Add an extent. rc = createDctnry(m_dctnryOID, 0, // dummy column width m_dbRoot, m_partition, m_segment, startLbid, false); if (rc != NO_ERROR) { // roll back the extent BRMWrapper::getInstance()->deleteEmptyDictStoreExtents(dictExtentInfo); return rc; } } RETURN_ON_ERROR( BRMWrapper::getInstance()->getBrmInfo(m_dctnryOID, m_partition, m_segment, m_curFbo, m_curLbid)); m_curBlock.lbid = m_curLbid; } // end for loop for all of the blocks return ERR_DICT_NO_SPACE_INSERT; } /******************************************************************************* * Description * Update the block header (and data members like m_freeSpace, * m_newStartOffset, etc), to reflect the insertion of string of size "size" * * PARAMETERS: * input * blockBuf * --the block buffer * input * size * --Size of the signature value * * RETURN: * none ******************************************************************************/ void Dctnry::insertDctnryHdr(unsigned char* blockBuf, const int& size) { int endHdrLoc = START_HDR1 + (m_curOp + 1) * HDR_UNIT_SIZE; int nextOffsetLoc = START_HDR1 + m_curOp * HDR_UNIT_SIZE; int lastOffsetLoc = START_HDR1 + (m_curOp - 1) * HDR_UNIT_SIZE; m_freeSpace -= (size + HDR_UNIT_SIZE); memcpy(&blockBuf[endHdrLoc], &m_endHeader, HDR_UNIT_SIZE); uint16_t lastOffset = *(uint16_t*)&blockBuf[lastOffsetLoc]; uint16_t nextOffset = lastOffset - size; memcpy(&blockBuf[0], &m_freeSpace, HDR_UNIT_SIZE); memcpy(&blockBuf[nextOffsetLoc], &nextOffset, HDR_UNIT_SIZE); m_newStartOffset = nextOffset; m_curOp++; } /******************************************************************************* * DESCRIPTION: * Insert the specified string into the block buffer. * * PARAMETERS: * Input blockBuf * --block buffer * Input size * -- size of the signature value * Input value * -- value of the signature * * RETURN: * none ******************************************************************************/ void Dctnry::insertSgnture(unsigned char* blockBuf, const int& size, unsigned char* value) { // m_newStartLoc is calculated from the header insertion code memcpy(&blockBuf[m_newStartOffset], value, size); } /******************************************************************************* * Description: * get the op count for a block * input * DataBlock& fileBlock -- the file block * output * op_count - total op count ******************************************************************************/ void Dctnry::getBlockOpCount(const DataBlock& fileBlock, int& op_count) { ByteStream bs; ByteStream::byte inbuf[BYTE_PER_BLOCK]; memcpy(inbuf, fileBlock.data, BYTE_PER_BLOCK); bs.load(inbuf, BYTE_PER_BLOCK); ByteStream::doublebyte offset; ByteStream::doublebyte dbyte; bs >> m_freeSpace; bs >> dbyte; bs >> dbyte; bs >> dbyte; bs >> dbyte; bs >> dbyte; idbassert(dbyte == BYTE_PER_BLOCK); bs >> offset; while (offset < 0xffff) { op_count++; bs >> offset; } } /******************************************************************************* * Description: * Loads the string cache from the specified DataBlock, which should be * the first block in the applicable dictionary store file. * input * DataBlock& fileBlock -- the file block ******************************************************************************/ void Dctnry::preLoadStringCache(const DataBlock& fileBlock) { int hdrOffsetBeg = HDR_UNIT_SIZE + NEXT_PTR_BYTES + HDR_UNIT_SIZE; int hdrOffsetEnd = HDR_UNIT_SIZE + NEXT_PTR_BYTES; uint16_t offBeg = 0; uint16_t offEnd = 0; memcpy(&offBeg, &fileBlock.data[hdrOffsetBeg], HDR_UNIT_SIZE); memcpy(&offEnd, &fileBlock.data[hdrOffsetEnd], HDR_UNIT_SIZE); int op = 1; // ordinal position of the string within the block Signature aSig; void* aSigPtr = static_cast(&aSig); memset(aSigPtr, 0, sizeof(aSig)); while ((offBeg != DCTNRY_END_HEADER) && (op <= MAX_STRING_CACHE_SIZE)) { unsigned int len = offEnd - offBeg; aSig.size = len; aSig.signature = new unsigned char[len]; memcpy(aSig.signature, &fileBlock.data[offBeg], len); aSig.token.op = op; aSig.token.fbo = m_curLbid; m_sigArray.insert(aSig); offEnd = offBeg; hdrOffsetBeg += HDR_UNIT_SIZE; memcpy(&offBeg, &fileBlock.data[hdrOffsetBeg], HDR_UNIT_SIZE); op++; } m_arraySize = op - 1; // std::cout << "Preloading strings..." << std::endl; // char strSig[1000]; // uint64_t tokenVal; // for (int i=0; i(&asig); memset(aSigPtr, 0, sizeof(asig)); asig.signature = new unsigned char[newSig.size]; memcpy(asig.signature, newSig.signature, newSig.size); asig.size = newSig.size; asig.token = newSig.token; m_sigArray.insert(asig); m_arraySize++; } /******************************************************************************* * Description: * get the location of the end of header * input * dFile - file handle * lbid - block of interest * output * endOp - ordinal position of the end of header for "lbid" * * return value * Success -- found and deleted * Fail -- ERR_DICT_INVALID_DELETE ******************************************************************************/ int Dctnry::getEndOp(IDBDataFile* dFile, int lbid, int& endOp) { DataBlock fileBlock; Offset newOffset; int rc; CommBlock cb; cb.file.oid = m_dctnryOID; cb.file.pFile = dFile; memset(fileBlock.data, 0, sizeof(fileBlock.data)); m_dFile = dFile; rc = readSubBlockEntry(cb, &fileBlock, lbid, 0, 0, HDR_UNIT_SIZE + NEXT_PTR_BYTES + HDR_UNIT_SIZE + HDR_UNIT_SIZE, &m_dctnryHeader); memcpy(&m_freeSpace, &fileBlock.data[0], HDR_UNIT_SIZE); memcpy(&m_nextPtr, &fileBlock.data[HDR_UNIT_SIZE], NEXT_PTR_BYTES); newOffset.hdrLoc = HDR_UNIT_SIZE + NEXT_PTR_BYTES + HDR_UNIT_SIZE; memcpy(&newOffset.offset, &fileBlock.data[newOffset.hdrLoc], HDR_UNIT_SIZE); endOp = 1; // should be zero counting the end of header then while (newOffset.offset != DCTNRY_END_HEADER) { newOffset.hdrLoc += HDR_UNIT_SIZE; memcpy(&newOffset.offset, &fileBlock.data[newOffset.hdrLoc], HDR_UNIT_SIZE); endOp++; } return rc; } /******************************************************************************* * Add a signature value to the dictionary store. * Function first checks to see if the signature is already * in our string cache, and returns the corresponding token * if it is found in the cache. ******************************************************************************/ int Dctnry::updateDctnry(unsigned char* sigValue, int& sigSize, Token& token) { int rc = NO_ERROR; Signature sig; sig.signature = sigValue; sig.size = sigSize; // Look for string in cache // As long as the string <= 8000 bytes if (sigSize <= MAX_SIGNATURE_SIZE) { bool found = false; found = getTokenFromArray(sig); if (found) { token = sig.token; return NO_ERROR; } } // Insert into Dictionary rc = insertDctnry(sigSize, sigValue, token); // Add the new signature and token into cache // As long as the string is <= 8000 bytes if ((m_arraySize < MAX_STRING_CACHE_SIZE) && (sigSize <= MAX_SIGNATURE_SIZE)) { Signature sig; sig.size = sigSize; sig.signature = new unsigned char[sigSize]; memcpy(sig.signature, sigValue, sigSize); sig.token = token; m_sigArray.insert(sig); m_arraySize++; } return rc; } /******************************************************************************* * open dictionary file ******************************************************************************/ IDBDataFile* Dctnry::createDctnryFile(const char* name, int, const char* mode, int ioBuffSize, LBID_t lbid) { (void)lbid; return openFile(name, mode, ioBuffSize, false); } /******************************************************************************* * open dictionary file ******************************************************************************/ // @bug 5572 - HDFS usage: add *.tmp file backup flag IDBDataFile* Dctnry::openDctnryFile(bool useTmpSuffix) { return openFile(m_dctnryOID, m_dbRoot, m_partition, m_segment, m_segFileName, "r+b", DEFAULT_COLSIZ, useTmpSuffix); } /******************************************************************************* * close dictionary file ******************************************************************************/ void Dctnry::closeDctnryFile(bool doFlush, std::map& oids) { closeFile(m_dFile); m_dFile = NULL; } int Dctnry::numOfBlocksInFile() { long long fileSizeBytes = 0; getFileSize(m_dFile, fileSizeBytes); // dmc-error handling (ignoring rc) return fileSizeBytes / BYTE_PER_BLOCK; } void Dctnry::copyDctnryHeader(void* buf) { memcpy(buf, m_dctnryHeader2, m_totalHdrBytes); } } // namespace WriteEngine