/* Copyright (C) 2014 InfiniDB, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ /******************************************************************************* * $Id$ * *******************************************************************************/ /* * we_filereadthread.cpp * * Created on: Oct 25, 2011 * Author: bpaul */ #include "we_messages.h" #include "we_sdhandler.h" #include #include #include using namespace boost; #include "messagequeue.h" #include "bytestream.h" using namespace messageqcpp; #include #include #include #ifdef _MSC_VER #include #endif using namespace std; #include "we_filereadthread.h" namespace WriteEngine { void WEReadThreadRunner::operator ()() { try { fRef.feedData(); } catch(std::exception& ex) { throw runtime_error(ex.what()); } } //------------------------------------------------------------------------------ // //------------------------------------------------------------------------------ WEFileReadThread::WEFileReadThread(WESDHandler& aSdh):fSdh(aSdh), fpThread(0), fFileMutex(), fContinue(true), fInFileName(), fInFile(std::cin.rdbuf()), //@BUG 4326 fTgtPmId(0), fBatchQty(0), fEnclEsc(false), fEncl('\0'), fEsc('\\'), fDelim('|') { //TODO batch qty to get from config fBatchQty = 10000; } //WEFileReadThread::WEFileReadThread(const WEFileReadThread& rhs):fSdh(rhs.fSdh) //{ // // TODO copy constructor //} WEFileReadThread::~WEFileReadThread() { //if(fInFile.is_open()) fInFile.close(); //@BUG 4326 if(fIfFile.is_open()) fIfFile.close(); setTgtPmId(0); if (fpThread) { delete fpThread; } fpThread=0; //cout << "WEFileReadThread destructor called" << endl; } //------------------------------------------------------------------------------ void WEFileReadThread::reset() { //if(fInFile.is_open()) fInFile.close(); //@BUG 4326 if(fIfFile.is_open()) fIfFile.close(); setTgtPmId(0); if (fpThread) { delete fpThread; } fpThread=0; //cout << "WEFileReadThread destructor called" << endl; this->setContinue(true); } //------------------------------------------------------------------------------ void WEFileReadThread::setup(std::string FileName) { if(fSdh.getDebugLvl()) cout << "WEFileReadThread::setup : *** Input files = " << FileName <2) cout << "Next InFileName = " << aStrName << endl; setInFileName(aStrName); //setInFileName(FileName); openInFile(); //set the target PM fpThread = new boost::thread(WEReadThreadRunner(*this)); } catch(std::exception& ex) { //cout << ex.what() << endl; //throw runtime_error("Exception occured in WEFileReadThread\n"); throw runtime_error(ex.what()); } if(fpThread) { // Need to send a all clear?? } } //------------------------------------------------------------------------------ bool WEFileReadThread::chkForListOfFiles(std::string& FileName) { //cout << "Inside chkForListOfFiles("<< FileName << ")" << endl; std::string aFileName = FileName; istringstream iss(aFileName); size_t start = 0, end = 0; const char* sep = " ,|"; end = aFileName.find_first_of(sep); do { if(end != string::npos) { std::string aFile = aFileName.substr(start, end-start); if(fSdh.getDebugLvl()>2) cout << "File: " << aFileName.substr(start, end-start) << endl; start = end + 1; fInfileList.push_back(aFile); } else { std::string aFile = aFileName.substr(start, end-start); if(fSdh.getDebugLvl()>1) cout << "Next Input File " << aFileName.substr(start, end-start) << endl; fInfileList.push_back(aFile); break; } end = aFileName.find_first_of(sep, start); } while(start != end); //cout << "Going out chkForListOfFiles("<< FileName << ")" << endl; return false; } //------------------------------------------------------------------------------ std::string WEFileReadThread::getNextInputDataFile() { std::string aNextFile; if(fInfileList.size()>0) { aNextFile = fInfileList.front(); fInfileList.pop_front(); } //cout << "Next Input DataFile = " << aNextFile << endl; return aNextFile; } //------------------------------------------------------------------------------ void WEFileReadThread::add2InputDataFileList(std::string& FileName) { fInfileList.push_front(FileName); } //------------------------------------------------------------------------------ void WEFileReadThread::shutdown() { this->setContinue(false); mutex::scoped_lock aLock(fFileMutex); //wait till readDataFile() finish //if(fInFile.is_open()) fInFile.close(); //@BUG 4326 if(fIfFile.is_open()) fIfFile.close(); } //------------------------------------------------------------------------------ void WEFileReadThread::feedData() { unsigned int aRowCnt = 0; const unsigned int c10mSec = 10000; while(isContinue()) { unsigned int TgtPmId = getTgtPmId(); if(TgtPmId == 0) { setTgtPmId(fSdh.getNextPm2Feed()); TgtPmId = getTgtPmId(); } if((TgtPmId>0)&&(fInFile.good())) { try { messageqcpp::SBS aSbs(new messageqcpp::ByteStream); if (fSdh.getImportDataMode() == IMPORT_DATA_TEXT) aRowCnt = readDataFile(aSbs); else aRowCnt = readBinaryDataFile(aSbs, fSdh.getTableRecLen() ); //cout << "Length " << aSbs->length() <0)) { fBuff[aLen-1] = '\n'; fBuff[aLen]=0; if(fSdh.getDebugLvl()>3) cout << "Data Read " << fBuff <(fBuff), aLen); aIdx++; if(fSdh.getDebugLvl()>2) cout << "File data line = " << aIdx <=sizeof(fBuff)-2) //Didn't hit delim; BIG ROW { cout <<"Bad Row data " << endl; cout << fBuff << endl; throw runtime_error("Data Row too BIG to handle!!"); } //for debug //if(fSdh.getDebugLvl()>3) cout << aIdx << endl; }// while return aIdx; }// if return 0; } //------------------------------------------------------------------------------ // Read input data as binary data //------------------------------------------------------------------------------ unsigned int WEFileReadThread::readBinaryDataFile(messageqcpp::SBS& Sbs, unsigned int recLen) { mutex::scoped_lock aLock(fFileMutex); if((fInFile.good()) && (!fInFile.eof())) { unsigned int aIdx=0; unsigned int aLen=0; *Sbs << (ByteStream::byte)(WE_CLT_SRV_DATA); while( (!fInFile.eof()) && (aIdx 0) { (*Sbs).append(reinterpret_cast(fBuff), aLen); aIdx++; if(fSdh.getDebugLvl()>2) cout << "Binary input data line = " << aIdx << endl; if (aLen != recLen) { cout << "Binary input data does not end on record boundary;" " Last record is " << aLen << " bytes long." << " Expected record length is: " << recLen << endl; } } } // while return aIdx; } // if return 0; } //------------------------------------------------------------------------------ void WEFileReadThread::openInFile() { try { if(fSdh.getDebugLvl()) cout << "Input FileName: " << fInFileName <