/* Copyright (C) 2014 InfiniDB, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ /*********************************************************************** * $Id: ddlindexpopulator.cpp 9210 2013-01-21 14:10:42Z rdempsey $ * * ***********************************************************************/ #include #include #include "ddlindexpopulator.h" #include "messagelog.h" #include "dataconvert.h" #include "joblist.h" #include "calpontselectexecutionplan.h" #include "distributedenginecomm.h" #include "simplecolumn.h" #include "resourcemanager.h" #include "columnresult.h" #include using namespace boost; #include using namespace boost::algorithm; using namespace WriteEngine; using namespace logging; using namespace resultset; using namespace joblist; using namespace std; using namespace execplan; using namespace ddlpackage; using namespace messageqcpp; namespace ddlpackageprocessor { bool DDLIndexPopulator::populateIndex(DDLPackageProcessor::DDLResult& result) { if (makeIndexStructs()) insertIndex(); result = fResult; return NO_ERROR != fResult.result; } bool DDLIndexPopulator::makeIndexStructs() { CalpontSelectExecutionPlan csep; makeCsep(csep); ResourceManager* rm; if (!fEC) { fEC = DistributedEngineComm::instance(rm); fEC->Open(); } SJLP jbl = joblist::JobListFactory::makeJobList(&csep, rm); boost::shared_ptr csc = CalpontSystemCatalog::makeCalpontSystemCatalog(fSessionID); csc->identity(CalpontSystemCatalog::EC); jbl->putEngineComm(fEC); /* ResultManager * result = jbl->GetResultManager(); result->setRunning(1); jbl->Execute(); */ jbl->doQuery(); CalpontSystemCatalog::TableName tableName; tableName.schema = fTable.fSchema; tableName.table = fTable.fName; CalpontSystemCatalog::OID tableOid = (csc->tableRID(tableName)).objnum; CalpontSystemCatalog::NJLSysDataList sysDataList; for (;;) { TableBand band; band = jbl->projectTable(tableOid); if (band.getRowCount() == 0) { // No more bands, table is done break; } band.convertToSysDataList(sysDataList, csc); break; } // size_t cnt = fColNames.size(); size_t i = 0; vector::const_iterator it; vector::const_iterator oid_iter; for (it = sysDataList.begin(); it != sysDataList.end(); it++) { if (isUnique()) fUniqueColResultList.push_back(*it); for (oid_iter = fOidList.begin(); oid_iter != fOidList.end(); oid_iter++) { if ((*it)->ColumnOID() == *oid_iter) { CalpontSystemCatalog::ColType coltype = makeIdxStruct(*it, fColNames.size(), csc); addColumnData(*it, coltype, i); } } i++; } return (fIdxValueList.size() && NO_ERROR == fResult.result); } void DDLIndexPopulator::makeCsep(CalpontSelectExecutionPlan& csep) { csep.sessionID(fSessionID); csep.txnID(fTxnID); csep.verID(fSessionManager->verID()); CalpontSelectExecutionPlan::ReturnedColumnList colList; CalpontSelectExecutionPlan::ColumnMap colMap; CalpontSystemCatalog::TableColName tableColName; CalpontSystemCatalog::OID oid; tableColName.schema = fTable.fSchema; tableColName.table = fTable.fName; boost::shared_ptr csc = CalpontSystemCatalog::makeCalpontSystemCatalog(fSessionID); string tableName(fTable.fSchema + "." + fTable.fName + "."); ColumnNameList::const_iterator cend = fColNames.end(); for (ColumnNameList::const_iterator cname = fColNames.begin(); cname != cend; ++cname) { string fullColName(tableName + *cname); SRCP srcp(new SimpleColumn(fullColName, fSessionID)); colList.push_back(srcp); tableColName.column = *cname; oid = csc->lookupOID(tableColName); fOidList.push_back(oid); colMap.insert(CalpontSelectExecutionPlan::ColumnMap::value_type(fullColName, srcp)); } csep.columnMap(colMap); csep.returnedCols(colList); } CalpontSystemCatalog::ColType DDLIndexPopulator::makeIdxStruct(const ColumnResult* cr, size_t cols, boost::shared_ptr csc) { IdxStruct idx; idx.treeOid = fIdxOID.treeOID; idx.listOid = fIdxOID.listOID; idx.multiColFlag = cols > 1; CalpontSystemCatalog::ColType coltype = csc->colType(cr->ColumnOID()); idx.idxDataType = static_cast(coltype.colDataType); if (isDictionaryType(coltype)) { idx.idxWidth = fTOKENSIZE; idx.idxType = WR_CHAR; } //@bug 410: index sizes are either 1, 4 or 8 else if (exeplan::isCharType(coltype)) { if (1 == coltype.colWidth) idx.idxWidth = 1; else idx.idxWidth = (coltype.colWidth > 4) ? 8 : 4; idx.idxType = WR_CHAR; } else idx.idxWidth = coltype.colWidth; fIdxStructList.push_back(idx); return coltype; } void DDLIndexPopulator::addColumnData(const execplan::ColumnResult* cr, const CalpontSystemCatalog::ColType colType, int added) { WriteEngine::IdxTupleList tupleList; WriteEngine::IdxTuple tuple; for (int i = 0; i < cr->dataCount(); ++i) { WriteEngine::IdxTuple tuple; convertColData(cr, i, colType, tuple); if (checkConstraints(tuple, colType, i, added)) { tupleList.push_back(tuple); if (!added) fRidList.push_back(cr->GetRid(i)); } else break; } if (tupleList.size()) fIdxValueList.push_back(tupleList); } void DDLIndexPopulator::convertColData(const execplan::ColumnResult* cr, int idx, const CalpontSystemCatalog::ColType& colType, WriteEngine::IdxTuple& tuple) { if (isDictionaryType(colType)) { /* tuple.data = tokenizeData ( colType, cr->GetStringData(idx) );*/ /* tuple.data = tokenizeData ( cr->GetRid(idx) );*/ tuple.data = convertTokenData(cr->GetStringData(idx)); } else tuple.data = convertData(colType, cr, idx); } boost::any DDLIndexPopulator::convertTokenData(const std::string& data) { string strData((size_t)fTOKENSIZE < data.length() ? data.substr(0, fTOKENSIZE) : data); return strData; } #if 0 // Disabled this function as it is currently not used. // If we decide to use, we should check on the usage of fileop.getFileName(). // With iteration 17, the more common version of this getFileName() takes a // partition and segment number in addition to an OID. openColumnFile // should perhaps be changed to use this updated version of getFileName(). bool DDLIndexPopulator::openColumnFile(WriteEngine::OID oid) { FileOp fileOp; char fileName[WriteEngine::FILE_NAME_SIZE]; if (WriteEngine::NO_ERROR == fileOp.getFileName(oid, fileName) ) { fColumnFile.open(fileName); return true; } else { logError("Could not get column file name for data"); return false; } } #endif // Workaround to get original column token and not "retokenize" the string value boost::any DDLIndexPopulator::tokenizeData(WriteEngine::RID rid) { int64_t byteOffset = rid * fTOKENSIZE; ByteStream::byte inbuf[fTOKENSIZE]; fColumnFile.seekg(byteOffset, ios::beg); fColumnFile.read(reinterpret_cast(inbuf), fTOKENSIZE); WriteEngine::Token token; memcpy(&token, inbuf, fTOKENSIZE); return token; } boost::any DDLIndexPopulator::tokenizeData(const execplan::CalpontSystemCatalog::ColType& colType, const std::string& data) { WriteEngine::DctnryTuple dictTuple; if (data.length() > (unsigned int)colType.colWidth) { logError("Insert value is too large for column"); } else { WriteEngine::DctnryStruct dictStruct; dictStruct.treeOid = colType.ddn.treeOID; dictStruct.listOid = colType.ddn.listOID; dictStruct.dctnryOid = colType.ddn.dictOID; dictTuple.sigValue = data.c_str(); dictTuple.sigSize = data.length(); int error = NO_ERROR; if (NO_ERROR != (error = fWriteEngine->tokenize(fTxnID, dictStruct, dictTuple))) { logError("Tokenization failed", error); } } return dictTuple.token; } boost::any DDLIndexPopulator::convertData(const CalpontSystemCatalog::ColType& colType, const execplan::ColumnResult* cr, int idx) { uint64_t data = cr->GetData(idx); switch (colType.colDataType) { case CalpontSystemCatalog::BIT: case execplan::CalpontSystemCatalog::TINYINT: return *reinterpret_cast(&data); case execplan::CalpontSystemCatalog::SMALLINT: return *reinterpret_cast(&data); case execplan::CalpontSystemCatalog::DATE: // @bug 375 case execplan::CalpontSystemCatalog::MEDINT: case execplan::CalpontSystemCatalog::INT: return *reinterpret_cast(&data); case execplan::CalpontSystemCatalog::DATETIME: // @bug 375 case execplan::CalpontSystemCatalog::TIME: case execplan::CalpontSystemCatalog::TIMESTAMP: case execplan::CalpontSystemCatalog::BIGINT: return *reinterpret_cast(&data); case execplan::CalpontSystemCatalog::DECIMAL: { if (colType.colWidth <= CalpontSystemCatalog::FOUR_BYTE) return *reinterpret_cast(&data); else if (colType.colWidth <= 9) return *reinterpret_cast(&data); else return *reinterpret_cast(&data); } case execplan::CalpontSystemCatalog::FLOAT: return *reinterpret_cast(&data); case execplan::CalpontSystemCatalog::DOUBLE: return *reinterpret_cast(&data); case execplan::CalpontSystemCatalog::CHAR: case execplan::CalpontSystemCatalog::VARCHAR: { string strData(cr->GetStringData(idx)); return *reinterpret_cast(&strData); } default: break; } logError("Invalid column type"); throw std::runtime_error("Invalid data"); return *reinterpret_cast(&data); } void DDLIndexPopulator::insertIndex() { // @bug 359 use bulk load build int rc = (1 < fIdxStructList.size()) ? (void)0 : (void)0; if (rc) logError("Error inserting index values", rc); } bool DDLIndexPopulator::isDictionaryType(const CalpontSystemCatalog::ColType& colType) { return ((CalpontSystemCatalog::CHAR == colType.colDataType && 8 < colType.colWidth) || (CalpontSystemCatalog::VARCHAR == colType.colDataType && 7 < colType.colWidth) || (CalpontSystemCatalog::DECIMAL == colType.colDataType && 18 < colType.precision)); } bool DDLIndexPopulator::checkConstraints(const IdxTuple& data, const CalpontSystemCatalog::ColType& ctype, int i, int column) { switch (fConstraint) { case DDL_INVALID_CONSTRAINT: return true; case DDL_UNIQUE: case DDL_PRIMARY_KEY: if ((size_t)column + 1 < fColNames.size()) return true; return checkUnique(i, ctype); case DDL_NOT_NULL: return checkNotNull(data, ctype); case DDL_CHECK: return checkCheck(data, ctype); default: return true; //? } } // Check if the row of data at idx is already in fUniqueColResultList bool DDLIndexPopulator::checkUnique(int idx, const CalpontSystemCatalog::ColType& colType) { if (0 == idx) return true; // Get row of data as each column result data at idx size_t indexSize = fColNames.size(); vector rowIntData(indexSize); vector rowStrData(indexSize); for (size_t i = 0; i < indexSize; ++i) { // if ( isStringType(fUniqueColResultList[i]->columnType()) ) if (isStringType(colType.colDataType)) rowStrData[i] = fUniqueColResultList[i]->GetStringData(idx); else rowIntData[i] = fUniqueColResultList[i]->GetData(idx); } // check if each value in the idx row is equal to each value in a previous row // i is the row; j is the column. bool unique = true; for (int i = 0; i < idx && unique; ++i) { bool equal = true; for (size_t j = 0; j < indexSize && equal; ++j) { if (isStringType(colType.colDataType)) { equal = fUniqueColResultList[j]->GetStringData(i) == rowStrData[j]; } else { equal = (static_cast(fUniqueColResultList[j]->GetData(i)) == rowIntData[j]); } } unique = !equal; } if (!unique) { stringstream ss; ss << idx; logError("Unique Constraint violated on row: " + ss.str()); } return unique; } bool DDLIndexPopulator::checkNotNull(const IdxTuple& data, const CalpontSystemCatalog::ColType& colType) { any nullvalue = DDLNullValueForType(colType); bool isNull = false; switch (colType.colDataType) { case CalpontSystemCatalog::BIT: break; case execplan::CalpontSystemCatalog::TINYINT: isNull = any_cast(data.data) == any_cast(nullvalue); break; case execplan::CalpontSystemCatalog::SMALLINT: isNull = any_cast(data.data) == any_cast(nullvalue); break; case execplan::CalpontSystemCatalog::MEDINT: case execplan::CalpontSystemCatalog::INT: isNull = any_cast(data.data) == any_cast(nullvalue); break; case execplan::CalpontSystemCatalog::BIGINT: isNull = any_cast(data.data) == any_cast(nullvalue); break; case execplan::CalpontSystemCatalog::DECIMAL: { if (colType.colWidth <= CalpontSystemCatalog::FOUR_BYTE) isNull = any_cast(data.data) == any_cast(nullvalue); else if (colType.colWidth <= 9) isNull = any_cast(data.data) == any_cast(nullvalue); else if (colType.colWidth <= 18) isNull = any_cast(data.data) == any_cast(nullvalue); else isNull = compareToken(any_cast(data.data), any_cast(nullvalue)); break; } case execplan::CalpontSystemCatalog::FLOAT: isNull = any_cast(data.data) == any_cast(nullvalue); break; case execplan::CalpontSystemCatalog::DOUBLE: isNull = any_cast(data.data) == any_cast(nullvalue); break; case execplan::CalpontSystemCatalog::DATE: isNull = any_cast(data.data) == any_cast(nullvalue); break; case execplan::CalpontSystemCatalog::DATETIME: case execplan::CalpontSystemCatalog::TIME: case execplan::CalpontSystemCatalog::TIMESTAMP: isNull = any_cast(data.data) == any_cast(nullvalue); break; case execplan::CalpontSystemCatalog::CHAR: { if (colType.colWidth == execplan::CalpontSystemCatalog::ONE_BYTE) isNull = any_cast(data.data) == any_cast(nullvalue); else if (colType.colWidth == execplan::CalpontSystemCatalog::TWO_BYTE) isNull = any_cast(data.data) == any_cast(nullvalue); else if (colType.colWidth <= execplan::CalpontSystemCatalog::FOUR_BYTE) isNull = any_cast(data.data) == any_cast(nullvalue); else isNull = compareToken(any_cast(data.data), any_cast(nullvalue)); break; } case execplan::CalpontSystemCatalog::VARCHAR: { if (colType.colWidth == execplan::CalpontSystemCatalog::ONE_BYTE) isNull = any_cast(data.data) == any_cast(nullvalue); else if (colType.colWidth < execplan::CalpontSystemCatalog::FOUR_BYTE) isNull = any_cast(data.data) == any_cast(nullvalue); else isNull = compareToken(any_cast(data.data), any_cast(nullvalue)); break; } default: throw std::runtime_error("getNullValueForType: unkown column data type"); } if (isNull) logError("Null value not allowed in index"); return !isNull; } void DDLIndexPopulator::logError(const string& msg, int error) { Message::Args args; Message message(9); args.add((string)__FILE__ + ": "); args.add(msg); if (error) { args.add("Error number: "); args.add(error); } message.format(args); fResult.result = DDLPackageProcessor::CREATE_ERROR; fResult.message = message; } } // namespace ddlpackageprocessor