You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-10-30 07:25:34 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			1488 lines
		
	
	
		
			45 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			1488 lines
		
	
	
		
			45 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /* Copyright (C) 2014 InfiniDB, Inc.
 | |
| 
 | |
|    This program is free software; you can redistribute it and/or
 | |
|    modify it under the terms of the GNU General Public License
 | |
|    as published by the Free Software Foundation; version 2 of
 | |
|    the License.
 | |
| 
 | |
|    This program is distributed in the hope that it will be useful,
 | |
|    but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
|    GNU General Public License for more details.
 | |
| 
 | |
|    You should have received a copy of the GNU General Public License
 | |
|    along with this program; if not, write to the Free Software
 | |
|    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 | |
|    MA 02110-1301, USA. */
 | |
| 
 | |
| /*******************************************************************************
 | |
|  * $Id: we_dctnry.cpp 4737 2013-08-14 20:45:46Z bwilkinson $
 | |
|  *
 | |
|  *******************************************************************************/
 | |
| /** @we_dctnry.cpp
 | |
|  *  When a signature is given, the value will be stored in dictionary and
 | |
|  *  a token will be issued. Given a token, the signature in the dictionary
 | |
|  *  can be deleted.
 | |
|  *  The whole file contains only one class Dctnry
 | |
|  */
 | |
| #include <cstdio>
 | |
| #include <cstdlib>
 | |
| #include <cstring>
 | |
| #include <vector>
 | |
| #include <set>
 | |
| #include <sstream>
 | |
| #include <inttypes.h>
 | |
| #include <iostream>
 | |
| using namespace std;
 | |
| 
 | |
| #include "bytestream.h"
 | |
| #include "brmtypes.h"
 | |
| #include "extentmap.h"  // for DICT_COL_WIDTH
 | |
| #include "we_stats.h"
 | |
| #include "we_log.h"
 | |
| #include "we_dctnry.h"
 | |
| using namespace messageqcpp;
 | |
| using namespace WriteEngine;
 | |
| using namespace BRM;
 | |
| #include "IDBDataFile.h"
 | |
| #include "IDBPolicy.h"
 | |
| #include "cacheutils.h"
 | |
| using namespace idbdatafile;
 | |
| #include "checks.h"
 | |
| #include "utils_utf8.h"  // for utf8_truncate_point()
 | |
| 
 | |
| namespace
 | |
| {
 | |
| // These used to be member variables, hence the "m_" prefix.  But they are
 | |
| // all constants, so I removed them as member variables.  May change the
 | |
| // variable name later (to remove the m_ prefix) as time allows.
 | |
| const uint16_t m_endHeader = DCTNRY_END_HEADER;  // end of header flag (0xffff)
 | |
| const uint16_t m_offSetZero = BYTE_PER_BLOCK;    // value for 0 offset (8192)
 | |
| const int m_totalHdrBytes =                      // # bytes in header
 | |
|     HDR_UNIT_SIZE + NEXT_PTR_BYTES + HDR_UNIT_SIZE + HDR_UNIT_SIZE;
 | |
| const int START_HDR1 =  // start loc of 2nd offset (HDR1)
 | |
|     HDR_UNIT_SIZE + NEXT_PTR_BYTES + HDR_UNIT_SIZE;
 | |
| const int PSEUDO_COL_WIDTH = DICT_COL_WIDTH;  // used to convert row count to block count
 | |
| const int MAX_BLOB_SIZE = 16777215;           // MCOL-4758 limit TEXT and BLOB to 16MB
 | |
| }  // namespace
 | |
| 
 | |
| namespace WriteEngine
 | |
| {
 | |
| // We will make this a constant for now.  If we ever decide to make
 | |
| // INITIAL_EXTENT_ROWS_TO_DISK configurable, we will need to move this
 | |
| // statement, and use Config class to get INITIAL_EXTENT_ROWS_TO_DISK.
 | |
| int NUM_BLOCKS_PER_INITIAL_EXTENT = ((INITIAL_EXTENT_ROWS_TO_DISK / BYTE_PER_BLOCK) * PSEUDO_COL_WIDTH);
 | |
| 
 | |
| /*******************************************************************************
 | |
|  * Description:
 | |
|  * Dctnry constructor
 | |
|  ******************************************************************************/
 | |
| Dctnry::Dctnry()
 | |
|  : m_nextPtr(NOT_USED_PTR)
 | |
|  , m_partition(0)
 | |
|  , m_segment(0)
 | |
|  , m_dbRoot(1)
 | |
|  , m_numBlocks(0)
 | |
|  , m_lastFbo(0)
 | |
|  , m_hwm(0)
 | |
|  , m_newStartOffset(0)
 | |
|  , m_freeSpace(0)
 | |
|  , m_curOp(0)
 | |
|  , m_colWidth(0)
 | |
|  , m_importDataMode(IMPORT_DATA_TEXT)
 | |
| {
 | |
|   memset(m_dctnryHeader, 0, sizeof(m_dctnryHeader));
 | |
|   memset(m_curBlock.data, 0, sizeof(m_curBlock.data));
 | |
|   m_curBlock.lbid = INVALID_LBID;
 | |
|   // add all initial header sizes for an empty block
 | |
|   m_freeSpace = BYTE_PER_BLOCK - m_totalHdrBytes;
 | |
| 
 | |
|   memcpy(m_dctnryHeader2, &m_freeSpace, HDR_UNIT_SIZE);
 | |
|   memcpy(m_dctnryHeader2 + HDR_UNIT_SIZE, &m_nextPtr, NEXT_PTR_BYTES);
 | |
|   memcpy(m_dctnryHeader2 + HDR_UNIT_SIZE + NEXT_PTR_BYTES, &m_offSetZero, HDR_UNIT_SIZE);
 | |
|   memcpy(m_dctnryHeader2 + HDR_UNIT_SIZE + NEXT_PTR_BYTES + HDR_UNIT_SIZE, &m_endHeader, HDR_UNIT_SIZE);
 | |
|   m_curFbo = INVALID_NUM;
 | |
|   m_curLbid = INVALID_LBID;
 | |
|   m_arraySize = 0;
 | |
| 
 | |
|   clear();  // files
 | |
| }
 | |
| 
 | |
| /*******************************************************************************
 | |
|  * Description:
 | |
|  * Dctnry destructor
 | |
|  ******************************************************************************/
 | |
| Dctnry::~Dctnry()
 | |
| {
 | |
|   // clear string cache here!
 | |
|   freeStringCache();
 | |
| }
 | |
| 
 | |
| /*******************************************************************************
 | |
|  * Description:
 | |
|  * Free memory consumed by dictionary string cache
 | |
|  ******************************************************************************/
 | |
| void Dctnry::freeStringCache()
 | |
| {
 | |
|   std::set<Signature, sig_compare>::iterator it;
 | |
|   for (it = m_sigArray.begin(); it != m_sigArray.end(); it++)
 | |
|   {
 | |
|     Signature sig = *it;
 | |
|     delete[] sig.signature;
 | |
|     sig.signature = 0;
 | |
|   }
 | |
| 
 | |
|   m_arraySize = 0;
 | |
|   m_sigArray.clear();
 | |
| }
 | |
| 
 | |
| /*******************************************************************************
 | |
|  * Description:
 | |
|  * Create a dictionary file and initialize the header
 | |
|  *
 | |
|  * PARAMETERS:
 | |
|  *    none
 | |
|  *
 | |
|  * RETURN:
 | |
|  *    success    - successfully write the header to block
 | |
|  *    failure    - it did not  write the header to block
 | |
|  ******************************************************************************/
 | |
| int Dctnry::init()
 | |
| {
 | |
|   // cout <<"Init called! m_dctnryOID ="  << m_dctnryOID << endl;
 | |
|   m_lastFbo = 0;
 | |
|   m_hwm = 0;
 | |
|   m_newStartOffset = 0;
 | |
|   m_freeSpace = 0;
 | |
|   m_curOp = 0;
 | |
|   memset(m_curBlock.data, 0, sizeof(m_curBlock.data));
 | |
|   m_curBlock.lbid = INVALID_LBID;
 | |
|   m_arraySize = 0;
 | |
| 
 | |
|   return NO_ERROR;
 | |
| }
 | |
| 
 | |
| /*******************************************************************************
 | |
|  * Description:
 | |
|  * Create a dictionary file and initialize the header, or can be used to
 | |
|  * just add an extent to an already open dictionary store file.
 | |
|  *
 | |
|  * PARAMETERS:
 | |
|  *    input
 | |
|  *        dctnryOID - dictionary OID
 | |
|  *        colWidth  - dictionary string width (not the token width)
 | |
|  *        dbRoot    - DBRoot where file is located
 | |
|  *        partition - partition number associated with the file
 | |
|  *        segment   - segment number associated with the file
 | |
|  *        startLbid - (out) starting LBID of the newly allocated extent
 | |
|  *        flag      - "true" indicates we are adding the first block and the
 | |
|  *                    file needs to be created with an abbreviated extent.
 | |
|  *                    "false" indicates we just want to add an extent to
 | |
|  *                    an existing file, and the file has already been opened.
 | |
|  *
 | |
|  * RETURN:
 | |
|  *    success    - successfully created file and/or extent
 | |
|  *    failure    - failed to create file and/or extent
 | |
|  ******************************************************************************/
 | |
| int Dctnry::createDctnry(const OID& dctnryOID, int colWidth, const uint16_t dbRoot, const uint32_t partition,
 | |
|                          const uint16_t segment, LBID_t& startLbid, bool flag)
 | |
| {
 | |
|   int allocSize = 0;
 | |
|   char fileName[FILE_NAME_SIZE];
 | |
|   int rc;
 | |
|   std::map<FID, FID> oids;
 | |
| 
 | |
| #ifdef PROFILE
 | |
|   Stats::startParseEvent(WE_STATS_ALLOC_DCT_EXTENT);
 | |
| #endif
 | |
| 
 | |
|   if (flag)
 | |
|   {
 | |
|     // Allocate extent before file creation.
 | |
|     // If we got an error while allocating dictionary store extent,
 | |
|     // we do not need to create/close the file, because it was not created
 | |
|     // yet. This logic is the same as column segment file creation - at
 | |
|     // first we allocate an extent, then we create a segment file.
 | |
|     rc = BRMWrapper::getInstance()->allocateDictStoreExtent((OID)dctnryOID, dbRoot, partition, segment,
 | |
|                                                             startLbid, allocSize);
 | |
| 
 | |
|     if (rc != NO_ERROR)
 | |
|     {
 | |
|       return rc;
 | |
|     }
 | |
| 
 | |
|     m_dctnryOID = dctnryOID;
 | |
|     m_partition = partition;
 | |
|     m_segment = segment;
 | |
|     m_dbRoot = dbRoot;
 | |
| 
 | |
|     RETURN_ON_ERROR((rc = oid2FileName(m_dctnryOID, fileName, true, m_dbRoot, m_partition, m_segment)));
 | |
|     m_segFileName = fileName;
 | |
| 
 | |
|     // if obsolete file exists, "w+b" will truncate and write over
 | |
|     m_dFile = createDctnryFile(fileName, colWidth, "w+b", DEFAULT_BUFSIZ, startLbid);
 | |
| 
 | |
|     {
 | |
|       // We presume the path will contain /
 | |
|       std::string filePath(fileName);
 | |
|       if (chownDataPath(filePath))
 | |
|       {
 | |
|         return ERR_FILE_CHOWN;
 | |
|       }
 | |
|     }
 | |
|   }
 | |
|   else
 | |
|   {
 | |
|     rc = BRMWrapper::getInstance()->allocateDictStoreExtent((OID)m_dctnryOID, m_dbRoot, m_partition,
 | |
|                                                             m_segment, startLbid, allocSize);
 | |
| 
 | |
|     if (rc != NO_ERROR)
 | |
|     {
 | |
|       return rc;
 | |
|     }
 | |
| 
 | |
|     RETURN_ON_ERROR(setFileOffset(m_dFile, 0, SEEK_END));
 | |
|   }
 | |
| 
 | |
|   // We allocate a full extent from BRM, but only write an abbreviated 256K
 | |
|   // rows to disk for 1st extent in each store file, to conserve disk usage.
 | |
|   int totalSize = allocSize;
 | |
| 
 | |
|   if (flag)
 | |
|   {
 | |
|     totalSize = NUM_BLOCKS_PER_INITIAL_EXTENT;
 | |
|   }
 | |
| 
 | |
|   if (!isDiskSpaceAvail(Config::getDBRootByNum(m_dbRoot), totalSize))
 | |
|   {
 | |
|     if (flag)
 | |
|     {
 | |
|       closeDctnryFile(false, oids);
 | |
|     }
 | |
| 
 | |
|     return ERR_FILE_DISK_SPACE;
 | |
|   }
 | |
| 
 | |
| #ifdef PROFILE
 | |
|   Stats::stopParseEvent(WE_STATS_ALLOC_DCT_EXTENT);
 | |
| #endif
 | |
| 
 | |
|   if (m_dFile != NULL)
 | |
|   {
 | |
|     // MCOL-498 CS optimizes abbreviated extent
 | |
|     // creation.
 | |
|     rc = FileOp::initDctnryExtent(m_dFile, m_dbRoot, totalSize, m_dctnryHeader2, m_totalHdrBytes, false,
 | |
|                                   true,  // explicitly optimize
 | |
|                                   startLbid);
 | |
|     if (rc != NO_ERROR)
 | |
|     {
 | |
|       if (flag)
 | |
|       {
 | |
|         closeDctnryFile(false, oids);
 | |
|       }
 | |
| 
 | |
|       return rc;
 | |
|     }
 | |
|   }
 | |
|   else
 | |
|     return ERR_FILE_CREATE;
 | |
| 
 | |
|   if (flag)
 | |
|   {
 | |
|     closeDctnryFile(true, oids);
 | |
|     m_numBlocks = totalSize;
 | |
|     m_hwm = 0;
 | |
|     rc = BRMWrapper::getInstance()->setLocalHWM(m_dctnryOID, m_partition, m_segment, m_hwm);
 | |
|   }
 | |
|   else
 | |
|   {
 | |
|     m_numBlocks = m_numBlocks + totalSize;
 | |
|   }
 | |
| 
 | |
|   return rc;
 | |
| }
 | |
| 
 | |
| /*******************************************************************************
 | |
|  * Description:
 | |
|  * This function should be called to expand an abbreviated dictionary extent
 | |
|  * into a full extent on disk.
 | |
|  *
 | |
|  * PARAMETERS:
 | |
|  *    none
 | |
|  *
 | |
|  * RETURN:
 | |
|  *    success    - successfully expanded extent
 | |
|  *    failure    - failed to expand extent
 | |
|  ******************************************************************************/
 | |
| int Dctnry::expandDctnryExtent()
 | |
| {
 | |
|   RETURN_ON_NULL(m_dFile, ERR_FILE_SEEK);
 | |
| 
 | |
|   off64_t oldOffset = m_dFile->tell();
 | |
| 
 | |
|   RETURN_ON_ERROR(setFileOffset(m_dFile, 0, SEEK_END));
 | |
| 
 | |
|   // Based on extent size, see how many blocks to add to fill the extent
 | |
|   int blksToAdd =
 | |
|       (((int)BRMWrapper::getInstance()->getExtentRows() - INITIAL_EXTENT_ROWS_TO_DISK) / BYTE_PER_BLOCK) *
 | |
|       PSEUDO_COL_WIDTH;
 | |
| 
 | |
|   if (!isDiskSpaceAvail(Config::getDBRootByNum(m_dbRoot), blksToAdd))
 | |
|   {
 | |
|     return ERR_FILE_DISK_SPACE;
 | |
|   }
 | |
| 
 | |
|   int rc = FileOp::initDctnryExtent(m_dFile, m_dbRoot, blksToAdd, m_dctnryHeader2, m_totalHdrBytes, true,
 | |
|                                     true);  // explicitly optimize
 | |
| 
 | |
|   if (rc != NO_ERROR)
 | |
|     return rc;
 | |
| 
 | |
|   // Restore offset back to where we were before expanding the extent
 | |
|   RETURN_ON_ERROR(setFileOffset(m_dFile, oldOffset, SEEK_SET));
 | |
| 
 | |
|   // Update block count to reflect disk space added by expanding the extent.
 | |
|   m_numBlocks = m_numBlocks + blksToAdd;
 | |
| 
 | |
|   return rc;
 | |
| }
 | |
| 
 | |
| /*******************************************************************************
 | |
|  * DESCRIPTION:
 | |
|  *    Close dictionary files
 | |
|  *
 | |
|  * PARAMETERS:
 | |
|  *    none
 | |
|  *
 | |
|  * RETURN:
 | |
|  *    none
 | |
|  ******************************************************************************/
 | |
| int Dctnry::closeDctnry(bool realClose)
 | |
| {
 | |
|   if (!m_dFile)
 | |
|     return NO_ERROR;
 | |
| 
 | |
|   int rc;
 | |
|   CommBlock cb;
 | |
|   cb.file.oid = m_dctnryOID;
 | |
|   cb.file.pFile = m_dFile;
 | |
|   std::map<FID, FID> oids;
 | |
| 
 | |
|   if (m_curBlock.state == BLK_WRITE)
 | |
|   {
 | |
|     rc = writeDBFile(cb, &m_curBlock, m_curBlock.lbid);
 | |
| 
 | |
|     if (rc != NO_ERROR)
 | |
|     {
 | |
|       closeDctnryFile(false, oids);
 | |
|       return rc;
 | |
|     }
 | |
| 
 | |
|     memset(m_curBlock.data, 0, sizeof(m_curBlock.data));
 | |
|     // m_curBlock.state== BLK_INIT;
 | |
|   }
 | |
| 
 | |
|   //@Bug 5572. always close file for uncompressed file.
 | |
|   if (FileOp::compressionType() == 0)
 | |
|     realClose = true;
 | |
| 
 | |
|   if (realClose)
 | |
|   {
 | |
|     //@Bug 5689. Need pass oid to write to the right file.
 | |
|     oids[m_dctnryOID] = m_dctnryOID;
 | |
|     // dmc-error handling (should detect/report error in closing file)
 | |
|     closeDctnryFile(true, oids);
 | |
|   }
 | |
| 
 | |
|   m_hwm = (HWM)m_lastFbo;
 | |
|   idbassert(utils::is_nonnegative(m_dctnryOID));
 | |
| 
 | |
|   if (idbdatafile::IDBPolicy::useHdfs() && realClose)
 | |
|   {
 | |
|     BRM::FileInfo aFile;
 | |
|     std::vector<BRM::OID_t> oidsToFlush;
 | |
|     oidsToFlush.push_back(m_dctnryOID);
 | |
|     aFile.oid = m_dctnryOID;
 | |
|     aFile.partitionNum = m_partition;
 | |
|     aFile.segmentNum = m_segment;
 | |
|     aFile.dbRoot = m_dbRoot;
 | |
|     aFile.compType = FileOp::compressionType();
 | |
|     std::vector<BRM::FileInfo> aFileInfo;
 | |
|     aFileInfo.push_back(aFile);
 | |
|     cacheutils::purgePrimProcFdCache(aFileInfo, Config::getLocalModuleID());
 | |
|     cacheutils::flushOIDsFromCache(oidsToFlush);
 | |
|   }
 | |
| 
 | |
|   rc = BRMWrapper::getInstance()->setLocalHWM(m_dctnryOID, m_partition, m_segment, m_hwm);
 | |
| 
 | |
|   if (rc != NO_ERROR)
 | |
|     return rc;
 | |
| 
 | |
|   // cout <<"Init called! m_dctnryOID ="  << m_dctnryOID << endl;
 | |
|   freeStringCache();
 | |
| 
 | |
|   return NO_ERROR;
 | |
| }
 | |
| 
 | |
| /*******************************************************************************
 | |
|  * DESCRIPTION:
 | |
|  *    Close dictionary file without flushing block buffer or updating
 | |
|  *    BRM with HWM.
 | |
|  *
 | |
|  * PARAMETERS:
 | |
|  *    none
 | |
|  *
 | |
|  * RETURN:
 | |
|  *    none
 | |
|  ******************************************************************************/
 | |
| int Dctnry::closeDctnryOnly()
 | |
| {
 | |
|   if (!m_dFile)
 | |
|     return NO_ERROR;
 | |
| 
 | |
|   // dmc-error handling (should detect/report error in closing file)
 | |
|   std::map<FID, FID> oids;
 | |
|   closeDctnryFile(false, oids);
 | |
| 
 | |
|   freeStringCache();
 | |
| 
 | |
|   return NO_ERROR;
 | |
| }
 | |
| 
 | |
| /*******************************************************************************
 | |
|  * DESCRIPTION:
 | |
|  *    drop/delete dictionary file
 | |
|  *
 | |
|  * PARAMETERS:
 | |
|  *    dctnryOID -- file number to drop
 | |
|  *
 | |
|  * RETURN:
 | |
|  *    none
 | |
|  ******************************************************************************/
 | |
| int Dctnry::dropDctnry(const OID& dctnryOID)
 | |
| {
 | |
|   m_dctnryOID = dctnryOID;
 | |
| 
 | |
|   if (m_dFile)
 | |
|   {
 | |
|     RETURN_ON_ERROR(closeDctnry());
 | |
|   }
 | |
| 
 | |
|   return deleteFile(dctnryOID);
 | |
| }
 | |
| 
 | |
| /*******************************************************************************
 | |
|  * DESCRIPTION:
 | |
|  *    open dictionary file
 | |
|  *
 | |
|  * PARAMETERS:
 | |
|  *    dctnryOID-- for open dictionary file
 | |
|  *    dbRoot   -- DBRoot for dictionary store segment file
 | |
|  *    partition-- partition for dictionary store segment file
 | |
|  *    segment  -- segment for dictionary store segment file
 | |
|  *    useTmpSuffix - for Bulk HDFS usage: use or not use *.tmp file suffix
 | |
|  *
 | |
|  * RETURN:
 | |
|  *    successful- NO_ERROR
 | |
|  *    Fail      - Error Code
 | |
|  ******************************************************************************/
 | |
| // @bug 5572 - HDFS usage: add *.tmp file backup flag
 | |
| int Dctnry::openDctnry(const OID& dctnryOID, const uint16_t dbRoot, const uint32_t partition,
 | |
|                        const uint16_t segment, const bool useTmpSuffix)
 | |
| {
 | |
| #ifdef PROFILE
 | |
|   Stats::startParseEvent(WE_STATS_OPEN_DCT_FILE);
 | |
| #endif
 | |
|   int rc = NO_ERROR;
 | |
|   m_dctnryOID = dctnryOID;
 | |
|   m_dbRoot = dbRoot;
 | |
|   m_partition = partition;
 | |
|   m_segment = segment;
 | |
| 
 | |
|   m_dFile = openDctnryFile(useTmpSuffix);
 | |
| 
 | |
|   if (m_dFile == NULL)
 | |
|   {
 | |
|     ostringstream oss;
 | |
|     oss << "oid:partition:segment " << dctnryOID << ":" << partition << ":" << segment;
 | |
|     logging::Message::Args args;
 | |
|     logging::Message message(1);
 | |
|     args.add("Error opening dictionary file ");
 | |
|     args.add(oss.str());
 | |
|     args.add("");
 | |
|     args.add("");
 | |
|     message.format(args);
 | |
|     logging::LoggingID lid(21);
 | |
|     logging::MessageLog ml(lid);
 | |
| 
 | |
|     ml.logErrorMessage(message);
 | |
|     return ERR_FILE_OPEN;
 | |
|   }
 | |
| 
 | |
|   m_numBlocks = numOfBlocksInFile();
 | |
|   std::map<FID, FID> oids;
 | |
| 
 | |
|   // Initialize other misc member variables
 | |
|   init();
 | |
| 
 | |
|   int extState;
 | |
|   rc = BRMWrapper::getInstance()->getLocalHWM(dctnryOID, m_partition, m_segment, m_hwm, extState);
 | |
| 
 | |
|   if (rc != NO_ERROR)
 | |
|   {
 | |
|     closeDctnryFile(false, oids);
 | |
|     return rc;
 | |
|   }
 | |
| 
 | |
|   m_lastFbo = (int)m_hwm;
 | |
| 
 | |
|   memset(m_curBlock.data, 0, sizeof(m_curBlock.data));
 | |
|   m_curFbo = m_lastFbo;
 | |
|   rc = BRMWrapper::getInstance()->getBrmInfo(m_dctnryOID, m_partition, m_segment, m_curFbo, m_curLbid);
 | |
| 
 | |
|   if (rc != NO_ERROR)
 | |
|   {
 | |
|     closeDctnryFile(false, oids);
 | |
|     return rc;
 | |
|   }
 | |
| 
 | |
|   CommBlock cb;
 | |
|   cb.file.oid = m_dctnryOID;
 | |
|   cb.file.pFile = m_dFile;
 | |
| #ifdef PROFILE
 | |
|   // We omit the call to readDBFile from OPEN_DCT_FILE stats, because com-
 | |
|   // pressed files have separate stats that readDBFile() will capture thru
 | |
|   // ChunkManager::fetchChunkFromFile().
 | |
|   Stats::stopParseEvent(WE_STATS_OPEN_DCT_FILE);
 | |
| #endif
 | |
|   rc = readDBFile(cb, m_curBlock.data, m_curLbid);
 | |
| #ifdef PROFILE
 | |
|   Stats::startParseEvent(WE_STATS_OPEN_DCT_FILE);
 | |
| #endif
 | |
| 
 | |
|   if (rc != NO_ERROR)
 | |
|   {
 | |
|     closeDctnryFile(false, oids);
 | |
|     return rc;
 | |
|   }
 | |
| 
 | |
|   //@Bug 5567  Don't seek for compressed file.
 | |
|   if (m_compressionType == 0)
 | |
|   {
 | |
|     // Position file to the start of the current block;
 | |
|     // Determine file byte offset based on the current block offset (m_curFbo)
 | |
|     long long byteOffset = ((long long)m_curFbo) * (long)BYTE_PER_BLOCK;
 | |
|     rc = setFileOffset(m_dFile, byteOffset);
 | |
| 
 | |
|     if (rc != NO_ERROR)
 | |
|     {
 | |
|       closeDctnryFile(false, oids);
 | |
|       return rc;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   m_curBlock.lbid = m_curLbid;
 | |
|   m_curBlock.state = BLK_READ;
 | |
|   int opCnt = 0;
 | |
|   // Get new free space (m_freeSpace) from header too! Here!!!!!!!!!!!!!!!
 | |
|   getBlockOpCount(m_curBlock, opCnt);
 | |
|   m_curOp = opCnt;
 | |
| 
 | |
|   // "If" this store file contains no more than 1 block, then we preload
 | |
|   // the string cache used to recognize duplicates during row insertion.
 | |
|   if (m_hwm == 0)
 | |
|   {
 | |
|     preLoadStringCache(m_curBlock);
 | |
|   }
 | |
| 
 | |
| #ifdef PROFILE
 | |
|   Stats::stopParseEvent(WE_STATS_OPEN_DCT_FILE);
 | |
| #endif
 | |
| 
 | |
|   return rc;
 | |
| }
 | |
| 
 | |
| /*******************************************************************************
 | |
|  * Description:
 | |
|  * Determine if the specified signature is present in the string cache.
 | |
|  *
 | |
|  * PARAMETERS:
 | |
|  *    input
 | |
|  *       sig - signature to search for
 | |
|  *
 | |
|  * RETURN:
 | |
|  *    true  - if signature if found
 | |
|  *    false - if signature is not found
 | |
|  ******************************************************************************/
 | |
| bool Dctnry::getTokenFromArray(Signature& sig)
 | |
| {
 | |
|   std::set<Signature, sig_compare>::iterator it;
 | |
|   it = m_sigArray.find(sig);
 | |
|   if (it == m_sigArray.end())
 | |
|   {
 | |
|     return false;
 | |
|   }
 | |
|   else
 | |
|   {
 | |
|     Signature sigfound = *it;
 | |
|     sig.token = sigfound.token;
 | |
|     return true;
 | |
|   }
 | |
| 
 | |
|   return false;
 | |
| }
 | |
| 
 | |
| /*******************************************************************************
 | |
|  * Description:
 | |
|  * Used by bulk import to insert a signature into m_curBlock, and update
 | |
|  * the m_curBlock header accordingly.
 | |
|  *
 | |
|  * PARAMETERS:
 | |
|  *    input
 | |
|  *       sig   - signature to be inserted
 | |
|  *    output
 | |
|  *       token - token that was assigned to the inserted signature
 | |
|  *
 | |
|  * RETURN:
 | |
|  *    success    - successfully write the signature to the block
 | |
|  *    failure    - failed to extend/create an extent for the block
 | |
|  ******************************************************************************/
 | |
| int Dctnry::insertDctnry2(Signature& sig)
 | |
| {
 | |
|   int rc = 0;
 | |
|   int write_size;
 | |
|   bool lbid_in_token = false;
 | |
|   size_t origSigSize = sig.size;
 | |
|   unsigned char* origSig = sig.signature;
 | |
| 
 | |
|   sig.token.bc = 0;
 | |
| 
 | |
|   while (sig.size > 0 || !lbid_in_token)
 | |
|   {
 | |
|     if (sig.size > (m_freeSpace - HDR_UNIT_SIZE))
 | |
|     {
 | |
|       write_size = (m_freeSpace - HDR_UNIT_SIZE);
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|       write_size = sig.size;
 | |
|     }
 | |
| 
 | |
|     insertDctnryHdr(m_curBlock.data, write_size);
 | |
|     insertSgnture(m_curBlock.data, write_size, (unsigned char*)sig.signature);
 | |
| 
 | |
|     sig.size -= write_size;
 | |
|     sig.signature += write_size;
 | |
|     m_curFbo = m_lastFbo;
 | |
| 
 | |
|     if (!lbid_in_token)
 | |
|     {
 | |
|       sig.token.fbo = m_curLbid;
 | |
|       sig.token.op = m_curOp;
 | |
|       lbid_in_token = true;
 | |
|     }
 | |
| 
 | |
|     if (sig.size > 0)
 | |
|     {
 | |
|       CommBlock cb;
 | |
|       cb.file.oid = m_dctnryOID;
 | |
|       cb.file.pFile = m_dFile;
 | |
|       sig.token.bc++;
 | |
| 
 | |
|       RETURN_ON_ERROR(writeDBFileNoVBCache(cb, &m_curBlock, m_curFbo));
 | |
|       memset(m_curBlock.data, 0, sizeof(m_curBlock.data));
 | |
|       memcpy(m_curBlock.data, &m_dctnryHeader2, m_totalHdrBytes);
 | |
|       m_freeSpace = BYTE_PER_BLOCK - m_totalHdrBytes;
 | |
|       m_curBlock.state = BLK_WRITE;
 | |
|       m_curOp = 0;
 | |
|       m_lastFbo++;
 | |
|       m_curFbo = m_lastFbo;
 | |
| 
 | |
|       //...Expand current extent if it is an abbreviated initial extent
 | |
|       if ((m_curFbo == m_numBlocks) && (m_numBlocks == NUM_BLOCKS_PER_INITIAL_EXTENT))
 | |
|       {
 | |
|         RETURN_ON_ERROR(expandDctnryExtent());
 | |
|       }
 | |
| 
 | |
|       //...Allocate a new extent if we have reached the last block in the
 | |
|       //   current extent.
 | |
|       if (m_curFbo == m_numBlocks)
 | |
|       {
 | |
|         // last block
 | |
|         // for roll back the extent to use
 | |
|         // Save those empty extents in case of failure to rollback
 | |
|         std::vector<ExtentInfo> dictExtentInfo;
 | |
|         ExtentInfo info;
 | |
|         info.oid = m_dctnryOID;
 | |
|         info.partitionNum = m_partition;
 | |
|         info.segmentNum = m_segment;
 | |
|         info.dbRoot = m_dbRoot;
 | |
|         info.hwm = m_hwm;
 | |
|         info.newFile = false;
 | |
|         dictExtentInfo.push_back(info);
 | |
|         LBID_t startLbid;
 | |
|         // Add an extent.
 | |
|         rc = createDctnry(m_dctnryOID,
 | |
|                           0,  // dummy column width
 | |
|                           m_dbRoot, m_partition, m_segment, startLbid, false);
 | |
| 
 | |
|         if (rc != NO_ERROR)
 | |
|         {
 | |
|           // roll back the extent
 | |
|           BRMWrapper::getInstance()->deleteEmptyDictStoreExtents(dictExtentInfo);
 | |
|           return rc;
 | |
|         }
 | |
|       }
 | |
| 
 | |
|       RETURN_ON_ERROR(
 | |
|           BRMWrapper::getInstance()->getBrmInfo(m_dctnryOID, m_partition, m_segment, m_curFbo, m_curLbid));
 | |
|       m_curBlock.lbid = m_curLbid;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   sig.size = origSigSize;
 | |
|   sig.signature = origSig;
 | |
|   return NO_ERROR;
 | |
| }
 | |
| 
 | |
| /*******************************************************************************
 | |
|  * Description:
 | |
|  * Used by bulk import to insert collection of strings into this store file.
 | |
|  * Function assumes that the file is already positioned to the current block.
 | |
|  *
 | |
|  * PARAMETERS:
 | |
|  *    input
 | |
|  *       buf - character buffer containing input strings
 | |
|  *       pos - meta data describing data in "buf"
 | |
|  *       totalRow - number of rows in "buf"
 | |
|  *       col - column of strings to be parsed from "buf"
 | |
|  *    output
 | |
|  *       tokenBuf  - tokens assigned to inserted strings
 | |
|  *
 | |
|  * RETURN:
 | |
|  *    success    - successfully write the header to block
 | |
|  *    failure    - it did not  write the header to block
 | |
|  ******************************************************************************/
 | |
| int Dctnry::insertDctnry(const char* buf, ColPosPair** pos, const int totalRow, const int col, char* tokenBuf,
 | |
|                          long long& truncCount, const CHARSET_INFO* cs, const WriteEngine::ColType& weType)
 | |
| {
 | |
| #ifdef PROFILE
 | |
|   Stats::startParseEvent(WE_STATS_PARSE_DCT);
 | |
| #endif
 | |
|   int startPos = 0;
 | |
|   int totalUseSize = 0;
 | |
| 
 | |
|   int outOffset = 0;
 | |
|   const char* pIn;
 | |
|   char* pOut = tokenBuf;
 | |
|   Signature curSig;
 | |
|   bool found = false;
 | |
|   bool next = false;
 | |
|   CommBlock cb;
 | |
|   cb.file.oid = m_dctnryOID;
 | |
|   cb.file.pFile = m_dFile;
 | |
|   WriteEngine::Token nullToken;
 | |
| 
 | |
|   //...Loop through all the rows for the specified column
 | |
|   while (startPos < totalRow)
 | |
|   {
 | |
|     found = false;
 | |
|     void* curSigPtr = static_cast<void*>(&curSig);
 | |
|     memset(curSigPtr, 0, sizeof(curSig));
 | |
|     curSig.size = pos[startPos][col].offset;
 | |
| 
 | |
|     // Strip trailing null bytes '\0' (by adjusting curSig.size) if import-
 | |
|     // ing in binary mode.  If entire string is binary zeros, then we treat
 | |
|     // as a NULL value.
 | |
|     if (m_importDataMode != IMPORT_DATA_TEXT)
 | |
|     {
 | |
|       if ((curSig.size > 0) && (curSig.size != COLPOSPAIR_NULL_TOKEN_OFFSET))
 | |
|       {
 | |
|         char* fld = (char*)buf + pos[startPos][col].start;
 | |
|         int kk = curSig.size - 1;
 | |
| 
 | |
|         for (; kk >= 0; kk--)
 | |
|         {
 | |
|           if (fld[kk] != '\0')
 | |
|             break;
 | |
|         }
 | |
| 
 | |
|         curSig.size = kk + 1;
 | |
|       }
 | |
|     }
 | |
| 
 | |
|     // Read thread should validate against max size so that the entire row
 | |
|     // can be rejected up front.  Once we get here in the parsing thread,
 | |
|     // it is too late to reject the row.  However, as a precaution, we
 | |
|     // still check against max size & set to null token if needed.
 | |
|     if ((curSig.size == 0) || (curSig.size == COLPOSPAIR_NULL_TOKEN_OFFSET) || (curSig.size > MAX_BLOB_SIZE))
 | |
|     {
 | |
|       if (m_defVal.length() > 0)  // use default string if available
 | |
|       {
 | |
|         pIn = m_defVal.str();
 | |
|         curSig.signature = (unsigned char*)pIn;
 | |
|         curSig.size = m_defVal.length();
 | |
|       }
 | |
|       else
 | |
|       {
 | |
|         memcpy(pOut + outOffset, &nullToken, 8);
 | |
|         outOffset += 8;
 | |
|         startPos++;
 | |
|         continue;
 | |
|       }
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|       pIn = (char*)buf + pos[startPos][col].start;
 | |
|       curSig.signature = (unsigned char*)pIn;
 | |
|     }
 | |
| 
 | |
|     if (cs->mbmaxlen > 1)
 | |
|     {
 | |
|       // For TEXT columns, we truncate based on the number of bytes,
 | |
|       // and not based on the number of characters, as for CHAR/VARCHAR
 | |
|       // columns in the else block.
 | |
|       if (weType == WriteEngine::WR_TEXT)
 | |
|       {
 | |
|         if (curSig.size > m_colWidth)
 | |
|         {
 | |
|           uint8_t truncate_point = utf8::utf8_truncate_point((const char*)curSig.signature, m_colWidth);
 | |
|           curSig.size = m_colWidth - truncate_point;
 | |
|           truncCount++;
 | |
|         }
 | |
|       }
 | |
|       else
 | |
|       {
 | |
|         const char* start = (const char*)curSig.signature;
 | |
|         const char* end = (const char*)(curSig.signature + curSig.size);
 | |
|         size_t numChars = cs->numchars(start, end);
 | |
|         size_t maxCharLength = m_colWidth / cs->mbmaxlen;
 | |
| 
 | |
|         if (numChars > maxCharLength)
 | |
|         {
 | |
|           MY_STRCOPY_STATUS status;
 | |
|           cs->well_formed_char_length(start, end, maxCharLength, &status);
 | |
|           curSig.size = status.m_source_end_pos - start;
 | |
|           truncCount++;
 | |
|         }
 | |
|       }
 | |
|     }
 | |
|     else  // cs->mbmaxlen == 1
 | |
|     {
 | |
|       if (curSig.size > m_colWidth)
 | |
|       {
 | |
|         curSig.size = m_colWidth;
 | |
|         truncCount++;
 | |
|       }
 | |
|     }
 | |
| 
 | |
|     //...Search for the string in our string cache
 | |
|     // if it fits into one block (< 8KB)
 | |
|     if (curSig.size <= MAX_SIGNATURE_SIZE)
 | |
|     {
 | |
|       // Stats::startParseEvent("getTokenFromArray");
 | |
|       found = getTokenFromArray(curSig);
 | |
| 
 | |
|       if (found)
 | |
|       {
 | |
|         memcpy(pOut + outOffset, &curSig.token, 8);
 | |
|         outOffset += 8;
 | |
|         startPos++;
 | |
|         // Stats::stopParseEvent("getTokenFromArray");
 | |
|         continue;
 | |
|       }
 | |
| 
 | |
|       // Stats::stopParseEvent("getTokenFromArray");
 | |
|     }
 | |
| 
 | |
|     totalUseSize = m_totalHdrBytes + curSig.size;
 | |
| 
 | |
|     //...String not found in cache, so proceed.
 | |
|     //   If room is available in current block then insert into block.
 | |
|     // @bug 3960: Add MAX_OP_COUNT check to handle case after bulk rollback
 | |
|     if (((totalUseSize <= m_freeSpace - HDR_UNIT_SIZE) ||
 | |
|          ((curSig.size > 8176) && (m_freeSpace > HDR_UNIT_SIZE))) &&
 | |
|         (m_curOp < (MAX_OP_COUNT - 1)))
 | |
|     {
 | |
|       RETURN_ON_ERROR(insertDctnry2(curSig));  // m_freeSpace updated!
 | |
|       m_curBlock.state = BLK_WRITE;
 | |
|       memcpy(pOut + outOffset, &curSig.token, 8);
 | |
|       outOffset += 8;
 | |
|       startPos++;
 | |
|       found = true;
 | |
| 
 | |
|       //...If we have reached limit for the number of strings allowed in
 | |
|       //   a block, then we write the current block so that we can start
 | |
|       //   another block.
 | |
|       if (m_curOp >= MAX_OP_COUNT - 1)
 | |
|       {
 | |
| #ifdef PROFILE
 | |
|         Stats::stopParseEvent(WE_STATS_PARSE_DCT);
 | |
| #endif
 | |
|         RETURN_ON_ERROR(writeDBFileNoVBCache(cb, &m_curBlock, m_curFbo));
 | |
|         m_curBlock.state = BLK_READ;
 | |
|         next = true;
 | |
|       }
 | |
| 
 | |
|       //...Add string to cache, if we have not exceeded cache limit
 | |
|       // Don't cache big blobs
 | |
|       if ((m_arraySize < MAX_STRING_CACHE_SIZE) && (curSig.size <= MAX_SIGNATURE_SIZE))
 | |
|       {
 | |
|         addToStringCache(curSig);
 | |
|       }
 | |
|     }
 | |
|     else  //...No room for this string in current block, so we write
 | |
|           //   out the current block, so we can start another block
 | |
|     {
 | |
| #ifdef PROFILE
 | |
|       Stats::stopParseEvent(WE_STATS_PARSE_DCT);
 | |
| #endif
 | |
|       RETURN_ON_ERROR(writeDBFileNoVBCache(cb, &m_curBlock, m_curFbo));
 | |
|       m_curBlock.state = BLK_READ;
 | |
|       next = true;
 | |
|       found = false;
 | |
|     }  // if m_freeSpace
 | |
| 
 | |
|     //..."next" flag is used to indicate that we need to advance to the
 | |
|     //   next block in the store file.
 | |
|     if (next)
 | |
|     {
 | |
|       memset(m_curBlock.data, 0, sizeof(m_curBlock.data));
 | |
|       memcpy(m_curBlock.data, &m_dctnryHeader2, m_totalHdrBytes);
 | |
|       m_freeSpace = BYTE_PER_BLOCK - m_totalHdrBytes;
 | |
|       m_curBlock.state = BLK_WRITE;
 | |
|       m_curOp = 0;
 | |
|       next = false;
 | |
|       m_lastFbo++;
 | |
|       m_curFbo = m_lastFbo;
 | |
| 
 | |
|       //...Expand current extent if it is an abbreviated initial extent
 | |
|       if ((m_curFbo == m_numBlocks) && (m_numBlocks == NUM_BLOCKS_PER_INITIAL_EXTENT))
 | |
|       {
 | |
|         RETURN_ON_ERROR(expandDctnryExtent());
 | |
|       }
 | |
| 
 | |
|       //...Allocate a new extent if we have reached the last block in the
 | |
|       //   current extent.
 | |
|       if (m_curFbo == m_numBlocks)
 | |
|       {
 | |
|         // last block
 | |
|         LBID_t startLbid;
 | |
| 
 | |
|         // Add an extent.
 | |
|         RETURN_ON_ERROR(
 | |
|             createDctnry(m_dctnryOID, m_colWidth, m_dbRoot, m_partition, m_segment, startLbid, false));
 | |
| 
 | |
|         if (m_logger)
 | |
|         {
 | |
|           std::ostringstream oss;
 | |
|           oss << "Add dictionary extent OID-" << m_dctnryOID << "; DBRoot-" << m_dbRoot << "; part-"
 | |
|               << m_partition << "; seg-" << m_segment << "; hwm-" << m_curFbo << "; LBID-" << startLbid
 | |
|               << "; file-" << m_segFileName;
 | |
|           m_logger->logMsg(oss.str(), MSGLVL_INFO2);
 | |
|         }
 | |
| 
 | |
|         m_curLbid = startLbid;
 | |
| 
 | |
|         // now seek back to the curFbo, after adding an extent
 | |
|         // @bug5769 For uncompressed only;
 | |
|         // ChunkManager manages the file offset for the compression case
 | |
|         if (m_compressionType == 0)
 | |
|         {
 | |
| #ifdef PROFILE
 | |
|           Stats::startParseEvent(WE_STATS_PARSE_DCT_SEEK_EXTENT_BLK);
 | |
| #endif
 | |
|           long long byteOffset = m_curFbo;
 | |
|           byteOffset *= BYTE_PER_BLOCK;
 | |
|           RETURN_ON_ERROR(setFileOffset(m_dFile, byteOffset));
 | |
| #ifdef PROFILE
 | |
|           Stats::stopParseEvent(WE_STATS_PARSE_DCT_SEEK_EXTENT_BLK);
 | |
| #endif
 | |
|         }
 | |
|       }
 | |
|       else
 | |
|       {
 | |
|         // LBIDs are numbered collectively and consecutively within an
 | |
|         // extent, so within an extent we can derive the LBID by simply
 | |
|         // incrementing it rather than having to go back to BRM to look
 | |
|         // up the LBID for each FBO.
 | |
|         m_curLbid++;
 | |
|       }
 | |
| 
 | |
| #ifdef PROFILE
 | |
|       Stats::startParseEvent(WE_STATS_PARSE_DCT);
 | |
| #endif
 | |
|       m_curBlock.lbid = m_curLbid;
 | |
| 
 | |
|       //..."found" flag indicates whether the string was already found
 | |
|       //   "or" added to the end of the previous block.  If false, then
 | |
|       //   we need to add the string to the new block.
 | |
|       if (!found)
 | |
|       {
 | |
|         RETURN_ON_ERROR(insertDctnry2(curSig));  // m_freeSpace updated!
 | |
|         m_curBlock.state = BLK_WRITE;
 | |
|         memcpy(pOut + outOffset, &curSig.token, 8);
 | |
|         outOffset += 8;
 | |
|         startPos++;
 | |
| 
 | |
|         //...Add string to cache, if we have not exceeded cache limit
 | |
|         if ((m_arraySize < MAX_STRING_CACHE_SIZE) && (curSig.size <= MAX_SIGNATURE_SIZE))
 | |
|         {
 | |
|           addToStringCache(curSig);
 | |
|         }
 | |
|       }
 | |
|     }  // if next
 | |
|   }  // end while
 | |
| 
 | |
| #ifdef PROFILE
 | |
|   Stats::stopParseEvent(WE_STATS_PARSE_DCT);
 | |
| #endif
 | |
|   // Done
 | |
|   // If any data leftover and not written by subsequent call to
 | |
|   // insertDctnry(), then it will be written by closeDctnry().
 | |
| 
 | |
|   return NO_ERROR;
 | |
| }
 | |
| 
 | |
| /*******************************************************************************
 | |
|  * DESCRIPTION:
 | |
|  * Used by DML to insert a single string into this store file.
 | |
|  * (1) Insert a signature value into the block
 | |
|  * (2) The header information inserted at front
 | |
|  * (3) The signature inserted from back
 | |
|  * (4) Total minimum header size-- free space 2bytes, next pointer 8 bytes
 | |
|  *     zero offset 2 bytes, end of header 2 bytes, total 14 bytes
 | |
|  *     plus 2 bytes for new values' starting offset value storage
 | |
|  *     total 14 bytes
 | |
|  * (5) Values size <=8176 =(8192-16) will not be split into two blocks
 | |
|  * (6) For smaller value <=8176, it has to fit into one block or
 | |
|  *     unsuccessfully to insert
 | |
|  * (7) For large value > 8176,
 | |
|  *      smaller space first then take up a whole block
 | |
|  *      or a whole block first then some left over space in another
 | |
|  *     block
 | |
|  * (8) limit to 8000 byte for this release size
 | |
|  *
 | |
|  * PARAMETERS:
 | |
|  *    input dFile
 | |
|  *        -- File handle
 | |
|  *    Input  sgnature_size
 | |
|  *        -- how many bytes the signature occupies
 | |
|  *    Input  sgnature_value
 | |
|  *        -- the value of the signature
 | |
|  *    output token
 | |
|  *        -- token structure carrying the assigned fbo and op
 | |
|  *
 | |
|  * RETURN:
 | |
|  *    success    - successfully insert the signature
 | |
|  *    failure    - it did not   insert the signature
 | |
|  ******************************************************************************/
 | |
| int Dctnry::insertDctnry(const int& sgnature_size, const unsigned char* sgnature_value, Token& token)
 | |
| {
 | |
|   int rc = 0;
 | |
|   int i;
 | |
|   unsigned char* value = NULL;
 | |
|   int size;
 | |
|   int write_size;
 | |
|   bool lbid_in_token = false;
 | |
| 
 | |
|   // Round down for safety. In theory we can take 262143 * 8176 bytes
 | |
|   if (sgnature_size > MAX_BLOB_SIZE)
 | |
|   {
 | |
|     return ERR_DICT_SIZE_GT_2G;
 | |
|   }
 | |
| 
 | |
|   CommBlock cb;
 | |
|   cb.file.oid = m_dctnryOID;
 | |
|   cb.file.pFile = m_dFile;
 | |
| 
 | |
|   size = sgnature_size;
 | |
|   value = (unsigned char*)sgnature_value;
 | |
|   token.bc = 0;
 | |
| 
 | |
|   for (i = m_lastFbo; i < m_numBlocks; i++)
 | |
|   {
 | |
|     // @bug 3960: Add MAX_OP_COUNT check to handle case after bulk rollback
 | |
|     if (((m_freeSpace - HDR_UNIT_SIZE >= size) || ((size > 8176) && (m_freeSpace > HDR_UNIT_SIZE))) &&
 | |
|         (m_curOp < (MAX_OP_COUNT - 1)))
 | |
|     {
 | |
|       // found the perfect block; signature size fit in this block
 | |
|       if (size > (m_freeSpace - HDR_UNIT_SIZE))
 | |
|       {
 | |
|         write_size = (m_freeSpace - HDR_UNIT_SIZE);
 | |
|       }
 | |
|       else
 | |
|       {
 | |
|         write_size = size;
 | |
|       }
 | |
| 
 | |
|       insertDctnryHdr(m_curBlock.data, write_size);
 | |
|       insertSgnture(m_curBlock.data, write_size, value);
 | |
|       size -= write_size;
 | |
|       value += write_size;
 | |
|       m_curBlock.state = BLK_WRITE;
 | |
| 
 | |
|       // We only want the start LBID for a multi-block dict in the token
 | |
|       if (!lbid_in_token)
 | |
|       {
 | |
|         token.fbo = m_curLbid;
 | |
|         token.op = m_curOp;
 | |
|         lbid_in_token = true;
 | |
|       }
 | |
| 
 | |
|       if (size > 0)
 | |
|         token.bc++;
 | |
| 
 | |
|       m_lastFbo = i;
 | |
|       m_curFbo = m_lastFbo;
 | |
| 
 | |
|       if ((m_curOp < (MAX_OP_COUNT - 1)) && (size <= 0))
 | |
|         return NO_ERROR;
 | |
|     }  // end Found
 | |
| 
 | |
|     //@bug 3832. check error code
 | |
|     RETURN_ON_ERROR(writeDBFile(cb, &m_curBlock, m_curLbid));
 | |
|     memset(m_curBlock.data, 0, sizeof(m_curBlock.data));
 | |
|     memcpy(m_curBlock.data, &m_dctnryHeader2, m_totalHdrBytes);
 | |
|     m_freeSpace = BYTE_PER_BLOCK - m_totalHdrBytes;
 | |
|     m_curBlock.state = BLK_WRITE;
 | |
|     m_curOp = 0;
 | |
|     m_lastFbo++;
 | |
|     m_curFbo = m_lastFbo;
 | |
| 
 | |
|     //...Expand current extent if it is an abbreviated initial extent
 | |
|     if ((m_curFbo == m_numBlocks) && (m_numBlocks == NUM_BLOCKS_PER_INITIAL_EXTENT))
 | |
|     {
 | |
|       RETURN_ON_ERROR(expandDctnryExtent());
 | |
|     }
 | |
| 
 | |
|     //...Allocate a new extent if we have reached the last block in the
 | |
|     //   current extent.
 | |
|     if (m_curFbo == m_numBlocks)
 | |
|     {
 | |
|       // last block
 | |
|       // for roll back the extent to use
 | |
|       // Save those empty extents in case of failure to rollback
 | |
|       std::vector<ExtentInfo> dictExtentInfo;
 | |
|       ExtentInfo info;
 | |
|       info.oid = m_dctnryOID;
 | |
|       info.partitionNum = m_partition;
 | |
|       info.segmentNum = m_segment;
 | |
|       info.dbRoot = m_dbRoot;
 | |
|       info.hwm = m_hwm;
 | |
|       info.newFile = false;
 | |
|       dictExtentInfo.push_back(info);
 | |
|       LBID_t startLbid;
 | |
|       // Add an extent.
 | |
|       rc = createDctnry(m_dctnryOID,
 | |
|                         0,  // dummy column width
 | |
|                         m_dbRoot, m_partition, m_segment, startLbid, false);
 | |
| 
 | |
|       if (rc != NO_ERROR)
 | |
|       {
 | |
|         // roll back the extent
 | |
|         BRMWrapper::getInstance()->deleteEmptyDictStoreExtents(dictExtentInfo);
 | |
|         return rc;
 | |
|       }
 | |
|     }
 | |
| 
 | |
|     RETURN_ON_ERROR(
 | |
|         BRMWrapper::getInstance()->getBrmInfo(m_dctnryOID, m_partition, m_segment, m_curFbo, m_curLbid));
 | |
|     m_curBlock.lbid = m_curLbid;
 | |
|   }  // end for loop for all of the blocks
 | |
| 
 | |
|   return ERR_DICT_NO_SPACE_INSERT;
 | |
| }
 | |
| 
 | |
| /*******************************************************************************
 | |
|  * Description
 | |
|  * Update the block header (and data members like m_freeSpace,
 | |
|  * m_newStartOffset, etc), to reflect the insertion of string of size "size"
 | |
|  *
 | |
|  * PARAMETERS:
 | |
|  *    input
 | |
|  *        blockBuf
 | |
|  *        --the block buffer
 | |
|  *    input
 | |
|  *        size
 | |
|  *        --Size of the signature value
 | |
|  *
 | |
|  * RETURN:
 | |
|  *    none
 | |
|  ******************************************************************************/
 | |
| void Dctnry::insertDctnryHdr(unsigned char* blockBuf, const int& size)
 | |
| {
 | |
|   int endHdrLoc = START_HDR1 + (m_curOp + 1) * HDR_UNIT_SIZE;
 | |
|   int nextOffsetLoc = START_HDR1 + m_curOp * HDR_UNIT_SIZE;
 | |
|   int lastOffsetLoc = START_HDR1 + (m_curOp - 1) * HDR_UNIT_SIZE;
 | |
| 
 | |
|   m_freeSpace -= (size + HDR_UNIT_SIZE);
 | |
|   memcpy(&blockBuf[endHdrLoc], &m_endHeader, HDR_UNIT_SIZE);
 | |
|   uint16_t lastOffset = *(uint16_t*)&blockBuf[lastOffsetLoc];
 | |
|   uint16_t nextOffset = lastOffset - size;
 | |
| 
 | |
|   memcpy(&blockBuf[0], &m_freeSpace, HDR_UNIT_SIZE);
 | |
|   memcpy(&blockBuf[nextOffsetLoc], &nextOffset, HDR_UNIT_SIZE);
 | |
|   m_newStartOffset = nextOffset;
 | |
|   m_curOp++;
 | |
| }
 | |
| 
 | |
| /*******************************************************************************
 | |
|  * DESCRIPTION:
 | |
|  * Insert the specified string into the block buffer.
 | |
|  *
 | |
|  * PARAMETERS:
 | |
|  *    Input blockBuf
 | |
|  *        --block buffer
 | |
|  *    Input size
 | |
|  *        -- size of the signature value
 | |
|  *    Input value
 | |
|  *        -- value of the signature
 | |
|  *
 | |
|  * RETURN:
 | |
|  *    none
 | |
|  ******************************************************************************/
 | |
| void Dctnry::insertSgnture(unsigned char* blockBuf, const int& size, unsigned char* value)
 | |
| {
 | |
|   // m_newStartLoc is calculated from the header insertion code
 | |
|   memcpy(&blockBuf[m_newStartOffset], value, size);
 | |
| }
 | |
| 
 | |
| /*******************************************************************************
 | |
|  * Description:
 | |
|  * get the op count for a block
 | |
|  * input
 | |
|  *      DataBlock& fileBlock -- the file block
 | |
|  * output
 | |
|  *      op_count - total op count
 | |
|  ******************************************************************************/
 | |
| void Dctnry::getBlockOpCount(const DataBlock& fileBlock, int& op_count)
 | |
| {
 | |
|   ByteStream bs;
 | |
|   ByteStream::byte inbuf[BYTE_PER_BLOCK];
 | |
|   memcpy(inbuf, fileBlock.data, BYTE_PER_BLOCK);
 | |
|   bs.load(inbuf, BYTE_PER_BLOCK);
 | |
| 
 | |
|   ByteStream::doublebyte offset;
 | |
|   ByteStream::doublebyte dbyte;
 | |
|   bs >> m_freeSpace;
 | |
|   bs >> dbyte;
 | |
|   bs >> dbyte;
 | |
|   bs >> dbyte;
 | |
|   bs >> dbyte;
 | |
|   bs >> dbyte;
 | |
|   idbassert(dbyte == BYTE_PER_BLOCK);
 | |
|   bs >> offset;
 | |
| 
 | |
|   while (offset < 0xffff)
 | |
|   {
 | |
|     op_count++;
 | |
|     bs >> offset;
 | |
|   }
 | |
| }
 | |
| 
 | |
| /*******************************************************************************
 | |
|  * Description:
 | |
|  * Loads the string cache from the specified DataBlock, which should be
 | |
|  * the first block in the applicable dictionary store file.
 | |
|  * input
 | |
|  *      DataBlock& fileBlock -- the file block
 | |
|  ******************************************************************************/
 | |
| void Dctnry::preLoadStringCache(const DataBlock& fileBlock)
 | |
| {
 | |
|   int hdrOffsetBeg = HDR_UNIT_SIZE + NEXT_PTR_BYTES + HDR_UNIT_SIZE;
 | |
|   int hdrOffsetEnd = HDR_UNIT_SIZE + NEXT_PTR_BYTES;
 | |
|   uint16_t offBeg = 0;
 | |
|   uint16_t offEnd = 0;
 | |
|   memcpy(&offBeg, &fileBlock.data[hdrOffsetBeg], HDR_UNIT_SIZE);
 | |
|   memcpy(&offEnd, &fileBlock.data[hdrOffsetEnd], HDR_UNIT_SIZE);
 | |
| 
 | |
|   int op = 1;  // ordinal position of the string within the block
 | |
|   Signature aSig;
 | |
|   void* aSigPtr = static_cast<void*>(&aSig);
 | |
|   memset(aSigPtr, 0, sizeof(aSig));
 | |
| 
 | |
|   while ((offBeg != DCTNRY_END_HEADER) && (op <= MAX_STRING_CACHE_SIZE))
 | |
|   {
 | |
|     unsigned int len = offEnd - offBeg;
 | |
|     aSig.size = len;
 | |
|     aSig.signature = new unsigned char[len];
 | |
|     memcpy(aSig.signature, &fileBlock.data[offBeg], len);
 | |
|     aSig.token.op = op;
 | |
|     aSig.token.fbo = m_curLbid;
 | |
|     m_sigArray.insert(aSig);
 | |
| 
 | |
|     offEnd = offBeg;
 | |
|     hdrOffsetBeg += HDR_UNIT_SIZE;
 | |
|     memcpy(&offBeg, &fileBlock.data[hdrOffsetBeg], HDR_UNIT_SIZE);
 | |
|     op++;
 | |
|   }
 | |
| 
 | |
|   m_arraySize = op - 1;
 | |
| 
 | |
|   // std::cout << "Preloading strings..." << std::endl;
 | |
|   // char strSig[1000];
 | |
|   // uint64_t tokenVal;
 | |
|   // for (int i=0; i<m_arraySize; i++)
 | |
|   //{
 | |
|   //  memcpy(strSig, m_sigArray[i].signature, m_sigArray[i].size );
 | |
|   //  memcpy(&tokenVal, &m_sigArray[i].token, sizeof(uint64_t));
 | |
|   //  strSig[m_sigArray[i].size] = '\0';
 | |
|   //  std::cout << "op-"      << m_sigArray[i].token.op  <<
 | |
|   //               "; fbo-"   << m_sigArray[i].token.fbo <<
 | |
|   //               "; sig-"   << strSig   <<
 | |
|   //               "; token-" << tokenVal << std::endl;
 | |
|   //}
 | |
| }
 | |
| 
 | |
| /*******************************************************************************
 | |
|  * Description:
 | |
|  * Add the specified signature (string) to the string cache.
 | |
|  * input
 | |
|  *      newSig -- Signature string to be added to the string cache.
 | |
|  ******************************************************************************/
 | |
| void Dctnry::addToStringCache(const Signature& newSig)
 | |
| {
 | |
|   // We better add constructors that sets everything to 0;
 | |
|   Signature asig;
 | |
|   void* aSigPtr = static_cast<void*>(&asig);
 | |
|   memset(aSigPtr, 0, sizeof(asig));
 | |
|   asig.signature = new unsigned char[newSig.size];
 | |
|   memcpy(asig.signature, newSig.signature, newSig.size);
 | |
|   asig.size = newSig.size;
 | |
|   asig.token = newSig.token;
 | |
|   m_sigArray.insert(asig);
 | |
|   m_arraySize++;
 | |
| }
 | |
| 
 | |
| /*******************************************************************************
 | |
|  * Description:
 | |
|  * get the location of the end of header
 | |
|  * input
 | |
|  *      dFile - file handle
 | |
|  *      lbid  - block of interest
 | |
|  * output
 | |
|  *      endOp - ordinal position of the end of header for "lbid"
 | |
|  *
 | |
|  * return value
 | |
|  *        Success -- found and deleted
 | |
|  *        Fail    -- ERR_DICT_INVALID_DELETE
 | |
|  ******************************************************************************/
 | |
| int Dctnry::getEndOp(IDBDataFile* dFile, int lbid, int& endOp)
 | |
| {
 | |
|   DataBlock fileBlock;
 | |
|   Offset newOffset;
 | |
|   int rc;
 | |
|   CommBlock cb;
 | |
|   cb.file.oid = m_dctnryOID;
 | |
|   cb.file.pFile = dFile;
 | |
|   memset(fileBlock.data, 0, sizeof(fileBlock.data));
 | |
|   m_dFile = dFile;
 | |
|   rc = readSubBlockEntry(cb, &fileBlock, lbid, 0, 0,
 | |
|                          HDR_UNIT_SIZE + NEXT_PTR_BYTES + HDR_UNIT_SIZE + HDR_UNIT_SIZE, &m_dctnryHeader);
 | |
| 
 | |
|   memcpy(&m_freeSpace, &fileBlock.data[0], HDR_UNIT_SIZE);
 | |
|   memcpy(&m_nextPtr, &fileBlock.data[HDR_UNIT_SIZE], NEXT_PTR_BYTES);
 | |
| 
 | |
|   newOffset.hdrLoc = HDR_UNIT_SIZE + NEXT_PTR_BYTES + HDR_UNIT_SIZE;
 | |
|   memcpy(&newOffset.offset, &fileBlock.data[newOffset.hdrLoc], HDR_UNIT_SIZE);
 | |
|   endOp = 1;  // should be zero counting the end of header then
 | |
| 
 | |
|   while (newOffset.offset != DCTNRY_END_HEADER)
 | |
|   {
 | |
|     newOffset.hdrLoc += HDR_UNIT_SIZE;
 | |
|     memcpy(&newOffset.offset, &fileBlock.data[newOffset.hdrLoc], HDR_UNIT_SIZE);
 | |
|     endOp++;
 | |
|   }
 | |
| 
 | |
|   return rc;
 | |
| }
 | |
| 
 | |
| /*******************************************************************************
 | |
|  * Add a signature value to the dictionary store.
 | |
|  * Function first checks to see if the signature is already
 | |
|  * in our string cache, and returns the corresponding token
 | |
|  * if it is found in the cache.
 | |
|  ******************************************************************************/
 | |
| int Dctnry::updateDctnry(unsigned char* sigValue, int& sigSize, Token& token)
 | |
| {
 | |
|   int rc = NO_ERROR;
 | |
|   Signature sig;
 | |
|   sig.signature = sigValue;
 | |
|   sig.size = sigSize;
 | |
| 
 | |
|   // Look for string in cache
 | |
|   // As long as the string <= 8000 bytes
 | |
|   if (sigSize <= MAX_SIGNATURE_SIZE)
 | |
|   {
 | |
|     bool found = false;
 | |
|     found = getTokenFromArray(sig);
 | |
| 
 | |
|     if (found)
 | |
|     {
 | |
|       token = sig.token;
 | |
|       return NO_ERROR;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   // Insert into Dictionary
 | |
|   rc = insertDctnry(sigSize, sigValue, token);
 | |
| 
 | |
|   // Add the new signature and token into cache
 | |
|   // As long as the string is <= 8000 bytes
 | |
|   if ((m_arraySize < MAX_STRING_CACHE_SIZE) && (sigSize <= MAX_SIGNATURE_SIZE))
 | |
|   {
 | |
|     Signature sig;
 | |
|     sig.size = sigSize;
 | |
|     sig.signature = new unsigned char[sigSize];
 | |
|     memcpy(sig.signature, sigValue, sigSize);
 | |
|     sig.token = token;
 | |
|     m_sigArray.insert(sig);
 | |
|     m_arraySize++;
 | |
|   }
 | |
| 
 | |
|   return rc;
 | |
| }
 | |
| 
 | |
| /*******************************************************************************
 | |
|  * open dictionary file
 | |
|  ******************************************************************************/
 | |
| IDBDataFile* Dctnry::createDctnryFile(const char* name, int, const char* mode, int ioBuffSize, LBID_t lbid)
 | |
| {
 | |
|   (void)lbid;
 | |
|   return openFile(name, mode, ioBuffSize, false);
 | |
| }
 | |
| 
 | |
| /*******************************************************************************
 | |
|  * open dictionary file
 | |
|  ******************************************************************************/
 | |
| // @bug 5572 - HDFS usage: add *.tmp file backup flag
 | |
| IDBDataFile* Dctnry::openDctnryFile(bool useTmpSuffix)
 | |
| {
 | |
|   return openFile(m_dctnryOID, m_dbRoot, m_partition, m_segment, m_segFileName, "r+b", DEFAULT_COLSIZ,
 | |
|                   useTmpSuffix);
 | |
| }
 | |
| 
 | |
| /*******************************************************************************
 | |
|  * close dictionary file
 | |
|  ******************************************************************************/
 | |
| void Dctnry::closeDctnryFile(bool /*doFlush*/, std::map<FID, FID>& /*oids*/)
 | |
| {
 | |
|   closeFile(m_dFile);
 | |
|   m_dFile = NULL;
 | |
| }
 | |
| 
 | |
| int Dctnry::numOfBlocksInFile()
 | |
| {
 | |
|   long long fileSizeBytes = 0;
 | |
|   getFileSize(m_dFile, fileSizeBytes);  // dmc-error handling (ignoring rc)
 | |
|   return fileSizeBytes / BYTE_PER_BLOCK;
 | |
| }
 | |
| 
 | |
| void Dctnry::copyDctnryHeader(void* buf)
 | |
| {
 | |
|   memcpy(buf, m_dctnryHeader2, m_totalHdrBytes);
 | |
| }
 | |
| 
 | |
| }  // namespace WriteEngine
 |