/* Copyright (C) 2014 InfiniDB, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ /******************************************************************************* * $Id$ * *******************************************************************************/ /* * we_brmupdater.cpp * * Created on: Dec 13, 2011 * Author: bpaul */ #include #include using namespace std; #include "we_define.h" #include using namespace boost; // #include "we_simplesyslog.h" #include "we_sdhandler.h" #include "brm.h" #include "brmtypes.h" using namespace BRM; #include "we_brmupdater.h" namespace WriteEngine { //----------------------------------------------------------------------------- bool WEBrmUpdater::updateCasualPartitionAndHighWaterMarkInBRM() { try { bool aGood = prepareCasualPartitionInfo(); if (!aGood) cout << "prepareCasualPartitionInfo Failed" << endl; } catch (std::exception& ex) { std::string aStr = "Exception in prepareCasualPartitionInfo(); Error Ignored"; logging::Message::Args errMsgArgs; errMsgArgs.add(aStr); errMsgArgs.add(ex.what()); fRef.sysLog(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000); cout << aStr << endl; } try { bool aSuccess = prepareHighWaterMarkInfo(); if (!aSuccess) throw(std::runtime_error("prepareHWMInfo Failed")); } catch (std::exception& ex) { std::string aStr = "prepareHWMInfo() failed... Bailing out!!"; logging::Message::Args errMsgArgs; errMsgArgs.add(aStr); aStr = "Need to rollback bulk upload"; errMsgArgs.add(aStr); errMsgArgs.add(ex.what()); fRef.sysLog(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000); cout << aStr << endl; return false; } // If we are here, we packaged informations properly // Lets connect to BRM if (!createBrmConnection()) { cout << "Brm Connection FAILED" << endl; return false; } int aRc = updateCPAndHWMInBRM(); if (aRc != 0) { cout << "Updating High Water Mark Failed" << endl; } else { if (fRef.getDebugLvl()) cout << "Updating High Water Mark Successful!!" << endl << endl; } /* int cpRc = updateCasualPartitionInBRM(); if(cpRc != 0) cout << "Updating Casual Partition Failed" << endl; else { if(fRef.getDebugLvl()) cout << "Updating Casual Partition Successful" << endl; } int hwmRc = updateHighWaterMarkInBRM(); if(hwmRc != 0) cout << "Updating High Water Mark Failed" << endl; else { if(fRef.getDebugLvl()) cout << "Updating High Water Mark Successful!!" << endl << endl; } */ releaseBrmConnection(); // return(!hwmRc)? true: false; return (!aRc) ? true : false; } //------------------------------------------------------------------------------ // Update Casual Partition information in BRM //------------------------------------------------------------------------------ int WEBrmUpdater::updateCasualPartitionInBRM() { int rc = 0; if (fCPInfo.size() > 0) { // std::ostringstream oss; // oss << "Committing " << fCPInfo.size() << " CP updates for table " << // fRef.getTableName() << " to BRM"; // cout << endl << oss.str() << endl; // TODO - NOTE. later make this Objection creation once for both CP & HWM if (fpBrm) { rc = fpBrm->mergeExtentsMaxMin(fCPInfo); if (rc != BRM::ERR_OK) { std::string errStr; BRM::errString(rc, errStr); cout << "BRM ERROR is ***** " << errStr << endl; std::ostringstream oss; oss << "Error updating BRM with CP data for table " << fRef.getTableName() << " Error: " << errStr << endl; cout << endl << oss.str() << endl; } } } return rc; } //------------------------------------------------------------------------------ // Send HWM update information to BRM //------------------------------------------------------------------------------ int WEBrmUpdater::updateHighWaterMarkInBRM() { int rc = 0; if (fHWMInfo.size() > 0) { // std::ostringstream oss; // oss << "Committing " << fHWMInfo.size() << " HWM update(s) for table "<< // fRef.getTableName() << " to BRM"; // cout << endl << oss.str() << endl; if (fpBrm) { rc = fpBrm->bulkSetHWM(fHWMInfo, 0); if (rc != BRM::ERR_OK) { std::string errStr; BRM::errString(rc, errStr); cout << "BRM ERROR is ***** " << errStr << endl; std::ostringstream oss; oss << "Error updating BRM with HWM data for table " << fRef.getTableName() << "error: " << errStr << endl; cout << endl << oss.str() << endl; return rc; } } } return rc; } //----------------------------------------------------------------------------- int WEBrmUpdater::updateCPAndHWMInBRM() { int rc = 0; size_t i; // BUG 4232. some imports may not contain CP but HWM if ((fCPInfo.size() > 0) || (fHWMInfo.size() > 0)) { // TODO - NOTE. later make this Objection creation once for both CP & HWM if (fpBrm) { /* rc = bulkSetHWMAndCP(const std::vector &, const std::vector & setCPDataArgs, const std::vector & mergeCPDataArgs, VER_t transID = 0) DBRM_THROW; */ for (i = 0; i < fCPInfo.size(); i++) { if (fCPInfo[i].newExtent) { fCPInfo[i].seqNum = 0; // to be in sync with DBRM. } } rc = fpBrm->bulkSetHWMAndCP(fHWMInfo, fCPInfoData, fCPInfo, 0); // rc = fpBrm->mergeExtentsMaxMin(fCPInfo); // rc = fpBrm->bulkSetHWM(fHWMInfo, 0); if (rc != BRM::ERR_OK) { std::string errStr; BRM::errString(rc, errStr); cout << "BRM ERROR is ***** " << errStr << endl; std::ostringstream oss; oss << "Error updating BRM with HWM data for table " << fRef.getTableName() << "error: " << errStr << endl; cout << endl << oss.str() << endl; cout << "ERROR: HWM and CP set failed!!" << endl; // throw runtime_error("ERROR: bulSetHWMAndCp Failed!!"); return rc; } } else return rc; } return rc; } //------------------------------------------------------------------------------ bool WEBrmUpdater::prepareCasualPartitionInfo() { // cout << "Started prepareCasualPartitionInfo()!!" << endl; // CP: 275456 6000000 4776193 -1 0 1 WESDHandler::StrVec::iterator aIt = fRef.fBrmRptVec.begin(); while (aIt != fRef.fBrmRptVec.end()) { std::string aEntry = *aIt; if ((!aEntry.empty()) && (aEntry.at(0) == 'C')) { BRM::CPInfoMerge cpInfoMerge; const int BUFLEN = 128; char aBuff[BUFLEN]; strncpy(aBuff, aEntry.c_str(), BUFLEN); aBuff[BUFLEN - 1] = 0; char* pTok = strtok(aBuff, " "); if (!pTok) // ignore the Msg Body { // cout << "CP Entry : " << aEntry << endl; throw(runtime_error("Bad Body in CP entry string")); } pTok = strtok(NULL, " "); if (pTok) cpInfoMerge.startLbid = boost::lexical_cast(pTok); else { // cout << "CP Entry : " << aEntry << endl; throw(runtime_error("Bad startLbid in CP entry string")); } pTok = strtok(NULL, " "); if (pTok) cpInfoMerge.seqNum = atoi(pTok); else { // cout << "CP Entry : " << aEntry << endl; throw(runtime_error("Bad seqNUM in CP entry string")); } pTok = strtok(NULL, " "); if (pTok) cpInfoMerge.type = (execplan::CalpontSystemCatalog::ColDataType)atoi(pTok); else { // cout << "CP Entry : " << aEntry << endl; throw(runtime_error("Bad type in CP entry string")); } pTok = strtok(NULL, " "); if (pTok) cpInfoMerge.colWidth = boost::lexical_cast(pTok); else { // cout << "CP Entry : " << aEntry << endl; throw(runtime_error("Bad column width in CP entry string")); } if (datatypes::isWideDecimalType(cpInfoMerge.type, cpInfoMerge.colWidth)) { datatypes::TypeAttributesStd tyAttr(cpInfoMerge.colWidth, 0, datatypes::INT128MAXPRECISION); pTok = strtok(NULL, " "); if (pTok) cpInfoMerge.bigMax = tyAttr.decimal128FromString(std::string(pTok), NULL); else { // cout << "CP Entry : " << aEntry << endl; throw(runtime_error("Bad MAX in CP entry string")); } pTok = strtok(NULL, " "); if (pTok) cpInfoMerge.bigMin = tyAttr.decimal128FromString(std::string(pTok), NULL); else { // cout << "CP Entry : " << aEntry << endl; throw(runtime_error("Bad MIN in CP entry string")); } } else { pTok = strtok(NULL, " "); if (pTok) cpInfoMerge.max = boost::lexical_cast(pTok); else { // cout << "CP Entry : " << aEntry << endl; throw(runtime_error("Bad MAX in CP entry string")); } pTok = strtok(NULL, " "); if (pTok) cpInfoMerge.min = boost::lexical_cast(pTok); else { // cout << "CP Entry : " << aEntry << endl; throw(runtime_error("Bad MIN in CP entry string")); } } pTok = strtok(NULL, " "); if (pTok) cpInfoMerge.newExtent = (atoi(pTok) != 0); else { // cout << "CP Entry : " << aEntry << endl; throw(runtime_error("Bad newExtent in CP entry string")); } fCPInfo.push_back(cpInfoMerge); } ++aIt; } if (fRef.getDebugLvl()) cout << "Finished prepareCasualPartitionInfo()!!" << endl; return true; } //----------------------------------------------------------------------------- bool WEBrmUpdater::prepareHighWaterMarkInfo() { // HWM: 3056 0 0 8191 WESDHandler::StrVec::iterator aIt = fRef.fBrmRptVec.begin(); while (aIt != fRef.fBrmRptVec.end()) { std::string aEntry = *aIt; if ((!aEntry.empty()) && (aEntry.at(0) == 'H')) { BRM::BulkSetHWMArg hwmArg; const int BUFLEN = 128; char aBuff[BUFLEN]; strncpy(aBuff, aEntry.c_str(), BUFLEN); aBuff[BUFLEN - 1] = 0; char* pTok = strtok(aBuff, " "); if (!pTok) // ignore the Msg Body { // cout << "HWM Entry : " << aEntry << endl; throw(runtime_error("Bad Body in HWM entry string")); } pTok = strtok(NULL, " "); if (pTok) hwmArg.oid = atoi(pTok); else { // cout << "HWM Entry : " << aEntry << endl; throw(runtime_error("Bad OID in HWM entry string")); } pTok = strtok(NULL, " "); if (pTok) hwmArg.partNum = atoi(pTok); else { // cout << "HWM Entry : " << aEntry << endl; throw(runtime_error("Bad partNum in HWM entry string")); } pTok = strtok(NULL, " "); if (pTok) hwmArg.segNum = atoi(pTok); else { // cout << "HWM Entry : " << aEntry << endl; throw(runtime_error("Bad partNum in HWM entry string")); } pTok = strtok(NULL, " "); if (pTok) hwmArg.hwm = atoi(pTok); else { // cout << "HWM Entry : " << aEntry << endl; throw(runtime_error("Bad partNum in HWM entry string")); } fHWMInfo.push_back(hwmArg); } ++aIt; } if (fRef.getDebugLvl()) cout << "prepareHighWaterMarkInfo() finished" << endl; return true; } //------------------------------------------------------------------------------ // #ROWS: numRowsRead numRowsInserted bool WEBrmUpdater::prepareRowsInsertedInfo(std::string Entry, int64_t& TotRows, int64_t& InsRows) { bool aFound = false; // ROWS: 3 1 if ((!Entry.empty()) && (Entry.at(0) == 'R')) { aFound = true; const int BUFLEN = 128; char aBuff[BUFLEN]; strncpy(aBuff, Entry.c_str(), BUFLEN); aBuff[BUFLEN - 1] = 0; char* pTok = strtok(aBuff, " "); if (!pTok) // ignore the Msg Body { // cout << "ROWS Entry : " << aEntry << endl; throw(runtime_error("Bad Body in entry string")); } pTok = strtok(NULL, " "); if (pTok) TotRows = strtol(pTok, NULL, 10); else { // cout << "HWM Entry : " << aEntry << endl; throw(runtime_error("Bad Tot ROWS entry string")); } pTok = strtok(NULL, " "); if (pTok) InsRows = strtol(pTok, NULL, 10); else { // cout << "HWM Entry : " << aEntry << endl; throw(runtime_error("Bad inserted ROWS in entry string")); } } return aFound; } //------------------------------------------------------------------------------ // #DATA: columnNumber columnType columnName numOutOfRangeValues bool WEBrmUpdater::prepareColumnOutOfRangeInfo(std::string Entry, int& ColNum, execplan::CalpontSystemCatalog::ColDataType& ColType, std::string& ColName, int& OorValues) { bool aFound = false; boost::shared_ptr systemCatalogPtr = execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(); // DATA: 3 1 if ((!Entry.empty()) && (Entry.at(0) == 'D')) { aFound = true; const int BUFLEN = 128; char aBuff[BUFLEN]; strncpy(aBuff, Entry.c_str(), BUFLEN); aBuff[BUFLEN - 1] = 0; char* pTok = strtok(aBuff, " "); if (!pTok) // ignore the Msg Body { // cout << "ROWS Entry : " << aEntry << endl; throw(runtime_error("Bad OOR entry info")); } pTok = strtok(NULL, " "); if (pTok) { ColNum = atoi(pTok); } else { // cout << "HWM Entry : " << aEntry << endl; throw(runtime_error("Bad Oor Column Number entry string")); } pTok = strtok(NULL, " "); if (pTok) { ColType = (execplan::CalpontSystemCatalog::ColDataType)atoi(pTok); } else { // cout << "HWM Entry : " << aEntry << endl; throw(runtime_error("Bad Oor Column Type entry string")); } pTok = strtok(NULL, " "); if (pTok) { uint64_t columnOid = strtol(pTok, NULL, 10); execplan::CalpontSystemCatalog::TableColName colname = systemCatalogPtr->colName(columnOid); ColName = colname.schema + "." + colname.table + "." + colname.column; } else { // cout << "HWM Entry : " << aEntry << endl; throw(runtime_error("Bad Column Name entry string")); } pTok = strtok(NULL, " "); if (pTok) { OorValues = atoi(pTok); } else { // cout << "HWM Entry : " << aEntry << endl; throw(runtime_error("Bad OorValues entry string")); } } return aFound; } //------------------------------------------------------------------------------ // #ERR: error message file bool WEBrmUpdater::prepareErrorFileInfo(std::string Entry, std::string& ErrFileName) { bool aFound = false; if ((!Entry.empty()) && (Entry.at(0) == 'E')) { aFound = true; const int BUFLEN = 128; char aBuff[BUFLEN]; strncpy(aBuff, Entry.c_str(), BUFLEN); aBuff[BUFLEN - 1] = 0; char* pTok = strtok(aBuff, " "); if (!pTok) // ignore the Msg Body { // cout << "ROWS Entry : " << aEntry << endl; throw(runtime_error("Bad ERR File entry info")); } pTok = strtok(NULL, " "); if (pTok) { ErrFileName = pTok; // int aLen = ErrFileName.length(); // if(aLen>0) ErrFileName.insert(aLen-1, 1, 0); } else { // cout << "HWM Entry : " << aEntry << endl; throw(runtime_error("Bad Error Filename entry string")); } } return aFound; } //------------------------------------------------------------------------------ // #BAD: bad data file, with rejected rows bool WEBrmUpdater::prepareBadDataFileInfo(std::string Entry, std::string& BadFileName) { bool aFound = false; if ((!Entry.empty()) && (Entry.at(0) == 'B')) { aFound = true; const int BUFLEN = 128; char aBuff[BUFLEN]; strncpy(aBuff, Entry.c_str(), BUFLEN); aBuff[BUFLEN - 1] = 0; char* pTok = strtok(aBuff, " "); if (!pTok) // ignore the Msg Body { // cout << "ROWS Entry : " << aEntry << endl; throw(runtime_error("Bad BAD Filename entry ")); } pTok = strtok(NULL, " "); if (pTok) { BadFileName = pTok; // int aLen = BadFileName.length(); // if(aLen>0) BadFileName.insert(aLen-1, 1, 0); } else { // cout << "HWM Entry : " << aEntry << endl; throw(runtime_error("Bad BAD Filename entry string")); } } return aFound; } //------------------------------------------------------------------------------ } /* namespace WriteEngine */