You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-10-31 18:30:33 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			501 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			501 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /* Copyright (C) 2014 InfiniDB, Inc.
 | |
| 
 | |
|    This program is free software; you can redistribute it and/or
 | |
|    modify it under the terms of the GNU General Public License
 | |
|    as published by the Free Software Foundation; version 2 of
 | |
|    the License.
 | |
| 
 | |
|    This program is distributed in the hope that it will be useful,
 | |
|    but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
|    GNU General Public License for more details.
 | |
| 
 | |
|    You should have received a copy of the GNU General Public License
 | |
|    along with this program; if not, write to the Free Software
 | |
|    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 | |
|    MA 02110-1301, USA. */
 | |
| 
 | |
| /***********************************************************************
 | |
|  *   $Id: sm.cpp 9254 2013-02-04 19:40:31Z rdempsey $
 | |
|  *
 | |
|  ***********************************************************************/
 | |
| 
 | |
| #define PREFER_MY_CONFIG_H
 | |
| #include <mariadb.h>
 | |
| #include <mysql.h>
 | |
| #include <my_sys.h>
 | |
| #include <errmsg.h>
 | |
| #include <sql_common.h>
 | |
| #include <unistd.h>
 | |
| #include <iostream>
 | |
| #include <stdexcept>
 | |
| #include <cstring>
 | |
| #include <sstream>
 | |
| #include <cstdlib>
 | |
| #include <signal.h>
 | |
| #include <cstdio>
 | |
| #if __FreeBSD__
 | |
| typedef sig_t sighandler_t;
 | |
| #endif
 | |
| using namespace std;
 | |
| 
 | |
| #include <boost/thread.hpp>
 | |
| 
 | |
| #include "calpontsystemcatalog.h"
 | |
| using namespace execplan;
 | |
| 
 | |
| #include "bytestream.h"
 | |
| using namespace messageqcpp;
 | |
| 
 | |
| #include "errorcodes.h"
 | |
| using namespace logging;
 | |
| 
 | |
| #include "querystats.h"
 | |
| using namespace querystats;
 | |
| 
 | |
| #include "sm.h"
 | |
| 
 | |
| #include "installdir.h"
 | |
| 
 | |
| namespace
 | |
| {
 | |
| using namespace sm;
 | |
| 
 | |
| // @bug 159 fix. clean up routine when error happened
 | |
| void cleanup(cpsm_conhdl_t* hndl)
 | |
| {
 | |
|   // remove system catalog instance for this statement.
 | |
|   CalpontSystemCatalog::removeCalpontSystemCatalog(hndl->sessionID);
 | |
|   hndl->queryState = NO_QUERY;
 | |
|   hndl->resultSet.erase(hndl->resultSet.begin(), hndl->resultSet.end());
 | |
| }
 | |
| 
 | |
| status_t tpl_scan_fetch_getband(cpsm_conhdl_t* hndl, sp_cpsm_tplsch_t& ntplsch, int* killed)
 | |
| {
 | |
|   // @bug 649 check keybandmap first
 | |
|   map<int, int>::iterator keyBandMapIter = hndl->keyBandMap.find(ntplsch->key);
 | |
| 
 | |
|   try
 | |
|   {
 | |
|     if (keyBandMapIter != hndl->keyBandMap.end())
 | |
|     {
 | |
|       ByteStream bs;
 | |
|       ostringstream oss;
 | |
|       oss << DEFAULT_SAVE_PATH << '/' << hndl->sessionID << '/' << ntplsch->key << '_' << ntplsch->bandID
 | |
|           << ".band";
 | |
|       ifstream bandFile(oss.str().c_str(), ios::in);
 | |
|       bandFile >> bs;
 | |
|       unlink(oss.str().c_str());
 | |
| 
 | |
|       // not possible for vtable
 | |
|       ntplsch->deserializeTable(bs);
 | |
|       ntplsch->bandID++;
 | |
| 
 | |
|       // end of result set
 | |
|       if (ntplsch->bandID == keyBandMapIter->second)
 | |
|       {
 | |
|         hndl->keyBandMap.erase(keyBandMapIter);
 | |
|         return SQL_NOT_FOUND;
 | |
|       }
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|       ByteStream bs;
 | |
| 
 | |
|       // @bug 626. check saveFlag. If SAVING, read band from socket and save to disk
 | |
|       if (ntplsch->saveFlag == SAVING)
 | |
|       {
 | |
|         ByteStream bs;
 | |
|         // @bug 2244. Bypass ClientRotator::read() because if I/O error occurs, it tries
 | |
|         //			to reestablish a connection with ExeMgr which ends up causing mysql
 | |
|         //			session to hang.
 | |
|         bs = hndl->exeMgr->read();
 | |
|         ostringstream oss;
 | |
|         oss << DEFAULT_SAVE_PATH << '/' << hndl->sessionID << '/' << ntplsch->tableid << '_'
 | |
|             << ntplsch->bandsReturned << ".band";
 | |
|         ofstream saveFile(oss.str().c_str(), ios::out);
 | |
|         saveFile << bs;
 | |
|         saveFile.close();
 | |
|         ntplsch->bandsReturned++;
 | |
|         // not possible for vtable
 | |
|         ntplsch->deserializeTable(bs);
 | |
|       }
 | |
|       // if SAVED, read from saved file. not possible for vtable
 | |
|       else if (ntplsch->saveFlag == SAVED)
 | |
|       {
 | |
|         ostringstream oss;
 | |
|         oss << DEFAULT_SAVE_PATH << '/' << hndl->sessionID << '/' << ntplsch->tableid << '_'
 | |
|             << ntplsch->bandsReturned << ".band";
 | |
|         ifstream saveFile(oss.str().c_str(), ios::in);
 | |
|         saveFile >> bs;
 | |
|         saveFile.close();
 | |
|         ntplsch->bandsReturned++;
 | |
|         ntplsch->deserializeTable(bs);
 | |
|       }
 | |
|       // most normal path. also the path for vtable
 | |
|       else
 | |
|       {
 | |
|         ntplsch->bs.restart();
 | |
|         // @bug 2244. Bypass ClientRotator::read() because if I/O error occurs, it tries
 | |
|         //			to reestablish a connection with ExeMgr which ends up causing mysql
 | |
|         //			session to hang.
 | |
|         // @bug 3386. need to abort the query when user does ctrl+c
 | |
|         timespec t;
 | |
|         t.tv_sec = 5L;
 | |
|         t.tv_nsec = 0L;
 | |
| 
 | |
|         if (killed && *killed)
 | |
|           return SQL_KILLED;
 | |
| 
 | |
|         ntplsch->bs = hndl->exeMgr->read();
 | |
| 
 | |
|         if (ntplsch->bs.length() != 0)
 | |
|         {
 | |
|           ntplsch->deserializeTable(ntplsch->bs);
 | |
| 
 | |
|           if (ntplsch->rowGroup && ntplsch->rowGroup->getRGData() == NULL)
 | |
|           {
 | |
|             ntplsch->bs.restart();
 | |
|             // @bug 2244. Bypass ClientRotator::read() because if I/O error occurs, it tries
 | |
|             //			to reestablish a connection with ExeMgr which ends up causing mysql
 | |
|             //			session to hang.
 | |
|             bool timeout = true;
 | |
| 
 | |
|             while (timeout)
 | |
|             {
 | |
|               timeout = false;
 | |
|               ntplsch->bs = hndl->exeMgr->getClient()->read(&t, &timeout);
 | |
| 
 | |
|               if (killed && *killed)
 | |
|                 return SQL_KILLED;
 | |
|             }
 | |
| 
 | |
|             if (ntplsch->bs.length() == 0)
 | |
|             {
 | |
|               hndl->curFetchTb = 0;
 | |
|               return logging::ERR_LOST_CONN_EXEMGR;
 | |
|             }
 | |
| 
 | |
|             ntplsch->deserializeTable(ntplsch->bs);
 | |
|           }
 | |
| 
 | |
|           uint16_t error = ntplsch->getStatus();
 | |
| 
 | |
|           if (0 != error)
 | |
|           {
 | |
|             ntplsch->setErrMsg();
 | |
|             return error;
 | |
|           }
 | |
|         }
 | |
|         else  // @todo error handling
 | |
|         {
 | |
|           hndl->curFetchTb = 0;
 | |
| 
 | |
|           if (ntplsch->saveFlag == NO_SAVE)
 | |
|             hndl->tidScanMap[ntplsch->tableid] = ntplsch;
 | |
| 
 | |
|           ntplsch->errMsg = IDBErrorInfo::instance()->errorMsg(ERR_LOST_CONN_EXEMGR);
 | |
|           return logging::ERR_LOST_CONN_EXEMGR;
 | |
|         }
 | |
|       }
 | |
| 
 | |
|       // All done with this table. reset curFetchTb when finish SOCKET reading
 | |
|       if (ntplsch->getRowCount() == 0)
 | |
|       {
 | |
|         hndl->curFetchTb = 0;
 | |
| 
 | |
|         if (ntplsch->saveFlag == NO_SAVE)
 | |
|           hndl->tidScanMap[ntplsch->tableid] = ntplsch;
 | |
| 
 | |
|         return SQL_NOT_FOUND;
 | |
|       }
 | |
|     }
 | |
|   }
 | |
|   catch (std::exception& e)
 | |
|   {
 | |
|     hndl->curFetchTb = 0;
 | |
| 
 | |
|     if (ntplsch->saveFlag == NO_SAVE)
 | |
|       hndl->tidScanMap[ntplsch->tableid] = ntplsch;
 | |
| 
 | |
|     ntplsch->errMsg = e.what();
 | |
|     return logging::ERR_LOST_CONN_EXEMGR;
 | |
|   }
 | |
| 
 | |
|   ntplsch->rowsreturned = 0;
 | |
|   return STATUS_OK;
 | |
| }
 | |
| 
 | |
| void end_query(cpsm_conhdl_t* hndl)
 | |
| {
 | |
|   // remove system catalog instance for this statement.
 | |
|   // @bug 695. turn on system catalog session cache for FE
 | |
|   // CalpontSystemCatalog::removeCalpontSystemCatalog(hndl->sessionID);
 | |
|   hndl->queryState = NO_QUERY;
 | |
|   // reset at the end of query
 | |
|   hndl->curFetchTb = 0;
 | |
|   // @bug 626 clear up
 | |
|   hndl->tidMap.clear();
 | |
|   hndl->tidScanMap.clear();
 | |
|   hndl->keyBandMap.clear();
 | |
| 
 | |
|   // Tell ExeMgr we are done with this query
 | |
|   try
 | |
|   {
 | |
|     ByteStream bs;
 | |
|     ByteStream::quadbyte qb = 0;
 | |
|     bs << qb;
 | |
|     hndl->write(bs);
 | |
|   }
 | |
|   catch (...)
 | |
|   {
 | |
|     throw;
 | |
|   }
 | |
| }
 | |
| 
 | |
| // @bug 1054, 863 - SIGPIPE handler
 | |
| bool sigFlag = false;
 | |
| void sighandler(int sig_num)
 | |
| {
 | |
|   FILE* p;
 | |
|   char buf[1024];
 | |
| 
 | |
|   string tmpDir = startup::StartUp::tmpDir() + "/f1.dat";
 | |
|   const char* cstr = tmpDir.c_str();
 | |
| 
 | |
|   if ((p = fopen(cstr, "a")) != NULL)
 | |
|   {
 | |
|     snprintf(buf, 1024, "sighandler() hit with %d\n", sig_num);
 | |
|     fwrite(buf, 1, strlen(buf), p);
 | |
|     fclose(p);
 | |
|   }
 | |
| 
 | |
|   sigFlag = true;
 | |
|   throw runtime_error("zerror");
 | |
| }
 | |
| 
 | |
| }  // namespace
 | |
| 
 | |
| namespace sm
 | |
| {
 | |
| const std::string DEFAULT_SAVE_PATH = "/var/tmp";
 | |
| 
 | |
| status_t tpl_open(tableid_t tableid, sp_cpsm_tplh_t& ntplh, cpsm_conhdl_t* conn_hdl)
 | |
| {
 | |
|   SMDEBUGLOG << "tpl_open: ntplh: " << ntplh << " conn_hdl: " << conn_hdl << " tableid: " << tableid << endl;
 | |
| 
 | |
|   // if first time enter this function for a statement, set
 | |
|   // queryState to QUERY_IN_PROCESS and get execution plan.
 | |
|   if (conn_hdl->queryState == NO_QUERY)
 | |
|   {
 | |
|     conn_hdl->queryState = QUERY_IN_PROCESS;
 | |
|   }
 | |
| 
 | |
|   try
 | |
|   {
 | |
|     // @bug 626. check saveFlag, if SAVED, do not project
 | |
|     if (ntplh->saveFlag != SAVED)
 | |
|     {
 | |
|       // Request ExeMgr to start projecting table tableid
 | |
|       CalpontSystemCatalog::OID tableOID = static_cast<CalpontSystemCatalog::OID>(tableid);
 | |
|       ByteStream::quadbyte qb = static_cast<ByteStream::quadbyte>(tableOID);
 | |
|       ByteStream bs;
 | |
|       bs << qb;
 | |
|       conn_hdl->write(bs);
 | |
|     }
 | |
|   }
 | |
|   catch (std::exception& ex)
 | |
|   {
 | |
|     SMDEBUGLOG << "Exception caught in tpl_open: " << ex.what() << endl;
 | |
|     cleanup(conn_hdl);
 | |
|     return CALPONT_INTERNAL_ERROR;
 | |
|   }
 | |
| 
 | |
|   ntplh->tableid = tableid;
 | |
| 
 | |
|   return STATUS_OK;
 | |
| }
 | |
| 
 | |
| status_t tpl_scan_open(tableid_t tableid, sp_cpsm_tplsch_t& ntplsch, cpsm_conhdl_t* /*conn_hdl*/)
 | |
| {
 | |
| #if IDB_SM_DEBUG
 | |
|   SMDEBUGLOG << "tpl_scan_open: " << conn_hdl << " tableid: " << tableid << endl;
 | |
| #endif
 | |
| 
 | |
|   // @bug 649. No initialization here. take passed in reference
 | |
|   ntplsch->tableid = tableid;
 | |
| 
 | |
|   ntplsch->rowsreturned = 0;
 | |
|   return STATUS_OK;
 | |
| }
 | |
| 
 | |
| status_t tpl_scan_fetch(sp_cpsm_tplsch_t& ntplsch, cpsm_conhdl_t* conn_hdl, int* killed)
 | |
| {
 | |
|   // @770. force end of result set when this is not the first table to be fetched.
 | |
|   if (ntplsch->traceFlags & CalpontSelectExecutionPlan::TRACE_NO_ROWS2)
 | |
|     if (conn_hdl->tidScanMap.size() >= 1)
 | |
|       return SQL_NOT_FOUND;
 | |
| 
 | |
|   // need another band
 | |
|   status_t status = STATUS_OK;
 | |
| 
 | |
|   if (ntplsch->rowsreturned == ntplsch->getRowCount())
 | |
|     status = tpl_scan_fetch_getband(conn_hdl, ntplsch, killed);
 | |
| 
 | |
|   return status;
 | |
| }
 | |
| 
 | |
| status_t tpl_scan_close(sp_cpsm_tplsch_t& ntplsch)
 | |
| {
 | |
| #if IDB_SM_DEBUG
 | |
|   SMDEBUGLOG << "tpl_scan_close: ";
 | |
| 
 | |
|   if (ntplsch)
 | |
|     SMDEBUGLOG << "tpl_scan_close: ntplsch " << ntplsch;
 | |
|   SMDEBUGLOG << "tpl_scan_close: tableid: " << ntplsch->tableid << endl;
 | |
| #endif
 | |
|   ntplsch.reset();
 | |
| 
 | |
|   return STATUS_OK;
 | |
| }
 | |
| 
 | |
| status_t tpl_close(sp_cpsm_tplh_t& ntplh, cpsm_conhdl_t** conn_hdl, QueryStats& stats, bool ask_4_stats,
 | |
|                    bool clear_scan_ctx)
 | |
| {
 | |
|   cpsm_conhdl_t* hndl = *conn_hdl;
 | |
|   SMDEBUGLOG << "tpl_close: hndl" << hndl << " ntplh " << ntplh;
 | |
| 
 | |
|   if (ntplh)
 | |
|     SMDEBUGLOG << " tableid: " << ntplh->tableid;
 | |
| 
 | |
|   SMDEBUGLOG << endl;
 | |
|   ntplh.reset();
 | |
| 
 | |
|   // determine end of result set and end of statement execution
 | |
|   if (hndl->queryState == QUERY_IN_PROCESS)
 | |
|   {
 | |
|     // Get the query stats
 | |
|     ByteStream bs;
 | |
|     // Ask for a stats only if a user explicitly asks
 | |
|     if (ask_4_stats)
 | |
|     {
 | |
|       ByteStream::quadbyte qb = 3;
 | |
|       bs << qb;
 | |
|       hndl->write(bs);
 | |
|     }
 | |
|     // MCOL-1601 Dispose of unused empty RowGroup
 | |
|     if (clear_scan_ctx)
 | |
|     {
 | |
|       SMDEBUGLOG << "tpl_close() clear_scan_ctx read" << std::endl;
 | |
|       bs = hndl->exeMgr->read();
 | |
|     }
 | |
| 
 | |
|     SMDEBUGLOG << "tpl_close hndl->exeMgr: " << hndl->exeMgr << endl;
 | |
|     // keep reading until we get a string
 | |
|     // Ask for a stats only if a user explicitly asks
 | |
|     if (ask_4_stats)
 | |
|     {
 | |
|       for (int tries = 0; tries < 10; tries++)
 | |
|       {
 | |
|         bs = hndl->exeMgr->read();
 | |
| 
 | |
|         if (bs.length() == 0)
 | |
|           break;
 | |
| 
 | |
|         try
 | |
|         {
 | |
|           bs >> hndl->queryStats;
 | |
|           bs >> hndl->extendedStats;
 | |
|           bs >> hndl->miniStats;
 | |
|           stats.unserialize(bs);
 | |
|           stats.setEndTime();
 | |
|           stats.insert();
 | |
|           break;
 | |
|         }
 | |
|         catch (IDBExcept&)
 | |
|         {
 | |
|           // @bug4732
 | |
|           end_query(hndl);
 | |
|           throw;
 | |
|         }
 | |
|         catch (...)
 | |
|         {
 | |
|           // querystats messed up. close connection.
 | |
|           // no need to throw for querystats protocol error, like for tablemode.
 | |
|           SMDEBUGLOG << "tpl_close() exception whilst getting stats" << endl;
 | |
|           end_query(hndl);
 | |
|           sm_cleanup(hndl);
 | |
|           *conn_hdl = 0;
 | |
|           return STATUS_OK;
 | |
|           // throw runtime_error(string("tbl_close catch exception: ") + e.what());
 | |
|         }
 | |
|       }
 | |
|     }  // stats
 | |
| 
 | |
|     end_query(hndl);
 | |
|   }
 | |
| 
 | |
|   return STATUS_OK;
 | |
| }
 | |
| 
 | |
| status_t sm_init(uint32_t sid, cpsm_conhdl_t** conn_hdl, uint32_t columnstore_local_query)
 | |
| {
 | |
|   // clear file content
 | |
| #if IDB_SM_DEBUG
 | |
|   // smlog.close();
 | |
|   // smlog.open("/tmp/sm.log");
 | |
|   SMDEBUGLOG << "sm_init: " << endl;
 | |
| #endif
 | |
| 
 | |
|   // @bug5660 Connection changes related to the local pm setting
 | |
|   /**
 | |
|    * when local PM is detected, or columnstore_local_query is set:
 | |
|    * 1. SELECT query connect to local ExeMgr 127.0.0.1:8601;
 | |
|    * 2. DML/DDL is disallowed.
 | |
|    * once local connection is determined, no need to check
 | |
|    * again because it will not switch back.
 | |
|    **/
 | |
|   if (*conn_hdl)
 | |
|   {
 | |
|     // existing connection is local, ok.
 | |
|     if ((*conn_hdl)->exeMgr->localQuery() || !columnstore_local_query)
 | |
|       return STATUS_OK;
 | |
|     // if session variable changes to local, re-establish the connection to loopback.
 | |
|     else
 | |
|       sm_cleanup(*conn_hdl);
 | |
|   }
 | |
| 
 | |
|   cpsm_conhdl_t* hndl = new cpsm_conhdl_t(time(0), sid, columnstore_local_query);
 | |
|   *conn_hdl = hndl;
 | |
|   hndl->sessionID = sid;
 | |
| 
 | |
|   // profiling statistics
 | |
|   GET_PF_TIME(hndl->pf.login);
 | |
| 
 | |
|   return STATUS_OK;
 | |
| }
 | |
| 
 | |
| status_t sm_cleanup(cpsm_conhdl_t* conn_hdl)
 | |
| {
 | |
| #if IDB_SM_DEBUG
 | |
|   SMDEBUGLOG << "sm_cleanup: " << conn_hdl << endl;
 | |
| #endif
 | |
| 
 | |
|   delete conn_hdl;
 | |
| 
 | |
|   return STATUS_OK;
 | |
| }
 | |
| 
 | |
| void cpsm_conhdl_t::write(ByteStream bs)
 | |
| {
 | |
|   sighandler_t old_handler = signal(SIGPIPE, sighandler);
 | |
|   sigFlag = false;
 | |
|   exeMgr->write(bs);
 | |
|   signal(SIGPIPE, old_handler);
 | |
| 
 | |
|   if (sigFlag)
 | |
|     throw runtime_error("Broken Pipe Error");
 | |
| }
 | |
| 
 | |
| }  // namespace sm
 |