/* Copyright (C) 2014 InfiniDB, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ /******************************************************************************* * $Id: we_readthread.cpp 4609 2013-04-19 15:32:02Z chao $ * *******************************************************************************/ #include #include "messagequeue.h" #include "bytestream.h" using namespace messageqcpp; #include "threadpool.h" using namespace threadpool; #include "we_dataloader.h" #include "we_readthread.h" #include "we_messages.h" #include "we_message_handlers.h" #include "we_ddlcommandproc.h" #include "we_dmlcommandproc.h" #include "../redistribute/we_redistribute.h" #include "we_config.h" #include "stopwatch.h" using namespace logging; using namespace WriteEngine; // StopWatch timer; namespace WriteEngine { ReadThread::ReadThread(const IOSocket& ios) : fIos(ios) { } ReadThread::~ReadThread() { } void ReadThread::operator()() { // We should never come here } //----------------------------------------------------------------------------- // ctor DmlReadThread::DmlReadThread(const messageqcpp::IOSocket& ios, ByteStream& Ibs) : ReadThread(ios), fWeDMLprocessor(new WE_DMLCommandProc), fWeDDLprocessor(new WE_DDLCommandProc) { fIbs = Ibs; } // dtor DmlReadThread::~DmlReadThread() { // cout << "in DmlReadThread destructor" << endl; } void DmlReadThread::operator()() { // DCH Since fIbs is a class member, there's no reason to make a copy here. // Why waste the CPU? Note that Splitter thread doesn't make a copy. // The only reason I can think of to make such a copy is to guarantee a // strong exception, but that doesn't appear to be in play here. ByteStream ibs = fIbs; ByteStream obs; ByteStream::byte msgId; ByteStream::octbyte uniqueID; ByteStream::quadbyte PMId; ByteStream::byte rc = 0; std::string errMsg; // cout << "DmlReadThread created ..." << endl; // queryStats.blocksChanged for delete/update uint64_t blocksChanged = 0; while (ibs.length() > 0) { try { errMsg.clear(); // do work here... ibs >> msgId; if (msgId != WE_SVR_CLOSE_CONNECTION) ibs >> uniqueID; // cout << "DmlReadThread " << pthread_self () << " received message id " << (uint32_t)msgId << " and // bytestream length " << ibs.length() << endl; switch (msgId) { case WE_SVR_SINGLE_INSERT: { rc = fWeDMLprocessor->processSingleInsert(ibs, errMsg); break; } case WE_SVR_COMMIT_VERSION: { rc = fWeDMLprocessor->commitVersion(ibs, errMsg); break; } case WE_SVR_ROLLBACK_BLOCKS: { rc = fWeDMLprocessor->rollbackBlocks(ibs, errMsg); break; } case WE_SVR_ROLLBACK_VERSION: { rc = fWeDMLprocessor->rollbackVersion(ibs, errMsg); break; } case WE_SVR_COMMIT_BATCH_AUTO_ON: { rc = fWeDMLprocessor->commitBatchAutoOn(ibs, errMsg); break; } case WE_SVR_ROLLBACK_BATCH_AUTO_ON: { rc = fWeDMLprocessor->rollbackBatchAutoOn(ibs, errMsg); break; } case WE_SVR_COMMIT_BATCH_AUTO_OFF: { rc = fWeDMLprocessor->commitBatchAutoOn(ibs, errMsg); break; } case WE_SVR_ROLLBACK_BATCH_AUTO_OFF: { rc = fWeDMLprocessor->rollbackBatchAutoOff(ibs, errMsg); break; } case WE_SVR_BATCH_INSERT: { rc = fWeDMLprocessor->processBatchInsert(ibs, errMsg, PMId); break; } case WE_SVR_BATCH_INSERT_BINARY: { rc = fWeDMLprocessor->processBatchInsertBinary(ibs, errMsg, PMId); break; } case WE_SVR_GET_WRITTEN_LBIDS: { rc = fWeDMLprocessor->getWrittenLbids(ibs, errMsg, PMId); break; } case WE_SVR_BATCH_INSERT_END: { rc = fWeDMLprocessor->processBatchInsertHwm(ibs, errMsg); // timer.finish(); break; } case WE_SVR_UPDATE: { rc = fWeDMLprocessor->processUpdate(ibs, errMsg, PMId, blocksChanged); break; } case WE_SVR_FLUSH_FILES: { rc = fWeDMLprocessor->processFlushFiles(ibs, errMsg); break; } case WE_SVR_DELETE: { rc = fWeDMLprocessor->processDelete(ibs, errMsg, PMId, blocksChanged); break; } case WE_SVR_BATCH_AUTOON_REMOVE_META: { rc = fWeDMLprocessor->processRemoveMeta(ibs, errMsg); break; } case WE_SVR_DML_BULKROLLBACK: { rc = fWeDMLprocessor->processBulkRollback(ibs, errMsg); break; } case WE_SVR_DML_BULKROLLBACK_CLEANUP: { rc = fWeDMLprocessor->processBulkRollbackCleanup(ibs, errMsg); break; } case WE_UPDATE_NEXTVAL: { rc = fWeDMLprocessor->updateSyscolumnNextval(ibs, errMsg); break; } case WE_SVR_WRITE_SYSTABLE: { rc = fWeDDLprocessor->writeSystable(ibs, errMsg); break; } case WE_SVR_WRITE_SYSCOLUMN: { rc = fWeDDLprocessor->writeSyscolumn(ibs, errMsg); break; } case WE_SVR_WRITE_CREATE_SYSCOLUMN: { rc = fWeDDLprocessor->writeCreateSyscolumn(ibs, errMsg); break; } case WE_SVR_WRITE_CREATETABLEFILES: { rc = fWeDDLprocessor->createtablefiles(ibs, errMsg); break; } case WE_SVR_DELETE_SYSCOLUMN: { rc = fWeDDLprocessor->deleteSyscolumn(ibs, errMsg); break; } case WE_SVR_DELETE_SYSCOLUMN_ROW: { rc = fWeDDLprocessor->deleteSyscolumnRow(ibs, errMsg); break; } case WE_SVR_DELETE_SYSTABLE: { rc = fWeDDLprocessor->deleteSystable(ibs, errMsg); break; } case WE_SVR_DELETE_SYSTABLES: { rc = fWeDDLprocessor->deleteSystables(ibs, errMsg); break; } case WE_SVR_WRITE_DROPFILES: { rc = fWeDDLprocessor->dropFiles(ibs, errMsg); break; } case WE_SVR_UPDATE_SYSCOLUMN_AUTO: { rc = fWeDDLprocessor->updateSyscolumnAuto(ibs, errMsg); break; } case WE_SVR_UPDATE_SYSCOLUMN_NEXTVAL: { rc = fWeDDLprocessor->updateSyscolumnNextvalCol(ibs, errMsg); break; } case WE_SVR_UPDATE_SYSCOLUMN_AUTOVAL: { rc = fWeDDLprocessor->updateSyscolumnNextval(ibs, errMsg); break; } case WE_SVR_UPDATE_SYSCOLUMN_DEFAULTVAL: { rc = fWeDDLprocessor->updateSyscolumnSetDefault(ibs, errMsg); break; } case WE_SVR_UPDATE_SYSCOLUMN_TABLENAME: { rc = fWeDDLprocessor->updateSyscolumnTablename(ibs, errMsg); break; } case WE_SVR_UPDATE_SYSCOLUMN_RENAMECOLUMN: { rc = fWeDDLprocessor->updateSyscolumnRenameColumn(ibs, errMsg); break; } case WE_SVR_UPDATE_SYSCOLUMN_COLPOS: { rc = fWeDDLprocessor->updateSyscolumnColumnposCol(ibs, errMsg); break; } case WE_SVR_UPDATE_SYSTABLE_AUTO: { rc = fWeDDLprocessor->updateSystableAuto(ibs, errMsg); break; } case WE_SVR_UPDATE_SYSTABLE_TABLENAME: { rc = fWeDDLprocessor->updateSystableTablename(ibs, errMsg); break; } case WE_SVR_FILL_COLUMN: { rc = fWeDDLprocessor->fillNewColumn(ibs, errMsg); break; } case WE_SVR_DROP_PARTITIONS: { rc = fWeDDLprocessor->dropPartitions(ibs, errMsg); break; } case WE_SVR_WRITE_TRUNCATE: { rc = fWeDDLprocessor->writeTruncateLog(ibs, errMsg); break; } case WE_SVR_WRITE_DROPPARTITION: { rc = fWeDDLprocessor->writeDropPartitionLog(ibs, errMsg); break; } case WE_SVR_WRITE_DROPTABLE: { rc = fWeDDLprocessor->writeDropTableLog(ibs, errMsg); break; } case WE_SVR_DELETE_DDLLOG: { rc = fWeDDLprocessor->deleteDDLLog(ibs, errMsg); break; } case WE_SVR_FETCH_DDL_LOGS: { rc = fWeDDLprocessor->fetchDDLLog(ibs, errMsg); break; } case WE_SVR_PURGEFD: { rc = fWeDMLprocessor->processPurgeFDCache(ibs, errMsg); break; } case WE_END_TRANSACTION: { rc = fWeDMLprocessor->processEndTransaction(ibs, errMsg); break; } case WE_SRV_FIX_ROWS: { rc = fWeDMLprocessor->processFixRows(ibs, errMsg, PMId); break; } case WE_SVR_CLOSE_CONNECTION: { break; } default: break; } } catch (std::exception& ex) { logging::LoggingID logid(19, 0, 0); logging::Message::Args args; logging::Message msg(1); args.add("we_readthread caught exception "); args.add(ex.what()); msg.format(args); logging::Logger logger(logid.fSubsysID); logger.logMessage(logging::LOG_TYPE_ERROR, msg, logid); rc = 1; errMsg = msg.msg(); } catch (...) { logging::LoggingID logid(19, 0, 0); logging::Message::Args args; logging::Message msg(1); args.add("we_readthread caught ... exception "); msg.format(args); logging::Logger logger(logid.fSubsysID); logger.logMessage(logging::LOG_TYPE_ERROR, msg, logid); rc = 1; errMsg = msg.msg(); } if (msgId != WE_SVR_CLOSE_CONNECTION) { // send response obs.restart(); obs << uniqueID; obs << rc; obs << errMsg; } if ((msgId == WE_SVR_COMMIT_BATCH_AUTO_ON) || (msgId == WE_SVR_BATCH_INSERT_END) || (msgId == WE_SVR_FETCH_DDL_LOGS) || (msgId == WE_SVR_GET_WRITTEN_LBIDS)) { obs += ibs; // cout << " sending back hwm info with ibs length " << endl; } else if ((msgId == WE_SVR_BATCH_INSERT) || (msgId == WE_SVR_UPDATE) || (msgId == WE_SVR_DELETE)) { obs << PMId; } else if ((msgId == WE_SVR_DML_BULKROLLBACK) || (msgId == WE_SVR_DML_BULKROLLBACK_CLEANUP)) { obs << Config::getLocalModuleID(); } if (msgId == WE_SVR_UPDATE || msgId == WE_SVR_DELETE) obs << blocksChanged; // send stats back to DMLProc blocksChanged = 0; // reset if (msgId == WE_SVR_CLOSE_CONNECTION) { // cout << "received request. closing connection ..." << endl; break; } else { try { fIos.write(obs); // cout << "dmlthread sent back response for msgid " << (uint32_t)msgId << " with uniqueID:rc= " //<< (uint32_t)uniqueID<<":"<< (uint32_t)rc<<" and error message is " << errMsg < 0) { fWeDataLoader.updateRxBytes(fIbs.length()); // do work here... fIbs >> msgId; // cout << (int)msgId << endl; switch (msgId) { case WE_CLT_SRV_KEEPALIVE: { fWeDataLoader.onReceiveKeepAlive(fIbs); break; } case WE_CLT_SRV_DATA: { fWeDataLoader.onReceiveData(fIbs); break; } case WE_CLT_SRV_EOD: { fWeDataLoader.onReceiveEod(fIbs); break; } case WE_CLT_SRV_MODE: { fWeDataLoader.onReceiveMode(fIbs); break; } case WE_CLT_SRV_IMPFILENAME: { fWeDataLoader.onReceiveImportFileName(fIbs); break; } case WE_CLT_SRV_CMDLINEARGS: { fWeDataLoader.onReceiveCmdLineArgs(fIbs); break; } case WE_CLT_SRV_CMD: { fWeDataLoader.onReceiveCmd(fIbs); // fig out share_ptr on BS& is better break; } case WE_CLT_SRV_ACK: { fWeDataLoader.onReceiveAck(fIbs); break; } case WE_CLT_SRV_NAK: { fWeDataLoader.onReceiveNak(fIbs); break; } case WE_CLT_SRV_PM_ERROR: { fWeDataLoader.onReceiveError(fIbs); break; } case WE_CLT_SRV_STARTCPI: { fWeDataLoader.onReceiveStartCpimport(); break; } case WE_CLT_SRV_BRMRPT: { fWeDataLoader.onReceiveBrmRptFileName(fIbs); break; } case WE_CLT_SRV_CLEANUP: { fWeDataLoader.onReceiveCleanup(fIbs); break; } case WE_CLT_SRV_ROLLBACK: { fWeDataLoader.onReceiveRollback(fIbs); break; } case WE_CLT_SRV_JOBID: { fWeDataLoader.onReceiveJobId(fIbs); break; } case WE_CLT_SRV_JOBDATA: { fWeDataLoader.onReceiveJobData(fIbs); break; } case WE_CLT_SRV_ERRLOG: { fWeDataLoader.onReceiveErrFileRqst(fIbs); break; } case WE_CLT_SRV_BADLOG: { fWeDataLoader.onReceiveBadFileRqst(fIbs); break; } default: break; } fIbs.restart(); try { // get next message fIbs = fIos.read(); } catch (...) { fIbs.restart(); // setting length=0, get out of loop std::cout << "Broken Pipe" << std::endl; logging::LoggingID logid(19, 0, 0); logging::Message::Args args; logging::Message msg(1); args.add("SplitterReadThread::operator: Broken Pipe "); msg.format(args); logging::Logger logger(logid.fSubsysID); logger.logMessage(logging::LOG_TYPE_ERROR, msg, logid); } } fIos.close(); } //------------------------------------------------------------------------------ // ClearTableLockReadThread constructor. //------------------------------------------------------------------------------ ClearTableLockReadThread::ClearTableLockReadThread(const messageqcpp::IOSocket& ios, ByteStream& Ibs) : ReadThread(ios), fWeClearTableLockCmd(new WE_ClearTableLockCmd("ClearTableLockTool")) { fIbs = Ibs; } //------------------------------------------------------------------------------ // ClearTableLockReadThread destructor. //------------------------------------------------------------------------------ ClearTableLockReadThread::~ClearTableLockReadThread() { } //------------------------------------------------------------------------------ // Thread entry point to ClearTableLockReadThread object used to receive msgs // from a cleartablelock tool client. //------------------------------------------------------------------------------ void ClearTableLockReadThread::operator()() { ByteStream::byte msgId; ByteStream obs; ByteStream::byte rc = 0; std::string errMsg; // Read msgid from ByteStream and forward to applicable processing function while (fIbs.length() > 0) { fIbs >> msgId; switch (msgId) { case WE_CLT_SRV_CLEAR_TABLE_LOCK: { rc = fWeClearTableLockCmd->processRollback(fIbs, errMsg); break; } case WE_CLT_SRV_CLEAR_TABLE_LOCK_CLEANUP: { rc = fWeClearTableLockCmd->processCleanup(fIbs, errMsg); break; } default: { break; } } // Send response obs.restart(); obs << rc; obs << errMsg; try { fIos.write(obs); // Get next message fIbs = fIos.read(); } catch (...) { logging::LoggingID logid(19, 0, 0); logging::Message::Args args; logging::Message msg(1); args.add("ClearTableLockReadThread::operator: Broken Pipe "); msg.format(args); logging::Logger logger(logid.fSubsysID); logger.logMessage(logging::LOG_TYPE_ERROR, msg, logid); break; } } fIos.close(); } //------------------------------------------------------------------------------ // RedistributeReadThread constructor. //------------------------------------------------------------------------------ RedistributeReadThread::RedistributeReadThread(const messageqcpp::IOSocket& ios, ByteStream& Ibs) : ReadThread(ios) { fIbs = Ibs; } //------------------------------------------------------------------------------ // RedistributeReadThread destructor. //------------------------------------------------------------------------------ RedistributeReadThread::~RedistributeReadThread() { } //------------------------------------------------------------------------------ // Thread entry point to RedistributeReadThread object used to receive msgs // from a cleartablelock tool client. //------------------------------------------------------------------------------ void RedistributeReadThread::operator()() { try { redistribute::Redistribute::handleRedistributeMessage(fIbs, fIos); } catch (...) { logging::LoggingID logid(19, 0, 0); logging::Message::Args args; logging::Message msg(1); args.add("RedistributeReadThread::operator exception handled "); msg.format(args); logging::Logger logger(logid.fSubsysID); logger.logMessage(logging::LOG_TYPE_ERROR, msg, logid); } fIos.close(); } //------------------------------------------------------------------------------ // GetFileSizeThread constructor. //------------------------------------------------------------------------------ GetFileSizeThread::GetFileSizeThread(const messageqcpp::IOSocket& ios, ByteStream& Ibs, BRM::DBRM& dbrm) : ReadThread(ios), fWeGetFileSizes(new WE_GetFileSizes()) { fIbs = Ibs; key = dbrm.getUnique32(); } //------------------------------------------------------------------------------ // GetFileSizeThread destructor. //------------------------------------------------------------------------------ GetFileSizeThread::~GetFileSizeThread() { } //------------------------------------------------------------------------------ // Thread entry point to GetFileSizeThread object used to receive msgs //------------------------------------------------------------------------------ void GetFileSizeThread::operator()() { ByteStream::byte msgId; ByteStream obs; ByteStream::byte rc = 0; std::string errMsg; // Read msgid from ByteStream and forward to applicable processing function while (fIbs.length() > 0) { fIbs >> msgId; switch (msgId) { case WE_SVR_GET_FILESIZES: { rc = fWeGetFileSizes->processTable(fIbs, errMsg, key); break; } case WE_SVR_GET_FILESIZE: { rc = fWeGetFileSizes->processFileName(fIbs, errMsg, key); break; } default: { break; } } // Send response obs.restart(); obs << rc; obs << errMsg; obs += fIbs; try { fIos.write(obs); // Get next message fIbs = fIos.read(); } catch (...) { logging::LoggingID logid(19, 0, 0); logging::Message::Args args; logging::Message msg(1); args.add("GetFileSizeThread::operator: Broken Pipe "); msg.format(args); logging::Logger logger(logid.fSubsysID); logger.logMessage(logging::LOG_TYPE_ERROR, msg, logid); break; } } fIos.close(); } //----------------------------------------------------------------------------- void ReadThreadFactory::CreateReadThread(ThreadPool& Tp, IOSocket& Ios, BRM::DBRM& dbrm) { struct timespec rm_ts; int sleepTime = 20000; // wait for 20 seconds rm_ts.tv_sec = sleepTime / 1000; rm_ts.tv_nsec = sleepTime % 1000 * 1000000; bool isTimeOut = false; ByteStream::byte msgId; ByteStream aBs; try { aBs = Ios.read(&rm_ts, &isTimeOut); } catch (std::exception& ex) { std::cout << "Handled : " << ex.what() << std::endl; logging::LoggingID logid(19, 0, 0); logging::Message::Args args; logging::Message msg(1); args.add("ReadThreadFactory::CreateReadThread: read() error"); args.add(ex.what()); msg.format(args); logging::Logger logger(logid.fSubsysID); logger.logMessage(logging::LOG_TYPE_ERROR, msg, logid); } if ((aBs.length() <= 0) || (isTimeOut)) { Ios.close(); return; } aBs.peek(msgId); switch (msgId) { case WE_SVR_DDL_KEEPALIVE: case WE_SVR_DML_KEEPALIVE: { DmlReadThread dmlReadThread(Ios, aBs); boost::thread t(dmlReadThread); // cout << "starting DML thread id " << t.get_id() << endl; } break; case WE_CLT_SRV_KEEPALIVE: case WE_CLT_SRV_MODE: case WE_CLT_SRV_DATA: case WE_CLT_SRV_CMD: case WE_CLT_SRV_ACK: case WE_CLT_SRV_NAK: case WE_CLT_SRV_PM_ERROR: case WE_CLT_SRV_CMDLINEARGS: { // SplitterReadThread aSpReadThread(Ios, aBs); // fOwner.attach(reinterpret_cast(&(aSpReadThread.fWeDataLoader))); // Tp.invoke(aSpReadThread); Tp.invoke(SplitterReadThread(Ios, aBs)); } break; case WE_CLT_SRV_CLEAR_TABLE_LOCK: case WE_CLT_SRV_CLEAR_TABLE_LOCK_CLEANUP: { ClearTableLockReadThread clearTableLockThread(Ios, aBs); Tp.invoke(clearTableLockThread); } break; case WE_SVR_REDISTRIBUTE: { RedistributeReadThread RedistributeReadThread(Ios, aBs); Tp.invoke(RedistributeReadThread); } break; case WE_SVR_GET_FILESIZES: case WE_SVR_GET_FILESIZE: { GetFileSizeThread getFileSizeThread(Ios, aBs, dbrm); Tp.invoke(getFileSizeThread); } break; default: { Ios.close(); // don't know who is this } break; } } } // namespace WriteEngine