/* Copyright (C) 2014 InfiniDB, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ /*********************************************************************** * $Id: sm.cpp 9254 2013-02-04 19:40:31Z rdempsey $ * ***********************************************************************/ #define PREFER_MY_CONFIG_H #include #include #include #include #include #include #include #include #include #include #include #include #include #if __FreeBSD__ typedef sig_t sighandler_t; #endif using namespace std; #include #include "calpontsystemcatalog.h" using namespace execplan; #include "bytestream.h" using namespace messageqcpp; #include "errorcodes.h" using namespace logging; #include "querystats.h" using namespace querystats; #include "sm.h" #include "installdir.h" namespace { using namespace sm; // @bug 159 fix. clean up routine when error happened void cleanup(cpsm_conhdl_t* hndl) { // remove system catalog instance for this statement. CalpontSystemCatalog::removeCalpontSystemCatalog(hndl->sessionID); hndl->queryState = NO_QUERY; hndl->resultSet.erase(hndl->resultSet.begin(), hndl->resultSet.end()); } status_t tpl_scan_fetch_getband(cpsm_conhdl_t* hndl, sp_cpsm_tplsch_t& ntplsch, int* killed) { // @bug 649 check keybandmap first map::iterator keyBandMapIter = hndl->keyBandMap.find(ntplsch->key); try { if (keyBandMapIter != hndl->keyBandMap.end()) { ByteStream bs; ostringstream oss; oss << DEFAULT_SAVE_PATH << '/' << hndl->sessionID << '/' << ntplsch->key << '_' << ntplsch->bandID << ".band"; ifstream bandFile(oss.str().c_str(), ios::in); bandFile >> bs; unlink(oss.str().c_str()); // not possible for vtable ntplsch->deserializeTable(bs); ntplsch->bandID++; // end of result set if (ntplsch->bandID == keyBandMapIter->second) { hndl->keyBandMap.erase(keyBandMapIter); return SQL_NOT_FOUND; } } else { ByteStream bs; // @bug 626. check saveFlag. If SAVING, read band from socket and save to disk if (ntplsch->saveFlag == SAVING) { ByteStream bs; // @bug 2244. Bypass ClientRotator::read() because if I/O error occurs, it tries // to reestablish a connection with ExeMgr which ends up causing mysql // session to hang. bs = hndl->exeMgr->read(); ostringstream oss; oss << DEFAULT_SAVE_PATH << '/' << hndl->sessionID << '/' << ntplsch->tableid << '_' << ntplsch->bandsReturned << ".band"; ofstream saveFile(oss.str().c_str(), ios::out); saveFile << bs; saveFile.close(); ntplsch->bandsReturned++; // not possible for vtable ntplsch->deserializeTable(bs); } // if SAVED, read from saved file. not possible for vtable else if (ntplsch->saveFlag == SAVED) { ostringstream oss; oss << DEFAULT_SAVE_PATH << '/' << hndl->sessionID << '/' << ntplsch->tableid << '_' << ntplsch->bandsReturned << ".band"; ifstream saveFile(oss.str().c_str(), ios::in); saveFile >> bs; saveFile.close(); ntplsch->bandsReturned++; ntplsch->deserializeTable(bs); } // most normal path. also the path for vtable else { ntplsch->bs.restart(); // @bug 2244. Bypass ClientRotator::read() because if I/O error occurs, it tries // to reestablish a connection with ExeMgr which ends up causing mysql // session to hang. // @bug 3386. need to abort the query when user does ctrl+c timespec t; t.tv_sec = 5L; t.tv_nsec = 0L; if (killed && *killed) return SQL_KILLED; ntplsch->bs = hndl->exeMgr->read(); if (ntplsch->bs.length() != 0) { ntplsch->deserializeTable(ntplsch->bs); if (ntplsch->rowGroup && ntplsch->rowGroup->getRGData() == NULL) { ntplsch->bs.restart(); // @bug 2244. Bypass ClientRotator::read() because if I/O error occurs, it tries // to reestablish a connection with ExeMgr which ends up causing mysql // session to hang. bool timeout = true; while (timeout) { timeout = false; ntplsch->bs = hndl->exeMgr->getClient()->read(&t, &timeout); if (killed && *killed) return SQL_KILLED; } if (ntplsch->bs.length() == 0) { hndl->curFetchTb = 0; return logging::ERR_LOST_CONN_EXEMGR; } ntplsch->deserializeTable(ntplsch->bs); } uint16_t error = ntplsch->getStatus(); if (0 != error) { ntplsch->setErrMsg(); return error; } } else // @todo error handling { hndl->curFetchTb = 0; if (ntplsch->saveFlag == NO_SAVE) hndl->tidScanMap[ntplsch->tableid] = ntplsch; ntplsch->errMsg = IDBErrorInfo::instance()->errorMsg(ERR_LOST_CONN_EXEMGR); return logging::ERR_LOST_CONN_EXEMGR; } } // All done with this table. reset curFetchTb when finish SOCKET reading if (ntplsch->getRowCount() == 0) { hndl->curFetchTb = 0; if (ntplsch->saveFlag == NO_SAVE) hndl->tidScanMap[ntplsch->tableid] = ntplsch; return SQL_NOT_FOUND; } } } catch (std::exception& e) { hndl->curFetchTb = 0; if (ntplsch->saveFlag == NO_SAVE) hndl->tidScanMap[ntplsch->tableid] = ntplsch; ntplsch->errMsg = e.what(); return logging::ERR_LOST_CONN_EXEMGR; } ntplsch->rowsreturned = 0; return STATUS_OK; } void end_query(cpsm_conhdl_t* hndl) { // remove system catalog instance for this statement. // @bug 695. turn on system catalog session cache for FE // CalpontSystemCatalog::removeCalpontSystemCatalog(hndl->sessionID); hndl->queryState = NO_QUERY; // reset at the end of query hndl->curFetchTb = 0; // @bug 626 clear up hndl->tidMap.clear(); hndl->tidScanMap.clear(); hndl->keyBandMap.clear(); // Tell ExeMgr we are done with this query try { ByteStream bs; ByteStream::quadbyte qb = 0; bs << qb; hndl->write(bs); } catch (...) { throw; } } // @bug 1054, 863 - SIGPIPE handler bool sigFlag = false; void sighandler(int sig_num) { FILE* p; char buf[1024]; string tmpDir = startup::StartUp::tmpDir() + "/f1.dat"; const char* cstr = tmpDir.c_str(); if ((p = fopen(cstr, "a")) != NULL) { snprintf(buf, 1024, "sighandler() hit with %d\n", sig_num); fwrite(buf, 1, strlen(buf), p); fclose(p); } sigFlag = true; throw runtime_error("zerror"); } } // namespace namespace sm { const std::string DEFAULT_SAVE_PATH = "/var/tmp"; status_t tpl_open(tableid_t tableid, sp_cpsm_tplh_t& ntplh, cpsm_conhdl_t* conn_hdl) { SMDEBUGLOG << "tpl_open: ntplh: " << ntplh << " conn_hdl: " << conn_hdl << " tableid: " << tableid << endl; // if first time enter this function for a statement, set // queryState to QUERY_IN_PROCESS and get execution plan. if (conn_hdl->queryState == NO_QUERY) { conn_hdl->queryState = QUERY_IN_PROCESS; } try { // @bug 626. check saveFlag, if SAVED, do not project if (ntplh->saveFlag != SAVED) { // Request ExeMgr to start projecting table tableid CalpontSystemCatalog::OID tableOID = static_cast(tableid); ByteStream::quadbyte qb = static_cast(tableOID); ByteStream bs; bs << qb; conn_hdl->write(bs); } } catch (std::exception& ex) { SMDEBUGLOG << "Exception caught in tpl_open: " << ex.what() << endl; cleanup(conn_hdl); return CALPONT_INTERNAL_ERROR; } ntplh->tableid = tableid; return STATUS_OK; } status_t tpl_scan_open(tableid_t tableid, sp_cpsm_tplsch_t& ntplsch, cpsm_conhdl_t* conn_hdl) { #if IDB_SM_DEBUG SMDEBUGLOG << "tpl_scan_open: " << conn_hdl << " tableid: " << tableid << endl; #endif // @bug 649. No initialization here. take passed in reference ntplsch->tableid = tableid; ntplsch->rowsreturned = 0; return STATUS_OK; } status_t tpl_scan_fetch(sp_cpsm_tplsch_t& ntplsch, cpsm_conhdl_t* conn_hdl, int* killed) { // @770. force end of result set when this is not the first table to be fetched. if (ntplsch->traceFlags & CalpontSelectExecutionPlan::TRACE_NO_ROWS2) if (conn_hdl->tidScanMap.size() >= 1) return SQL_NOT_FOUND; // need another band status_t status = STATUS_OK; if (ntplsch->rowsreturned == ntplsch->getRowCount()) status = tpl_scan_fetch_getband(conn_hdl, ntplsch, killed); return status; } status_t tpl_scan_close(sp_cpsm_tplsch_t& ntplsch) { #if IDB_SM_DEBUG SMDEBUGLOG << "tpl_scan_close: "; if (ntplsch) SMDEBUGLOG << "tpl_scan_close: ntplsch " << ntplsch; SMDEBUGLOG << "tpl_scan_close: tableid: " << ntplsch->tableid << endl; #endif ntplsch.reset(); return STATUS_OK; } status_t tpl_close(sp_cpsm_tplh_t& ntplh, cpsm_conhdl_t** conn_hdl, QueryStats& stats, bool ask_4_stats, bool clear_scan_ctx) { cpsm_conhdl_t* hndl = *conn_hdl; SMDEBUGLOG << "tpl_close: hndl" << hndl << " ntplh " << ntplh; if (ntplh) SMDEBUGLOG << " tableid: " << ntplh->tableid; SMDEBUGLOG << endl; ntplh.reset(); // determine end of result set and end of statement execution if (hndl->queryState == QUERY_IN_PROCESS) { // Get the query stats ByteStream bs; // Ask for a stats only if a user explicitly asks if (ask_4_stats) { ByteStream::quadbyte qb = 3; bs << qb; hndl->write(bs); } // MCOL-1601 Dispose of unused empty RowGroup if (clear_scan_ctx) { SMDEBUGLOG << "tpl_close() clear_scan_ctx read" << std::endl; bs = hndl->exeMgr->read(); } SMDEBUGLOG << "tpl_close hndl->exeMgr: " << hndl->exeMgr << endl; // keep reading until we get a string // Ask for a stats only if a user explicitly asks if (ask_4_stats) { for (int tries = 0; tries < 10; tries++) { bs = hndl->exeMgr->read(); if (bs.length() == 0) break; try { bs >> hndl->queryStats; bs >> hndl->extendedStats; bs >> hndl->miniStats; stats.unserialize(bs); stats.setEndTime(); stats.insert(); break; } catch (IDBExcept&) { // @bug4732 end_query(hndl); throw; } catch (...) { // querystats messed up. close connection. // no need to throw for querystats protocol error, like for tablemode. SMDEBUGLOG << "tpl_close() exception whilst getting stats" << endl; end_query(hndl); sm_cleanup(hndl); *conn_hdl = 0; return STATUS_OK; // throw runtime_error(string("tbl_close catch exception: ") + e.what()); } } } // stats end_query(hndl); } return STATUS_OK; } status_t sm_init(uint32_t sid, cpsm_conhdl_t** conn_hdl, uint32_t columnstore_local_query) { // clear file content #if IDB_SM_DEBUG // smlog.close(); // smlog.open("/tmp/sm.log"); SMDEBUGLOG << "sm_init: " << endl; #endif // @bug5660 Connection changes related to the local pm setting /** * when local PM is detected, or columnstore_local_query is set: * 1. SELECT query connect to local ExeMgr 127.0.0.1:8601; * 2. DML/DDL is disallowed. * once local connection is determined, no need to check * again because it will not switch back. **/ if (*conn_hdl) { // existing connection is local, ok. if ((*conn_hdl)->exeMgr->localQuery() || !columnstore_local_query) return STATUS_OK; // if session variable changes to local, re-establish the connection to loopback. else sm_cleanup(*conn_hdl); } cpsm_conhdl_t* hndl = new cpsm_conhdl_t(time(0), sid, columnstore_local_query); *conn_hdl = hndl; hndl->sessionID = sid; // profiling statistics GET_PF_TIME(hndl->pf.login); return STATUS_OK; } status_t sm_cleanup(cpsm_conhdl_t* conn_hdl) { #if IDB_SM_DEBUG SMDEBUGLOG << "sm_cleanup: " << conn_hdl << endl; #endif delete conn_hdl; return STATUS_OK; } void cpsm_conhdl_t::write(ByteStream bs) { sighandler_t old_handler = signal(SIGPIPE, sighandler); sigFlag = false; exeMgr->write(bs); signal(SIGPIPE, old_handler); if (sigFlag) throw runtime_error("Broken Pipe Error"); } }