/* Copyright (C) 2014 InfiniDB, Inc. Copyright (C) 2016-2021 MariaDB Corporation This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ // $Id: writeengine.cpp 4737 2013-08-14 20:45:46Z bwilkinson $ /** @writeengine.cpp * A wrapper class for the write engine to write information to files */ // XXX: a definition to switch off computations for token columns. #define XXX_WRITEENGINE_TOKENS_RANGES_XXX #include #include #include #include #include #include using namespace std; #include "joblisttypes.h" #define WRITEENGINEWRAPPER_DLLEXPORT #include "writeengine.h" #undef WRITEENGINEWRAPPER_DLLEXPORT #include "we_convertor.h" #include "we_log.h" #include "we_simplesyslog.h" #include "we_config.h" #include "we_bulkrollbackmgr.h" #include "brm.h" #include "stopwatch.h" #include "we_colop.h" #include "we_type.h" #include "we_colopcompress.h" #include "we_dctnrycompress.h" #include "cacheutils.h" #include "calpontsystemcatalog.h" #include "we_simplesyslog.h" using namespace cacheutils; using namespace logging; using namespace BRM; using namespace execplan; #include "IDBDataFile.h" #include "IDBPolicy.h" #include "MonitorProcMem.h" using namespace idbdatafile; #include "dataconvert.h" #include "string_prefixes.h" #include "mcs_decimal.h" namespace WriteEngine //#define PROFILE 1 #define RETURN_ON_ERROR_REPORT(statement) \ do \ { \ int rc = (statement); \ if (rc != NO_ERROR) \ { \ cerr << "failed at " << __LINE__ << "function " << __FUNCTION__ << ", statement '" << #statement \ << "', rc " << rc << endl; \ return rc; \ } \ } while (0) { StopWatch timer; using OidToIdxMap = std::unordered_map; /**@brief WriteEngineWrapper Constructor */ WriteEngineWrapper::WriteEngineWrapper() : m_opType(NOOP) { m_colOp[UN_COMPRESSED_OP] = new ColumnOpCompress0; m_dctnry[UN_COMPRESSED_OP] = new DctnryCompress0; m_colOp[COMPRESSED_OP_1] = new ColumnOpCompress1(/*comressionType=*/1); m_dctnry[COMPRESSED_OP_1] = new DctnryCompress1(/*compressionType=*/1); m_colOp[COMPRESSED_OP_2] = new ColumnOpCompress1(/*comressionType=*/3); m_dctnry[COMPRESSED_OP_2] = new DctnryCompress1(/*compressionType=*/3); } WriteEngineWrapper::WriteEngineWrapper(const WriteEngineWrapper& rhs) : m_opType(rhs.m_opType) { m_colOp[UN_COMPRESSED_OP] = new ColumnOpCompress0; m_dctnry[UN_COMPRESSED_OP] = new DctnryCompress0; m_colOp[COMPRESSED_OP_1] = new ColumnOpCompress1(/*compressionType=*/1); m_dctnry[COMPRESSED_OP_1] = new DctnryCompress1(/*compressionType=*/1); m_colOp[COMPRESSED_OP_2] = new ColumnOpCompress1(/*compressionType=*/3); m_dctnry[COMPRESSED_OP_2] = new DctnryCompress1(/*compressionType=*/3); } /**@brief WriteEngineWrapper Constructor */ WriteEngineWrapper::~WriteEngineWrapper() { delete m_colOp[UN_COMPRESSED_OP]; delete m_dctnry[UN_COMPRESSED_OP]; delete m_colOp[COMPRESSED_OP_1]; delete m_dctnry[COMPRESSED_OP_1]; delete m_colOp[COMPRESSED_OP_2]; delete m_dctnry[COMPRESSED_OP_2]; } /**@brief Perform upfront initialization */ /* static */ void WriteEngineWrapper::init(unsigned subSystemID) { SimpleSysLog::instance()->setLoggingID(logging::LoggingID(subSystemID)); Config::initConfigCache(); BRMWrapper::getInstance(); // Bug 5415 Add HDFS MemBuffer vs. FileBuffer decision logic. config::Config* cf = config::Config::makeConfig(); //-------------------------------------------------------------------------- // Memory overload protection. This setting will cause the process to die should // it, by itself, consume maxPct of total memory. Monitored in MonitorProcMem. // Only used at the express direction of Field Support. //-------------------------------------------------------------------------- int maxPct = 0; // disable by default string strMaxPct = cf->getConfig("WriteEngine", "MaxPct"); if (strMaxPct.length() != 0) maxPct = cf->uFromText(strMaxPct); //-------------------------------------------------------------------------- // MemoryCheckPercent. This controls at what percent of total memory be consumed // by all processes before we switch from HdfsRdwrMemBuffer to HdfsRdwrFileBuffer. // This is only used in Hdfs installations. //-------------------------------------------------------------------------- int checkPct = 95; string strCheckPct = cf->getConfig("SystemConfig", "MemoryCheckPercent"); if (strCheckPct.length() != 0) checkPct = cf->uFromText(strCheckPct); //-------------------------------------------------------------------------- // If we're either HDFS, or maxPct is turned on, start the monitor thread. // Otherwise, we don't need it, so don't waste the resources. //-------------------------------------------------------------------------- if (maxPct > 0 || IDBPolicy::useHdfs()) { new boost::thread(utils::MonitorProcMem(maxPct, checkPct, subSystemID)); } } /*@brief checkValid --Check input parameters are valid */ /*********************************************************** * DESCRIPTION: * Check input parameters are valid * PARAMETERS: * colStructList - column struct list * colValueList - column value list * ridList - rowid list * RETURN: * NO_ERROR if success * others if something wrong in the checking process ***********************************************************/ int WriteEngineWrapper::checkValid(const TxnID& txnid, const ColStructList& colStructList, const ColValueList& colValueList, const RIDList& ridList) const { ColTupleList curTupleList; ColStructList::size_type structListSize; ColValueList::size_type valListSize; ColTupleList::size_type totalRow; if (colStructList.size() == 0) return ERR_STRUCT_EMPTY; structListSize = colStructList.size(); valListSize = colValueList.size(); if (structListSize != valListSize) return ERR_STRUCT_VALUE_NOT_MATCH; for (ColValueList::size_type i = 0; i < valListSize; i++) { curTupleList = static_cast(colValueList[i]); totalRow = curTupleList.size(); if (ridList.size() > 0) { if (totalRow != ridList.size()) return ERR_ROWID_VALUE_NOT_MATCH; } } // end of for (int i = 0; return NO_ERROR; } /*@brief findSmallestColumn --Find the smallest column for this table */ /*********************************************************** * DESCRIPTION: * Find the smallest column for this table * PARAMETERS: * lowColLen - returns smallest column width * colId - returns smallest column id * colStructList - column struct list * RETURN: * void ***********************************************************/ void WriteEngineWrapper::findSmallestColumn(uint32_t& colId, ColStructList colStructList) // MCOL-1675: find the smallest column width to calculate the RowID from so // that all HWMs will be incremented by this operation { int32_t lowColLen = 8192; for (uint32_t colIt = 0; colIt < colStructList.size(); colIt++) { if (colStructList[colIt].colWidth < lowColLen) { colId = colIt; lowColLen = colStructList[colId].colWidth; if (lowColLen == 1) { break; } } } } /** @brief Fetch values from arrays into VT-types references casting arrays to (element type) ET-typed arrays. * * There might be two arrays: one from which we write to buffer and one with old values written before. * One of these arrays can be absent because of, well, physical absence. Insertion does not have * values written before and deletion does not have values to write to. */ template void fetchNewOldValues(VT& value, VT& oldValue, const void* array, const void* oldArray, size_t i, size_t totalNewRow) { const ET* eArray = static_cast(array); const ET* oldEArray = static_cast(oldArray); if (eArray) { value = eArray[i < totalNewRow ? i : 0]; } if (oldEArray) { oldValue = oldEArray[i]; } } static bool updateBigRangeCheckForInvalidity(ExtCPInfo* maxMin, int128_t value, int128_t oldValue, const void* valArrayVoid, const void* oldValArrayVoid) { if (!oldValArrayVoid) { // insertion. we can update range directly. range will not become invalid here. maxMin->fCPInfo.bigMax = std::max(maxMin->fCPInfo.bigMax, value); // we update big range because int columns can be associated with decimals. maxMin->fCPInfo.bigMin = std::min(maxMin->fCPInfo.bigMin, value); } else if (!valArrayVoid) { // deletion. we need to check old value only, is it on (or outside) range boundary. if (oldValue >= maxMin->fCPInfo.bigMax || oldValue <= maxMin->fCPInfo.bigMin) { maxMin->toInvalid(); return true; // no point working further. } } else if (valArrayVoid && oldValArrayVoid) { // update. we need to check boundaries as in deletion and extend as in insertion. // check boundaries as in deletion accounting for possible extension. if ((oldValue <= maxMin->fCPInfo.bigMin && value > oldValue) || (oldValue >= maxMin->fCPInfo.bigMax && value < oldValue)) { // we are overwriting value on the boundary with value that does not preserve or extend range. maxMin->toInvalid(); return true; } maxMin->fCPInfo.bigMax = std::max(maxMin->fCPInfo.bigMax, value); maxMin->fCPInfo.bigMin = std::min(maxMin->fCPInfo.bigMin, value); } return false; } template bool updateRangeCheckForInvalidity(ExtCPInfo* maxMin, InternalType value, InternalType oldValue, const void* valArrayVoid, const void* oldValArrayVoid) { if (!oldValArrayVoid) { // insertion. we can update range directly. range will not become invalid here. maxMin->fCPInfo.bigMax = std::max( maxMin->fCPInfo.bigMax, (int128_t)value); // we update big range because int columns can be associated with decimals. maxMin->fCPInfo.bigMin = std::min(maxMin->fCPInfo.bigMin, (int128_t)value); maxMin->fCPInfo.max = std::max((InternalType)maxMin->fCPInfo.max, value); maxMin->fCPInfo.min = std::min((InternalType)maxMin->fCPInfo.min, value); } else if (!valArrayVoid) { // deletion. we need to check old value only, is it on (or outside) range boundary. if (oldValue >= (InternalType)maxMin->fCPInfo.max || oldValue <= (InternalType)maxMin->fCPInfo.min) { maxMin->toInvalid(); return true; // no point working further. } } else if (valArrayVoid && oldValArrayVoid) { // update. we need to check boundaries as in deletion and extend as in insertion. // check boundaries as in deletion accounting for possible extension. if ((oldValue <= (InternalType)maxMin->fCPInfo.min && value > oldValue) || (oldValue >= (InternalType)maxMin->fCPInfo.max && value < oldValue)) { // we are overwriting value on the boundary with value that does not preserve or extend range. maxMin->toInvalid(); return true; } maxMin->fCPInfo.bigMax = std::max(maxMin->fCPInfo.bigMax, (int128_t)value); maxMin->fCPInfo.bigMin = std::min(maxMin->fCPInfo.bigMin, (int128_t)value); maxMin->fCPInfo.max = std::max((InternalType)maxMin->fCPInfo.max, value); maxMin->fCPInfo.min = std::min((InternalType)maxMin->fCPInfo.min, value); } return false; } /** * There can be case with missing valArray (delete), missing oldValArray (insert) and when * both arrays are present (update). */ void WriteEngineWrapper::updateMaxMinRange(const size_t totalNewRow, const size_t totalOldRow, const execplan::CalpontSystemCatalog::ColType& cscColType, const ColType colType, const void* valArrayVoid, const void* oldValArrayVoid, ExtCPInfo* maxMin, bool canStartWithInvalidRange) { if (!maxMin) { return; } if (colType == WR_CHAR) { maxMin->toInvalid(); // simple and wrong solution. return; } bool isUnsigned = false; // TODO: should change with type. switch (colType) { case WR_UBYTE: case WR_USHORT: case WR_UINT: case WR_ULONGLONG: case WR_CHAR: #if defined(XXX_WRITEENGINE_TOKENS_RANGES_XXX) case WR_TOKEN: #endif { isUnsigned = true; break; } default: // WR_BINARY is signed. break; } if (!canStartWithInvalidRange) { // check if range is invalid, we can't update it. if (maxMin->isInvalid()) { return; } } if (colType == WR_CHAR) { idbassert(MAX_COLUMN_BOUNDARY == sizeof(uint64_t)); // have to check that - code below depends on that. if (!maxMin->isInvalid()) { maxMin->fromToChars(); } } #if defined(XXX_WRITEENGINE_TOKENS_RANGES_XXX) if (colType == WR_TOKEN) { oldValArrayVoid = nullptr; // no old values for tokens, sadly. valArrayVoid = (void*)maxMin->stringsPrefixes(); } #endif size_t i; for (i = 0; i < totalOldRow; i++) { int64_t value = 0, oldValue = 0; uint64_t uvalue = 0, oldUValue = 0; int128_t bvalue = 0, oldBValue = 0; // fetching. May not assign value or oldValue variables when array is not present. switch (colType) { case WR_BYTE: { fetchNewOldValues(value, oldValue, valArrayVoid, oldValArrayVoid, i, totalNewRow); break; } case WR_UBYTE: { fetchNewOldValues(uvalue, oldUValue, valArrayVoid, oldValArrayVoid, i, totalNewRow); break; } case WR_SHORT: { fetchNewOldValues(value, oldValue, valArrayVoid, oldValArrayVoid, i, totalNewRow); break; } case WR_USHORT: { fetchNewOldValues(uvalue, oldUValue, valArrayVoid, oldValArrayVoid, i, totalNewRow); break; } case WR_MEDINT: case WR_INT: { fetchNewOldValues(value, oldValue, valArrayVoid, oldValArrayVoid, i, totalNewRow); break; } case WR_UMEDINT: case WR_UINT: { fetchNewOldValues(uvalue, oldUValue, valArrayVoid, oldValArrayVoid, i, totalNewRow); break; } case WR_LONGLONG: { fetchNewOldValues(value, oldValue, valArrayVoid, oldValArrayVoid, i, totalNewRow); break; } #if defined(XXX_WRITEENGINE_TOKENS_RANGES_XXX) case WR_TOKEN: #endif case WR_ULONGLONG: { fetchNewOldValues(uvalue, oldUValue, valArrayVoid, oldValArrayVoid, i, totalNewRow); break; } case WR_BINARY: { fetchNewOldValues(bvalue, oldBValue, valArrayVoid, oldValArrayVoid, i, totalNewRow); break; } case WR_CHAR: { fetchNewOldValues(uvalue, oldUValue, valArrayVoid, oldValArrayVoid, i, totalNewRow); // for characters (strings, actually), we fetched then in LSB order, on x86, at the very least. // this means most significant byte of the string, which is first, is now in LSB of uvalue/oldValue. // we must perform a conversion. uvalue = uint64ToStr(uvalue); oldUValue = uint64ToStr(oldUValue); break; } default: idbassert_s(0, "unknown WR type tag"); return; } if (maxMin->isBinaryColumn()) { // special case of wide decimals. They fit into int128_t range so we do not care about signedness. if (updateBigRangeCheckForInvalidity(maxMin, bvalue, oldBValue, valArrayVoid, oldValArrayVoid)) { return; } } else if (isUnsigned) { if (updateRangeCheckForInvalidity(maxMin, uvalue, oldUValue, valArrayVoid, oldValArrayVoid)) { return; } } else { if (updateRangeCheckForInvalidity(maxMin, value, oldValue, valArrayVoid, oldValArrayVoid)) { return; } } } // the range will be kept. // to handle character columns properly we need to convert range values back to strings. if (colType == WR_CHAR) { maxMin->fromToChars(); } } /*@convertValArray - Convert interface values to internal values */ /*********************************************************** * DESCRIPTION: * Convert interface values to internal values * PARAMETERS: * cscColType - CSC ColType struct list * colStructList - column struct list * colValueList - column value list * RETURN: * none * valArray - output value array * nullArray - output null flag array ***********************************************************/ void WriteEngineWrapper::convertValArray(const size_t totalRow, const CalpontSystemCatalog::ColType& cscColType, const ColType colType, ColTupleList& curTupleList, void* valArray, bool bFromList) { ColTuple curTuple; ColTupleList::size_type i; if (bFromList) { for (i = 0; i < curTupleList.size(); i++) { curTuple = curTupleList[i]; convertValue(cscColType, colType, valArray, i, curTuple.data, true); } } else { for (i = 0; i < totalRow; i++) { convertValue(cscColType, colType, valArray, i, curTuple.data, false); curTupleList.push_back(curTuple); } } } /* * @brief Convert column value to its internal representation */ void WriteEngineWrapper::convertValue(const execplan::CalpontSystemCatalog::ColType& cscColType, ColType colType, void* value, boost::any& data) { string curStr; int size; switch (colType) { case WriteEngine::WR_INT: case WriteEngine::WR_MEDINT: if (data.type() == typeid(int)) { int val = boost::any_cast(data); size = sizeof(int); memcpy(value, &val, size); } else { uint32_t val = boost::any_cast(data); size = sizeof(uint32_t); memcpy(value, &val, size); } break; case WriteEngine::WR_UINT: case WriteEngine::WR_UMEDINT: { uint32_t val = boost::any_cast(data); size = sizeof(uint32_t); memcpy(value, &val, size); } break; case WriteEngine::WR_VARBINARY: // treat same as char for now case WriteEngine::WR_CHAR: case WriteEngine::WR_BLOB: case WriteEngine::WR_TEXT: curStr = boost::any_cast(data); if ((int)curStr.length() > MAX_COLUMN_BOUNDARY) curStr = curStr.substr(0, MAX_COLUMN_BOUNDARY); memcpy(value, curStr.c_str(), curStr.length()); break; case WriteEngine::WR_FLOAT: { float val = boost::any_cast(data); // N.B.There is a bug in boost::any or in gcc where, if you store a nan, you will get back a nan, // but not necessarily the same bits that you put in. This only seems to be for float (double seems // to work). if (isnan(val)) { uint32_t ti = joblist::FLOATNULL; float* tfp = (float*)&ti; val = *tfp; } size = sizeof(float); memcpy(value, &val, size); } break; case WriteEngine::WR_DOUBLE: { double val = boost::any_cast(data); size = sizeof(double); memcpy(value, &val, size); } break; case WriteEngine::WR_SHORT: { short val = boost::any_cast(data); size = sizeof(short); memcpy(value, &val, size); } break; case WriteEngine::WR_USHORT: { uint16_t val = boost::any_cast(data); size = sizeof(uint16_t); memcpy(value, &val, size); } break; case WriteEngine::WR_BYTE: { char val = boost::any_cast(data); size = sizeof(char); memcpy(value, &val, size); } break; case WriteEngine::WR_UBYTE: { uint8_t val = boost::any_cast(data); size = sizeof(uint8_t); memcpy(value, &val, size); } break; case WriteEngine::WR_LONGLONG: if (data.type() == typeid(long long)) { long long val = boost::any_cast(data); size = sizeof(long long); memcpy(value, &val, size); } else if (data.type() == typeid(long)) { long val = boost::any_cast(data); size = sizeof(long); memcpy(value, &val, size); } else { uint64_t val = boost::any_cast(data); size = sizeof(uint64_t); memcpy(value, &val, size); } break; case WriteEngine::WR_ULONGLONG: { uint64_t val = boost::any_cast(data); size = sizeof(uint64_t); memcpy(value, &val, size); } break; case WriteEngine::WR_TOKEN: { Token val = boost::any_cast(data); size = sizeof(Token); memcpy(value, &val, size); } break; case WriteEngine::WR_BINARY: { size = cscColType.colWidth; int128_t val = boost::any_cast(data); memcpy(value, &val, size); } break; } // end of switch (colType) } /*@convertValue - The base for converting values */ /*********************************************************** * DESCRIPTION: * The base for converting values * PARAMETERS: * colType - data type * pos - array position * data - value * RETURN: * none ***********************************************************/ void WriteEngineWrapper::convertValue(const CalpontSystemCatalog::ColType& cscColType, const ColType colType, void* valArray, const size_t pos, boost::any& data, bool fromList) { string curStr; if (fromList) { switch (colType) { case WriteEngine::WR_INT: case WriteEngine::WR_MEDINT: // here we assign value to an array element and update range. if (data.type() == typeid(long)) ((int*)valArray)[pos] = static_cast(boost::any_cast(data)); else if (data.type() == typeid(int)) ((int*)valArray)[pos] = boost::any_cast(data); else { // this interesting part is for magic values like NULL or EMPTY (marks deleted elements). // we will not put these into range. ((int*)valArray)[pos] = boost::any_cast(data); } break; case WriteEngine::WR_UINT: case WriteEngine::WR_UMEDINT: ((uint32_t*)valArray)[pos] = boost::any_cast(data); break; case WriteEngine::WR_VARBINARY: // treat same as char for now case WriteEngine::WR_CHAR: case WriteEngine::WR_BLOB: case WriteEngine::WR_TEXT: curStr = boost::any_cast(data); if ((int)curStr.length() > MAX_COLUMN_BOUNDARY) curStr = curStr.substr(0, MAX_COLUMN_BOUNDARY); memcpy((char*)valArray + pos * MAX_COLUMN_BOUNDARY, curStr.c_str(), curStr.length()); break; // case WriteEngine::WR_LONG : ((long*)valArray)[pos] = // boost::any_cast(curTuple.data); // break; case WriteEngine::WR_FLOAT: ((float*)valArray)[pos] = boost::any_cast(data); if (isnan(((float*)valArray)[pos])) { uint32_t ti = joblist::FLOATNULL; float* tfp = (float*)&ti; ((float*)valArray)[pos] = *tfp; } break; case WriteEngine::WR_DOUBLE: ((double*)valArray)[pos] = boost::any_cast(data); break; case WriteEngine::WR_SHORT: ((short*)valArray)[pos] = boost::any_cast(data); break; case WriteEngine::WR_USHORT: ((uint16_t*)valArray)[pos] = boost::any_cast(data); break; // case WriteEngine::WR_BIT: ((bool*)valArray)[pos] = boost::any_cast(data); // break; case WriteEngine::WR_BYTE: ((char*)valArray)[pos] = boost::any_cast(data); break; case WriteEngine::WR_UBYTE: ((uint8_t*)valArray)[pos] = boost::any_cast(data); break; case WriteEngine::WR_LONGLONG: if (data.type() == typeid(long long)) ((long long*)valArray)[pos] = boost::any_cast(data); else if (data.type() == typeid(long)) ((long long*)valArray)[pos] = (long long)boost::any_cast(data); else ((long long*)valArray)[pos] = boost::any_cast(data); break; case WriteEngine::WR_ULONGLONG: ((uint64_t*)valArray)[pos] = boost::any_cast(data); break; case WriteEngine::WR_TOKEN: ((Token*)valArray)[pos] = boost::any_cast(data); break; case WriteEngine::WR_BINARY: size_t size = cscColType.colWidth; int128_t val = boost::any_cast(data); memcpy((uint8_t*)valArray + pos * size, &val, size); break; } // end of switch (colType) } else { switch (colType) { case WriteEngine::WR_INT: case WriteEngine::WR_MEDINT: data = ((int*)valArray)[pos]; break; case WriteEngine::WR_UINT: case WriteEngine::WR_UMEDINT: data = ((uint64_t*)valArray)[pos]; break; case WriteEngine::WR_VARBINARY: // treat same as char for now case WriteEngine::WR_CHAR: case WriteEngine::WR_BLOB: case WriteEngine::WR_TEXT: char tmp[10]; memcpy(tmp, (char*)valArray + pos * 8, 8); curStr = tmp; data = curStr; break; // case WriteEngine::WR_LONG : ((long*)valArray)[pos] = // boost::any_cast(curTuple.data); // break; case WriteEngine::WR_FLOAT: data = ((float*)valArray)[pos]; break; case WriteEngine::WR_DOUBLE: data = ((double*)valArray)[pos]; break; case WriteEngine::WR_SHORT: data = ((short*)valArray)[pos]; break; case WriteEngine::WR_USHORT: data = ((uint16_t*)valArray)[pos]; break; // case WriteEngine::WR_BIT: data = ((bool*)valArray)[pos]; // break; case WriteEngine::WR_BYTE: data = ((char*)valArray)[pos]; break; case WriteEngine::WR_UBYTE: data = ((uint8_t*)valArray)[pos]; break; case WriteEngine::WR_LONGLONG: data = ((long long*)valArray)[pos]; break; case WriteEngine::WR_ULONGLONG: data = ((uint64_t*)valArray)[pos]; break; case WriteEngine::WR_TOKEN: data = ((Token*)valArray)[pos]; break; case WriteEngine::WR_BINARY: data = ((int128_t*)valArray)[pos]; break; } // end of switch (colType) } // end of if } /*@createColumn - Create column files, including data and bitmap files */ /*********************************************************** * DESCRIPTION: * Create column files, including data and bitmap files * PARAMETERS: * dataOid - column data file id * bitmapOid - column bitmap file id * colWidth - column width * dbRoot - DBRoot where file is to be located * partition - Starting partition number for segment file path * compressionType - compression type * RETURN: * NO_ERROR if success * ERR_FILE_EXIST if file exists * ERR_FILE_CREATE if something wrong in creating the file ***********************************************************/ int WriteEngineWrapper::createColumn(const TxnID& txnid, const OID& dataOid, const CalpontSystemCatalog::ColDataType dataType, int dataWidth, uint16_t dbRoot, uint32_t partition, int compressionType) { int rc; Column curCol; int compress_op = op(compressionType); m_colOp[compress_op]->initColumn(curCol); m_colOp[compress_op]->findTypeHandler(dataWidth, dataType); rc = m_colOp[compress_op]->createColumn(curCol, 0, dataWidth, dataType, WriteEngine::WR_CHAR, (FID)dataOid, dbRoot, partition); // This is optional, however, it's recommended to do so to free heap // memory if assigned in the future m_colOp[compress_op]->clearColumn(curCol); std::map oids; if (rc == NO_ERROR) rc = flushDataFiles(NO_ERROR, txnid, oids); if (rc != NO_ERROR) { return rc; } RETURN_ON_ERROR(BRMWrapper::getInstance()->setLocalHWM(dataOid, partition, 0, 0)); // @bug 281 : fix for bug 281 - Add flush VM cache to clear all write buffer // flushVMCache(); return rc; } // BUG931 /** * @brief Fill column with default values */ int WriteEngineWrapper::fillColumn(const TxnID& txnid, const OID& dataOid, const CalpontSystemCatalog::ColType& colType, ColTuple defaultVal, const OID& refColOID, const CalpontSystemCatalog::ColDataType refColDataType, int refColWidth, int refCompressionType, bool isNULL, int compressionType, const string& defaultValStr, const OID& dictOid, bool autoincrement) { int rc = NO_ERROR; Column newCol; Column refCol; ColType newColType; ColType refColType; boost::scoped_array defVal(new char[datatypes::MAXDECIMALWIDTH]); ColumnOp* colOpNewCol = m_colOp[op(compressionType)]; ColumnOp* refColOp = m_colOp[op(refCompressionType)]; Dctnry* dctnry = m_dctnry[op(compressionType)]; colOpNewCol->initColumn(newCol); refColOp->initColumn(refCol); uint16_t dbRoot = 1; // not to be used int newDataWidth = colType.colWidth; // Convert HWM of the reference column for the new column // Bug 1703,1705 bool isToken = false; if (((colType.colDataType == CalpontSystemCatalog::VARCHAR) && (colType.colWidth > 7)) || ((colType.colDataType == CalpontSystemCatalog::CHAR) && (colType.colWidth > 8)) || (colType.colDataType == CalpontSystemCatalog::VARBINARY) || (colType.colDataType == CalpontSystemCatalog::BLOB) || (colType.colDataType == CalpontSystemCatalog::TEXT)) { isToken = true; } Convertor::convertColType(colType.colDataType, colType.colWidth, newColType, isToken); // WIP // replace with isDictCol if (((refColDataType == CalpontSystemCatalog::VARCHAR) && (refColWidth > 7)) || ((refColDataType == CalpontSystemCatalog::CHAR) && (refColWidth > 8)) || (refColDataType == CalpontSystemCatalog::VARBINARY) || (colType.colDataType == CalpontSystemCatalog::BLOB) || (colType.colDataType == CalpontSystemCatalog::TEXT)) { isToken = true; } newDataWidth = colOpNewCol->getCorrectRowWidth(colType.colDataType, colType.colWidth); // MCOL-1347 CS doubles the width for ALTER TABLE..ADD COLUMN if (colType.colWidth < 4 && colType.colDataType == CalpontSystemCatalog::VARCHAR) { newDataWidth >>= 1; } Convertor::convertColType(refColDataType, refColWidth, refColType, isToken); refColOp->setColParam(refCol, 0, refColOp->getCorrectRowWidth(refColDataType, refColWidth), refColDataType, refColType, (FID)refColOID, refCompressionType, dbRoot); colOpNewCol->setColParam(newCol, 0, newDataWidth, colType.colDataType, newColType, (FID)dataOid, compressionType, dbRoot); // refColOp and colOpNewCol are the same ptr. colOpNewCol->findTypeHandler(newDataWidth, colType.colDataType); int size = sizeof(Token); if (newColType == WriteEngine::WR_TOKEN) { if (isNULL) { Token nullToken; memcpy(defVal.get(), &nullToken, size); } // Tokenization is done when we create dictionary file } else { // WIP convertValue(colType, newColType, defVal.get(), defaultVal.data); } if (rc == NO_ERROR) rc = colOpNewCol->fillColumn(txnid, newCol, refCol, defVal.get(), dctnry, refColOp, dictOid, colType.colWidth, defaultValStr, autoincrement); // flushing files is in colOp->fillColumn() return rc; } // TODO: Get rid of this void emptyValueToAny(boost::any* any, const uint8_t* emptyValue, int colWidth) { switch (colWidth) { case 16: *any = *(uint128_t*)emptyValue; break; case 8: *any = *(uint64_t*)emptyValue; break; case 4: *any = *(uint32_t*)emptyValue; break; case 2: *any = *(uint16_t*)emptyValue; break; default: *any = *emptyValue; } } int WriteEngineWrapper::deleteRow(const TxnID& txnid, const vector& colExtentsColType, vector& colExtentsStruct, vector& colOldValueList, vector& ridLists, const int32_t tableOid, bool hasAUXCol) { ColTuple curTuple; ColStruct curColStruct; CalpontSystemCatalog::ColType cscColType; DctnryStruct dctnryStruct; ColValueList colValueList; ColTupleList curTupleList; DctnryStructList dctnryStructList; DctnryValueList dctnryValueList; ColStructList colStructList; CSCTypesList cscColTypeList; int rc; string tmpStr(""); vector dctnryExtentsStruct; if (colExtentsStruct.size() == 0 || ridLists.size() == 0) return ERR_STRUCT_EMPTY; // set transaction id setTransId(txnid); unsigned numExtents = colExtentsStruct.size(); for (unsigned extent = 0; extent < numExtents; extent++) { colStructList = colExtentsStruct[extent]; cscColTypeList = colExtentsColType[extent]; for (ColStructList::size_type i = 0; i < colStructList.size(); i++) { curTupleList.clear(); curColStruct = colStructList[i]; cscColType = cscColTypeList[i]; Convertor::convertColType(&curColStruct); const uint8_t* emptyVal = m_colOp[op(curColStruct.fCompressionType)]->getEmptyRowValue( curColStruct.colDataType, curColStruct.colWidth); emptyValueToAny(&curTuple.data, emptyVal, curColStruct.colWidth); curTupleList.push_back(curTuple); colValueList.push_back(curTupleList); dctnryStruct.dctnryOid = 0; dctnryStruct.fColPartition = curColStruct.fColPartition; dctnryStruct.fColSegment = curColStruct.fColSegment; dctnryStruct.fColDbRoot = curColStruct.fColDbRoot; dctnryStruct.columnOid = colStructList[i].dataOid; dctnryStructList.push_back(dctnryStruct); DctnryTuple dctnryTuple; DctColTupleList dctColTuples; dctnryTuple.sigValue = (unsigned char*)tmpStr.c_str(); dctnryTuple.sigSize = tmpStr.length(); dctnryTuple.isNull = true; dctColTuples.push_back(dctnryTuple); dctnryValueList.push_back(dctColTuples); } dctnryExtentsStruct.push_back(dctnryStructList); } // unfortunately I don't have a better way to instruct without passing too many parameters m_opType = DELETE; rc = updateColumnRec(txnid, colExtentsColType, colExtentsStruct, colValueList, colOldValueList, ridLists, dctnryExtentsStruct, dctnryValueList, tableOid, hasAUXCol); m_opType = NOOP; return rc; } inline void allocateValArray(void*& valArray, ColTupleList::size_type totalRow, ColType colType, int colWidth) { // MCS allocates 8 bytes even for CHARs smaller then 8 bytes. switch (colType) { case WriteEngine::WR_VARBINARY: // treat same as char for now case WriteEngine::WR_CHAR: case WriteEngine::WR_BLOB: case WriteEngine::WR_TEXT: valArray = calloc(sizeof(char), totalRow * MAX_COLUMN_BOUNDARY); break; case WriteEngine::WR_TOKEN: valArray = calloc(sizeof(Token), totalRow); break; default: valArray = calloc(totalRow, colWidth); break; } } int WriteEngineWrapper::deleteBadRows(const TxnID& txnid, ColStructList& colStructs, RIDList& ridList, DctnryStructList& dctnryStructList) { /* Need to scan all files including dictionary store files to check whether there is any bad chunks * */ int rc = 0; Column curCol; void* valArray = NULL; m_opType = DELETE; for (unsigned i = 0; i < colStructs.size(); i++) { ColumnOp* colOp = m_colOp[op(colStructs[i].fCompressionType)]; unsigned needFixFiles = colStructs[i].tokenFlag ? 2 : 1; colOp->initColumn(curCol); for (unsigned j = 0; j < needFixFiles; j++) { if (j == 0) { colOp->setColParam(curCol, 0, colStructs[i].colWidth, colStructs[i].colDataType, colStructs[i].colType, colStructs[i].dataOid, colStructs[i].fCompressionType, colStructs[i].fColDbRoot, colStructs[i].fColPartition, colStructs[i].fColSegment); colOp->findTypeHandler(colStructs[i].colWidth, colStructs[i].colDataType); string segFile; rc = colOp->openColumnFile(curCol, segFile, true, IO_BUFF_SIZE); // @bug 5572 HDFS tmp file if (rc != NO_ERROR) // If openFile fails, disk error or header error is assumed. { // report error and return. std::ostringstream oss; WErrorCodes ec; string err = ec.errorString(rc); oss << "Error opening file oid:dbroot:partition:segment = " << colStructs[i].dataOid << ":" << colStructs[i].fColDbRoot << ":" << colStructs[i].fColPartition << ":" << colStructs[i].fColSegment << " and error code is " << rc << " with message " << err; throw std::runtime_error(oss.str()); } allocateValArray(valArray, 1, colStructs[i].colType, colStructs[i].colWidth); rc = colOp->writeRows(curCol, ridList.size(), ridList, valArray, 0, true); if (rc != NO_ERROR) { // read error is fixed in place if (rc == ERR_COMP_COMPRESS) // write error { } } // flush files will be done in the end of fix. colOp->clearColumn(curCol); if (valArray != NULL) free(valArray); } else // dictionary file. How to fix { // read headers out, uncompress the last chunk, if error, replace it with empty chunk. Dctnry* dctnry = m_dctnry[op(dctnryStructList[i].fCompressionType)]; rc = dctnry->openDctnry(dctnryStructList[i].dctnryOid, dctnryStructList[i].fColDbRoot, dctnryStructList[i].fColPartition, dctnryStructList[i].fColSegment, false); rc = dctnry->checkFixLastDictChunk(); rc = dctnry->closeDctnry(true); } } } return rc; } /*@flushVMCache - Flush VM cache */ /*********************************************************** * DESCRIPTION: * Flush sytem VM cache * PARAMETERS: * none * RETURN: * none ***********************************************************/ void WriteEngineWrapper::flushVMCache() const { // int fd = open("/proc/sys/vm/drop_caches", O_WRONLY); // write(fd, "3", 1); // close(fd); } #if 0 // XXX temporary for debugging purposes static void log_this(const char *message, logging::LOG_TYPE log_type, unsigned sid) { // corresponds with dbcon in SubsystemID vector // in messagelog.cpp unsigned int subSystemId = 24; logging::LoggingID logid( subSystemId, sid, 0); logging::Message::Args args1; logging::Message msg(1); args1.add(message); msg.format( args1 ); Logger logger(logid.fSubsysID); logger.logMessage(log_type, msg, logid); } #endif /** @brief Determine whether we may update a column's ranges (by type) and return nullptr if we can't */ static ExtCPInfo* getCPInfoToUpdateForUpdatableType(const ColStruct& colStruct, ExtCPInfo* currentCPInfo, OpType optype) { if (colStruct.tokenFlag) { #if defined(XXX_WRITEENGINE_TOKENS_RANGES_XXX) if (currentCPInfo && currentCPInfo->hasStringsPrefixes() && optype == INSERT) { return currentCPInfo; } #endif return nullptr; } switch (colStruct.colType) { // here we enumerate all supported types. case WR_BYTE: case WR_SHORT: case WR_INT: case WR_LONGLONG: case WR_CHAR: case WR_UBYTE: case WR_USHORT: case WR_UINT: case WR_ULONGLONG: case WR_MEDINT: case WR_UMEDINT: case WR_BINARY: { return currentCPInfo; } // all unsupported types must not be supported. default: return nullptr; // safe choice for everything we can't do. } } /** @brief Let only valid ranges to be present. * * There can be a case that we have computed invalid range while computing updated ranges. * * These invalid ranges should not have CP_VALID in the status code. So, they must not * come into final call to setCPInfos or something. * * To achieve that, we filter these invalid ranges here. */ static void setInvalidCPInfosSpecialMarks(std::vector& cpInfos) { size_t i; for (i = 0; i < cpInfos.size(); i++) { if (cpInfos[i].isInvalid()) { cpInfos[i].fCPInfo.seqNum = SEQNUM_MARK_INVALID_SET_RANGE; } } } /*@insertColumnRecs - Insert value(s) into a column */ /*********************************************************** * DESCRIPTION: * Insert values into columns (batchinsert) * PARAMETERS: * colStructList - column struct list * colValueList - column value list * RETURN: * NO_ERROR if success * others if something wrong in inserting the value ***********************************************************/ int WriteEngineWrapper::insertColumnRecs( const TxnID& txnid, const CSCTypesList& cscColTypeList, ColStructList& colStructList, ColValueList& colValueList, DctnryStructList& dctnryStructList, DictStrList& dictStrList, std::vector >& dbRootExtentTrackers, RBMetaWriter* fRBMetaWriter, bool bFirstExtentOnThisPM, bool insertSelect, bool isAutoCommitOn, OID tableOid, bool isFirstBatchPm) { int rc; RID* rowIdArray = NULL; ColTupleList curTupleList; Column curCol; ColStruct curColStruct; ColValueList colOldValueList; ColValueList colNewValueList; ColStructList newColStructList; DctnryStructList newDctnryStructList; HWM hwm = 0; HWM oldHwm = 0; HWM newHwm = 0; ColTupleList::size_type totalRow; ColStructList::size_type totalColumns; uint64_t rowsLeft = 0; bool newExtent = false; RIDList ridList; ColumnOp* colOp = NULL; ColSplitMaxMinInfoList maxMins; // Set tmp file suffix to modify HDFS db file bool useTmpSuffix = false; m_opType = INSERT; if (idbdatafile::IDBPolicy::useHdfs()) { if (!bFirstExtentOnThisPM) useTmpSuffix = true; } unsigned i = 0; #ifdef PROFILE StopWatch timer; #endif // debug information for testing if (isDebug(DEBUG_2)) { printInputValue(colStructList, colValueList, ridList, dctnryStructList, dictStrList); } // end // Convert data type and column width to write engine specific for (i = 0; i < colStructList.size(); i++) Convertor::convertColType(&colStructList[i]); for (const auto& colStruct : colStructList) { ColSplitMaxMinInfo tmp(colStruct.colDataType, colStruct.colWidth); maxMins.push_back(tmp); } uint32_t colId = 0; // MCOL-1675: find the smallest column width to calculate the RowID from so // that all HWMs will be incremented by this operation findSmallestColumn(colId, colStructList); // rc = checkValid(txnid, colStructList, colValueList, ridList); // if (rc != NO_ERROR) // return rc; setTransId(txnid); uint16_t dbRoot, segmentNum; uint32_t partitionNum; string segFile; bool newFile; TableMetaData* tableMetaData = TableMetaData::makeTableMetaData(tableOid); // populate colStructList with file information IDBDataFile* pFile = NULL; std::vector extentInfo; int currentDBrootIdx = 0; std::vector extents; // this structure valid only when isFirstOnBatchPm is true. std::vector newExtentsStartingLbids; // we keep column-indexed LBIDs here for **new** extents. // It is set and valid and used only when rowsLeft > 0. //-------------------------------------------------------------------------- // For first batch on this PM: // o get starting extent from ExtentTracker, and allocate extent if needed // o construct colStructList and dctnryStructList accordingly // o save extent information in tableMetaData for future use // If not first batch on this PM: // o construct colStructList and dctnryStructList from tableMetaData //-------------------------------------------------------------------------- if (isFirstBatchPm) { currentDBrootIdx = dbRootExtentTrackers[colId]->getCurrentDBRootIdx(); extentInfo = dbRootExtentTrackers[colId]->getDBRootExtentList(); dbRoot = extentInfo[currentDBrootIdx].fDbRoot; partitionNum = extentInfo[currentDBrootIdx].fPartition; //---------------------------------------------------------------------- // check whether this extent is the first on this PM //---------------------------------------------------------------------- if (bFirstExtentOnThisPM) { std::vector cols; BRM::CreateStripeColumnExtentsArgIn createStripeColumnExtentsArgIn; for (i = 0; i < colStructList.size(); i++) { createStripeColumnExtentsArgIn.oid = colStructList[i].dataOid; createStripeColumnExtentsArgIn.width = colStructList[i].colWidth; createStripeColumnExtentsArgIn.colDataType = colStructList[i].colDataType; cols.push_back(createStripeColumnExtentsArgIn); } rc = BRMWrapper::getInstance()->allocateStripeColExtents(cols, dbRoot, partitionNum, segmentNum, extents); if (rc != NO_ERROR) return rc; // Create column files ExtCPInfoList cpinfoList; for (i = 0; i < extents.size(); i++) { ExtCPInfo cpInfo(colStructList[i].colDataType, colStructList[i].colWidth); colOp = m_colOp[op(colStructList[i].fCompressionType)]; colOp->initColumn(curCol); colOp->setColParam(curCol, colId, colStructList[i].colWidth, colStructList[i].colDataType, colStructList[i].colType, colStructList[i].dataOid, colStructList[i].fCompressionType, dbRoot, partitionNum, segmentNum); colOp->findTypeHandler(colStructList[i].colWidth, colStructList[i].colDataType); rc = colOp->extendColumn(curCol, false, extents[i].startBlkOffset, extents[i].startLbid, extents[i].allocSize, dbRoot, partitionNum, segmentNum, segFile, pFile, newFile); if (rc != NO_ERROR) return rc; cpInfo.toInvalid(); cpInfo.fCPInfo.seqNum = SEQNUM_MARK_INVALID_SET_RANGE; // mark the extents to invalid cpInfo.fCPInfo.firstLbid = extents[i].startLbid; cpinfoList.push_back(cpInfo); colStructList[i].fColPartition = partitionNum; colStructList[i].fColSegment = segmentNum; colStructList[i].fColDbRoot = dbRoot; dctnryStructList[i].fColPartition = partitionNum; dctnryStructList[i].fColSegment = segmentNum; dctnryStructList[i].fColDbRoot = dbRoot; } // mark the extents to invalid rc = BRMWrapper::getInstance()->setExtentsMaxMin(cpinfoList); if (rc != NO_ERROR) return rc; // create corresponding dictionary files for (i = 0; i < dctnryStructList.size(); i++) { if (dctnryStructList[i].dctnryOid > 0) { rc = createDctnry(txnid, dctnryStructList[i].dctnryOid, dctnryStructList[i].colWidth, dbRoot, partitionNum, segmentNum, dctnryStructList[i].fCompressionType); if (rc != NO_ERROR) return rc; } } } // if ( bFirstExtentOnThisPM) else // if (!bFirstExtentOnThisPM) { std::vector tmpExtentInfo; for (i = 0; i < dbRootExtentTrackers.size(); i++) { tmpExtentInfo = dbRootExtentTrackers[i]->getDBRootExtentList(); colStructList[i].fColPartition = tmpExtentInfo[currentDBrootIdx].fPartition; colStructList[i].fColSegment = tmpExtentInfo[currentDBrootIdx].fSegment; colStructList[i].fColDbRoot = tmpExtentInfo[currentDBrootIdx].fDbRoot; dctnryStructList[i].fColPartition = tmpExtentInfo[currentDBrootIdx].fPartition; dctnryStructList[i].fColSegment = tmpExtentInfo[currentDBrootIdx].fSegment; dctnryStructList[i].fColDbRoot = tmpExtentInfo[currentDBrootIdx].fDbRoot; } } //---------------------------------------------------------------------- // Save the extents info in tableMetaData //---------------------------------------------------------------------- for (i = 0; i < colStructList.size(); i++) { ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[i].dataOid); ColExtsInfo::iterator it = aColExtsInfo.begin(); while (it != aColExtsInfo.end()) { if ((it->dbRoot == colStructList[i].fColDbRoot) && (it->partNum == colStructList[i].fColPartition) && (it->segNum == colStructList[i].fColSegment)) break; it++; } if (it == aColExtsInfo.end()) // add this one to the list { ColExtInfo aExt; aExt.dbRoot = colStructList[i].fColDbRoot; aExt.partNum = colStructList[i].fColPartition; aExt.segNum = colStructList[i].fColSegment; aExt.compType = colStructList[i].fCompressionType; aExt.isDict = false; if (bFirstExtentOnThisPM) { aExt.hwm = extents[i].startBlkOffset; aExt.isNewExt = true; } else { std::vector tmpExtentInfo; tmpExtentInfo = dbRootExtentTrackers[i]->getDBRootExtentList(); aExt.isNewExt = false; aExt.hwm = tmpExtentInfo[currentDBrootIdx].fLocalHwm; } aExt.current = true; aColExtsInfo.push_back(aExt); } tableMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo); } for (i = 0; i < dctnryStructList.size(); i++) { if (dctnryStructList[i].dctnryOid > 0) { ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(dctnryStructList[i].dctnryOid); ColExtsInfo::iterator it = aColExtsInfo.begin(); while (it != aColExtsInfo.end()) { if ((it->dbRoot == dctnryStructList[i].fColDbRoot) && (it->partNum == dctnryStructList[i].fColPartition) && (it->segNum == dctnryStructList[i].fColSegment)) break; it++; } if (it == aColExtsInfo.end()) // add this one to the list { ColExtInfo aExt; aExt.dbRoot = dctnryStructList[i].fColDbRoot; aExt.partNum = dctnryStructList[i].fColPartition; aExt.segNum = dctnryStructList[i].fColSegment; aExt.compType = dctnryStructList[i].fCompressionType; aExt.isDict = true; aColExtsInfo.push_back(aExt); } tableMetaData->setColExtsInfo(dctnryStructList[i].dctnryOid, aColExtsInfo); } } } // if (isFirstBatchPm) else // get the extent info from tableMetaData { ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[colId].dataOid); ColExtsInfo::iterator it = aColExtsInfo.begin(); while (it != aColExtsInfo.end()) { if (it->current) break; it++; } if (it == aColExtsInfo.end()) return 1; for (i = 0; i < colStructList.size(); i++) { colStructList[i].fColPartition = it->partNum; colStructList[i].fColSegment = it->segNum; colStructList[i].fColDbRoot = it->dbRoot; dctnryStructList[i].fColPartition = it->partNum; dctnryStructList[i].fColSegment = it->segNum; dctnryStructList[i].fColDbRoot = it->dbRoot; } } curTupleList = static_cast(colValueList[0]); totalRow = curTupleList.size(); totalColumns = colStructList.size(); rowIdArray = new RID[totalRow]; // use scoped_array to ensure ptr deletion regardless of where we return boost::scoped_array rowIdArrayPtr(rowIdArray); memset(rowIdArray, 0, (sizeof(RID) * totalRow)); //-------------------------------------------------------------------------- // allocate row id(s) //-------------------------------------------------------------------------- curColStruct = colStructList[colId]; colOp = m_colOp[op(curColStruct.fCompressionType)]; colOp->initColumn(curCol); // Get the correct segment, partition, column file vector colExtentInfo; // Save those empty extents in case of failure to rollback vector dictExtentInfo; // Save those empty extents in case of failure to rollback vector fileInfo; dbRoot = curColStruct.fColDbRoot; // use the first column to calculate row id ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[colId].dataOid); ColExtsInfo::iterator it = aColExtsInfo.begin(); while (it != aColExtsInfo.end()) { if ((it->dbRoot == colStructList[colId].fColDbRoot) && (it->partNum == colStructList[colId].fColPartition) && (it->segNum == colStructList[colId].fColSegment) && it->current) { break; } it++; } if (it != aColExtsInfo.end()) { hwm = it->hwm; } oldHwm = hwm; // Save this info for rollback // need to pass real dbRoot, partition, and segment to setColParam colOp->setColParam(curCol, colId, curColStruct.colWidth, curColStruct.colDataType, curColStruct.colType, curColStruct.dataOid, curColStruct.fCompressionType, curColStruct.fColDbRoot, curColStruct.fColPartition, curColStruct.fColSegment); colOp->findTypeHandler(curColStruct.colWidth, curColStruct.colDataType); rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix); // @bug 5572 HDFS tmp file if (rc != NO_ERROR) { return rc; } // get hwm first // @bug 286 : fix for bug 286 - correct the typo in getHWM // RETURN_ON_ERROR(BRMWrapper::getInstance()->getHWM(curColStruct.dataOid, hwm)); Column newCol; #ifdef PROFILE timer.start("allocRowId"); #endif newColStructList = colStructList; newDctnryStructList = dctnryStructList; bool bUseStartExtent = true; if (idbdatafile::IDBPolicy::useHdfs()) insertSelect = true; rc = colOp->allocRowId(txnid, bUseStartExtent, curCol, (uint64_t)totalRow, rowIdArray, hwm, newExtent, rowsLeft, newHwm, newFile, newColStructList, newDctnryStructList, dbRootExtentTrackers, insertSelect, true, tableOid, isFirstBatchPm, &newExtentsStartingLbids); if (rc != NO_ERROR) // Clean up is already done return rc; #ifdef PROFILE timer.stop("allocRowId"); #endif //-------------------------------------------------------------------------- // Expand initial abbreviated extent if any RID in 1st extent is > 256K. // if totalRow == rowsLeft, then not adding rows to 1st extent, so skip it. //-------------------------------------------------------------------------- // DMC-SHARED_NOTHING_NOTE: Is it safe to assume only part0 seg0 is abbreviated? if ((curCol.dataFile.fPartition == 0) && (curCol.dataFile.fSegment == 0) && ((totalRow - rowsLeft) > 0) && (rowIdArray[totalRow - rowsLeft - 1] >= (RID)INITIAL_EXTENT_ROWS_TO_DISK)) { for (unsigned k = 0; k < colStructList.size(); k++) { // Skip the selected column if (k == colId) continue; Column expandCol; colOp = m_colOp[op(colStructList[k].fCompressionType)]; colOp->setColParam(expandCol, 0, colStructList[k].colWidth, colStructList[k].colDataType, colStructList[k].colType, colStructList[k].dataOid, colStructList[k].fCompressionType, colStructList[k].fColDbRoot, colStructList[k].fColPartition, colStructList[k].fColSegment); colOp->findTypeHandler(colStructList[k].colWidth, colStructList[k].colDataType); rc = colOp->openColumnFile(expandCol, segFile, true); // @bug 5572 HDFS tmp file if (rc == NO_ERROR) { if (colOp->abbreviatedExtent(expandCol.dataFile.pFile, colStructList[k].colWidth)) { rc = colOp->expandAbbrevExtent(expandCol); } } if (rc != NO_ERROR) { return rc; } colOp->clearColumn(expandCol); // closes the file (if uncompressed) } } //-------------------------------------------------------------------------- // Tokenize data if needed //-------------------------------------------------------------------------- if (insertSelect && isAutoCommitOn) BRMWrapper::setUseVb(false); else BRMWrapper::setUseVb(true); dictStr::iterator dctStr_iter; ColTupleList::iterator col_iter; for (i = 0; i < colStructList.size(); i++) { if (colStructList[i].tokenFlag) { dctStr_iter = dictStrList[i].begin(); col_iter = colValueList[i].begin(); Dctnry* dctnry = m_dctnry[op(dctnryStructList[i].fCompressionType)]; rc = dctnry->openDctnry(dctnryStructList[i].dctnryOid, dctnryStructList[i].fColDbRoot, dctnryStructList[i].fColPartition, dctnryStructList[i].fColSegment, useTmpSuffix); // @bug 5572 HDFS tmp file if (rc != NO_ERROR) { // cout << "Error opening dctnry file " << dctnryStructList[i].dctnryOid << endl; return rc; } #if defined(XXX_WRITEENGINE_TOKENS_RANGES_XXX) datatypes::Charset cset(dctnryStructList[i].fCharsetNumber); #endif for (uint32_t rows = 0; rows < (totalRow - rowsLeft); rows++) { #if defined(XXX_WRITEENGINE_TOKENS_RANGES_XXX) int64_t strPrefix; #endif if (dctStr_iter->isNull()) { Token nullToken; col_iter->data = nullToken; #if defined(XXX_WRITEENGINE_TOKENS_RANGES_XXX) strPrefix = (int64_t)joblist::UBIGINTNULL; // the string prefixes are signed long ints. #endif } else { #ifdef PROFILE timer.start("tokenize"); #endif DctnryTuple dctTuple; dctTuple.sigValue = (unsigned char*)dctStr_iter->str(); dctTuple.sigSize = dctStr_iter->length(); #if defined(XXX_WRITEENGINE_TOKENS_RANGES_XXX) strPrefix = encodeStringPrefix(dctTuple.sigValue, dctTuple.sigSize, cset); #endif dctTuple.isNull = false; rc = tokenize(txnid, dctTuple, dctnryStructList[i].fCompressionType); if (rc != NO_ERROR) { dctnry->closeDctnry(); return rc; } #ifdef PROFILE timer.stop("tokenize"); #endif col_iter->data = dctTuple.token; } #if defined(XXX_WRITEENGINE_TOKENS_RANGES_XXX) maxMins[i].fSplitMaxMinInfo[0].addStringPrefix(strPrefix); #endif dctStr_iter++; col_iter++; } // close dictionary files rc = dctnry->closeDctnry(false); if (rc != NO_ERROR) return rc; if (newExtent) { //@Bug 4854 back up hwm chunk for the file to be modified if (fRBMetaWriter) fRBMetaWriter->backupDctnryHWMChunk( newDctnryStructList[i].dctnryOid, newDctnryStructList[i].fColDbRoot, newDctnryStructList[i].fColPartition, newDctnryStructList[i].fColSegment); rc = dctnry->openDctnry(newDctnryStructList[i].dctnryOid, newDctnryStructList[i].fColDbRoot, newDctnryStructList[i].fColPartition, newDctnryStructList[i].fColSegment, false); // @bug 5572 HDFS tmp file if (rc != NO_ERROR) return rc; for (uint32_t rows = 0; rows < rowsLeft; rows++) { #if defined(XXX_WRITEENGINE_TOKENS_RANGES_XXX) int64_t strPrefix; #endif if (dctStr_iter->isNull()) { Token nullToken; col_iter->data = nullToken; #if defined(XXX_WRITEENGINE_TOKENS_RANGES_XXX) strPrefix = joblist::UBIGINTNULL; // string prefixes are signed long ints. #endif } else { #ifdef PROFILE timer.start("tokenize"); #endif DctnryTuple dctTuple; dctTuple.sigValue = (unsigned char*)dctStr_iter->str(); dctTuple.sigSize = dctStr_iter->length(); #if defined(XXX_WRITEENGINE_TOKENS_RANGES_XXX) strPrefix = encodeStringPrefix_check_null(dctTuple.sigValue, dctTuple.sigSize, cset); #endif dctTuple.isNull = false; rc = tokenize(txnid, dctTuple, newDctnryStructList[i].fCompressionType); if (rc != NO_ERROR) { dctnry->closeDctnry(); return rc; } #ifdef PROFILE timer.stop("tokenize"); #endif col_iter->data = dctTuple.token; } #if defined(XXX_WRITEENGINE_TOKENS_RANGES_XXX) maxMins[i].fSplitMaxMinInfo[1].addStringPrefix(strPrefix); #endif dctStr_iter++; col_iter++; } // close dictionary files rc = dctnry->closeDctnry(false); if (rc != NO_ERROR) return rc; } } } if (insertSelect && isAutoCommitOn) BRMWrapper::setUseVb(false); else BRMWrapper::setUseVb(true); //-------------------------------------------------------------------------- // Update column info structure @Bug 1862 set hwm, and // Prepare ValueList for new extent (if applicable) //-------------------------------------------------------------------------- //@Bug 2205 Check whether all rows go to the new extent RID lastRid = 0; RID lastRidNew = 0; if (totalRow - rowsLeft > 0) { lastRid = rowIdArray[totalRow - rowsLeft - 1]; lastRidNew = rowIdArray[totalRow - 1]; } else { lastRid = 0; lastRidNew = rowIdArray[totalRow - 1]; } // if a new extent is created, all the columns in this table should have their own new extent // First column already processed //@Bug 1701. Close the file (if uncompressed) m_colOp[op(curCol.compressionType)]->clearColumn(curCol); // Update hwm to set them in the end bool succFlag = false; unsigned colWidth = 0; int curFbo = 0, curBio; for (i = 0; i < totalColumns; i++) { // shoud be obtained from saved hwm aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[i].dataOid); it = aColExtsInfo.begin(); while (it != aColExtsInfo.end()) { if ((it->dbRoot == colStructList[i].fColDbRoot) && (it->partNum == colStructList[i].fColPartition) && (it->segNum == colStructList[i].fColSegment) && it->current) break; it++; } if (it != aColExtsInfo.end()) // update hwm info { oldHwm = it->hwm; } // save hwm for the old extent colWidth = colStructList[i].colWidth; succFlag = colOp->calculateRowId(lastRid, BYTE_PER_BLOCK / colWidth, colWidth, curFbo, curBio); if (succFlag) { if ((HWM)curFbo >= oldHwm) { it->hwm = (HWM)curFbo; } //@Bug 4947. set current to false for old extent. if (newExtent) { it->current = false; } } else return ERR_INVALID_PARAM; // update hwm for the new extent if (newExtent) { it = aColExtsInfo.begin(); while (it != aColExtsInfo.end()) { if ((it->dbRoot == newColStructList[i].fColDbRoot) && (it->partNum == newColStructList[i].fColPartition) && (it->segNum == newColStructList[i].fColSegment) && it->current) break; it++; } succFlag = colOp->calculateRowId(lastRidNew, BYTE_PER_BLOCK / colWidth, colWidth, curFbo, curBio); if (succFlag) { if (it != aColExtsInfo.end()) { it->hwm = (HWM)curFbo; // cout << "setting hwm to " << (int)curFbo <<" for seg " <segNum << endl; it->current = true; } } else return ERR_INVALID_PARAM; } tableMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo); } //-------------------------------------------------------------------------- // Prepare the valuelist for the new extent //-------------------------------------------------------------------------- ColTupleList colTupleList; ColTupleList newColTupleList; ColTupleList firstPartTupleList; for (unsigned i = 0; i < totalColumns; i++) { colTupleList = static_cast(colValueList[i]); for (uint64_t j = rowsLeft; j > 0; j--) { newColTupleList.push_back(colTupleList[totalRow - j]); } colNewValueList.push_back(newColTupleList); newColTupleList.clear(); // update the oldvalue list for the old extent for (uint64_t j = 0; j < (totalRow - rowsLeft); j++) { firstPartTupleList.push_back(colTupleList[j]); } colOldValueList.push_back(firstPartTupleList); firstPartTupleList.clear(); } // end of allocate row id #ifdef PROFILE timer.start("writeColumnRec"); #endif if (rc == NO_ERROR) { //---------------------------------------------------------------------- // Mark extents invalid //---------------------------------------------------------------------- bool successFlag = true; unsigned width = 0; int curFbo = 0, curBio, lastFbo = -1; if (isFirstBatchPm && (totalRow == rowsLeft)) { // in this particular case we already marked extents as invalid above. } else { int firstHalfCount = totalRow - rowsLeft; for (unsigned i = 0; i < colStructList.size(); i++) { colOp = m_colOp[op(colStructList[i].fCompressionType)]; width = colStructList[i].colWidth; if (firstHalfCount) { ExtCPInfo* cpInfoP = getCPInfoToUpdateForUpdatableType(colStructList[i], &maxMins[i].fSplitMaxMinInfo[0], m_opType); RID thisRid = rowsLeft ? lastRid : lastRidNew; successFlag = colOp->calculateRowId(thisRid, BYTE_PER_BLOCK / width, width, curFbo, curBio); if (successFlag) { if (curFbo != lastFbo) { RETURN_ON_ERROR(AddLBIDtoList(txnid, colStructList[i], curFbo, cpInfoP)); } } maxMins[i].fSplitMaxMinInfoPtrs[0] = cpInfoP; } if (rowsLeft) { ExtCPInfo* cpInfoP = getCPInfoToUpdateForUpdatableType(colStructList[i], &maxMins[i].fSplitMaxMinInfo[1], m_opType); if (cpInfoP) { RETURN_ON_ERROR(GetLBIDRange(newExtentsStartingLbids[i], colStructList[i], *cpInfoP)); } maxMins[i].fSplitMaxMinInfoPtrs[1] = cpInfoP; } } } markTxnExtentsAsInvalid(txnid); //---------------------------------------------------------------------- // Write row(s) to database file(s) //---------------------------------------------------------------------- std::vector cpinfoList; for (auto& splitCPInfo : maxMins) { for (i = 0; i < 2; i++) { ExtCPInfo* cpInfo = splitCPInfo.fSplitMaxMinInfoPtrs[i]; if (cpInfo) { cpinfoList.push_back(*cpInfo); cpinfoList[cpinfoList.size() - 1].fCPInfo.seqNum = SEQNUM_MARK_INVALID_SET_RANGE; } } } rc = BRMWrapper::getInstance()->setExtentsMaxMin(cpinfoList); if (rc != NO_ERROR) { return rc; } rc = writeColumnRec(txnid, cscColTypeList, colStructList, colOldValueList, rowIdArray, newColStructList, colNewValueList, tableOid, useTmpSuffix, true, &maxMins); // @bug 5572 HDFS tmp file if (rc == NO_ERROR) { if (dctnryStructList.size() > 0) { vector oids{static_cast(tableOid)}; for (const DctnryStruct& dctnryStruct : dctnryStructList) { oids.push_back(dctnryStruct.dctnryOid); } rc = flushOIDsFromCache(oids); if (rc != 0) rc = ERR_BLKCACHE_FLUSH_LIST; // translate to WE error } if (rc == NO_ERROR) { int index = 0; for (auto& splitCPInfo : maxMins) { for (i = 0; i < 2; i++) { ExtCPInfo* cpInfo = splitCPInfo.fSplitMaxMinInfoPtrs[i]; if (cpInfo) { cpinfoList[index] = *cpInfo; cpinfoList[index].fCPInfo.seqNum++; index++; } } } setInvalidCPInfosSpecialMarks(cpinfoList); rc = BRMWrapper::getInstance()->setExtentsMaxMin(cpinfoList); } } } return rc; } int WriteEngineWrapper::insertColumnRecsBinary( const TxnID& txnid, ColStructList& colStructList, std::vector& colValueList, DctnryStructList& dctnryStructList, DictStrList& dictStrList, std::vector >& dbRootExtentTrackers, RBMetaWriter* fRBMetaWriter, bool bFirstExtentOnThisPM, bool insertSelect, bool isAutoCommitOn, OID tableOid, bool isFirstBatchPm) { int rc; RID* rowIdArray = NULL; Column curCol; ColStruct curColStruct; ColStructList newColStructList; std::vector colNewValueList; DctnryStructList newDctnryStructList; HWM hwm = 0; HWM oldHwm = 0; HWM newHwm = 0; size_t totalRow; ColStructList::size_type totalColumns; uint64_t rowsLeft = 0; bool newExtent = false; RIDList ridList; ColumnOp* colOp = NULL; std::vector dictLbids; // Set tmp file suffix to modify HDFS db file bool useTmpSuffix = false; m_opType = INSERT; if (idbdatafile::IDBPolicy::useHdfs()) { if (!bFirstExtentOnThisPM) useTmpSuffix = true; } unsigned i = 0; #ifdef PROFILE StopWatch timer; #endif // Convert data type and column width to write engine specific for (i = 0; i < colStructList.size(); i++) Convertor::convertColType(&colStructList[i]); uint32_t colId = 0; // MCOL-1675: find the smallest column width to calculate the RowID from so // that all HWMs will be incremented by this operation findSmallestColumn(colId, colStructList); // rc = checkValid(txnid, colStructList, colValueList, ridList); // if (rc != NO_ERROR) // return rc; setTransId(txnid); uint16_t dbRoot, segmentNum; uint32_t partitionNum; string segFile; bool newFile; TableMetaData* tableMetaData = TableMetaData::makeTableMetaData(tableOid); // populate colStructList with file information IDBDataFile* pFile = NULL; std::vector extentInfo; int currentDBrootIdx = 0; std::vector extents; //-------------------------------------------------------------------------- // For first batch on this PM: // o get starting extent from ExtentTracker, and allocate extent if needed // o construct colStructList and dctnryStructList accordingly // o save extent information in tableMetaData for future use // If not first batch on this PM: // o construct colStructList and dctnryStructList from tableMetaData //-------------------------------------------------------------------------- if (isFirstBatchPm) { currentDBrootIdx = dbRootExtentTrackers[colId]->getCurrentDBRootIdx(); extentInfo = dbRootExtentTrackers[colId]->getDBRootExtentList(); dbRoot = extentInfo[currentDBrootIdx].fDbRoot; partitionNum = extentInfo[currentDBrootIdx].fPartition; //---------------------------------------------------------------------- // check whether this extent is the first on this PM //---------------------------------------------------------------------- if (bFirstExtentOnThisPM) { // cout << "bFirstExtentOnThisPM is " << bFirstExtentOnThisPM << endl; std::vector cols; BRM::CreateStripeColumnExtentsArgIn createStripeColumnExtentsArgIn; for (i = 0; i < colStructList.size(); i++) { createStripeColumnExtentsArgIn.oid = colStructList[i].dataOid; createStripeColumnExtentsArgIn.width = colStructList[i].colWidth; createStripeColumnExtentsArgIn.colDataType = colStructList[i].colDataType; cols.push_back(createStripeColumnExtentsArgIn); } rc = BRMWrapper::getInstance()->allocateStripeColExtents(cols, dbRoot, partitionNum, segmentNum, extents); if (rc != NO_ERROR) return rc; // Create column files ExtCPInfoList cpinfoList; for (i = 0; i < extents.size(); i++) { ExtCPInfo cpInfo(colStructList[i].colDataType, colStructList[i].colWidth); colOp = m_colOp[op(colStructList[i].fCompressionType)]; colOp->initColumn(curCol); colOp->setColParam(curCol, 0, colStructList[i].colWidth, colStructList[i].colDataType, colStructList[i].colType, colStructList[i].dataOid, colStructList[i].fCompressionType, dbRoot, partitionNum, segmentNum); colOp->findTypeHandler(colStructList[i].colWidth, colStructList[i].colDataType); rc = colOp->extendColumn(curCol, false, extents[i].startBlkOffset, extents[i].startLbid, extents[i].allocSize, dbRoot, partitionNum, segmentNum, segFile, pFile, newFile); if (rc != NO_ERROR) return rc; cpInfo.toInvalid(); cpInfo.fCPInfo.seqNum = SEQNUM_MARK_INVALID_SET_RANGE; // mark the extents to invalid cpInfo.fCPInfo.firstLbid = extents[i].startLbid; cpinfoList.push_back(cpInfo); colStructList[i].fColPartition = partitionNum; colStructList[i].fColSegment = segmentNum; colStructList[i].fColDbRoot = dbRoot; dctnryStructList[i].fColPartition = partitionNum; dctnryStructList[i].fColSegment = segmentNum; dctnryStructList[i].fColDbRoot = dbRoot; } // mark the extents to invalid rc = BRMWrapper::getInstance()->setExtentsMaxMin(cpinfoList); if (rc != NO_ERROR) return rc; // create corresponding dictionary files for (i = 0; i < dctnryStructList.size(); i++) { if (dctnryStructList[i].dctnryOid > 0) { rc = createDctnry(txnid, dctnryStructList[i].dctnryOid, dctnryStructList[i].colWidth, dbRoot, partitionNum, segmentNum, dctnryStructList[i].fCompressionType); if (rc != NO_ERROR) return rc; } } } // if ( bFirstExtentOnThisPM) else // if (!bFirstExtentOnThisPM) { std::vector tmpExtentInfo; for (i = 0; i < dbRootExtentTrackers.size(); i++) { tmpExtentInfo = dbRootExtentTrackers[i]->getDBRootExtentList(); colStructList[i].fColPartition = tmpExtentInfo[currentDBrootIdx].fPartition; colStructList[i].fColSegment = tmpExtentInfo[currentDBrootIdx].fSegment; colStructList[i].fColDbRoot = tmpExtentInfo[currentDBrootIdx].fDbRoot; // cout << "Load from dbrootExtenttracker oid:dbroot:part:seg = " <getColExtsInfo(colStructList[i].dataOid); ColExtsInfo::iterator it = aColExtsInfo.begin(); while (it != aColExtsInfo.end()) { if ((it->dbRoot == colStructList[i].fColDbRoot) && (it->partNum == colStructList[i].fColPartition) && (it->segNum == colStructList[i].fColSegment)) break; it++; } if (it == aColExtsInfo.end()) // add this one to the list { ColExtInfo aExt; aExt.dbRoot = colStructList[i].fColDbRoot; aExt.partNum = colStructList[i].fColPartition; aExt.segNum = colStructList[i].fColSegment; aExt.compType = colStructList[i].fCompressionType; aExt.isDict = false; if (bFirstExtentOnThisPM) { aExt.hwm = extents[i].startBlkOffset; aExt.isNewExt = true; // cout << "adding a ext to metadata" << endl; } else { std::vector tmpExtentInfo; tmpExtentInfo = dbRootExtentTrackers[i]->getDBRootExtentList(); aExt.isNewExt = false; aExt.hwm = tmpExtentInfo[currentDBrootIdx].fLocalHwm; // cout << "oid " << colStructList[i].dataOid << " gets hwm " << aExt.hwm << endl; } aExt.current = true; aColExtsInfo.push_back(aExt); // cout << "get from extentinfo oid:hwm = " << colStructList[i].dataOid << ":" << aExt.hwm << endl; } tableMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo); } for (i = 0; i < dctnryStructList.size(); i++) { if (dctnryStructList[i].dctnryOid > 0) { ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(dctnryStructList[i].dctnryOid); ColExtsInfo::iterator it = aColExtsInfo.begin(); while (it != aColExtsInfo.end()) { if ((it->dbRoot == dctnryStructList[i].fColDbRoot) && (it->partNum == dctnryStructList[i].fColPartition) && (it->segNum == dctnryStructList[i].fColSegment)) break; it++; } if (it == aColExtsInfo.end()) // add this one to the list { ColExtInfo aExt; aExt.dbRoot = dctnryStructList[i].fColDbRoot; aExt.partNum = dctnryStructList[i].fColPartition; aExt.segNum = dctnryStructList[i].fColSegment; aExt.compType = dctnryStructList[i].fCompressionType; aExt.isDict = true; aColExtsInfo.push_back(aExt); } tableMetaData->setColExtsInfo(dctnryStructList[i].dctnryOid, aColExtsInfo); } } } // if (isFirstBatchPm) else // get the extent info from tableMetaData { ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[colId].dataOid); ColExtsInfo::iterator it = aColExtsInfo.begin(); while (it != aColExtsInfo.end()) { if (it->current) break; it++; } if (it == aColExtsInfo.end()) return 1; for (i = 0; i < colStructList.size(); i++) { colStructList[i].fColPartition = it->partNum; colStructList[i].fColSegment = it->segNum; colStructList[i].fColDbRoot = it->dbRoot; dctnryStructList[i].fColPartition = it->partNum; dctnryStructList[i].fColSegment = it->segNum; dctnryStructList[i].fColDbRoot = it->dbRoot; } } totalColumns = colStructList.size(); totalRow = colValueList.size() / totalColumns; rowIdArray = new RID[totalRow]; // use scoped_array to ensure ptr deletion regardless of where we return boost::scoped_array rowIdArrayPtr(rowIdArray); memset(rowIdArray, 0, (sizeof(RID) * totalRow)); //-------------------------------------------------------------------------- // allocate row id(s) //-------------------------------------------------------------------------- curColStruct = colStructList[colId]; colOp = m_colOp[op(curColStruct.fCompressionType)]; colOp->initColumn(curCol); // Get the correct segment, partition, column file vector colExtentInfo; // Save those empty extents in case of failure to rollback vector dictExtentInfo; // Save those empty extents in case of failure to rollback vector fileInfo; dbRoot = curColStruct.fColDbRoot; // use the smallest column to calculate row id ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[colId].dataOid); ColExtsInfo::iterator it = aColExtsInfo.begin(); while (it != aColExtsInfo.end()) { if ((it->dbRoot == colStructList[colId].fColDbRoot) && (it->partNum == colStructList[colId].fColPartition) && (it->segNum == colStructList[colId].fColSegment) && it->current) break; it++; } if (it != aColExtsInfo.end()) { hwm = it->hwm; // cout << "Got from colextinfo hwm for oid " << colStructList[colId].dataOid << " is " << hwm << " and // seg is " << colStructList[colId].fColSegment << endl; } oldHwm = hwm; // Save this info for rollback // need to pass real dbRoot, partition, and segment to setColParam colOp->setColParam(curCol, colId, curColStruct.colWidth, curColStruct.colDataType, curColStruct.colType, curColStruct.dataOid, curColStruct.fCompressionType, curColStruct.fColDbRoot, curColStruct.fColPartition, curColStruct.fColSegment); colOp->findTypeHandler(curColStruct.colWidth, curColStruct.colDataType); rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix); // @bug 5572 HDFS tmp file if (rc != NO_ERROR) { return rc; } // get hwm first // @bug 286 : fix for bug 286 - correct the typo in getHWM // RETURN_ON_ERROR(BRMWrapper::getInstance()->getHWM(curColStruct.dataOid, hwm)); Column newCol; #ifdef PROFILE timer.start("allocRowId"); #endif newColStructList = colStructList; newDctnryStructList = dctnryStructList; bool bUseStartExtent = true; if (idbdatafile::IDBPolicy::useHdfs()) insertSelect = true; rc = colOp->allocRowId(txnid, bUseStartExtent, curCol, (uint64_t)totalRow, rowIdArray, hwm, newExtent, rowsLeft, newHwm, newFile, newColStructList, newDctnryStructList, dbRootExtentTrackers, insertSelect, true, tableOid, isFirstBatchPm, NULL); // cout << "after allocrowid, total row = " < 256K. // if totalRow == rowsLeft, then not adding rows to 1st extent, so skip it. //-------------------------------------------------------------------------- // DMC-SHARED_NOTHING_NOTE: Is it safe to assume only part0 seg0 is abbreviated? if ((curCol.dataFile.fPartition == 0) && (curCol.dataFile.fSegment == 0) && ((totalRow - rowsLeft) > 0) && (rowIdArray[totalRow - rowsLeft - 1] >= (RID)INITIAL_EXTENT_ROWS_TO_DISK)) { for (size_t k = 0; k < colStructList.size(); k++) { // Skip the selected column if (k == (size_t)colId) continue; Column expandCol; colOp = m_colOp[op(colStructList[k].fCompressionType)]; // Shouldn't we change 0 to colId here? colOp->setColParam(expandCol, 0, colStructList[k].colWidth, colStructList[k].colDataType, colStructList[k].colType, colStructList[k].dataOid, colStructList[k].fCompressionType, colStructList[k].fColDbRoot, colStructList[k].fColPartition, colStructList[k].fColSegment); colOp->findTypeHandler(colStructList[k].colWidth, colStructList[k].colDataType); rc = colOp->openColumnFile(expandCol, segFile, true); // @bug 5572 HDFS tmp file if (rc == NO_ERROR) { if (colOp->abbreviatedExtent(expandCol.dataFile.pFile, colStructList[k].colWidth)) { rc = colOp->expandAbbrevExtent(expandCol); } } if (rc != NO_ERROR) { return rc; } colOp->closeColumnFile(expandCol); } } //-------------------------------------------------------------------------- // Tokenize data if needed //-------------------------------------------------------------------------- if (insertSelect && isAutoCommitOn) BRMWrapper::setUseVb(false); else BRMWrapper::setUseVb(true); dictStr::iterator dctStr_iter; uint64_t* colValPtr; size_t rowsPerColumn = colValueList.size() / colStructList.size(); for (i = 0; i < colStructList.size(); i++) { if (colStructList[i].tokenFlag) { dctStr_iter = dictStrList[i].begin(); Dctnry* dctnry = m_dctnry[op(dctnryStructList[i].fCompressionType)]; rc = dctnry->openDctnry(dctnryStructList[i].dctnryOid, dctnryStructList[i].fColDbRoot, dctnryStructList[i].fColPartition, dctnryStructList[i].fColSegment, useTmpSuffix); // @bug 5572 HDFS tmp file if (rc != NO_ERROR) { return rc; } for (uint32_t rows = 0; rows < (totalRow - rowsLeft); rows++) { colValPtr = &colValueList[(i * rowsPerColumn) + rows]; if (dctStr_iter->isNull()) { Token nullToken; memcpy(colValPtr, &nullToken, 8); } else { #ifdef PROFILE timer.start("tokenize"); #endif DctnryTuple dctTuple; dctTuple.sigValue = (unsigned char*)dctStr_iter->str(); dctTuple.sigSize = dctStr_iter->length(); dctTuple.isNull = false; rc = tokenize(txnid, dctTuple, dctnryStructList[i].fCompressionType); if (rc != NO_ERROR) { dctnry->closeDctnry(); return rc; } #ifdef PROFILE timer.stop("tokenize"); #endif memcpy(colValPtr, &dctTuple.token, 8); dictLbids.push_back(dctTuple.token.fbo); } dctStr_iter++; } // close dictionary files rc = dctnry->closeDctnry(false); if (rc != NO_ERROR) return rc; if (newExtent) { //@Bug 4854 back up hwm chunk for the file to be modified if (fRBMetaWriter) fRBMetaWriter->backupDctnryHWMChunk( newDctnryStructList[i].dctnryOid, newDctnryStructList[i].fColDbRoot, newDctnryStructList[i].fColPartition, newDctnryStructList[i].fColSegment); rc = dctnry->openDctnry(newDctnryStructList[i].dctnryOid, newDctnryStructList[i].fColDbRoot, newDctnryStructList[i].fColPartition, newDctnryStructList[i].fColSegment, false); // @bug 5572 HDFS tmp file if (rc != NO_ERROR) return rc; for (uint32_t rows = 0; rows < rowsLeft; rows++) { colValPtr = &colValueList[(i * rowsPerColumn) + rows]; if (dctStr_iter->isNull()) { Token nullToken; memcpy(colValPtr, &nullToken, 8); } else { #ifdef PROFILE timer.start("tokenize"); #endif DctnryTuple dctTuple; dctTuple.sigValue = (unsigned char*)dctStr_iter->str(); dctTuple.sigSize = dctStr_iter->length(); dctTuple.isNull = false; rc = tokenize(txnid, dctTuple, newDctnryStructList[i].fCompressionType); if (rc != NO_ERROR) { dctnry->closeDctnry(); return rc; } #ifdef PROFILE timer.stop("tokenize"); #endif memcpy(colValPtr, &dctTuple.token, 8); dictLbids.push_back(dctTuple.token.fbo); } dctStr_iter++; } // close dictionary files rc = dctnry->closeDctnry(false); if (rc != NO_ERROR) return rc; } } } if (insertSelect && isAutoCommitOn) BRMWrapper::setUseVb(false); else BRMWrapper::setUseVb(true); //-------------------------------------------------------------------------- // Update column info structure @Bug 1862 set hwm, and // Prepare ValueList for new extent (if applicable) //-------------------------------------------------------------------------- //@Bug 2205 Check whether all rows go to the new extent RID lastRid = 0; RID lastRidNew = 0; if (totalRow - rowsLeft > 0) { lastRid = rowIdArray[totalRow - rowsLeft - 1]; lastRidNew = rowIdArray[totalRow - 1]; } else { lastRid = 0; lastRidNew = rowIdArray[totalRow - 1]; } // cout << "rowid allocated is " << lastRid << endl; // if a new extent is created, all the columns in this table should have their own new extent // First column already processed //@Bug 1701. Close the file (if uncompressed) m_colOp[op(curCol.compressionType)]->closeColumnFile(curCol); // cout << "Saving hwm info for new ext batch" << endl; // Update hwm to set them in the end bool succFlag = false; unsigned colWidth = 0; int curFbo = 0, curBio; for (i = 0; i < totalColumns; i++) { // shoud be obtained from saved hwm aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[i].dataOid); it = aColExtsInfo.begin(); while (it != aColExtsInfo.end()) { if ((it->dbRoot == colStructList[i].fColDbRoot) && (it->partNum == colStructList[i].fColPartition) && (it->segNum == colStructList[i].fColSegment) && it->current) break; it++; } if (it != aColExtsInfo.end()) // update hwm info { oldHwm = it->hwm; // save hwm for the old extent colWidth = colStructList[i].colWidth; succFlag = colOp->calculateRowId(lastRid, BYTE_PER_BLOCK / colWidth, colWidth, curFbo, curBio); // cout << "insertcolumnrec oid:rid:fbo:oldhwm = " << colStructList[i].dataOid << ":" << lastRid << // ":" << curFbo << ":" << oldHwm << endl; if (succFlag) { if ((HWM)curFbo >= oldHwm) { it->hwm = (HWM)curFbo; } //@Bug 4947. set current to false for old extent. if (newExtent) { it->current = false; } // cout << "updated old ext info for oid " << colStructList[i].dataOid << " // dbroot:part:seg:hwm:current = " //<< it->dbRoot<<":"<partNum<<":"<segNum<<":"<hwm<<":"<< it->current<< " and newExtent //is " << newExtent << endl; } else return ERR_INVALID_PARAM; } // update hwm for the new extent if (newExtent) { it = aColExtsInfo.begin(); while (it != aColExtsInfo.end()) { if ((it->dbRoot == newColStructList[i].fColDbRoot) && (it->partNum == newColStructList[i].fColPartition) && (it->segNum == newColStructList[i].fColSegment) && it->current) break; it++; } colWidth = newColStructList[i].colWidth; succFlag = colOp->calculateRowId(lastRidNew, BYTE_PER_BLOCK / colWidth, colWidth, curFbo, curBio); if (succFlag) { if (it != aColExtsInfo.end()) { it->hwm = (HWM)curFbo; // cout << "setting hwm to " << (int)curFbo <<" for seg " <segNum << endl; it->current = true; } } else return ERR_INVALID_PARAM; } tableMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo); } //-------------------------------------------------------------------------- // Prepare the valuelist for the new extent //-------------------------------------------------------------------------- for (unsigned i = 1; i <= totalColumns; i++) { // Copy values to second value list for (uint64_t j = rowsLeft; j > 0; j--) { colNewValueList.push_back(colValueList[(totalRow * i) - j]); } } // end of allocate row id #ifdef PROFILE timer.start("writeColumnRec"); #endif // cout << "Writing column record" << endl; if (rc == NO_ERROR) { //---------------------------------------------------------------------- // Mark extents invalid //---------------------------------------------------------------------- bool successFlag = true; unsigned width = 0; int curFbo = 0, curBio, lastFbo = -1; if (isFirstBatchPm && (totalRow == rowsLeft)) { } else { for (unsigned i = 0; i < colStructList.size(); i++) { colOp = m_colOp[op(colStructList[i].fCompressionType)]; width = colStructList[i].colWidth; successFlag = colOp->calculateRowId(lastRid, BYTE_PER_BLOCK / width, width, curFbo, curBio); if (successFlag) { if (curFbo != lastFbo) { RETURN_ON_ERROR(AddLBIDtoList(txnid, colStructList[i], curFbo)); } } else return ERR_INVALID_PARAM; } } // If we create a new extent for this batch for (unsigned i = 0; i < newColStructList.size(); i++) { colOp = m_colOp[op(newColStructList[i].fCompressionType)]; width = newColStructList[i].colWidth; successFlag = colOp->calculateRowId(lastRidNew, BYTE_PER_BLOCK / width, width, curFbo, curBio); if (successFlag) { if (curFbo != lastFbo) { RETURN_ON_ERROR(AddLBIDtoList(txnid, newColStructList[i], curFbo)); } } else return ERR_INVALID_PARAM; } markTxnExtentsAsInvalid(txnid); //---------------------------------------------------------------------- // Write row(s) to database file(s) //---------------------------------------------------------------------- bool versioning = !(isAutoCommitOn && insertSelect); AddDictToList(txnid, dictLbids); rc = writeColumnRecBinary(txnid, colStructList, colValueList, rowIdArray, newColStructList, colNewValueList, tableOid, useTmpSuffix, versioning); // @bug 5572 HDFS tmp file } return rc; } int WriteEngineWrapper::insertColumnRec_SYS(const TxnID& txnid, const CSCTypesList& cscColTypeList, ColStructList& colStructList, ColValueList& colValueList, DctnryStructList& dctnryStructList, DictStrList& dictStrList, const int32_t tableOid) { int rc; RID* rowIdArray = NULL; ColTupleList curTupleList; Column curCol; ColStruct curColStruct; ColValueList colOldValueList; ColValueList colNewValueList; ColStructList newColStructList; DctnryStructList newDctnryStructList; HWM hwm = 0; HWM newHwm = 0; HWM oldHwm = 0; ColTupleList::size_type totalRow; ColStructList::size_type totalColumns; uint64_t rowsLeft = 0; bool newExtent = false; RIDList ridList; ColumnOp* colOp = NULL; uint32_t i = 0; #ifdef PROFILE StopWatch timer; #endif m_opType = INSERT; // debug information for testing if (isDebug(DEBUG_2)) { printInputValue(colStructList, colValueList, ridList, dctnryStructList, dictStrList); } // end // Convert data type and column width to write engine specific for (i = 0; i < colStructList.size(); i++) Convertor::convertColType(&colStructList[i]); rc = checkValid(txnid, colStructList, colValueList, ridList); if (rc != NO_ERROR) return rc; setTransId(txnid); curTupleList = static_cast(colValueList[0]); totalRow = curTupleList.size(); totalColumns = colStructList.size(); rowIdArray = new RID[totalRow]; // use scoped_array to ensure ptr deletion regardless of where we return boost::scoped_array rowIdArrayPtr(rowIdArray); memset(rowIdArray, 0, (sizeof(RID) * totalRow)); // allocate row id(s) curColStruct = colStructList[0]; colOp = m_colOp[op(curColStruct.fCompressionType)]; colOp->initColumn(curCol); // Get the correct segment, partition, column file uint16_t dbRoot, segmentNum; uint32_t partitionNum; vector colExtentInfo; // Save those empty extents in case of failure to rollback vector dictExtentInfo; // Save those empty extents in case of failure to rollback vector fileInfo; ExtentInfo info; // Don't search for empty space, always append to the end. May need to revisit this part dbRoot = curColStruct.fColDbRoot; int extState; bool extFound; RETURN_ON_ERROR(BRMWrapper::getInstance()->getLastHWM_DBroot(curColStruct.dataOid, dbRoot, partitionNum, segmentNum, hwm, extState, extFound)); for (i = 0; i < colStructList.size(); i++) { colStructList[i].fColPartition = partitionNum; colStructList[i].fColSegment = segmentNum; colStructList[i].fColDbRoot = dbRoot; } oldHwm = hwm; // Save this info for rollback // need to pass real dbRoot, partition, and segment to setColParam colOp->setColParam(curCol, 0, curColStruct.colWidth, curColStruct.colDataType, curColStruct.colType, curColStruct.dataOid, curColStruct.fCompressionType, dbRoot, partitionNum, segmentNum); colOp->findTypeHandler(curColStruct.colWidth, curColStruct.colDataType); string segFile; rc = colOp->openColumnFile(curCol, segFile, false); // @bug 5572 HDFS tmp file if (rc != NO_ERROR) { return rc; } // get hwm first // @bug 286 : fix for bug 286 - correct the typo in getHWM // RETURN_ON_ERROR(BRMWrapper::getInstance()->getHWM(curColStruct.dataOid, hwm)); //...Note that we are casting totalRow to int to be in sync with //...allocRowId(). So we are assuming that totalRow //...(curTupleList.size()) will fit into an int. We arleady made //...that assumption earlier in this method when we used totalRow //...in the call to calloc() to allocate rowIdArray. Column newCol; bool newFile; #ifdef PROFILE timer.start("allocRowId"); #endif newColStructList = colStructList; newDctnryStructList = dctnryStructList; std::vector > dbRootExtentTrackers; bool bUseStartExtent = true; rc = colOp->allocRowId(txnid, bUseStartExtent, curCol, (uint64_t)totalRow, rowIdArray, hwm, newExtent, rowsLeft, newHwm, newFile, newColStructList, newDctnryStructList, dbRootExtentTrackers, false, false, 0, false, NULL); if ((rc == ERR_FILE_DISK_SPACE) && newExtent) { for (i = 0; i < newColStructList.size(); i++) { info.oid = newColStructList[i].dataOid; info.partitionNum = newColStructList[i].fColPartition; info.segmentNum = newColStructList[i].fColSegment; info.dbRoot = newColStructList[i].fColDbRoot; if (newFile) fileInfo.push_back(info); colExtentInfo.push_back(info); } int rc1 = BRMWrapper::getInstance()->deleteEmptyColExtents(colExtentInfo); if ((rc1 == 0) && newFile) { rc1 = colOp->deleteFile(fileInfo[0].oid, fileInfo[0].dbRoot, fileInfo[0].partitionNum, fileInfo[0].segmentNum); if (rc1 != NO_ERROR) return rc; FileOp fileOp; for (i = 0; i < newDctnryStructList.size(); i++) { if (newDctnryStructList[i].dctnryOid > 0) { info.oid = newDctnryStructList[i].dctnryOid; info.partitionNum = newDctnryStructList[i].fColPartition; info.segmentNum = newDctnryStructList[i].fColSegment; info.dbRoot = newDctnryStructList[i].fColDbRoot; info.newFile = true; fileInfo.push_back(info); dictExtentInfo.push_back(info); } } if (dictExtentInfo.size() > 0) { rc1 = BRMWrapper::getInstance()->deleteEmptyDictStoreExtents(dictExtentInfo); if (rc1 != NO_ERROR) return rc; for (unsigned j = 0; j < fileInfo.size(); j++) { rc1 = fileOp.deleteFile(fileInfo[j].oid, fileInfo[j].dbRoot, fileInfo[j].partitionNum, fileInfo[j].segmentNum); } } } } TableMetaData* aTableMetaData = TableMetaData::makeTableMetaData(tableOid); //..Expand initial abbreviated extent if any RID in 1st extent is > 256K // DMC-SHARED_NOTHING_NOTE: Is it safe to assume only part0 seg0 is abbreviated? if ((partitionNum == 0) && (segmentNum == 0) && ((totalRow - rowsLeft) > 0) && (rowIdArray[totalRow - rowsLeft - 1] >= (RID)INITIAL_EXTENT_ROWS_TO_DISK)) { for (size_t k = 1; k < colStructList.size(); k++) { Column expandCol; colOp = m_colOp[op(colStructList[k].fCompressionType)]; colOp->setColParam(expandCol, 0, colStructList[k].colWidth, colStructList[k].colDataType, colStructList[k].colType, colStructList[k].dataOid, colStructList[k].fCompressionType, dbRoot, partitionNum, segmentNum); colOp->findTypeHandler(colStructList[k].colWidth, colStructList[k].colDataType); rc = colOp->openColumnFile(expandCol, segFile, false); // @bug 5572 HDFS tmp file if (rc == NO_ERROR) { if (colOp->abbreviatedExtent(expandCol.dataFile.pFile, colStructList[k].colWidth)) { rc = colOp->expandAbbrevExtent(expandCol); } } if (rc != NO_ERROR) { if (newExtent) { // Remove the empty extent added to the first column int rc1 = BRMWrapper::getInstance()->deleteEmptyColExtents(colExtentInfo); if ((rc1 == 0) && newFile) { rc1 = colOp->deleteFile(fileInfo[0].oid, fileInfo[0].dbRoot, fileInfo[0].partitionNum, fileInfo[0].segmentNum); } } colOp->clearColumn(expandCol); // closes the file return rc; } colOp->clearColumn(expandCol); // closes the file } } BRMWrapper::setUseVb(true); // Tokenize data if needed dictStr::iterator dctStr_iter; ColTupleList::iterator col_iter; for (i = 0; i < colStructList.size(); i++) { if (colStructList[i].tokenFlag) { dctStr_iter = dictStrList[i].begin(); col_iter = colValueList[i].begin(); Dctnry* dctnry = m_dctnry[op(dctnryStructList[i].fCompressionType)]; dctnryStructList[i].fColPartition = partitionNum; dctnryStructList[i].fColSegment = segmentNum; dctnryStructList[i].fColDbRoot = dbRoot; rc = dctnry->openDctnry(dctnryStructList[i].dctnryOid, dctnryStructList[i].fColDbRoot, dctnryStructList[i].fColPartition, dctnryStructList[i].fColSegment, false); // @bug 5572 HDFS tmp file if (rc != NO_ERROR) return rc; ColExtsInfo aColExtsInfo = aTableMetaData->getColExtsInfo(dctnryStructList[i].dctnryOid); ColExtsInfo::iterator it = aColExtsInfo.begin(); while (it != aColExtsInfo.end()) { if ((it->dbRoot == dctnryStructList[i].fColDbRoot) && (it->partNum == dctnryStructList[i].fColPartition) && (it->segNum == dctnryStructList[i].fColSegment)) break; it++; } if (it == aColExtsInfo.end()) // add this one to the list { ColExtInfo aExt; aExt.dbRoot = dctnryStructList[i].fColDbRoot; aExt.partNum = dctnryStructList[i].fColPartition; aExt.segNum = dctnryStructList[i].fColSegment; aExt.compType = dctnryStructList[i].fCompressionType; aExt.isDict = true; aColExtsInfo.push_back(aExt); aTableMetaData->setColExtsInfo(dctnryStructList[i].dctnryOid, aColExtsInfo); } for (uint32_t rows = 0; rows < (totalRow - rowsLeft); rows++) { if (dctStr_iter->isNull()) { Token nullToken; col_iter->data = nullToken; } else { #ifdef PROFILE timer.start("tokenize"); #endif DctnryTuple dctTuple; dctTuple.sigValue = (unsigned char*)dctStr_iter->str(); dctTuple.sigSize = dctStr_iter->length(); dctTuple.isNull = false; rc = tokenize(txnid, dctTuple, dctnryStructList[i].fCompressionType); if (rc != NO_ERROR) { dctnry->closeDctnry(); return rc; } #ifdef PROFILE timer.stop("tokenize"); #endif col_iter->data = dctTuple.token; } dctStr_iter++; col_iter++; } // close dictionary files rc = dctnry->closeDctnry(); if (rc != NO_ERROR) return rc; if (newExtent) { rc = dctnry->openDctnry(newDctnryStructList[i].dctnryOid, newDctnryStructList[i].fColDbRoot, newDctnryStructList[i].fColPartition, newDctnryStructList[i].fColSegment, false); // @bug 5572 HDFS tmp file if (rc != NO_ERROR) return rc; aColExtsInfo = aTableMetaData->getColExtsInfo(newDctnryStructList[i].dctnryOid); it = aColExtsInfo.begin(); while (it != aColExtsInfo.end()) { if ((it->dbRoot == newDctnryStructList[i].fColDbRoot) && (it->partNum == newDctnryStructList[i].fColPartition) && (it->segNum == newDctnryStructList[i].fColSegment)) break; it++; } if (it == aColExtsInfo.end()) // add this one to the list { ColExtInfo aExt; aExt.dbRoot = newDctnryStructList[i].fColDbRoot; aExt.partNum = newDctnryStructList[i].fColPartition; aExt.segNum = newDctnryStructList[i].fColSegment; aExt.compType = newDctnryStructList[i].fCompressionType; aExt.isDict = true; aColExtsInfo.push_back(aExt); aTableMetaData->setColExtsInfo(newDctnryStructList[i].dctnryOid, aColExtsInfo); } for (uint32_t rows = 0; rows < rowsLeft; rows++) { if (dctStr_iter->isNull()) { Token nullToken; col_iter->data = nullToken; } else { #ifdef PROFILE timer.start("tokenize"); #endif DctnryTuple dctTuple; dctTuple.sigValue = (unsigned char*)dctStr_iter->str(); dctTuple.sigSize = dctStr_iter->length(); dctTuple.isNull = false; rc = tokenize(txnid, dctTuple, newDctnryStructList[i].fCompressionType); if (rc != NO_ERROR) { dctnry->closeDctnry(); return rc; } #ifdef PROFILE timer.stop("tokenize"); #endif col_iter->data = dctTuple.token; } dctStr_iter++; col_iter++; } // close dictionary files rc = dctnry->closeDctnry(); if (rc != NO_ERROR) return rc; } } } // Update column info structure @Bug 1862 set hwm //@Bug 2205 Check whether all rows go to the new extent RID lastRid = 0; RID lastRidNew = 0; if (totalRow - rowsLeft > 0) { lastRid = rowIdArray[totalRow - rowsLeft - 1]; lastRidNew = rowIdArray[totalRow - 1]; } else { lastRid = 0; lastRidNew = rowIdArray[totalRow - 1]; } // cout << "rowid allocated is " << lastRid << endl; // if a new extent is created, all the columns in this table should have their own new extent //@Bug 1701. Close the file m_colOp[op(curCol.compressionType)]->clearColumn(curCol); std::vector hwmVecNewext; std::vector hwmVecOldext; if (newExtent) // Save all hwms to set them later. { BulkSetHWMArg aHwmEntryNew; BulkSetHWMArg aHwmEntryOld; bool succFlag = false; unsigned colWidth = 0; int extState; bool extFound; int curFbo = 0, curBio; for (i = 0; i < totalColumns; i++) { Column curColLocal; colOp->initColumn(curColLocal); colOp = m_colOp[op(newColStructList[i].fCompressionType)]; colOp->setColParam(curColLocal, 0, newColStructList[i].colWidth, newColStructList[i].colDataType, newColStructList[i].colType, newColStructList[i].dataOid, newColStructList[i].fCompressionType, dbRoot, partitionNum, segmentNum); colOp->findTypeHandler(newColStructList[i].colWidth, newColStructList[i].colDataType); rc = BRMWrapper::getInstance()->getLastHWM_DBroot(curColLocal.dataFile.fid, dbRoot, partitionNum, segmentNum, oldHwm, extState, extFound); info.oid = curColLocal.dataFile.fid; info.partitionNum = partitionNum; info.segmentNum = segmentNum; info.dbRoot = dbRoot; info.hwm = oldHwm; colExtentInfo.push_back(info); // @Bug 2714 need to set hwm for the old extent colWidth = colStructList[i].colWidth; succFlag = colOp->calculateRowId(lastRid, BYTE_PER_BLOCK / colWidth, colWidth, curFbo, curBio); // cout << "insertcolumnrec oid:rid:fbo:hwm = " << colStructList[i].dataOid << ":" << lastRid << ":" // << curFbo << ":" << hwm << endl; if (succFlag) { if ((HWM)curFbo > oldHwm) { aHwmEntryOld.oid = colStructList[i].dataOid; aHwmEntryOld.partNum = partitionNum; aHwmEntryOld.segNum = segmentNum; aHwmEntryOld.hwm = curFbo; hwmVecOldext.push_back(aHwmEntryOld); } } else return ERR_INVALID_PARAM; colWidth = newColStructList[i].colWidth; succFlag = colOp->calculateRowId(lastRidNew, BYTE_PER_BLOCK / colWidth, colWidth, curFbo, curBio); if (succFlag) { aHwmEntryNew.oid = newColStructList[i].dataOid; aHwmEntryNew.partNum = newColStructList[i].fColPartition; aHwmEntryNew.segNum = newColStructList[i].fColSegment; aHwmEntryNew.hwm = curFbo; hwmVecNewext.push_back(aHwmEntryNew); } m_colOp[op(curColLocal.compressionType)]->clearColumn(curColLocal); } // Prepare the valuelist for the new extent ColTupleList colTupleList; ColTupleList newColTupleList; ColTupleList firstPartTupleList; for (unsigned i = 0; i < totalColumns; i++) { colTupleList = static_cast(colValueList[i]); for (uint64_t j = rowsLeft; j > 0; j--) { newColTupleList.push_back(colTupleList[totalRow - j]); } colNewValueList.push_back(newColTupleList); newColTupleList.clear(); // upate the oldvalue list for the old extent for (uint64_t j = 0; j < (totalRow - rowsLeft); j++) { firstPartTupleList.push_back(colTupleList[j]); } colOldValueList.push_back(firstPartTupleList); firstPartTupleList.clear(); } } // Mark extents invalid bool successFlag = true; unsigned width = 0; int curFbo = 0, curBio, lastFbo = -1; if (totalRow - rowsLeft > 0) { for (unsigned i = 0; i < colStructList.size(); i++) { colOp = m_colOp[op(colStructList[i].fCompressionType)]; width = colStructList[i].colWidth; successFlag = colOp->calculateRowId(lastRid, BYTE_PER_BLOCK / width, width, curFbo, curBio); if (successFlag) { if (curFbo != lastFbo) { RETURN_ON_ERROR(AddLBIDtoList(txnid, colStructList[i], curFbo)); } } } } lastRid = rowIdArray[totalRow - 1]; for (unsigned i = 0; i < newColStructList.size(); i++) { colOp = m_colOp[op(newColStructList[i].fCompressionType)]; width = newColStructList[i].colWidth; successFlag = colOp->calculateRowId(lastRid, BYTE_PER_BLOCK / width, width, curFbo, curBio); if (successFlag) { if (curFbo != lastFbo) { RETURN_ON_ERROR(AddLBIDtoList(txnid, newColStructList[i], curFbo)); } } } // cout << "lbids size = " << lbids.size()<< endl; markTxnExtentsAsInvalid(txnid); if (rc == NO_ERROR) { // MCOL-66 The DBRM can't handle concurrent transactions to sys tables static boost::mutex dbrmMutex; boost::mutex::scoped_lock lk(dbrmMutex); if (newExtent) { rc = writeColumnRec(txnid, cscColTypeList, colStructList, colOldValueList, rowIdArray, newColStructList, colNewValueList, tableOid, false); // @bug 5572 HDFS tmp file } else { rc = writeColumnRec(txnid, cscColTypeList, colStructList, colValueList, rowIdArray, newColStructList, colNewValueList, tableOid, false); // @bug 5572 HDFS tmp file } } #ifdef PROFILE timer.stop("writeColumnRec"); #endif // for (ColTupleList::size_type i = 0; i < totalRow; i++) // ridList.push_back((RID) rowIdArray[i]); // if (rc == NO_ERROR) // rc = flushDataFiles(NO_ERROR); if (!newExtent) { // flushVMCache(); bool succFlag = false; unsigned colWidth = 0; int extState; bool extFound; int curFbo = 0, curBio; std::vector hwmVec; for (unsigned i = 0; i < totalColumns; i++) { // colOp = m_colOp[op(colStructList[i].fCompressionType)]; // Set all columns hwm together BulkSetHWMArg aHwmEntry; RETURN_ON_ERROR(BRMWrapper::getInstance()->getLastHWM_DBroot( colStructList[i].dataOid, dbRoot, partitionNum, segmentNum, hwm, extState, extFound)); colWidth = colStructList[i].colWidth; succFlag = colOp->calculateRowId(lastRid, BYTE_PER_BLOCK / colWidth, colWidth, curFbo, curBio); // cout << "insertcolumnrec oid:rid:fbo:hwm = " << colStructList[i].dataOid << ":" << lastRid << ":" // << curFbo << ":" << hwm << endl; if (succFlag) { if ((HWM)curFbo > hwm) { aHwmEntry.oid = colStructList[i].dataOid; aHwmEntry.partNum = partitionNum; aHwmEntry.segNum = segmentNum; aHwmEntry.hwm = curFbo; hwmVec.push_back(aHwmEntry); } } else return ERR_INVALID_PARAM; } if (hwmVec.size() > 0) { std::vector mergeCPDataArgs; RETURN_ON_ERROR(BRMWrapper::getInstance()->bulkSetHWMAndCP(hwmVec, mergeCPDataArgs)); } } if (newExtent) { #ifdef PROFILE timer.start("flushVMCache"); #endif std::vector mergeCPDataArgs; RETURN_ON_ERROR(BRMWrapper::getInstance()->bulkSetHWMAndCP(hwmVecNewext, mergeCPDataArgs)); RETURN_ON_ERROR(BRMWrapper::getInstance()->bulkSetHWMAndCP(hwmVecOldext, mergeCPDataArgs)); // flushVMCache(); #ifdef PROFILE timer.stop("flushVMCache"); #endif } #ifdef PROFILE timer.finish(); #endif return rc; } int WriteEngineWrapper::insertColumnRec_Single(const TxnID& txnid, const CSCTypesList& cscColTypeList, ColStructList& colStructList, ColValueList& colValueList, DctnryStructList& dctnryStructList, DictStrList& dictStrList, const int32_t tableOid) { int rc; RID* rowIdArray = NULL; ColTupleList curTupleList; Column curCol; ColStruct curColStruct; ColValueList colOldValueList; ColValueList colNewValueList; ColStructList newColStructList; DctnryStructList newDctnryStructList; HWM hwm = 0; HWM newHwm = 0; HWM oldHwm = 0; ColTupleList::size_type totalRow; ColStructList::size_type totalColumns; uint64_t rowsLeft = 0; bool newExtent = false; RIDList ridList; ColumnOp* colOp = NULL; uint32_t i = 0; #ifdef PROFILE StopWatch timer; #endif m_opType = INSERT; // debug information for testing if (isDebug(DEBUG_2)) { printInputValue(colStructList, colValueList, ridList, dctnryStructList, dictStrList); } // Convert data type and column width to write engine specific for (i = 0; i < colStructList.size(); i++) Convertor::convertColType(&colStructList[i]); uint32_t colId = 0; // MCOL-1675: find the smallest column width to calculate the RowID from so // that all HWMs will be incremented by this operation findSmallestColumn(colId, colStructList); rc = checkValid(txnid, colStructList, colValueList, ridList); if (rc != NO_ERROR) return rc; setTransId(txnid); curTupleList = static_cast(colValueList[0]); totalRow = curTupleList.size(); totalColumns = colStructList.size(); rowIdArray = new RID[totalRow]; // use scoped_array to ensure ptr deletion regardless of where we return boost::scoped_array rowIdArrayPtr(rowIdArray); memset(rowIdArray, 0, (sizeof(RID) * totalRow)); //-------------------------------------------------------------------------- // allocate row id(s) //-------------------------------------------------------------------------- curColStruct = colStructList[colId]; colOp = m_colOp[op(curColStruct.fCompressionType)]; colOp->initColumn(curCol); // Get the correct segment, partition, column file uint16_t dbRoot; uint16_t segmentNum = 0; uint32_t partitionNum = 0; // Don't search for empty space, always append to the end. May revisit later dbRoot = curColStruct.fColDbRoot; int extState; bool bStartExtFound; bool bUseStartExtent = false; RETURN_ON_ERROR(BRMWrapper::getInstance()->getLastHWM_DBroot(curColStruct.dataOid, dbRoot, partitionNum, segmentNum, hwm, extState, bStartExtFound)); if ((bStartExtFound) && (extState == BRM::EXTENTAVAILABLE)) bUseStartExtent = true; for (i = 0; i < colStructList.size(); i++) { colStructList[i].fColPartition = partitionNum; colStructList[i].fColSegment = segmentNum; colStructList[i].fColDbRoot = dbRoot; } for (i = 0; i < dctnryStructList.size(); i++) { dctnryStructList[i].fColPartition = partitionNum; dctnryStructList[i].fColSegment = segmentNum; dctnryStructList[i].fColDbRoot = dbRoot; } oldHwm = hwm; // Save this info for rollback // need to pass real dbRoot, partition, and segment to setColParam colOp->setColParam(curCol, colId, curColStruct.colWidth, curColStruct.colDataType, curColStruct.colType, curColStruct.dataOid, curColStruct.fCompressionType, dbRoot, partitionNum, segmentNum); colOp->findTypeHandler(curColStruct.colWidth, curColStruct.colDataType); string segFile; if (bUseStartExtent) { rc = colOp->openColumnFile(curCol, segFile, true); // @bug 5572 HDFS tmp file if (rc != NO_ERROR) { return rc; } } bool newFile; #ifdef PROFILE timer.start("allocRowId"); #endif newColStructList = colStructList; newDctnryStructList = dctnryStructList; std::vector > dbRootExtentTrackers; rc = colOp->allocRowId(txnid, bUseStartExtent, curCol, (uint64_t)totalRow, rowIdArray, hwm, newExtent, rowsLeft, newHwm, newFile, newColStructList, newDctnryStructList, dbRootExtentTrackers, false, false, 0, false, NULL); //-------------------------------------------------------------------------- // Handle case where we ran out of disk space allocating a new extent. // Rollback extentmap and delete any db files that were created. //-------------------------------------------------------------------------- if (rc != NO_ERROR) { if ((rc == ERR_FILE_DISK_SPACE) && newExtent) { vector colExtentInfo; vector dictExtentInfo; vector fileInfo; ExtentInfo info; for (i = 0; i < newColStructList.size(); i++) { info.oid = newColStructList[i].dataOid; info.partitionNum = newColStructList[i].fColPartition; info.segmentNum = newColStructList[i].fColSegment; info.dbRoot = newColStructList[i].fColDbRoot; if (newFile) fileInfo.push_back(info); colExtentInfo.push_back(info); } int rc1 = BRMWrapper::getInstance()->deleteEmptyColExtents(colExtentInfo); // Only rollback dictionary extents "if" store file is new if ((rc1 == 0) && newFile) { for (unsigned int j = 0; j < fileInfo.size(); j++) { // ignore return code and delete what we can rc1 = colOp->deleteFile(fileInfo[j].oid, fileInfo[j].dbRoot, fileInfo[j].partitionNum, fileInfo[j].segmentNum); } fileInfo.clear(); for (i = 0; i < newDctnryStructList.size(); i++) { if (newDctnryStructList[i].dctnryOid > 0) { info.oid = newDctnryStructList[i].dctnryOid; info.partitionNum = newDctnryStructList[i].fColPartition; info.segmentNum = newDctnryStructList[i].fColSegment; info.dbRoot = newDctnryStructList[i].fColDbRoot; info.newFile = true; fileInfo.push_back(info); dictExtentInfo.push_back(info); } } if (dictExtentInfo.size() > 0) { FileOp fileOp; rc1 = BRMWrapper::getInstance()->deleteEmptyDictStoreExtents(dictExtentInfo); if (rc1 != NO_ERROR) return rc; for (unsigned j = 0; j < fileInfo.size(); j++) { rc1 = fileOp.deleteFile(fileInfo[j].oid, fileInfo[j].dbRoot, fileInfo[j].partitionNum, fileInfo[j].segmentNum); } } } } // disk space error allocating new extent return rc; } // rc != NO_ERROR from call to allocRowID() #ifdef PROFILE timer.stop("allocRowId"); #endif TableMetaData* aTableMetaData = TableMetaData::makeTableMetaData(tableOid); //-------------------------------------------------------------------------- // Expand initial abbreviated extent if any RID in 1st extent is > 256K. // if totalRow == rowsLeft, then not adding rows to 1st extent, so skip it. //-------------------------------------------------------------------------- // DMC-SHARED_NOTHING_NOTE: Is it safe to assume only part0 seg0 is abbreviated? if ((colStructList[colId].fColPartition == 0) && (colStructList[colId].fColSegment == 0) && ((totalRow - rowsLeft) > 0) && (rowIdArray[totalRow - rowsLeft - 1] >= (RID)INITIAL_EXTENT_ROWS_TO_DISK)) { for (unsigned k = 0; k < colStructList.size(); k++) { if (k == colId) continue; Column expandCol; colOp = m_colOp[op(colStructList[k].fCompressionType)]; colOp->setColParam(expandCol, 0, colStructList[k].colWidth, colStructList[k].colDataType, colStructList[k].colType, colStructList[k].dataOid, colStructList[k].fCompressionType, colStructList[k].fColDbRoot, colStructList[k].fColPartition, colStructList[k].fColSegment); colOp->findTypeHandler(colStructList[k].colWidth, colStructList[k].colDataType); rc = colOp->openColumnFile(expandCol, segFile, true); // @bug 5572 HDFS tmp file if (rc == NO_ERROR) { if (colOp->abbreviatedExtent(expandCol.dataFile.pFile, colStructList[k].colWidth)) { rc = colOp->expandAbbrevExtent(expandCol); } } colOp->clearColumn(expandCol); // closes the file if (rc != NO_ERROR) { return rc; } } // loop through columns } // if starting extent needs to be expanded //-------------------------------------------------------------------------- // Tokenize data if needed //-------------------------------------------------------------------------- dictStr::iterator dctStr_iter; ColTupleList::iterator col_iter; for (unsigned i = 0; i < colStructList.size(); i++) { if (colStructList[i].tokenFlag) { dctStr_iter = dictStrList[i].begin(); col_iter = colValueList[i].begin(); Dctnry* dctnry = m_dctnry[op(dctnryStructList[i].fCompressionType)]; ColExtsInfo aColExtsInfo = aTableMetaData->getColExtsInfo(dctnryStructList[i].dctnryOid); ColExtsInfo::iterator it = aColExtsInfo.begin(); if (bUseStartExtent) { rc = dctnry->openDctnry(dctnryStructList[i].dctnryOid, dctnryStructList[i].fColDbRoot, dctnryStructList[i].fColPartition, dctnryStructList[i].fColSegment, true); // @bug 5572 HDFS tmp file if (rc != NO_ERROR) return rc; while (it != aColExtsInfo.end()) { if ((it->dbRoot == dctnryStructList[i].fColDbRoot) && (it->partNum == dctnryStructList[i].fColPartition) && (it->segNum == dctnryStructList[i].fColSegment)) break; it++; } if (it == aColExtsInfo.end()) // add this one to the list { ColExtInfo aExt; aExt.dbRoot = dctnryStructList[i].fColDbRoot; aExt.partNum = dctnryStructList[i].fColPartition; aExt.segNum = dctnryStructList[i].fColSegment; aExt.compType = dctnryStructList[i].fCompressionType; aExt.isDict = true; aColExtsInfo.push_back(aExt); aTableMetaData->setColExtsInfo(dctnryStructList[i].dctnryOid, aColExtsInfo); } for (uint32_t rows = 0; rows < (totalRow - rowsLeft); rows++) { if (dctStr_iter->isNull()) { Token nullToken; col_iter->data = nullToken; } else { #ifdef PROFILE timer.start("tokenize"); #endif DctnryTuple dctTuple; dctTuple.sigValue = (unsigned char*)dctStr_iter->str(); dctTuple.sigSize = dctStr_iter->length(); dctTuple.isNull = false; rc = tokenize(txnid, dctTuple, dctnryStructList[i].fCompressionType); if (rc != NO_ERROR) { dctnry->closeDctnry(); return rc; } #ifdef PROFILE timer.stop("tokenize"); #endif col_iter->data = dctTuple.token; } dctStr_iter++; col_iter++; } // close dictionary files rc = dctnry->closeDctnry(); if (rc != NO_ERROR) return rc; } // tokenize dictionary rows in 1st extent if (newExtent) { rc = dctnry->openDctnry(newDctnryStructList[i].dctnryOid, newDctnryStructList[i].fColDbRoot, newDctnryStructList[i].fColPartition, newDctnryStructList[i].fColSegment, false); // @bug 5572 HDFS tmp file if (rc != NO_ERROR) return rc; aColExtsInfo = aTableMetaData->getColExtsInfo(newDctnryStructList[i].dctnryOid); it = aColExtsInfo.begin(); while (it != aColExtsInfo.end()) { if ((it->dbRoot == newDctnryStructList[i].fColDbRoot) && (it->partNum == newDctnryStructList[i].fColPartition) && (it->segNum == newDctnryStructList[i].fColSegment)) break; it++; } if (it == aColExtsInfo.end()) // add this one to the list { ColExtInfo aExt; aExt.dbRoot = newDctnryStructList[i].fColDbRoot; aExt.partNum = newDctnryStructList[i].fColPartition; aExt.segNum = newDctnryStructList[i].fColSegment; aExt.compType = newDctnryStructList[i].fCompressionType; aExt.isDict = true; aColExtsInfo.push_back(aExt); aTableMetaData->setColExtsInfo(newDctnryStructList[i].dctnryOid, aColExtsInfo); } for (uint32_t rows = 0; rows < rowsLeft; rows++) { if (dctStr_iter->isNull()) { Token nullToken; col_iter->data = nullToken; } else { #ifdef PROFILE timer.start("tokenize"); #endif DctnryTuple dctTuple; dctTuple.sigValue = (unsigned char*)dctStr_iter->str(); dctTuple.sigSize = dctStr_iter->length(); dctTuple.isNull = false; rc = tokenize(txnid, dctTuple, newDctnryStructList[i].fCompressionType); if (rc != NO_ERROR) { dctnry->closeDctnry(); return rc; } #ifdef PROFILE timer.stop("tokenize"); #endif col_iter->data = dctTuple.token; } dctStr_iter++; col_iter++; } // close dictionary files rc = dctnry->closeDctnry(); if (rc != NO_ERROR) return rc; } // tokenize dictionary rows in second extent } // tokenize dictionary columns } // loop through columns to see which ones need tokenizing //---------------------------------------------------------------------- // Update column info structure @Bug 1862 set hwm, and // Prepare ValueList for new extent (if applicable) //---------------------------------------------------------------------- //@Bug 2205 Check whether all rows go to the new extent RID lastRid = 0; RID lastRidNew = 0; if (totalRow - rowsLeft > 0) { lastRid = rowIdArray[totalRow - rowsLeft - 1]; lastRidNew = rowIdArray[totalRow - 1]; } else { lastRid = 0; lastRidNew = rowIdArray[totalRow - 1]; } // if a new extent is created, all the columns in this table should // have their own new extent //@Bug 1701. Close the file if (bUseStartExtent) { m_colOp[op(curCol.compressionType)]->clearColumn(curCol); } std::vector hwmVecNewext; std::vector hwmVecOldext; if (newExtent) // Save all hwms to set them later. { BulkSetHWMArg aHwmEntryNew; BulkSetHWMArg aHwmEntryOld; bool succFlag = false; unsigned colWidth = 0; int curFbo = 0, curBio; for (i = 0; i < totalColumns; i++) { colOp = m_colOp[op(newColStructList[i].fCompressionType)]; // @Bug 2714 need to set hwm for the old extent colWidth = colStructList[i].colWidth; succFlag = colOp->calculateRowId(lastRid, BYTE_PER_BLOCK / colWidth, colWidth, curFbo, curBio); if (succFlag) { if ((HWM)curFbo > oldHwm) { aHwmEntryOld.oid = colStructList[i].dataOid; aHwmEntryOld.partNum = colStructList[i].fColPartition; aHwmEntryOld.segNum = colStructList[i].fColSegment; aHwmEntryOld.hwm = curFbo; hwmVecOldext.push_back(aHwmEntryOld); } } else return ERR_INVALID_PARAM; colWidth = newColStructList[i].colWidth; succFlag = colOp->calculateRowId(lastRidNew, BYTE_PER_BLOCK / colWidth, colWidth, curFbo, curBio); if (succFlag) { aHwmEntryNew.oid = newColStructList[i].dataOid; aHwmEntryNew.partNum = newColStructList[i].fColPartition; aHwmEntryNew.segNum = newColStructList[i].fColSegment; aHwmEntryNew.hwm = curFbo; hwmVecNewext.push_back(aHwmEntryNew); } } //---------------------------------------------------------------------- // Prepare the valuelist for the new extent //---------------------------------------------------------------------- ColTupleList colTupleList; ColTupleList newColTupleList; ColTupleList firstPartTupleList; for (unsigned i = 0; i < totalColumns; i++) { colTupleList = static_cast(colValueList[i]); for (uint64_t j = rowsLeft; j > 0; j--) { newColTupleList.push_back(colTupleList[totalRow - j]); } colNewValueList.push_back(newColTupleList); newColTupleList.clear(); // upate the oldvalue list for the old extent for (uint64_t j = 0; j < (totalRow - rowsLeft); j++) { firstPartTupleList.push_back(colTupleList[j]); } colOldValueList.push_back(firstPartTupleList); firstPartTupleList.clear(); } } //-------------------------------------------------------------------------- // Mark extents invalid //-------------------------------------------------------------------------- // WIP // Set min/max in dmlprocprocessor if aplicable vector lbids; vector colDataTypes; bool successFlag = true; unsigned width = 0; int curFbo = 0, curBio, lastFbo = -1; lastRid = rowIdArray[totalRow - 1]; for (unsigned i = 0; i < colStructList.size(); i++) { colOp = m_colOp[op(colStructList[i].fCompressionType)]; width = colStructList[i].colWidth; successFlag = colOp->calculateRowId(lastRid, BYTE_PER_BLOCK / width, width, curFbo, curBio); if (successFlag) { if (curFbo != lastFbo) { colDataTypes.push_back(colStructList[i].colDataType); RETURN_ON_ERROR(AddLBIDtoList(txnid, colStructList[i], curFbo)); } } } markTxnExtentsAsInvalid(txnid); //-------------------------------------------------------------------------- // Write row(s) to database file(s) //-------------------------------------------------------------------------- #ifdef PROFILE timer.start("writeColumnRec"); #endif if (rc == NO_ERROR) { if (newExtent) { rc = writeColumnRec(txnid, cscColTypeList, colStructList, colOldValueList, rowIdArray, newColStructList, colNewValueList, tableOid, false); // @bug 5572 HDFS tmp file } else { rc = writeColumnRec(txnid, cscColTypeList, colStructList, colValueList, rowIdArray, newColStructList, colNewValueList, tableOid, true); // @bug 5572 HDFS tmp file } } #ifdef PROFILE timer.stop("writeColumnRec"); #endif //-------------------------------------------------------------------------- // Update BRM //-------------------------------------------------------------------------- if (!newExtent) { bool succFlag = false; unsigned colWidth = 0; int extState; bool extFound; int curFbo = 0, curBio; std::vector hwmVec; for (unsigned i = 0; i < totalColumns; i++) { // Set all columns hwm together BulkSetHWMArg aHwmEntry; RETURN_ON_ERROR(BRMWrapper::getInstance()->getLastHWM_DBroot( colStructList[i].dataOid, colStructList[i].fColDbRoot, colStructList[i].fColPartition, colStructList[i].fColSegment, hwm, extState, extFound)); colWidth = colStructList[i].colWidth; succFlag = colOp->calculateRowId(lastRid, BYTE_PER_BLOCK / colWidth, colWidth, curFbo, curBio); // cout << "insertcolumnrec oid:rid:fbo:hwm = " << // colStructList[i].dataOid << ":" << lastRid << ":" << // curFbo << ":" << hwm << endl; if (succFlag) { if ((HWM)curFbo > hwm) { aHwmEntry.oid = colStructList[i].dataOid; aHwmEntry.partNum = colStructList[i].fColPartition; aHwmEntry.segNum = colStructList[i].fColSegment; aHwmEntry.hwm = curFbo; hwmVec.push_back(aHwmEntry); } } else return ERR_INVALID_PARAM; } std::vector mergeCPDataArgs; RETURN_ON_ERROR(BRMWrapper::getInstance()->bulkSetHWMAndCP(hwmVec, mergeCPDataArgs)); } else // if (newExtent) { #ifdef PROFILE timer.start("flushVMCache"); #endif std::vector mergeCPDataArgs; if (hwmVecNewext.size() > 0) RETURN_ON_ERROR(BRMWrapper::getInstance()->bulkSetHWMAndCP(hwmVecNewext, mergeCPDataArgs)); if (hwmVecOldext.size() > 0) RETURN_ON_ERROR(BRMWrapper::getInstance()->bulkSetHWMAndCP(hwmVecOldext, mergeCPDataArgs)); #ifdef PROFILE timer.stop("flushVMCache"); #endif } #ifdef PROFILE timer.finish(); #endif return rc; } /*@brief printInputValue - Print input value */ /*********************************************************** * DESCRIPTION: * Print input value * PARAMETERS: * tableOid - table object id * colStructList - column struct list * colValueList - column value list * ridList - RID list * RETURN: * none ***********************************************************/ void WriteEngineWrapper::printInputValue(const ColStructList& colStructList, const ColValueList& colValueList, const RIDList& ridList, const DctnryStructList& dctnryStructList, const DictStrList& dictStrList) const { ColTupleList curTupleList; ColStruct curColStruct; ColTuple curTuple; string curStr; ColStructList::size_type i; size_t j; OidToIdxMap oidToIdxMap; std::cerr << std::endl << "=========================" << std::endl; std::cerr << "Total RIDs: " << ridList.size() << std::endl; for (i = 0; i < ridList.size(); i++) std::cerr << "RID[" << i << "] : " << ridList[i] << std::endl; std::cerr << "Total Columns: " << colStructList.size() << std::endl; for (i = 0; i < colStructList.size(); i++) { curColStruct = colStructList[i]; curTupleList = colValueList[i]; if (curColStruct.tokenFlag) { oidToIdxMap.insert({curColStruct.dataOid, i}); continue; } std::cerr << "Column[" << i << "]"; std::cerr << "Data file OID : " << curColStruct.dataOid << "\t"; std::cerr << "Width : " << curColStruct.colWidth << "\t" << " Type: " << curColStruct.colDataType << std::endl; std::cerr << "Total values : " << curTupleList.size() << std::endl; for (j = 0; j < curTupleList.size(); j++) { curTuple = curTupleList[j]; try { if (curTuple.data.type() == typeid(int)) curStr = boost::lexical_cast(boost::any_cast(curTuple.data)); else if (curTuple.data.type() == typeid(unsigned int)) curStr = boost::lexical_cast(boost::any_cast(curTuple.data)); else if (curTuple.data.type() == typeid(float)) curStr = boost::lexical_cast(boost::any_cast(curTuple.data)); else if (curTuple.data.type() == typeid(long long)) curStr = boost::lexical_cast(boost::any_cast(curTuple.data)); else if (curTuple.data.type() == typeid(unsigned long long)) curStr = boost::lexical_cast(boost::any_cast(curTuple.data)); else if (curTuple.data.type() == typeid(int128_t)) curStr = datatypes::TSInt128(boost::any_cast(curTuple.data)).toString(); else if (curTuple.data.type() == typeid(double)) curStr = boost::lexical_cast(boost::any_cast(curTuple.data)); else if (curTuple.data.type() == typeid(short)) curStr = boost::lexical_cast(boost::any_cast(curTuple.data)); else if (curTuple.data.type() == typeid(unsigned short)) curStr = boost::lexical_cast(boost::any_cast(curTuple.data)); else if (curTuple.data.type() == typeid(char)) curStr = boost::lexical_cast(boost::any_cast(curTuple.data)); else curStr = boost::any_cast(curTuple.data); } catch (...) { } if (isDebug(DEBUG_3)) std::cerr << "Value[" << j << "]: " << curStr.c_str() << std::endl; } } for (i = 0; i < dctnryStructList.size(); ++i) { if (dctnryStructList[i].dctnryOid == 0) continue; std::cerr << "Dict[" << i << "]"; std::cerr << " file OID : " << dctnryStructList[i].dctnryOid << " Token file OID: " << dctnryStructList[i].columnOid << "\t"; std::cerr << "Width : " << dctnryStructList[i].colWidth << "\t" << " Type: " << dctnryStructList[i].fCompressionType << std::endl; std::cerr << "Total values : " << dictStrList.size() << std::endl; if (isDebug(DEBUG_3)) { for (j = 0; j < dictStrList[i].size(); ++j) { // We presume there will be a value. auto tokenOidIdx = oidToIdxMap[dctnryStructList[i].columnOid]; std::cerr << "string [" << dictStrList[i][j].safeString("<>") << "]" << std::endl; bool isToken = colStructList[tokenOidIdx].colType == WriteEngine::WR_TOKEN && colStructList[tokenOidIdx].tokenFlag; if (isToken && !colValueList[tokenOidIdx][j].data.empty()) { Token t = boost::any_cast(colValueList[tokenOidIdx][j].data); std::cerr << "Token: block pos:[" << t.op << "] fbo: [" << t.fbo << "] bc: [" << t.bc << "]" << std::endl; } } } } std::cerr << "=========================" << std::endl; } /*********************************************************** * DESCRIPTION: * Process version buffer before any write operation * PARAMETERS: * txnid - transaction id * oid - column oid * totalRow - total number of rows * rowIdArray - rowid array * RETURN: * NO_ERROR if success * others if something wrong in inserting the value ***********************************************************/ int WriteEngineWrapper::processVersionBuffer(IDBDataFile* pFile, const TxnID& txnid, const ColStruct& colStruct, int width, int totalRow, const RID* rowIdArray, vector& rangeList) { if (idbdatafile::IDBPolicy::useHdfs()) return 0; RID curRowId; int rc = NO_ERROR; int curFbo = 0, curBio, lastFbo = -1; bool successFlag; BRM::LBID_t lbid; BRM::VER_t verId = (BRM::VER_t)txnid; vector fboList; LBIDRange range; ColumnOp* colOp = m_colOp[op(colStruct.fCompressionType)]; for (int i = 0; i < totalRow; i++) { curRowId = rowIdArray[i]; // cout << "processVersionBuffer got rid " << curRowId << endl; successFlag = colOp->calculateRowId(curRowId, BYTE_PER_BLOCK / width, width, curFbo, curBio); if (successFlag) { if (curFbo != lastFbo) { // cout << "processVersionBuffer is processing lbid " << lbid << endl; RETURN_ON_ERROR(BRMWrapper::getInstance()->getBrmInfo(colStruct.dataOid, colStruct.fColPartition, colStruct.fColSegment, curFbo, lbid)); // cout << "processVersionBuffer is processing lbid " << lbid << endl; fboList.push_back((uint32_t)curFbo); range.start = lbid; range.size = 1; rangeList.push_back(range); } lastFbo = curFbo; } } std::vector freeList; rc = BRMWrapper::getInstance()->writeVB(pFile, verId, colStruct.dataOid, fboList, rangeList, colOp, freeList, colStruct.fColDbRoot); return rc; } int WriteEngineWrapper::processVersionBuffers(IDBDataFile* pFile, const TxnID& txnid, const ColStruct& colStruct, int width, int totalRow, const RIDList& ridList, vector& rangeList) { if (idbdatafile::IDBPolicy::useHdfs()) return 0; RID curRowId; int rc = NO_ERROR; int curFbo = 0, curBio, lastFbo = -1; bool successFlag; BRM::LBID_t lbid; BRM::VER_t verId = (BRM::VER_t)txnid; LBIDRange range; vector fboList; // vector rangeList; ColumnOp* colOp = m_colOp[op(colStruct.fCompressionType)]; for (int i = 0; i < totalRow; i++) { curRowId = ridList[i]; // cout << "processVersionBuffer got rid " << curRowId << endl; successFlag = colOp->calculateRowId(curRowId, BYTE_PER_BLOCK / width, width, curFbo, curBio); if (successFlag) { if (curFbo != lastFbo) { // cout << "processVersionBuffer is processing lbid " << lbid << endl; RETURN_ON_ERROR(BRMWrapper::getInstance()->getBrmInfo(colStruct.dataOid, colStruct.fColPartition, colStruct.fColSegment, curFbo, lbid)); // cout << "processVersionBuffer is processing lbid " << lbid << endl; fboList.push_back((uint32_t)curFbo); range.start = lbid; range.size = 1; rangeList.push_back(range); } lastFbo = curFbo; } } // cout << "calling writeVB with blocks " << rangeList.size() << endl; std::vector freeList; rc = BRMWrapper::getInstance()->writeVB(pFile, verId, colStruct.dataOid, fboList, rangeList, colOp, freeList, colStruct.fColDbRoot); return rc; } int WriteEngineWrapper::processBeginVBCopy(const TxnID& txnid, const vector& colStructList, const RIDList& ridList, std::vector& freeList, vector >& fboLists, vector >& rangeLists, vector& rangeListTot) { if (idbdatafile::IDBPolicy::useHdfs()) return 0; RID curRowId; int rc = NO_ERROR; int curFbo = 0, curBio, lastFbo = -1; bool successFlag; BRM::LBID_t lbid; LBIDRange range; // StopWatch timer; // timer.start("calculation"); for (uint32_t j = 0; j < colStructList.size(); j++) { vector fboList; vector rangeList; lastFbo = -1; ColumnOp* colOp = m_colOp[op(colStructList[j].fCompressionType)]; ColStruct curColStruct = colStructList[j]; Convertor::convertColType(&curColStruct); for (uint32_t i = 0; i < ridList.size(); i++) { curRowId = ridList[i]; // cout << "processVersionBuffer got rid " << curRowId << endl; successFlag = colOp->calculateRowId(curRowId, BYTE_PER_BLOCK / curColStruct.colWidth, curColStruct.colWidth, curFbo, curBio); if (successFlag) { if (curFbo != lastFbo) { // cout << "processVersionBuffer is processing curFbo " << curFbo << endl; RETURN_ON_ERROR(BRMWrapper::getInstance()->getBrmInfo(colStructList[j].dataOid, colStructList[j].fColPartition, colStructList[j].fColSegment, curFbo, lbid)); // cout << "beginVBCopy is processing lbid:transaction " << lbid <<":"<pruneLBIDList(txnid, &rangeList, &fboList); rangeLists.push_back(rangeList); fboLists.push_back(fboList); rangeListTot.insert(rangeListTot.end(), rangeList.begin(), rangeList.end()); } if (rangeListTot.size() > 0) rc = BRMWrapper::getInstance()->getDbrmObject()->beginVBCopy(txnid, colStructList[0].fColDbRoot, rangeListTot, freeList); // timer.stop("beginVBCopy"); // timer.finish(); return rc; } /** * @brief Process versioning for batch insert - only version the hwm block. */ #if 0 int WriteEngineWrapper::processBatchVersions(const TxnID& txnid, std::vector columns, std::vector& rangeList) { int rc = 0; std::vector fileOps; //open the column files for ( unsigned i = 0; i < columns.size(); i++) { ColumnOp* colOp = m_colOp[op(columns[i].compressionType)]; Column curCol; // set params colOp->initColumn(curCol); ColType colType; Convertor::convertColType(columns[i].colDataType, colType); colOp->setColParam(curCol, 0, columns[i].colWidth, columns[i].colDataType, colType, columns[i].dataFile.oid, columns[i].compressionType, columns[i].dataFile.fDbRoot, columns[i].dataFile.fPartition, columns[i].dataFile.fSegment); string segFile; rc = colOp->openColumnFile(curCol, segFile, IO_BUFF_SIZE); if (rc != NO_ERROR) break; columns[i].dataFile.pFile = curCol.dataFile.pFile; fileOps.push_back(colOp); } if ( rc == 0) { BRM::VER_t verId = (BRM::VER_t) txnid; rc = BRMWrapper::getInstance()->writeBatchVBs(verId, columns, rangeList, fileOps); } //close files for ( unsigned i = 0; i < columns.size(); i++) { ColumnOp* colOp = dynamic_cast (fileOps[i]); Column curCol; // set params colOp->initColumn(curCol); ColType colType; Convertor::convertColType(columns[i].colDataType, colType); colOp->setColParam(curCol, 0, columns[i].colWidth, columns[i].colDataType, colType, columns[i].dataFile.oid, columns[i].compressionType, columns[i].dataFile.fDbRoot, columns[i].dataFile.fPartition, columns[i].dataFile.fSegment); curCol.dataFile.pFile = columns[i].dataFile.pFile; colOp->clearColumn(curCol); } return rc; } #endif void WriteEngineWrapper::writeVBEnd(const TxnID& txnid, std::vector& rangeList) { if (idbdatafile::IDBPolicy::useHdfs()) return; BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); } int WriteEngineWrapper::updateColumnRec(const TxnID& txnid, const vector& colExtentsColType, vector& colExtentsStruct, ColValueList& colValueList, vector& colOldValueList, vector& ridLists, vector& dctnryExtentsStruct, DctnryValueList& dctnryValueList, const int32_t tableOid, bool hasAUXCol) { int rc = 0; unsigned numExtents = colExtentsStruct.size(); ColStructList colStructList; DctnryStructList dctnryStructList; WriteEngine::CSCTypesList cscColTypeList; ColumnOp* colOp = NULL; ExtCPInfoList infosToUpdate; for (unsigned extent = 0; extent < numExtents; extent++) { colStructList = colExtentsStruct[extent]; dctnryStructList = dctnryExtentsStruct[extent]; cscColTypeList = colExtentsColType[extent]; if (m_opType != DELETE) { // Tokenize data if needed vector tokenList; DctColTupleList::iterator dctCol_iter; ColTupleList::iterator col_iter; for (unsigned i = 0; i < colStructList.size(); i++) { if (colStructList[i].tokenFlag) { // only need to tokenize once dctCol_iter = dctnryValueList[i].begin(); Token token; if (!dctCol_iter->isNull) { RETURN_ON_ERROR_REPORT( tokenize(txnid, dctnryStructList[i], *dctCol_iter, true)); // @bug 5572 HDFS tmp file token = dctCol_iter->token; #ifdef PROFILE // timer.stop("tokenize"); #endif } tokenList.push_back(token); } } int dicPos = 0; for (unsigned i = 0; i < colStructList.size(); i++) { if (colStructList[i].tokenFlag) { // only need to tokenize once col_iter = colValueList[i].begin(); while (col_iter != colValueList[i].end()) { col_iter->data = tokenList[dicPos]; col_iter++; } dicPos++; } } } RIDList::iterator rid_iter; // Mark extents invalid bool successFlag = true; unsigned width = 0; int curFbo = 0, curBio, lastFbo = -1; rid_iter = ridLists[extent].begin(); RID aRid = *rid_iter; ExtCPInfoList currentExtentRanges; for (const auto& colStruct : colStructList) { currentExtentRanges.push_back( ExtCPInfo(colStruct.colDataType, colStruct.colWidth)); // temporary for each extent. } std::vector currentExtentRangesPtrs(colStructList.size(), NULL); // pointers for each extent. if (m_opType != DELETE) m_opType = UPDATE; for (unsigned j = 0; j < colStructList.size(); j++) { colOp = m_colOp[op(colStructList[j].fCompressionType)]; ExtCPInfo* cpInfoP = &(currentExtentRanges[j]); cpInfoP = getCPInfoToUpdateForUpdatableType(colStructList[j], cpInfoP, m_opType); currentExtentRangesPtrs[j] = cpInfoP; // XXX: highly dubious. // if (!colStructList[j].tokenFlag) // continue; width = colOp->getCorrectRowWidth(colStructList[j].colDataType, colStructList[j].colWidth); successFlag = colOp->calculateRowId(aRid, BYTE_PER_BLOCK / width, width, curFbo, curBio); if (successFlag) { if (curFbo != lastFbo) { RETURN_ON_ERROR_REPORT(AddLBIDtoList(txnid, colStructList[j], curFbo, cpInfoP)); } } } //#ifdef PROFILE // timer.start("markExtentsInvalid"); //#endif bool hasFastDelete = false; if (m_opType == DELETE && hasAUXCol) { hasFastDelete = Config::getFastDelete(); } if (hasFastDelete) { ColStructList colStructListAUX(1, colStructList.back()); WriteEngine::CSCTypesList cscColTypeListAUX(1, cscColTypeList.back()); ColValueList colValueListAUX(1, colValueList.back()); std::vector currentExtentRangesPtrsAUX(1, currentExtentRangesPtrs.back()); rc = writeColumnRecUpdate(txnid, cscColTypeListAUX, colStructListAUX, colValueListAUX, colOldValueList, ridLists[extent], tableOid, true, ridLists[extent].size(), ¤tExtentRangesPtrsAUX, hasAUXCol); for (auto& cpInfoPtr : currentExtentRangesPtrs) { if (cpInfoPtr) { cpInfoPtr->toInvalid(); } } } else { rc = writeColumnRecUpdate(txnid, cscColTypeList, colStructList, colValueList, colOldValueList, ridLists[extent], tableOid, true, ridLists[extent].size(), ¤tExtentRangesPtrs, hasAUXCol); } if (rc != NO_ERROR) break; // copy updated ranges into bulk update vector. for (auto cpInfoPtr : currentExtentRangesPtrs) { if (cpInfoPtr) { cpInfoPtr->fCPInfo.seqNum++; infosToUpdate.push_back(*cpInfoPtr); } } } markTxnExtentsAsInvalid(txnid); if (rc == NO_ERROR) { ExtCPInfoList infosToDrop = infosToUpdate; for (auto& cpInfo : infosToDrop) { cpInfo.fCPInfo.seqNum = SEQNUM_MARK_INVALID_SET_RANGE; } // ZZZZ rc = BRMWrapper::getInstance()->setExtentsMaxMin(infosToDrop); setInvalidCPInfosSpecialMarks(infosToUpdate); rc = BRMWrapper::getInstance()->setExtentsMaxMin(infosToUpdate); } return rc; } int WriteEngineWrapper::updateColumnRecs(const TxnID& txnid, const CSCTypesList& cscColTypeList, vector& colExtentsStruct, ColValueList& colValueList, const RIDList& ridLists, const int32_t tableOid) { // Mark extents invalid ColumnOp* colOp = NULL; bool successFlag = true; unsigned width = 0; int curFbo = 0, curBio, lastFbo = -1; RID aRid = ridLists[0]; int rc = 0; ExtCPInfoList infosToUpdate; for (const auto& colStruct : colExtentsStruct) { infosToUpdate.push_back(ExtCPInfo(colStruct.colDataType, colStruct.colWidth)); } ExtCPInfoList bulkUpdateInfos; std::vector pointersToInfos; // pointersToInfos[i] points to infosToUpdate[i] and may be NULL. m_opType = UPDATE; for (unsigned j = 0; j < colExtentsStruct.size(); j++) { colOp = m_colOp[op(colExtentsStruct[j].fCompressionType)]; ExtCPInfo* cpInfoP = &(infosToUpdate[j]); cpInfoP = getCPInfoToUpdateForUpdatableType(colExtentsStruct[j], cpInfoP, m_opType); pointersToInfos.push_back(cpInfoP); width = colOp->getCorrectRowWidth(colExtentsStruct[j].colDataType, colExtentsStruct[j].colWidth); successFlag = colOp->calculateRowId(aRid, BYTE_PER_BLOCK / width, width, curFbo, curBio); if (successFlag) { if (curFbo != lastFbo) { RETURN_ON_ERROR(AddLBIDtoList(txnid, colExtentsStruct[j], curFbo, cpInfoP)); } } } markTxnExtentsAsInvalid(txnid); if (m_opType != DELETE) m_opType = UPDATE; for (auto cpInfoP : pointersToInfos) { if (cpInfoP) { auto tmp = *cpInfoP; tmp.fCPInfo.seqNum = SEQNUM_MARK_INVALID_SET_RANGE; bulkUpdateInfos.push_back(tmp); } } if (!bulkUpdateInfos.empty()) { rc = BRMWrapper::getInstance()->setExtentsMaxMin(bulkUpdateInfos); } rc = writeColumnRecords(txnid, cscColTypeList, colExtentsStruct, colValueList, ridLists, tableOid, true, &pointersToInfos); if (rc == NO_ERROR) { bulkUpdateInfos.clear(); for (auto cpInfoP : pointersToInfos) { if (cpInfoP) { cpInfoP->fCPInfo.seqNum++; bulkUpdateInfos.push_back(*cpInfoP); } } if (!bulkUpdateInfos.empty()) { setInvalidCPInfosSpecialMarks(bulkUpdateInfos); rc = BRMWrapper::getInstance()->setExtentsMaxMin(bulkUpdateInfos); } } m_opType = NOOP; return rc; } int WriteEngineWrapper::writeColumnRecords(const TxnID& txnid, const CSCTypesList& cscColTypeList, vector& colStructList, ColValueList& colValueList, const RIDList& ridLists, const int32_t tableOid, bool versioning, std::vector* cpInfos) { bool bExcp; int rc = 0; void* valArray = NULL; void* oldValArray = NULL; Column curCol; ColStruct curColStruct; CalpontSystemCatalog::ColType curColType; ColTupleList curTupleList; ColStructList::size_type totalColumn; ColStructList::size_type i; ColTupleList::size_type totalRow; setTransId(txnid); totalColumn = colStructList.size(); totalRow = ridLists.size(); TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid); for (i = 0; i < totalColumn; i++) { ExtCPInfo* cpInfo = NULL; if (cpInfos) { cpInfo = (*cpInfos)[i]; } valArray = NULL; oldValArray = NULL; curColStruct = colStructList[i]; curColType = cscColTypeList[i]; curTupleList = colValueList[i]; ColumnOp* colOp = m_colOp[op(curColStruct.fCompressionType)]; Convertor::convertColType(&curColStruct); // set params colOp->initColumn(curCol); colOp->setColParam(curCol, 0, curColStruct.colWidth, curColStruct.colDataType, curColStruct.colType, curColStruct.dataOid, curColStruct.fCompressionType, curColStruct.fColDbRoot, curColStruct.fColPartition, curColStruct.fColSegment); colOp->findTypeHandler(curColStruct.colWidth, curColStruct.colDataType); ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(curColStruct.dataOid); ColExtsInfo::iterator it = aColExtsInfo.begin(); while (it != aColExtsInfo.end()) { if ((it->dbRoot == curColStruct.fColDbRoot) && (it->partNum == curColStruct.fColPartition) && (it->segNum == curColStruct.fColSegment)) break; it++; } if (it == aColExtsInfo.end()) // add this one to the list { ColExtInfo aExt; aExt.dbRoot = curColStruct.fColDbRoot; aExt.partNum = curColStruct.fColPartition; aExt.segNum = curColStruct.fColSegment; aExt.compType = curColStruct.fCompressionType; aExt.isDict = false; aColExtsInfo.push_back(aExt); aTbaleMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo); } string segFile; rc = colOp->openColumnFile(curCol, segFile, true); // @bug 5572 HDFS tmp file if (rc != NO_ERROR) break; vector rangeList; if (versioning) { rc = processVersionBuffers(curCol.dataFile.pFile, txnid, curColStruct, curColStruct.colWidth, totalRow, ridLists, rangeList); } if (rc != NO_ERROR) { if (curColStruct.fCompressionType == 0) { curCol.dataFile.pFile->flush(); } BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); break; } allocateValArray(valArray, totalRow, curColStruct.colType, curColStruct.colWidth); if (m_opType != INSERT && cpInfo) { allocateValArray(oldValArray, totalRow, curColStruct.colType, curColStruct.colWidth); } // convert values to valArray bExcp = false; try { convertValArray(totalRow, cscColTypeList[i], curColStruct.colType, curTupleList, valArray); } catch (...) { bExcp = true; } if (bExcp) { BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); return ERR_PARSING; } #ifdef PROFILE timer.start("writeRow "); #endif rc = colOp->writeRowsValues(curCol, totalRow, ridLists, valArray, oldValArray); #ifdef PROFILE timer.stop("writeRow "); #endif colOp->clearColumn(curCol); updateMaxMinRange(totalRow, totalRow, cscColTypeList[i], curColStruct.colType, valArray, oldValArray, cpInfo, false); if (curColStruct.fCompressionType == 0) { std::vector files; BRM::FileInfo aFile; aFile.partitionNum = curColStruct.fColPartition; aFile.dbRoot = curColStruct.fColDbRoot; ; aFile.segmentNum = curColStruct.fColSegment; aFile.compType = curColStruct.fCompressionType; files.push_back(aFile); if (idbdatafile::IDBPolicy::useHdfs()) cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID()); } BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); if (valArray != NULL) free(valArray); if (oldValArray != NULL) { free(oldValArray); } // check error if (rc != NO_ERROR) break; } return rc; } /*@brief writeColumnRec - Write values to a column */ /*********************************************************** * DESCRIPTION: * Write values to a column * PARAMETERS: * tableOid - table object id * cscColTypesList - CSC ColType list * colStructList - column struct list * colValueList - column value list * colNewStructList - the new extent struct list * colNewValueList - column value list for the new extent * rowIdArray - row id list * useTmpSuffix - use temp suffix for db output file * RETURN: * NO_ERROR if success * others if something wrong in inserting the value ***********************************************************/ int WriteEngineWrapper::writeColumnRec(const TxnID& txnid, const CSCTypesList& cscColTypeList, const ColStructList& colStructList, ColValueList& colValueList, RID* rowIdArray, const ColStructList& newColStructList, ColValueList& newColValueList, const int32_t tableOid, bool useTmpSuffix, bool versioning, ColSplitMaxMinInfoList* maxMins) { bool bExcp; int rc = 0; void* valArray; void* oldValArray; string segFile; Column curCol; ColTupleList oldTupleList; ColStructList::size_type totalColumn; ColStructList::size_type i; ColTupleList::size_type totalRow1, totalRow2; setTransId(txnid); totalColumn = colStructList.size(); #ifdef PROFILE StopWatch timer; #endif totalRow1 = colValueList[0].size(); totalRow2 = 0; if (newColValueList.size() > 0) totalRow2 = newColValueList[0].size(); TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid); for (i = 0; i < totalColumn; i++) { if (totalRow2 > 0) { RID* secondPart = rowIdArray + totalRow1; //@Bug 2205 Check if all rows go to the new extent if (totalRow1 > 0) { // Write the first batch valArray = NULL; oldValArray = NULL; RID* firstPart = rowIdArray; ColumnOp* colOp = m_colOp[op(colStructList[i].fCompressionType)]; // set params colOp->initColumn(curCol); // need to pass real dbRoot, partition, and segment to setColParam colOp->setColParam(curCol, 0, colStructList[i].colWidth, colStructList[i].colDataType, colStructList[i].colType, colStructList[i].dataOid, colStructList[i].fCompressionType, colStructList[i].fColDbRoot, colStructList[i].fColPartition, colStructList[i].fColSegment); colOp->findTypeHandler(colStructList[i].colWidth, colStructList[i].colDataType); ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(colStructList[i].dataOid); ColExtsInfo::iterator it = aColExtsInfo.begin(); while (it != aColExtsInfo.end()) { if ((it->dbRoot == colStructList[i].fColDbRoot) && (it->partNum == colStructList[i].fColPartition) && (it->segNum == colStructList[i].fColSegment)) break; it++; } if (it == aColExtsInfo.end()) // add this one to the list { ColExtInfo aExt; aExt.dbRoot = colStructList[i].fColDbRoot; aExt.partNum = colStructList[i].fColPartition; aExt.segNum = colStructList[i].fColSegment; aExt.compType = colStructList[i].fCompressionType; aColExtsInfo.push_back(aExt); aTbaleMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo); } rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix, IO_BUFF_SIZE); // @bug 5572 HDFS tmp file if (rc != NO_ERROR) break; // handling versioning vector rangeList; if (versioning) { rc = processVersionBuffer(curCol.dataFile.pFile, txnid, colStructList[i], colStructList[i].colWidth, totalRow1, firstPart, rangeList); if (rc != NO_ERROR) { if (colStructList[i].fCompressionType == 0) { curCol.dataFile.pFile->flush(); } BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); break; } } // WIP We can allocate based on column size and not colType // have to init the size here allocateValArray(valArray, totalRow1, colStructList[i].colType, colStructList[i].colWidth); ExtCPInfo* cpInfo = getCPInfoToUpdateForUpdatableType( colStructList[i], maxMins ? ((*maxMins)[i]).fSplitMaxMinInfoPtrs[0] : NULL, m_opType); if (m_opType != INSERT && cpInfo != NULL) // we allocate space for old values only when we need them. { allocateValArray(oldValArray, totalRow1, colStructList[i].colType, colStructList[i].colWidth); } else { oldValArray = NULL; } // convert values to valArray // WIP Is m_opType ever set to DELETE? if (m_opType != DELETE) { bExcp = false; try { // WIP We convert values twice!? // dmlcommandproc converts strings to boost::any and this converts // into actual type value masked by *void // It is not clear why we need to convert to boost::any b/c we can convert from the original // string here convertValArray(totalRow1, cscColTypeList[i], colStructList[i].colType, colValueList[i], valArray); } catch (...) { bExcp = true; } if (bExcp) { if (versioning) BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); return ERR_PARSING; } #ifdef PROFILE timer.start("writeRow "); #endif rc = colOp->writeRow(curCol, totalRow1, firstPart, valArray, oldValArray); #ifdef PROFILE timer.stop("writeRow "); #endif } else { #ifdef PROFILE timer.start("writeRow "); #endif rc = colOp->writeRow(curCol, totalRow1, rowIdArray, valArray, oldValArray, true); #ifdef PROFILE timer.stop("writeRow "); #endif } colOp->clearColumn(curCol); updateMaxMinRange(totalRow1, totalRow1, cscColTypeList[i], colStructList[i].colType, valArray, oldValArray, cpInfo, rowIdArray[0] == 0 && m_opType == INSERT); if (versioning) BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); if (valArray != NULL) free(valArray); if (oldValArray != NULL) free(oldValArray); // check error if (rc != NO_ERROR) break; } // Process the second batch valArray = NULL; oldValArray = NULL; ColumnOp* colOp = m_colOp[op(newColStructList[i].fCompressionType)]; // set params colOp->initColumn(curCol); colOp->setColParam(curCol, 0, newColStructList[i].colWidth, newColStructList[i].colDataType, newColStructList[i].colType, newColStructList[i].dataOid, newColStructList[i].fCompressionType, newColStructList[i].fColDbRoot, newColStructList[i].fColPartition, newColStructList[i].fColSegment); colOp->findTypeHandler(newColStructList[i].colWidth, newColStructList[i].colDataType); ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(newColStructList[i].dataOid); ColExtsInfo::iterator it = aColExtsInfo.begin(); while (it != aColExtsInfo.end()) { if ((it->dbRoot == newColStructList[i].fColDbRoot) && (it->partNum == newColStructList[i].fColPartition) && (it->segNum == newColStructList[i].fColSegment)) break; it++; } if (it == aColExtsInfo.end()) // add this one to the list { ColExtInfo aExt; aExt.dbRoot = newColStructList[i].fColDbRoot; aExt.partNum = newColStructList[i].fColPartition; aExt.segNum = newColStructList[i].fColSegment; aExt.compType = newColStructList[i].fCompressionType; aColExtsInfo.push_back(aExt); aTbaleMetaData->setColExtsInfo(newColStructList[i].dataOid, aColExtsInfo); } // Pass "false" for hdfs tmp file flag. Since we only allow 1 // extent per segment file (with HDFS), we can assume a second // extent is going to a new file (and won't need tmp file). rc = colOp->openColumnFile(curCol, segFile, false, IO_BUFF_SIZE); // @bug 5572 HDFS tmp file if (rc != NO_ERROR) break; // handling versioning vector rangeList; if (versioning) { rc = processVersionBuffer(curCol.dataFile.pFile, txnid, newColStructList[i], newColStructList[i].colWidth, totalRow2, secondPart, rangeList); if (rc != NO_ERROR) { if (newColStructList[i].fCompressionType == 0) { curCol.dataFile.pFile->flush(); } BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); break; } } ExtCPInfo* cpInfo = getCPInfoToUpdateForUpdatableType( newColStructList[i], maxMins ? ((*maxMins)[i]).fSplitMaxMinInfoPtrs[1] : NULL, m_opType); allocateValArray(valArray, totalRow2, newColStructList[i].colType, newColStructList[i].colWidth); if (m_opType != INSERT && cpInfo != NULL) // we allocate space for old values only when we need them. { allocateValArray(oldValArray, totalRow1, colStructList[i].colType, colStructList[i].colWidth); } else { oldValArray = NULL; } // convert values to valArray if (m_opType != DELETE) { bExcp = false; try { convertValArray(totalRow2, cscColTypeList[i], newColStructList[i].colType, newColValueList[i], valArray); } catch (...) { bExcp = true; } if (bExcp) { if (versioning) BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); return ERR_PARSING; } #ifdef PROFILE timer.start("writeRow "); #endif rc = colOp->writeRow( curCol, totalRow2, secondPart, valArray, oldValArray); // XXX: here we use secondPart array and just below we use rowIdArray. WHY??? #ifdef PROFILE timer.stop("writeRow "); #endif } else { #ifdef PROFILE timer.start("writeRow "); #endif rc = colOp->writeRow( curCol, totalRow2, rowIdArray, valArray, oldValArray, true); // XXX: BUG: here we use rowIdArray and just above we use secondPart array. WHY??? #ifdef PROFILE timer.stop("writeRow "); #endif } colOp->clearColumn(curCol); updateMaxMinRange(totalRow2, totalRow2, cscColTypeList[i], newColStructList[i].colType, valArray, oldValArray, cpInfo, secondPart[0] == 0 && m_opType == INSERT); if (versioning) BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); if (valArray != NULL) free(valArray); // check error if (rc != NO_ERROR) break; } else { valArray = NULL; oldValArray = NULL; ColumnOp* colOp = m_colOp[op(colStructList[i].fCompressionType)]; ExtCPInfo* cpInfo = getCPInfoToUpdateForUpdatableType( colStructList[i], maxMins ? ((*maxMins)[i]).fSplitMaxMinInfoPtrs[0] : NULL, m_opType); // set params colOp->initColumn(curCol); colOp->setColParam(curCol, 0, colStructList[i].colWidth, colStructList[i].colDataType, colStructList[i].colType, colStructList[i].dataOid, colStructList[i].fCompressionType, colStructList[i].fColDbRoot, colStructList[i].fColPartition, colStructList[i].fColSegment); colOp->findTypeHandler(colStructList[i].colWidth, colStructList[i].colDataType); rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix, IO_BUFF_SIZE); // @bug 5572 HDFS tmp file // cout << " Opened file oid " << curCol.dataFile.pFile << endl; if (rc != NO_ERROR) break; ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(colStructList[i].dataOid); ColExtsInfo::iterator it = aColExtsInfo.begin(); while (it != aColExtsInfo.end()) { if ((it->dbRoot == colStructList[i].fColDbRoot) && (it->partNum == colStructList[i].fColPartition) && (it->segNum == colStructList[i].fColSegment)) break; it++; } if (it == aColExtsInfo.end()) // add this one to the list { ColExtInfo aExt; aExt.dbRoot = colStructList[i].fColDbRoot; aExt.partNum = colStructList[i].fColPartition; aExt.segNum = colStructList[i].fColSegment; aExt.compType = colStructList[i].fCompressionType; aColExtsInfo.push_back(aExt); aTbaleMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo); } // handling versioning vector rangeList; if (versioning) { rc = processVersionBuffer(curCol.dataFile.pFile, txnid, colStructList[i], colStructList[i].colWidth, totalRow1, rowIdArray, rangeList); if (rc != NO_ERROR) { if (colStructList[i].fCompressionType == 0) { curCol.dataFile.pFile->flush(); } BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); break; } } allocateValArray(valArray, totalRow1, colStructList[i].colType, colStructList[i].colWidth); if (m_opType != INSERT && cpInfo != NULL) // we allocate space for old values only when we need them. { allocateValArray(oldValArray, totalRow1, colStructList[i].colType, colStructList[i].colWidth); } else { oldValArray = NULL; } // convert values to valArray if (m_opType != DELETE) { bExcp = false; try { convertValArray(totalRow1, cscColTypeList[i], colStructList[i].colType, colValueList[i], valArray, true); } catch (...) { bExcp = true; } if (bExcp) { if (versioning) BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); return ERR_PARSING; } #ifdef PROFILE timer.start("writeRow "); #endif rc = colOp->writeRow(curCol, totalRow1, rowIdArray, valArray, oldValArray); #ifdef PROFILE timer.stop("writeRow "); #endif } else { #ifdef PROFILE timer.start("writeRow "); #endif rc = colOp->writeRow(curCol, totalRow1, rowIdArray, valArray, oldValArray, true); #ifdef PROFILE timer.stop("writeRow "); #endif } colOp->clearColumn(curCol); updateMaxMinRange(totalRow1, totalRow1, cscColTypeList[i], colStructList[i].colType, valArray, oldValArray, cpInfo, rowIdArray[0] == 0 && m_opType == INSERT); if (versioning) BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); if (valArray != NULL) free(valArray); if (oldValArray != NULL) free(oldValArray); // check error if (rc != NO_ERROR) break; } } // end of for (i = 0 #ifdef PROFILE timer.finish(); #endif return rc; } int WriteEngineWrapper::writeColumnRecBinary(const TxnID& txnid, const ColStructList& colStructList, std::vector& colValueList, RID* rowIdArray, const ColStructList& newColStructList, std::vector& newColValueList, const int32_t tableOid, bool useTmpSuffix, bool versioning) { int rc = 0; void* valArray = NULL; string segFile; Column curCol; ColStructList::size_type totalColumn; ColStructList::size_type i; size_t totalRow1, totalRow2; setTransId(txnid); totalColumn = colStructList.size(); #ifdef PROFILE StopWatch timer; #endif totalRow1 = colValueList.size() / totalColumn; if (newColValueList.size() > 0) { totalRow2 = newColValueList.size() / newColStructList.size(); totalRow1 -= totalRow2; } else { totalRow2 = 0; } // It is possible totalRow1 is zero but totalRow2 has values if ((totalRow1 == 0) && (totalRow2 == 0)) return rc; TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid); if (totalRow1) { valArray = malloc(sizeof(uint64_t) * totalRow1); for (i = 0; i < totalColumn; i++) { //@Bug 2205 Check if all rows go to the new extent // Write the first batch RID* firstPart = rowIdArray; ColumnOp* colOp = m_colOp[op(colStructList[i].fCompressionType)]; // set params colOp->initColumn(curCol); // need to pass real dbRoot, partition, and segment to setColParam colOp->setColParam(curCol, 0, colStructList[i].colWidth, colStructList[i].colDataType, colStructList[i].colType, colStructList[i].dataOid, colStructList[i].fCompressionType, colStructList[i].fColDbRoot, colStructList[i].fColPartition, colStructList[i].fColSegment); colOp->findTypeHandler(colStructList[i].colWidth, colStructList[i].colDataType); ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(colStructList[i].dataOid); ColExtsInfo::iterator it = aColExtsInfo.begin(); while (it != aColExtsInfo.end()) { if ((it->dbRoot == colStructList[i].fColDbRoot) && (it->partNum == colStructList[i].fColPartition) && (it->segNum == colStructList[i].fColSegment)) break; it++; } if (it == aColExtsInfo.end()) // add this one to the list { ColExtInfo aExt; aExt.dbRoot = colStructList[i].fColDbRoot; aExt.partNum = colStructList[i].fColPartition; aExt.segNum = colStructList[i].fColSegment; aExt.compType = colStructList[i].fCompressionType; aColExtsInfo.push_back(aExt); aTbaleMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo); } rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix, IO_BUFF_SIZE); // @bug 5572 HDFS tmp file if (rc != NO_ERROR) break; // handling versioning vector rangeList; if (versioning) { rc = processVersionBuffer(curCol.dataFile.pFile, txnid, colStructList[i], colStructList[i].colWidth, totalRow1, firstPart, rangeList); if (rc != NO_ERROR) { if (colStructList[i].fCompressionType == 0) { curCol.dataFile.pFile->flush(); } BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); break; } } // totalRow1 -= totalRow2; // have to init the size here // nullArray = (bool*) malloc(sizeof(bool) * totalRow); uint8_t tmp8; uint16_t tmp16; uint32_t tmp32; for (size_t j = 0; j < totalRow1; j++) { uint64_t curValue = colValueList[((totalRow1 + totalRow2) * i) + j]; switch (colStructList[i].colType) { case WriteEngine::WR_VARBINARY: // treat same as char for now case WriteEngine::WR_CHAR: case WriteEngine::WR_BLOB: case WriteEngine::WR_TEXT: ((uint64_t*)valArray)[j] = curValue; break; case WriteEngine::WR_INT: case WriteEngine::WR_UINT: case WriteEngine::WR_MEDINT: case WriteEngine::WR_UMEDINT: case WriteEngine::WR_FLOAT: tmp32 = curValue; ((uint32_t*)valArray)[j] = tmp32; break; case WriteEngine::WR_ULONGLONG: case WriteEngine::WR_LONGLONG: case WriteEngine::WR_DOUBLE: case WriteEngine::WR_TOKEN: ((uint64_t*)valArray)[j] = curValue; break; case WriteEngine::WR_BYTE: case WriteEngine::WR_UBYTE: tmp8 = curValue; ((uint8_t*)valArray)[j] = tmp8; break; case WriteEngine::WR_SHORT: case WriteEngine::WR_USHORT: tmp16 = curValue; ((uint16_t*)valArray)[j] = tmp16; break; case WriteEngine::WR_BINARY: ((uint64_t*)valArray)[j] = curValue; break; } } #ifdef PROFILE timer.start("writeRow "); #endif rc = colOp->writeRow(curCol, totalRow1, firstPart, valArray); #ifdef PROFILE timer.stop("writeRow "); #endif colOp->closeColumnFile(curCol); if (versioning) BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); // check error if (rc != NO_ERROR) break; } // end of for (i = 0 if (valArray != NULL) { free(valArray); valArray = NULL; } } // MCOL-1176 - Write second extent if (totalRow2) { valArray = malloc(sizeof(uint64_t) * totalRow2); for (i = 0; i < newColStructList.size(); i++) { //@Bug 2205 Check if all rows go to the new extent // Write the first batch RID* secondPart = rowIdArray + totalRow1; ColumnOp* colOp = m_colOp[op(newColStructList[i].fCompressionType)]; // set params colOp->initColumn(curCol); // need to pass real dbRoot, partition, and segment to setColParam colOp->setColParam(curCol, 0, newColStructList[i].colWidth, newColStructList[i].colDataType, newColStructList[i].colType, newColStructList[i].dataOid, newColStructList[i].fCompressionType, newColStructList[i].fColDbRoot, newColStructList[i].fColPartition, newColStructList[i].fColSegment); colOp->findTypeHandler(newColStructList[i].colWidth, newColStructList[i].colDataType); ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(newColStructList[i].dataOid); ColExtsInfo::iterator it = aColExtsInfo.begin(); while (it != aColExtsInfo.end()) { if ((it->dbRoot == newColStructList[i].fColDbRoot) && (it->partNum == newColStructList[i].fColPartition) && (it->segNum == colStructList[i].fColSegment)) break; it++; } if (it == aColExtsInfo.end()) // add this one to the list { ColExtInfo aExt; aExt.dbRoot = newColStructList[i].fColDbRoot; aExt.partNum = newColStructList[i].fColPartition; aExt.segNum = newColStructList[i].fColSegment; aExt.compType = newColStructList[i].fCompressionType; aColExtsInfo.push_back(aExt); aTbaleMetaData->setColExtsInfo(newColStructList[i].dataOid, aColExtsInfo); } rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix, IO_BUFF_SIZE); // @bug 5572 HDFS tmp file if (rc != NO_ERROR) break; // handling versioning vector rangeList; if (versioning) { rc = processVersionBuffer(curCol.dataFile.pFile, txnid, newColStructList[i], newColStructList[i].colWidth, totalRow2, secondPart, rangeList); if (rc != NO_ERROR) { if (newColStructList[i].fCompressionType == 0) { curCol.dataFile.pFile->flush(); } BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); break; } } // totalRow1 -= totalRow2; // have to init the size here // nullArray = (bool*) malloc(sizeof(bool) * totalRow); uint8_t tmp8; uint16_t tmp16; uint32_t tmp32; for (size_t j = 0; j < totalRow2; j++) { uint64_t curValue = newColValueList[(totalRow2 * i) + j]; switch (newColStructList[i].colType) { case WriteEngine::WR_VARBINARY: // treat same as char for now case WriteEngine::WR_CHAR: case WriteEngine::WR_BLOB: case WriteEngine::WR_TEXT: ((uint64_t*)valArray)[j] = curValue; break; case WriteEngine::WR_INT: case WriteEngine::WR_UINT: case WriteEngine::WR_MEDINT: case WriteEngine::WR_UMEDINT: case WriteEngine::WR_FLOAT: tmp32 = curValue; ((uint32_t*)valArray)[j] = tmp32; break; case WriteEngine::WR_ULONGLONG: case WriteEngine::WR_LONGLONG: case WriteEngine::WR_DOUBLE: case WriteEngine::WR_TOKEN: ((uint64_t*)valArray)[j] = curValue; break; case WriteEngine::WR_BYTE: case WriteEngine::WR_UBYTE: tmp8 = curValue; ((uint8_t*)valArray)[j] = tmp8; break; case WriteEngine::WR_SHORT: case WriteEngine::WR_USHORT: tmp16 = curValue; ((uint16_t*)valArray)[j] = tmp16; break; case WriteEngine::WR_BINARY: ((uint64_t*)valArray)[j] = curValue; break; } } #ifdef PROFILE timer.start("writeRow "); #endif rc = colOp->writeRow(curCol, totalRow2, secondPart, valArray); #ifdef PROFILE timer.stop("writeRow "); #endif colOp->closeColumnFile(curCol); if (versioning) BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); // check error if (rc != NO_ERROR) break; } // end of for (i = 0 } if (valArray != NULL) free(valArray); #ifdef PROFILE timer.finish(); #endif return rc; } int WriteEngineWrapper::writeColumnRecUpdate(const TxnID& txnid, const CSCTypesList& cscColTypeList, const ColStructList& colStructList, const ColValueList& colValueList, vector& colOldValueList, const RIDList& ridList, const int32_t tableOid, bool convertStructFlag, ColTupleList::size_type nRows, std::vector* cpInfos, bool hasAUXCol) { bool bExcp; int rc = 0; void* valArray = NULL; void* oldValArray = NULL; Column curCol; ColStruct curColStruct; ColTupleList curTupleList, oldTupleList; ColStructList::size_type totalColumn; ColStructList::size_type i; ColTupleList::size_type totalRow; setTransId(txnid); colOldValueList.clear(); totalColumn = colStructList.size(); totalRow = nRows; #ifdef PROFILE StopWatch timer; #endif vector rangeListTot; std::vector freeList; vector > fboLists; vector > rangeLists; rc = processBeginVBCopy(txnid, ((m_opType == DELETE && hasAUXCol) ? ColStructList(1, colStructList.back()) : colStructList), ridList, freeList, fboLists, rangeLists, rangeListTot); if (rc != NO_ERROR) { if (rangeListTot.size() > 0) BRMWrapper::getInstance()->writeVBEnd(txnid, rangeListTot); switch (rc) { case BRM::ERR_DEADLOCK: return ERR_BRM_DEAD_LOCK; case BRM::ERR_VBBM_OVERFLOW: return ERR_BRM_VB_OVERFLOW; case BRM::ERR_NETWORK: return ERR_BRM_NETWORK; case BRM::ERR_READONLY: return ERR_BRM_READONLY; default: return ERR_BRM_BEGIN_COPY; } } VBRange aRange; uint32_t blocksProcessedThisOid = 0; uint32_t blocksProcessed = 0; std::vector files; TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid); for (i = 0; i < totalColumn; i++) { valArray = NULL; oldValArray = NULL; curColStruct = colStructList[i]; curTupleList = colValueList[i]; // same value for all rows ColumnOp* colOp = m_colOp[op(curColStruct.fCompressionType)]; // convert column data type if (convertStructFlag) Convertor::convertColType(&curColStruct); // set params colOp->initColumn(curCol); colOp->setColParam(curCol, 0, curColStruct.colWidth, curColStruct.colDataType, curColStruct.colType, curColStruct.dataOid, curColStruct.fCompressionType, curColStruct.fColDbRoot, curColStruct.fColPartition, curColStruct.fColSegment); colOp->findTypeHandler(curColStruct.colWidth, curColStruct.colDataType); ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(curColStruct.dataOid); ColExtsInfo::iterator it = aColExtsInfo.begin(); while (it != aColExtsInfo.end()) { if ((it->dbRoot == curColStruct.fColDbRoot) && (it->partNum == curColStruct.fColPartition) && (it->segNum == curColStruct.fColSegment)) break; it++; } if (it == aColExtsInfo.end()) // add this one to the list { ColExtInfo aExt; aExt.dbRoot = curColStruct.fColDbRoot; aExt.partNum = curColStruct.fColPartition; aExt.segNum = curColStruct.fColSegment; aExt.compType = curColStruct.fCompressionType; aColExtsInfo.push_back(aExt); aTbaleMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo); } string segFile; bool isReadOnly = (m_opType == DELETE && hasAUXCol && (i != colStructList.size() - 1)); rc = colOp->openColumnFile(curCol, segFile, true, IO_BUFF_SIZE, isReadOnly); // @bug 5572 HDFS tmp file if (rc != NO_ERROR) break; if (curColStruct.fCompressionType == 0) { BRM::FileInfo aFile; aFile.oid = curColStruct.dataOid; aFile.partitionNum = curColStruct.fColPartition; aFile.dbRoot = curColStruct.fColDbRoot; aFile.segmentNum = curColStruct.fColSegment; aFile.compType = curColStruct.fCompressionType; files.push_back(aFile); } // handling versioning std::vector curFreeList; uint32_t blockUsed = 0; if (!idbdatafile::IDBPolicy::useHdfs()) { if (rangeListTot.size() > 0 && (m_opType != DELETE || !hasAUXCol || (i == colStructList.size() - 1))) { ColStructList::size_type j = i; if (m_opType == DELETE && hasAUXCol && (i == colStructList.size() - 1)) j = 0; if (freeList[0].size >= (blocksProcessed + rangeLists[j].size())) { aRange.vbOID = freeList[0].vbOID; aRange.vbFBO = freeList[0].vbFBO + blocksProcessed; aRange.size = rangeLists[j].size(); curFreeList.push_back(aRange); } else { aRange.vbOID = freeList[0].vbOID; aRange.vbFBO = freeList[0].vbFBO + blocksProcessed; aRange.size = freeList[0].size - blocksProcessed; blockUsed = aRange.size; curFreeList.push_back(aRange); if (freeList.size() > 1) { aRange.vbOID = freeList[1].vbOID; aRange.vbFBO = freeList[1].vbFBO + blocksProcessedThisOid; aRange.size = rangeLists[j].size() - blockUsed; curFreeList.push_back(aRange); blocksProcessedThisOid += aRange.size; } else { rc = 1; break; } } blocksProcessed += rangeLists[j].size(); rc = BRMWrapper::getInstance()->writeVB(curCol.dataFile.pFile, (BRM::VER_t)txnid, curColStruct.dataOid, fboLists[j], rangeLists[j], colOp, curFreeList, curColStruct.fColDbRoot, true); } } if (rc != NO_ERROR) { if (curColStruct.fCompressionType == 0) { curCol.dataFile.pFile->flush(); } if (rangeListTot.size() > 0) BRMWrapper::getInstance()->writeVBEnd(txnid, rangeListTot); break; } allocateValArray(valArray, 1, curColStruct.colType, curColStruct.colWidth); ExtCPInfo* cpInfo; cpInfo = cpInfos ? ((*cpInfos)[i]) : NULL; if (cpInfo) { allocateValArray(oldValArray, totalRow, curColStruct.colType, curColStruct.colWidth); } // convert values to valArray if (m_opType != DELETE) { bExcp = false; ColTuple curTuple; curTuple = curTupleList[0]; try { convertValue(cscColTypeList[i], curColStruct.colType, valArray, curTuple.data); } catch (...) { bExcp = true; } if (bExcp) { if (rangeListTot.size() > 0) BRMWrapper::getInstance()->writeVBEnd(txnid, rangeListTot); return ERR_PARSING; } #ifdef PROFILE timer.start("writeRow "); #endif rc = colOp->writeRows(curCol, totalRow, ridList, valArray, oldValArray); #ifdef PROFILE timer.stop("writeRow "); #endif } else { #ifdef PROFILE timer.start("writeRows "); #endif if (!isReadOnly) rc = colOp->writeRows(curCol, totalRow, ridList, valArray, oldValArray, true); else rc = colOp->writeRowsReadOnly(curCol, totalRow, ridList, oldValArray); #ifdef PROFILE timer.stop("writeRows "); #endif } updateMaxMinRange(1, totalRow, cscColTypeList[i], curColStruct.colType, m_opType == DELETE ? NULL : valArray, oldValArray, cpInfo, false); colOp->clearColumn(curCol, !isReadOnly); if (valArray != NULL) { free(valArray); valArray = NULL; } if (oldValArray != NULL) { free(oldValArray); oldValArray = NULL; } // check error if (rc != NO_ERROR) break; } // end of for (i = 0) if ((idbdatafile::IDBPolicy::useHdfs()) && (files.size() > 0)) cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID()); if (rangeListTot.size() > 0) BRMWrapper::getInstance()->writeVBEnd(txnid, rangeListTot); // timer.stop("Delete:writecolrec"); #ifdef PROFILE timer.finish(); #endif return rc; } /*@brief tokenize - return a token for a given signature and size */ /*********************************************************** * DESCRIPTION: * return a token for a given signature and size * If it is not in the dictionary, the signature * will be added to the dictionary and the index tree * If it is already in the dictionary, then * the token will be returned * This function does not open and close files. * users need to use openDctnry and CloseDctnry * PARAMETERS: * DctnryTuple& dctnryTuple - holds the sigValue, sigSize and token * RETURN: * NO_ERROR if success * others if something wrong in inserting the value ***********************************************************/ int WriteEngineWrapper::tokenize(const TxnID& txnid, DctnryTuple& dctnryTuple, int ct) { int cop = op(ct); m_dctnry[cop]->setTransId(txnid); return m_dctnry[cop]->updateDctnry(dctnryTuple.sigValue, dctnryTuple.sigSize, dctnryTuple.token); } /*@brief tokenize - return a token for a given signature and size * accept OIDs as input */ /*********************************************************** * DESCRIPTION: * Token for a given signature and size * If it is not in the dictionary, the signature * will be added to the dictionary and the index tree * If it is already in the dictionary, then * the token will be returned * PARAMETERS: * DctnryTuple& dctnryTuple - holds the sigValue, sigSize and token * DctnryStruct dctnryStruct- contain the 3 OID for dictionary, * tree and list. * RETURN: * NO_ERROR if success * others if something wrong in inserting the value ***********************************************************/ int WriteEngineWrapper::tokenize(const TxnID& txnid, DctnryStruct& dctnryStruct, DctnryTuple& dctnryTuple, bool useTmpSuffix) // @bug 5572 HDFS tmp file { // find the corresponding column segment file the token is going to be inserted. Dctnry* dctnry = m_dctnry[op(dctnryStruct.fCompressionType)]; int rc = dctnry->openDctnry(dctnryStruct.dctnryOid, dctnryStruct.fColDbRoot, dctnryStruct.fColPartition, dctnryStruct.fColSegment, useTmpSuffix); // @bug 5572 TBD if (rc != NO_ERROR) return rc; rc = tokenize(txnid, dctnryTuple, dctnryStruct.fCompressionType); int rc2 = dctnry->closeDctnry(true); // close file, even if tokenize() fails if ((rc == NO_ERROR) && (rc2 != NO_ERROR)) rc = rc2; return rc; } /*********************************************************** * DESCRIPTION: * Create column files, including data and bitmap files * PARAMETERS: * dataOid - column data file id * bitmapOid - column bitmap file id * colWidth - column width * dbRoot - DBRoot where file is to be located * partition - Starting partition number for segment file path * segment - segment number * compressionType - compression type * RETURN: * NO_ERROR if success * ERR_FILE_EXIST if file exists * ERR_FILE_CREATE if something wrong in creating the file ***********************************************************/ int WriteEngineWrapper::createDctnry(const TxnID& txnid, const OID& dctnryOid, int colWidth, uint16_t dbRoot, uint32_t partiotion, uint16_t segment, int compressionType) { BRM::LBID_t startLbid; return m_dctnry[op(compressionType)]->createDctnry(dctnryOid, colWidth, dbRoot, partiotion, segment, startLbid); } int WriteEngineWrapper::convertRidToColumn(RID& rid, uint16_t& dbRoot, uint32_t& partition, uint16_t& segment, RID filesPerColumnPartition, RID extentsPerSegmentFile, RID extentRows, uint16_t startDBRoot, unsigned dbrootCnt) { int rc = 0; partition = rid / (filesPerColumnPartition * extentsPerSegmentFile * extentRows); segment = (((rid % (filesPerColumnPartition * extentsPerSegmentFile * extentRows)) / extentRows)) % filesPerColumnPartition; dbRoot = ((startDBRoot - 1 + segment) % dbrootCnt) + 1; // Calculate the relative rid for this segment file RID relRidInPartition = rid - ((RID)partition * (RID)filesPerColumnPartition * (RID)extentsPerSegmentFile * (RID)extentRows); assert(relRidInPartition <= (RID)filesPerColumnPartition * (RID)extentsPerSegmentFile * (RID)extentRows); uint32_t numExtentsInThisPart = relRidInPartition / extentRows; unsigned numExtentsInThisSegPart = numExtentsInThisPart / filesPerColumnPartition; RID relRidInThisExtent = relRidInPartition - numExtentsInThisPart * extentRows; rid = relRidInThisExtent + numExtentsInThisSegPart * extentRows; return rc; } /*********************************************************** * DESCRIPTION: * Clears table lock for the specified table lock ID. * PARAMETERS: * lockID - table lock to be released * errMsg - if error occurs, this is the return error message * RETURN: * NO_ERROR if operation is successful ***********************************************************/ int WriteEngineWrapper::clearTableLockOnly(uint64_t lockID, std::string& errMsg) { bool bReleased; int rc = BRMWrapper::getInstance()->releaseTableLock(lockID, bReleased, errMsg); return rc; } /*********************************************************** * DESCRIPTION: * Rolls back the state of the extentmap and database files for the * specified table OID, using the metadata previously saved to disk. * Also clears the table lock for the specified table OID. * PARAMETERS: * tableOid - table OID to be rolled back * lockID - table lock corresponding to tableOid * tableName - table name associated with tableOid * applName - application that is driving this bulk rollback * debugConsole - enable debug logging to the console * errorMsg - error message explaining any rollback failure * RETURN: * NO_ERROR if rollback completed succesfully ***********************************************************/ int WriteEngineWrapper::bulkRollback(OID tableOid, uint64_t lockID, const std::string& tableName, const std::string& applName, bool debugConsole, string& errorMsg) { errorMsg.clear(); BulkRollbackMgr rollbackMgr(tableOid, lockID, tableName, applName); if (debugConsole) rollbackMgr.setDebugConsole(true); // We used to pass "false" to not keep (delete) the metafiles at the end of // the rollback. But after the transition to sharedNothing, we pass "true" // to initially keep these files. The metafiles are deleted later, only // after all the distributed bulk rollbacks are successfully completed. int rc = rollbackMgr.rollback(true); if (rc != NO_ERROR) errorMsg = rollbackMgr.getErrorMsg(); // Ignore the return code for now; more important to base rc on the // success or failure of the previous work BRMWrapper::getInstance()->takeSnapshot(); return rc; } int WriteEngineWrapper::rollbackCommon(const TxnID& txnid, int sessionId) { // Remove the unwanted tmp files and recover compressed chunks. string prefix; // BUG 4312 RemoveTxnFromLBIDMap(txnid); RemoveTxnFromDictMap(txnid); config::Config* config = config::Config::makeConfig(); prefix = config->getConfig("SystemConfig", "DBRMRoot"); if (prefix.length() == 0) { cerr << "Need a valid DBRMRoot entry in Calpont configuation file"; return ERR_INVALID_PARAM; } uint64_t pos = prefix.find_last_of("/"); std::string aDMLLogFileName; if (pos != string::npos) { aDMLLogFileName = prefix.substr(0, pos + 1); // Get the file path } else { logging::Message::Args args; args.add("RollbackTran cannot find the dbrm directory for the DML log file"); SimpleSysLog::instance()->logMsg(args, logging::LOG_TYPE_CRITICAL, logging::M0007); return ERR_OPEN_DML_LOG; } std::ostringstream oss; oss << txnid << "_" << Config::getLocalModuleID(); aDMLLogFileName += "DMLLog_" + oss.str(); if (IDBPolicy::exists(aDMLLogFileName.c_str())) { // TODO-for now the DML log file will always be in a local // filesystem since IDBDataFile doesn't have any support for // a cpp iostream interface. need to decide if this is ok. boost::scoped_ptr aDMLLogFile(IDBDataFile::open( IDBPolicy::getType(aDMLLogFileName.c_str(), IDBPolicy::WRITEENG), aDMLLogFileName.c_str(), "r", 0)); if (aDMLLogFile) // need recover { ssize_t fileSize = aDMLLogFile->size(); boost::scoped_array buf(new char[fileSize]); if (aDMLLogFile->read(buf.get(), fileSize) != fileSize) return ERR_FILE_READ; std::istringstream strstream(string(buf.get(), fileSize)); std::string backUpFileType; std::string filename; int64_t size; int64_t offset; while (strstream >> backUpFileType >> filename >> size >> offset) { // cout << "Found: " << backUpFileType << " name " << filename << "size: " << size << " offset: " << // offset << endl; std::ostringstream oss; oss << "RollbackTran found " << backUpFileType << " name " << filename << " size: " << size << " offset: " << offset; logging::Message::Args args; args.add(oss.str()); SimpleSysLog::instance()->logMsg(args, logging::LOG_TYPE_INFO, logging::M0007); if (backUpFileType.compare("rlc") == 0) { // remove the rlc file filename += ".rlc"; // cout << " File removed: " << filename << endl; IDBPolicy::remove(filename.c_str()); logging::Message::Args args1; args1.add(filename); args1.add(" is removed."); SimpleSysLog::instance()->logMsg(args1, logging::LOG_TYPE_INFO, logging::M0007); } else if (backUpFileType.compare("tmp") == 0) { int rc = NO_ERROR; string orig(filename + ".orig"); // restore the orig file if (IDBPolicy::exists(orig.c_str())) { // not likely both cdf and tmp exist if (IDBPolicy::exists(filename.c_str()) && IDBPolicy::remove(filename.c_str()) != 0) rc = ERR_COMP_REMOVE_FILE; if (rc == NO_ERROR && IDBPolicy::rename(orig.c_str(), filename.c_str()) != 0) rc = ERR_COMP_RENAME_FILE; } // remove the tmp file string tmp(filename + ".tmp"); if (rc == NO_ERROR && IDBPolicy::exists(tmp.c_str()) && IDBPolicy::remove(tmp.c_str()) != 0) rc = ERR_COMP_REMOVE_FILE; // remove the chunk shifting helper string rlc(filename + ".rlc"); if (rc == NO_ERROR && IDBPolicy::exists(rlc.c_str()) && IDBPolicy::remove(rlc.c_str()) != 0) rc = ERR_COMP_REMOVE_FILE; logging::Message::Args args1; args1.add(filename); if (rc == NO_ERROR) { args1.add(" is restored."); SimpleSysLog::instance()->logMsg(args1, logging::LOG_TYPE_INFO, logging::M0007); } else { args1.add(" may not restored: "); args1.add(rc); SimpleSysLog::instance()->logMsg(args1, logging::LOG_TYPE_CRITICAL, logging::M0007); return rc; } } else { // copy back to the data file std::string backFileName(filename); if (backUpFileType.compare("chk") == 0) backFileName += ".chk"; else backFileName += ".hdr"; // cout << "Rollback found file " << backFileName << endl; IDBDataFile* sourceFile = IDBDataFile::open( IDBPolicy::getType(backFileName.c_str(), IDBPolicy::WRITEENG), backFileName.c_str(), "r", 0); IDBDataFile* targetFile = IDBDataFile::open( IDBPolicy::getType(filename.c_str(), IDBPolicy::WRITEENG), filename.c_str(), "r+", 0); size_t byteRead; unsigned char* readBuf = new unsigned char[size]; boost::scoped_array readBufPtr(readBuf); if (sourceFile != NULL) { int rc = sourceFile->seek(0, 0); if (rc) return ERR_FILE_SEEK; byteRead = sourceFile->read(readBuf, size); if ((int)byteRead != size) { logging::Message::Args args6; args6.add("Rollback cannot read backup file "); args6.add(backFileName); SimpleSysLog::instance()->logMsg(args6, logging::LOG_TYPE_ERROR, logging::M0007); return ERR_FILE_READ; } } else { logging::Message::Args args5; args5.add("Rollback cannot open backup file "); args5.add(backFileName); SimpleSysLog::instance()->logMsg(args5, logging::LOG_TYPE_ERROR, logging::M0007); return ERR_FILE_NULL; } size_t byteWrite; if (targetFile != NULL) { int rc = targetFile->seek(offset, 0); if (rc) return ERR_FILE_SEEK; byteWrite = targetFile->write(readBuf, size); if ((int)byteWrite != size) { logging::Message::Args args3; args3.add("Rollback cannot copy to file "); args3.add(filename); args3.add("from file "); args3.add(backFileName); SimpleSysLog::instance()->logMsg(args3, logging::LOG_TYPE_ERROR, logging::M0007); return ERR_FILE_WRITE; } } else { logging::Message::Args args4; args4.add("Rollback cannot open target file "); args4.add(filename); SimpleSysLog::instance()->logMsg(args4, logging::LOG_TYPE_ERROR, logging::M0007); return ERR_FILE_NULL; } // cout << "Rollback copied to file " << filename << " from file " << backFileName << endl; delete targetFile; delete sourceFile; IDBPolicy::remove(backFileName.c_str()); logging::Message::Args arg1; arg1.add("Rollback copied to file "); arg1.add(filename); arg1.add("from file "); arg1.add(backFileName); SimpleSysLog::instance()->logMsg(arg1, logging::LOG_TYPE_INFO, logging::M0007); } } } IDBPolicy::remove(aDMLLogFileName.c_str()); } return 0; } int WriteEngineWrapper::rollbackTran(const TxnID& txnid, int sessionId) { if (rollbackCommon(txnid, sessionId) != 0) return -1; return BRMWrapper::getInstance()->rollBack(txnid, sessionId); } int WriteEngineWrapper::rollbackBlocks(const TxnID& txnid, int sessionId) { if (rollbackCommon(txnid, sessionId) != 0) return -1; return BRMWrapper::getInstance()->rollBackBlocks(txnid, sessionId); } int WriteEngineWrapper::rollbackVersion(const TxnID& txnid, int sessionId) { // BUG 4312 RemoveTxnFromLBIDMap(txnid); RemoveTxnFromDictMap(txnid); return BRMWrapper::getInstance()->rollBackVersion(txnid, sessionId); } int WriteEngineWrapper::updateNextValue(const TxnID txnId, const OID& columnoid, const uint64_t nextVal, const uint32_t sessionID, const uint16_t dbRoot) { int rc = NO_ERROR; boost::shared_ptr systemCatalogPtr; RIDList ridList; ColValueList colValueList; WriteEngine::ColTupleList colTuples; ColStructList colStructList; WriteEngine::CSCTypesList cscColTypeList; WriteEngine::ColStruct colStruct; CalpontSystemCatalog::ColType colType; colType.columnOID = colStruct.dataOid = OID_SYSCOLUMN_NEXTVALUE; colType.colWidth = colStruct.colWidth = 8; colStruct.tokenFlag = false; colType.colDataType = colStruct.colDataType = CalpontSystemCatalog::UBIGINT; colStruct.fColDbRoot = dbRoot; m_opType = UPDATE; if (idbdatafile::IDBPolicy::useHdfs()) colStruct.fCompressionType = 2; colStructList.push_back(colStruct); cscColTypeList.push_back(colType); ColTuple colTuple; systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); systemCatalogPtr->identity(CalpontSystemCatalog::EC); CalpontSystemCatalog::ROPair ropair; try { ropair = systemCatalogPtr->nextAutoIncrRid(columnoid); } catch (...) { rc = ERR_AUTOINC_RID; } if (rc != NO_ERROR) return rc; ridList.push_back(ropair.rid); colTuple.data = nextVal; colTuples.push_back(colTuple); colValueList.push_back(colTuples); rc = writeColumnRecords(txnId, cscColTypeList, colStructList, colValueList, ridList, SYSCOLUMN_BASE, false); if (rc != NO_ERROR) return rc; // flush PrimProc cache vector blockList; BRM::LBIDRange_v lbidRanges; rc = BRMWrapper::getInstance()->lookupLbidRanges(OID_SYSCOLUMN_NEXTVALUE, lbidRanges); if (rc != NO_ERROR) return rc; LBIDRange_v::iterator it; for (it = lbidRanges.begin(); it != lbidRanges.end(); it++) { for (LBID_t lbid = it->start; lbid < (it->start + it->size); lbid++) { blockList.push_back(lbid); } } // Bug 5459 Flush FD cache std::vector files; BRM::FileInfo aFile; aFile.oid = colStruct.dataOid; aFile.partitionNum = colStruct.fColPartition; aFile.dbRoot = colStruct.fColDbRoot; ; aFile.segmentNum = colStruct.fColSegment; aFile.compType = colStruct.fCompressionType; files.push_back(aFile); if (idbdatafile::IDBPolicy::useHdfs()) cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID()); rc = cacheutils::flushPrimProcAllverBlocks(blockList); if (rc != 0) rc = ERR_BLKCACHE_FLUSH_LIST; // translate to WE error return rc; } /*********************************************************** * DESCRIPTION: * Flush compressed files in chunk manager * PARAMETERS: * none * RETURN: * none ***********************************************************/ int WriteEngineWrapper::flushDataFiles(int rc, const TxnID txnId, std::map& columnOids) { RemoveTxnFromLBIDMap(txnId); RemoveTxnFromDictMap(txnId); for (int i = 0; i < TOTAL_COMPRESS_OP; i++) { int rc1 = m_colOp[i]->flushFile(rc, columnOids); int rc2 = m_dctnry[i]->flushFile(rc, columnOids); if (rc == NO_ERROR) { rc = (rc1 != NO_ERROR) ? rc1 : rc2; } } return rc; } void WriteEngineWrapper::AddDictToList(const TxnID txnid, std::vector& lbids) { std::tr1::unordered_map::iterator mapIter; mapIter = m_dictLBIDMap.find(txnid); if (mapIter == m_dictLBIDMap.end()) { dictLBIDRec_t tempRecord; tempRecord.insert(lbids.begin(), lbids.end()); m_dictLBIDMap[txnid] = tempRecord; return; } else { dictLBIDRec_t& txnRecord = mapIter->second; txnRecord.insert(lbids.begin(), lbids.end()); } } // Get CPInfo for given starting LBID and column description structure. int WriteEngineWrapper::GetLBIDRange(const BRM::LBID_t startingLBID, const ColStruct& colStruct, ExtCPInfo& cpInfo) { int rtn; BRM::CPMaxMin maxMin; rtn = BRMWrapper::getInstance()->getExtentCPMaxMin(startingLBID, maxMin); bool isBinary = cpInfo.isBinaryColumn(); maxMin.isBinaryColumn = isBinary; cpInfo.fCPInfo.firstLbid = startingLBID; if (rtn) { cpInfo.toInvalid(); return rtn; } // if we are provided with CPInfo pointer to update, we record current range there. // please note that we may fail here for unknown extents - e.g., newly allocated ones. // for these we mark CPInfo as invalid (above) and proceed as usual. // XXX With this logic we may end with invalid ranges for extents that were // allocated and recorded, yet not in our copy of extentmap. // As we update only part of that extent, the recorded range will be for // that part of the extent. // It should be investigated whether such situation is possible. // XXX Please note that most if not all calls to AddLBIDToList are enclosed into // RETURN_ON_ERROR() macro. // If we have failed to obtain information above we will abort execution of the function // that called us. // This is potential source of bugs. cpInfo.fCPInfo.bigMax = maxMin.bigMax; cpInfo.fCPInfo.bigMin = maxMin.bigMin; cpInfo.fCPInfo.max = maxMin.max; cpInfo.fCPInfo.min = maxMin.min; cpInfo.fCPInfo.seqNum = maxMin.seqNum; cpInfo.fCPInfo.isBinaryColumn = maxMin.isBinaryColumn; return rtn; } /*********************************************************** * DESCRIPTION: * Add an lbid to a list of lbids for sending to markExtentsInvalid. * However, rather than storing each lbid, store only unique first * lbids. This is an optimization to prevent invalidating the same * extents over and over. * PARAMETERS: * txnid - the lbid list is per txn. We use this to keep transactions * separated. * These next are needed for dbrm to get the lbid: * colStruct - reference to ColStruct structure of column we record (provides segment and type). * fbo - file block offset * cpInfo - a CPInfo to collect if we need that. * RETURN: 0 => OK. -1 => error ***********************************************************/ int WriteEngineWrapper::AddLBIDtoList(const TxnID txnid, const ColStruct& colStruct, const int fbo, ExtCPInfo* cpInfo) { int rtn = 0; BRM::LBID_t startingLBID; SP_TxnLBIDRec_t spTxnLBIDRec; std::tr1::unordered_map::iterator mapIter; // Find the set of extent starting LBIDs for this transaction. If not found, then create it. mapIter = m_txnLBIDMap.find(txnid); if (mapIter == m_txnLBIDMap.end()) { // This is a new transaction. SP_TxnLBIDRec_t sptemp(new TxnLBIDRec); spTxnLBIDRec = sptemp; m_txnLBIDMap[txnid] = spTxnLBIDRec; // cout << "New transaction entry " << txnid << " transaction count " << m_txnLBIDMap.size() << // endl; } else { spTxnLBIDRec = (*mapIter).second; } // Get the extent starting lbid given all these values (startingLBID is an out parameter). rtn = BRMWrapper::getInstance()->getStartLbid(colStruct.dataOid, colStruct.fColPartition, colStruct.fColSegment, fbo, startingLBID); if (rtn != 0) return -1; // if we are given the cpInfo (column's ranges can be traced), we should update it. if (cpInfo) { rtn = GetLBIDRange(startingLBID, colStruct, *cpInfo); } else { spTxnLBIDRec->AddLBID(startingLBID, colStruct.colDataType); } return rtn; } void WriteEngineWrapper::RemoveTxnFromDictMap(const TxnID txnid) { std::tr1::unordered_map::iterator mapIter; mapIter = m_dictLBIDMap.find(txnid); if (mapIter != m_dictLBIDMap.end()) { m_dictLBIDMap.erase(txnid); } } int WriteEngineWrapper::markTxnExtentsAsInvalid(const TxnID txnid, bool erase) { int rc = 0; std::tr1::unordered_map::iterator mapIter = m_txnLBIDMap.find(txnid); if (mapIter != m_txnLBIDMap.end()) { SP_TxnLBIDRec_t spTxnLBIDRec = (*mapIter).second; if (!spTxnLBIDRec->m_LBIDs.empty()) { rc = BRMWrapper::getInstance()->markExtentsInvalid(spTxnLBIDRec->m_LBIDs, spTxnLBIDRec->m_ColDataTypes); } if (erase) { m_txnLBIDMap.erase(txnid); // spTxnLBIDRec is auto-destroyed } } return rc; } /*********************************************************** * DESCRIPTION: * Remove a transaction LBID list from the LBID map * Called when a transaction ends, either commit or rollback * PARAMETERS: * txnid - the transaction to remove. * RETURN: * error code ***********************************************************/ int WriteEngineWrapper::RemoveTxnFromLBIDMap(const TxnID txnid) { return markTxnExtentsAsInvalid(txnid, true); } } // end of namespace