/* Copyright (C) 2014 InfiniDB, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ /*********************************************************************** * $Id: updatedmlpackage.cpp 9210 2013-01-21 14:10:42Z rdempsey $ * * ***********************************************************************/ #include #include #include #include #include using namespace std; #define UPDATEDMLPKG_DLLEXPORT #include "updatedmlpackage.h" #undef UPDATEDMLPKG_DLLEXPORT namespace dmlpackage { UpdateDMLPackage::UpdateDMLPackage() { } UpdateDMLPackage::UpdateDMLPackage(std::string schemaName, std::string tableName, std::string dmlStatement, int sessionID) : CalpontDMLPackage(schemaName, tableName, dmlStatement, sessionID) { } UpdateDMLPackage::~UpdateDMLPackage() { } int UpdateDMLPackage::write(messageqcpp::ByteStream& bytestream) { int retval = 1; messageqcpp::ByteStream::byte package_type = DML_UPDATE; bytestream << package_type; messageqcpp::ByteStream::quadbyte session_id = fSessionID; bytestream << session_id; /* if(fPlan != 0) fHasFilter = true; else fHasFilter = false; */ messageqcpp::ByteStream::quadbyte hasFilter = fHasFilter; bytestream << hasFilter; bytestream << fUuid; bytestream << fDMLStatement; bytestream << fSQLStatement; bytestream << fSchemaName; messageqcpp::ByteStream::octbyte timeZone = fTimeZone; bytestream << timeZone; bytestream << (uint8_t)fIsFromCol; if (fTable != 0) { retval = fTable->write(bytestream); } if (fHasFilter) { bytestream += *(fPlan.get()); } return retval; } /** * */ int UpdateDMLPackage::read(messageqcpp::ByteStream& bytestream) { int retval = 1; messageqcpp::ByteStream::quadbyte session_id; messageqcpp::ByteStream::quadbyte hasFilter; bytestream >> session_id; fSessionID = session_id; bytestream >> hasFilter; fHasFilter = (hasFilter != 0); bytestream >> fUuid; std::string dmlStatement; bytestream >> fDMLStatement; bytestream >> fSQLStatement; bytestream >> fSchemaName; messageqcpp::ByteStream::octbyte timeZone; bytestream >> timeZone; fTimeZone = timeZone; uint8_t isFromCol; bytestream >> isFromCol; fIsFromCol = (isFromCol != 0); fTable = new DMLTable(); retval = fTable->read(bytestream); if (fHasFilter) { fPlan.reset(new messageqcpp::ByteStream(bytestream)); } return retval; } int UpdateDMLPackage::buildFromSqlStatement(SqlStatement& sqlStatement) { int retval = 1; UpdateSqlStatement& updateStmt = dynamic_cast(sqlStatement); if (!updateStmt.fColAssignmentListPtr) throw runtime_error("updateStmt.fColAssignmentPtr == NULL"); initializeTable(); // Since there is no filter, all rows are updated // Push one row always and let the filter happen on the proc side. Row* rowPtr = new Row(); ColumnAssignmentList::const_iterator iter = updateStmt.fColAssignmentListPtr->begin(); while (iter != updateStmt.fColAssignmentListPtr->end()) { ColumnAssignment* colaPtr = *iter; NullString expr(colaPtr->fScalarExpression); DMLColumn* colPtr = new DMLColumn(colaPtr->fColumn, expr); rowPtr->get_ColumnList().push_back(colPtr); ++iter; } fTable->get_RowList().push_back(rowPtr); if (0 != updateStmt.fWhereClausePtr) { // We need to filter the rows...get row ids fHasFilter = true; fQueryString = updateStmt.getQueryString(); } return retval; } /** * */ int UpdateDMLPackage::buildFromBuffer(std::string& buffer, int columns, int rows) { #ifdef DML_PACKAGE_DEBUG // cout << "The data buffer received: " << buffer << endl; #endif int retval = 1; initializeTable(); std::vector dataList; typedef boost::tokenizer > tokenizer; boost::char_separator sep(":,"); tokenizer tokens(buffer, sep); for (tokenizer::iterator tok_iter = tokens.begin(); tok_iter != tokens.end(); ++tok_iter) { dataList.push_back(StripLeadingWhitespace(*tok_iter)); } int n = 0; for (int i = 0; i < rows; i++) { // get a new row Row* aRowPtr = new Row(); std::string colName; std::string colValue; // get row ID from the buffer std::string rowid = dataList[n++]; aRowPtr->set_RowID(atoll(rowid.c_str())); #ifdef DML_PACKAGE_DEBUG // cout << "The row ID is " << rowid << endl; #endif for (int j = 0; j < columns; j++) { // Build a column list colName = dataList[n++]; colValue = dataList[n++]; NullString val(colValue); #ifdef DML_PACKAGE_DEBUG // cout << "The column data: " << colName << " " << colValue << endl; #endif DMLColumn* aColumn = new DMLColumn(colName, val); (aRowPtr->get_ColumnList()).push_back(aColumn); } // build a row list for a table fTable->get_RowList().push_back(aRowPtr); } return retval; } int UpdateDMLPackage::buildFromMysqlBuffer(ColNameList& colNameList, TableValuesMap& tableValuesMap, int columns, int rows, NullValuesBitset& nullValues) { int retval = 1; initializeTable(); Row* aRowPtr = new Row(); std::string colName; ColValuesList colValList; for (int j = 0; j < columns; j++) { // Build a column list colName = colNameList[j]; colValList = tableValuesMap[j]; DMLColumn* aColumn = new DMLColumn(colName, colValList, false, 0, nullValues[j]); (aRowPtr->get_ColumnList()).push_back(aColumn); } // build a row list for a table fTable->get_RowList().push_back(aRowPtr); return retval; } void UpdateDMLPackage::buildUpdateFromMysqlBuffer(UpdateSqlStatement& updateStmt) { if (!updateStmt.fColAssignmentListPtr) throw runtime_error("updateStmt.fColAssignmentPtr == NULL"); initializeTable(); // Since there is no filter, all rows are updated // Push one row always and let the filter happen on the proc side. Row* rowPtr = new Row(); ColumnAssignmentList::const_iterator iter = updateStmt.fColAssignmentListPtr->begin(); while (iter != updateStmt.fColAssignmentListPtr->end()) { ColumnAssignment* colaPtr = *iter; NullString scalarExpression(colaPtr->fScalarExpression); DMLColumn* colPtr = new DMLColumn(colaPtr->fColumn, scalarExpression, colaPtr->fFromCol, colaPtr->fFuncScale, colaPtr->fIsNull); rowPtr->get_ColumnList().push_back(colPtr); ++iter; } fTable->get_RowList().push_back(rowPtr); } } // namespace dmlpackage