/* Copyright (C) 2014 InfiniDB, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ /*********************************************************************** * $Id: sm.h 9254 2013-02-04 19:40:31Z rdempsey $ * ***********************************************************************/ /** @file */ #pragma once #include #include #include #include #include #include #include "calpontsystemcatalog.h" #include "clientrotator.h" #include "rowgroup.h" #include "calpontselectexecutionplan.h" #include "querystats.h" #define IDB_SM_DEBUG 0 #define IDB_SM_PROFILE 0 #define EXPORT #if IDB_SM_PROFILE #include #define GET_PF_TIME(n) \ { \ gettimeofday(&n, NULL); \ } #else #define GET_PF_TIME(n) \ { \ } #endif namespace sm { const int STATUS_OK = 0; const int SQL_NOT_FOUND = -1000; const int SQL_KILLED = -1001; const int CALPONT_INTERNAL_ERROR = -1007; //#if IDB_SM_DEBUG // extern std::ofstream smlog; //#define SMDEBUGLOG smlog //#else #define SMDEBUGLOG \ if (false) \ std::cout //#endif extern const std::string DEFAULT_SAVE_PATH; typedef uint64_t tableid_t; typedef int32_t status_t; enum QueryState { NO_QUERY = 0, QUERY_IN_PROCESS }; typedef struct Column { Column() : tableID(-1) { } ~Column() { } int tableID; int colPos; int dataType; std::vector data; } Column; typedef std::map ResultMap; struct Profiler { struct timeval login; struct timeval beforePlan; struct timeval afterPlan; struct timeval resultArrival; struct timeval resultReady; struct timeval endProcess; long prePlan() { return (beforePlan.tv_sec - login.tv_sec) * 1000 + (beforePlan.tv_usec - login.tv_usec) / 1000; } long buildPlan() { return (afterPlan.tv_sec - beforePlan.tv_sec) * 1000 + (afterPlan.tv_usec - beforePlan.tv_usec) / 1000; } long jobProcess() { return (resultArrival.tv_sec - afterPlan.tv_sec) * 1000 + (resultArrival.tv_usec - afterPlan.tv_usec) / 1000; } long buildResult() { return (resultReady.tv_sec - resultArrival.tv_sec) * 1000 + (resultReady.tv_usec - resultArrival.tv_usec) / 1000; } long tableFetch() { return (endProcess.tv_sec - resultReady.tv_sec) * 1000 + (endProcess.tv_usec - resultReady.tv_usec) / 1000; } }; /** @brief Calpont table scan handle */ struct cpsm_tplsch_t { cpsm_tplsch_t() : tableid(0), rowsreturned(0), rowGroup(0), traceFlags(0), bandID(0), saveFlag(0), bandsReturned(0), ctp(0) { } ~cpsm_tplsch_t() { } tableid_t tableid; uint64_t rowsreturned; std::shared_ptr rowGroup; messageqcpp::ByteStream bs; // rowgroup bytestream. need to stay with the life span of rowgroup uint32_t traceFlags; // @bug 649 int bandID; // the band that being read from the disk int key; // unique key for the table's scan context // @bug 626 uint16_t saveFlag; uint32_t bandsReturned; std::vector ctp; std::string errMsg; rowgroup::RGData rgData; void deserializeTable(messageqcpp::ByteStream& bs) { if (!rowGroup) { rowGroup.reset(new rowgroup::RowGroup()); rowGroup->deserialize(bs); } else { // XXXST: the 'true' is to ease the transition to RGDatas. Take it out when the // transition is done. rgData.deserialize(bs, true); rowGroup->setData(&rgData); // rowGroup->setData(const_cast(bs.buf())); } } uint16_t getStatus() { idbassert(rowGroup != 0); return rowGroup->getStatus(); } uint64_t getRowCount() { if (rowGroup) return rowGroup->getRowCount(); else return 0; } void setErrMsg() { if (rowGroup && getStatus()) { // bs.advance(rowGroup->getDataSize()); bs >> errMsg; } else { errMsg = "NOERROR"; } } }; typedef boost::shared_ptr sp_cpsm_tplsch_t; /** @brief Calpont connection handle structure */ class cpsm_conhdl_t { public: cpsm_conhdl_t(time_t v, const uint32_t sid, bool columnstore_local_query) : value(v) , sessionID(sid) , queryState(NO_QUERY) , exeMgr(new execplan::ClientRotator(sid, "ExeMgr", columnstore_local_query)) , tblinfo_idx(0) , idxinfo_idx(0) , curFetchTb(0) { } /** @brief connnect ExeMgr * * Try connecting to ExeMgr. If no connection, try ExeMgr1, * ExeMgr2... until timeout lapses. Then throw exception. */ void connect(double timeout = 0.005) { exeMgr->connect(timeout); } EXPORT void write(messageqcpp::ByteStream bs); ~cpsm_conhdl_t() { delete exeMgr; } EXPORT const std::string toString() const; time_t value; uint32_t sessionID; short queryState; // 0 -- NO_QUERY; 1 -- QUERY_IN_PROCESS execplan::ClientRotator* exeMgr; ResultMap resultSet; Profiler pf; int tblinfo_idx; int idxinfo_idx; std::string schemaname; std::string tablename; int tboid; short requestType; // 0 -- ID2NAME; 1 -- NAME2ID boost::shared_ptr csc; // @bug 649; @bug 626 std::map tidMap; // tableid-tableStartCount map std::map tidScanMap; std::map keyBandMap; // key-savedBandCount map int curFetchTb; // current fetching table key std::string queryStats; std::string extendedStats; std::string miniStats; private: }; std::ostream& operator<<(std::ostream& output, const cpsm_conhdl_t& rhs); // @bug 626 save table bands to avoid sending plan too many times enum SavingFlag { NO_SAVE = 0, SAVING, SAVED }; /** @brief Calpont table handle */ struct cpsm_tplh_t { cpsm_tplh_t() : tableid(0), rowsintable(0), bandID(0), saveFlag(NO_SAVE), bandsInTable(0) { } tableid_t tableid; int rowsintable; // @bug 649 int bandID; // the band that being read from the disk int key; // unique key for the table's scan context // @bug 626 uint16_t saveFlag; int bandsInTable; }; typedef std::shared_ptr sp_cpsm_tplh_t; struct cpsm_tid_t { cpsm_tid_t() : valid(false), value(0) { } bool valid; int value; }; extern status_t sm_init(uint32_t, cpsm_conhdl_t**, uint32_t columnstore_local_query = false); extern status_t sm_cleanup(cpsm_conhdl_t*); extern status_t tpl_open(tableid_t, sp_cpsm_tplh_t&, cpsm_conhdl_t*); extern status_t tpl_scan_open(tableid_t, sp_cpsm_tplsch_t&, cpsm_conhdl_t*); extern status_t tpl_scan_fetch(sp_cpsm_tplsch_t&, cpsm_conhdl_t*, int* k = 0); extern status_t tpl_scan_close(sp_cpsm_tplsch_t&); extern status_t tpl_close(sp_cpsm_tplh_t&, cpsm_conhdl_t**, querystats::QueryStats& stats, bool ask_4_stats, bool clear_scan_ctx = false); } // namespace sm #undef EXPORT