You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-27 21:01:50 +03:00
MCOL-769 Add new binary bulk insert command
For use with mcsapi and maybe INSERT...SELECT and LDI
This commit is contained in:
@ -1122,12 +1122,6 @@ uint8_t WE_DMLCommandProc::processBatchInsert(messageqcpp::ByteStream& bs, std::
|
|||||||
|
|
||||||
if (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT)
|
if (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT)
|
||||||
{
|
{
|
||||||
if (((colType.colDataType == execplan::CalpontSystemCatalog::DATE) && (indata =="0000-00-00")) ||
|
|
||||||
((colType.colDataType == execplan::CalpontSystemCatalog::DATETIME) && (indata =="0000-00-00 00:00:00")))
|
|
||||||
{
|
|
||||||
isNULL = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (isNULL && colType.defaultValue.empty()) //error out
|
if (isNULL && colType.defaultValue.empty()) //error out
|
||||||
{
|
{
|
||||||
Message::Args args;
|
Message::Args args;
|
||||||
@ -1265,6 +1259,721 @@ End-Disable use of MetaFile for bulk rollback support
|
|||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
uint8_t WE_DMLCommandProc::processBatchInsertBinary(messageqcpp::ByteStream& bs, std::string & err, ByteStream::quadbyte & PMId)
|
||||||
|
{
|
||||||
|
int rc = 0;
|
||||||
|
//cout << "processBatchInsert received bytestream length " << bs.length() << endl;
|
||||||
|
|
||||||
|
ByteStream::quadbyte tmp32;
|
||||||
|
ByteStream::byte tmp8;
|
||||||
|
bs >> tmp32;
|
||||||
|
//cout << "processBatchInsert got transaction id " << tmp32 << endl;
|
||||||
|
bs >> PMId;
|
||||||
|
//cout << "processBatchInsert gor PMId " << PMId << endl;
|
||||||
|
uint32_t sessionId;
|
||||||
|
bs >> sessionId;
|
||||||
|
//cout << " processBatchInsert for session " << sessionId << endl;
|
||||||
|
bool isAutocommitOn;
|
||||||
|
bs >> tmp8;
|
||||||
|
isAutocommitOn = tmp8;
|
||||||
|
if (idbdatafile::IDBPolicy::useHdfs())
|
||||||
|
isAutocommitOn = true;
|
||||||
|
//cout << "This session isAutocommitOn is " << isAutocommitOn << endl;
|
||||||
|
BRM::TxnID txnid;
|
||||||
|
txnid.id = tmp32;
|
||||||
|
txnid.valid = true;
|
||||||
|
bool isInsertSelect;
|
||||||
|
bs >> tmp8;
|
||||||
|
// For insert select, skip the hwm block and start inserting from the next block
|
||||||
|
// to avoid self insert issue.
|
||||||
|
//For batch insert: if not first batch, use the saved last rid to start adding rows.
|
||||||
|
isInsertSelect = tmp8;
|
||||||
|
|
||||||
|
WriteEngine::ColStructList colStructs;
|
||||||
|
WriteEngine::DctnryStructList dctnryStructList;
|
||||||
|
WriteEngine::DctnryValueList dctnryValueList;
|
||||||
|
WriteEngine::ColValueList colValuesList;
|
||||||
|
WriteEngine::DictStrList dicStringList ;
|
||||||
|
CalpontSystemCatalog::TableName tableName;
|
||||||
|
CalpontSystemCatalog::TableColName tableColName;
|
||||||
|
bs >> tableColName.table;
|
||||||
|
bs >> tableColName.schema;
|
||||||
|
tableName.table = tableColName.table;
|
||||||
|
tableName.schema = tableColName.schema;
|
||||||
|
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionId);
|
||||||
|
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
|
||||||
|
CalpontSystemCatalog::ROPair roPair;
|
||||||
|
CalpontSystemCatalog::RIDList ridList;
|
||||||
|
CalpontSystemCatalog::DictOIDList dictOids;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
ridList = systemCatalogPtr->columnRIDs(tableName, true);
|
||||||
|
roPair = systemCatalogPtr->tableRID( tableName);
|
||||||
|
}
|
||||||
|
catch (std::exception& ex)
|
||||||
|
{
|
||||||
|
err = ex.what();
|
||||||
|
rc = 1;
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
std::vector<OID> dctnryStoreOids(ridList.size()) ;
|
||||||
|
std::vector<Column> columns;
|
||||||
|
DctnryStructList dctnryList;
|
||||||
|
std::vector<BRM::EmDbRootHWMInfo_v> dbRootHWMInfoColVec(ridList.size());
|
||||||
|
|
||||||
|
uint32_t tblOid = roPair.objnum;
|
||||||
|
CalpontSystemCatalog::ColType colType;
|
||||||
|
std::vector<DBRootExtentInfo> colDBRootExtentInfo;
|
||||||
|
bool bFirstExtentOnThisPM = false;
|
||||||
|
Convertor convertor;
|
||||||
|
if ( fIsFirstBatchPm )
|
||||||
|
{
|
||||||
|
dbRootExtTrackerVec.clear();
|
||||||
|
if (isAutocommitOn || ((fRBMetaWriter.get() == NULL) && (!isAutocommitOn)))
|
||||||
|
fRBMetaWriter.reset(new RBMetaWriter("BatchInsert", NULL));
|
||||||
|
fWEWrapper.setIsInsert(true);
|
||||||
|
fWEWrapper.setBulkFlag(true);
|
||||||
|
fWEWrapper.setTransId(txnid.id);
|
||||||
|
try
|
||||||
|
{
|
||||||
|
// First gather HWM BRM information for all columns
|
||||||
|
std::vector<int> colWidths;
|
||||||
|
for (unsigned i=0; i < ridList.size(); i++)
|
||||||
|
{
|
||||||
|
rc = BRMWrapper::getInstance()->getDbRootHWMInfo(ridList[i].objnum, dbRootHWMInfoColVec[i]);
|
||||||
|
//need handle error
|
||||||
|
|
||||||
|
CalpontSystemCatalog::ColType colType2 = systemCatalogPtr->colType(ridList[i].objnum);
|
||||||
|
colWidths.push_back( convertor.getCorrectRowWidth(
|
||||||
|
colType2.colDataType, colType2.colWidth) );
|
||||||
|
}
|
||||||
|
|
||||||
|
for (unsigned i=0; i < ridList.size(); i++)
|
||||||
|
{
|
||||||
|
// Find DBRoot/segment file where we want to start adding rows
|
||||||
|
colType = systemCatalogPtr->colType(ridList[i].objnum);
|
||||||
|
boost::shared_ptr<DBRootExtentTracker> pDBRootExtentTracker (new DBRootExtentTracker(ridList[i].objnum,
|
||||||
|
colWidths, dbRootHWMInfoColVec, i, 0) );
|
||||||
|
dbRootExtTrackerVec.push_back( pDBRootExtentTracker );
|
||||||
|
DBRootExtentInfo dbRootExtent;
|
||||||
|
std::string trkErrMsg;
|
||||||
|
bool bEmptyPM;
|
||||||
|
if (i == 0)
|
||||||
|
{
|
||||||
|
rc = pDBRootExtentTracker->selectFirstSegFile(dbRootExtent,bFirstExtentOnThisPM, bEmptyPM, trkErrMsg);
|
||||||
|
/* cout << "bEmptyPM = " << (int) bEmptyPM << " bFirstExtentOnThisPM= " << (int)bFirstExtentOnThisPM <<
|
||||||
|
" oid:dbroot:hwm = " << ridList[i].objnum << ":"<<dbRootExtent.fDbRoot << ":"
|
||||||
|
<<":"<<dbRootExtent.fLocalHwm << " err = " << trkErrMsg << endl; */
|
||||||
|
}
|
||||||
|
else
|
||||||
|
pDBRootExtentTracker->assignFirstSegFile(*(dbRootExtTrackerVec[0].get()),dbRootExtent);
|
||||||
|
|
||||||
|
|
||||||
|
colDBRootExtentInfo.push_back(dbRootExtent);
|
||||||
|
|
||||||
|
Column aColumn;
|
||||||
|
aColumn.colWidth = convertor.getCorrectRowWidth(colType.colDataType, colType.colWidth);
|
||||||
|
aColumn.colDataType = colType.colDataType;
|
||||||
|
aColumn.compressionType = colType.compressionType;
|
||||||
|
aColumn.dataFile.oid = ridList[i].objnum;
|
||||||
|
aColumn.dataFile.fPartition = dbRootExtent.fPartition;
|
||||||
|
aColumn.dataFile.fSegment = dbRootExtent.fSegment;
|
||||||
|
aColumn.dataFile.fDbRoot = dbRootExtent.fDbRoot;
|
||||||
|
aColumn.dataFile.hwm = dbRootExtent.fLocalHwm;
|
||||||
|
columns.push_back(aColumn);
|
||||||
|
if ((colType.compressionType > 0) && (colType.ddn.dictOID > 0))
|
||||||
|
{
|
||||||
|
DctnryStruct aDctnry;
|
||||||
|
aDctnry.dctnryOid = colType.ddn.dictOID;
|
||||||
|
aDctnry.fColPartition = dbRootExtent.fPartition;
|
||||||
|
aDctnry.fColSegment = dbRootExtent.fSegment;
|
||||||
|
aDctnry.fColDbRoot = dbRootExtent.fDbRoot;
|
||||||
|
dctnryList.push_back(aDctnry);
|
||||||
|
}
|
||||||
|
if (colType.ddn.dictOID > 0)
|
||||||
|
{
|
||||||
|
dctnryStoreOids[i] = colType.ddn.dictOID;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
dctnryStoreOids[i] = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (std::exception& ex)
|
||||||
|
{
|
||||||
|
err = ex.what();
|
||||||
|
rc = 1;
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
//@Bug 5996 validate hwm before starts
|
||||||
|
rc = validateColumnHWMs(ridList, systemCatalogPtr, colDBRootExtentInfo, "Starting");
|
||||||
|
if ( rc != 0)
|
||||||
|
{
|
||||||
|
WErrorCodes ec;
|
||||||
|
err = ec.errorString(rc);
|
||||||
|
err += " Check err.log for detailed information.";
|
||||||
|
fIsFirstBatchPm = false;
|
||||||
|
rc = 1;
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
std::vector<BRM::LBIDRange> rangeList;
|
||||||
|
|
||||||
|
// use of MetaFile for bulk rollback support
|
||||||
|
if ( fIsFirstBatchPm && isAutocommitOn)
|
||||||
|
{
|
||||||
|
//save meta data, version last block for each dbroot at the start of batch insert
|
||||||
|
try
|
||||||
|
{
|
||||||
|
fRBMetaWriter->init(tblOid, tableName.table);
|
||||||
|
fRBMetaWriter->saveBulkRollbackMetaData(columns, dctnryStoreOids, dbRootHWMInfoColVec);
|
||||||
|
//cout << "Saved meta files" << endl;
|
||||||
|
if (!bFirstExtentOnThisPM)
|
||||||
|
{
|
||||||
|
//cout << "Backing up hwm chunks" << endl;
|
||||||
|
for (unsigned i=0; i < dctnryList.size(); i++) //back up chunks for compressed dictionary
|
||||||
|
{
|
||||||
|
// @bug 5572 HDFS tmp file - Ignoring return flag, don't need in this context
|
||||||
|
fRBMetaWriter->backupDctnryHWMChunk(
|
||||||
|
dctnryList[i].dctnryOid, dctnryList[i].fColDbRoot, dctnryList[i].fColPartition, dctnryList[i].fColSegment);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (WeException& ex) // catch exception to close file, then rethrow
|
||||||
|
{
|
||||||
|
rc = 1;
|
||||||
|
err = ex.what();
|
||||||
|
}
|
||||||
|
//Do versioning. Currently, we only version columns, not strings. If there is a design change, this will need to be re-visited
|
||||||
|
if ( rc != 0)
|
||||||
|
return rc;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<string> colNames;
|
||||||
|
bool isWarningSet = false;
|
||||||
|
uint32_t columnCount;
|
||||||
|
bs >> columnCount;
|
||||||
|
if (columnCount)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
for (uint32_t current_column = 0; current_column < columnCount; current_column++)
|
||||||
|
{
|
||||||
|
uint32_t tmp32;
|
||||||
|
std::string colName;
|
||||||
|
bs >> tmp32;
|
||||||
|
bs >> colName;
|
||||||
|
colNames.push_back(colName);
|
||||||
|
CalpontSystemCatalog::OID oid = tmp32;
|
||||||
|
|
||||||
|
CalpontSystemCatalog::ColType colType;
|
||||||
|
colType = systemCatalogPtr->colType(oid);
|
||||||
|
|
||||||
|
WriteEngine::ColStruct colStruct;
|
||||||
|
WriteEngine::DctnryStruct dctnryStruct;
|
||||||
|
colStruct.dataOid = oid;
|
||||||
|
colStruct.tokenFlag = false;
|
||||||
|
colStruct.fCompressionType = colType.compressionType;
|
||||||
|
// Token
|
||||||
|
if ( isDictCol(colType) )
|
||||||
|
{
|
||||||
|
colStruct.colWidth = 8;
|
||||||
|
colStruct.tokenFlag = true;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
colStruct.colWidth = colType.colWidth;
|
||||||
|
}
|
||||||
|
colStruct.colDataType = colType.colDataType;
|
||||||
|
|
||||||
|
if (colStruct.tokenFlag)
|
||||||
|
{
|
||||||
|
dctnryStruct.dctnryOid = colType.ddn.dictOID;
|
||||||
|
dctnryStruct.columnOid = colStruct.dataOid;
|
||||||
|
dctnryStruct.fCompressionType = colType.compressionType;
|
||||||
|
dctnryStruct.colWidth = colType.colWidth;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
dctnryStruct.dctnryOid = 0;
|
||||||
|
dctnryStruct.columnOid = colStruct.dataOid;
|
||||||
|
dctnryStruct.fCompressionType = colType.compressionType;
|
||||||
|
dctnryStruct.colWidth = colType.colWidth;
|
||||||
|
}
|
||||||
|
|
||||||
|
colStructs.push_back(colStruct);
|
||||||
|
dctnryStructList.push_back(dctnryStruct);
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (std::exception& ex)
|
||||||
|
{
|
||||||
|
err = ex.what();
|
||||||
|
rc = 1;
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string tmpStr("");
|
||||||
|
uint32_t valuesPerColumn;
|
||||||
|
bs >> valuesPerColumn;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
WriteEngine::ColTupleList colTuples;
|
||||||
|
WriteEngine::DctColTupleList dctColTuples;
|
||||||
|
bool pushWarning = false;
|
||||||
|
for (uint32_t j = 0; j < columnCount; j++)
|
||||||
|
{
|
||||||
|
tableColName.column = colNames[j];
|
||||||
|
CalpontSystemCatalog::OID oid = colStructs[j].dataOid;
|
||||||
|
|
||||||
|
CalpontSystemCatalog::ColType colType;
|
||||||
|
colType = systemCatalogPtr->colType(oid);
|
||||||
|
|
||||||
|
boost::any datavalue;
|
||||||
|
bool isNULL = false;
|
||||||
|
WriteEngine::dictStr dicStrings;
|
||||||
|
// token
|
||||||
|
if ( isDictCol(colType) )
|
||||||
|
{
|
||||||
|
for ( uint32_t i=0; i < valuesPerColumn; i++ )
|
||||||
|
{
|
||||||
|
bs >> tmp8;
|
||||||
|
isNULL = tmp8;
|
||||||
|
bs >> tmpStr;
|
||||||
|
if ( tmpStr.length() == 0 )
|
||||||
|
isNULL = true;
|
||||||
|
|
||||||
|
if (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT)
|
||||||
|
{
|
||||||
|
if (isNULL && colType.defaultValue.empty()) //error out
|
||||||
|
{
|
||||||
|
Message::Args args;
|
||||||
|
args.add(tableColName.column);
|
||||||
|
err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
|
||||||
|
rc = 1;
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
else if (isNULL && !(colType.defaultValue.empty()))
|
||||||
|
{
|
||||||
|
tmpStr = colType.defaultValue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if ( tmpStr.length() > (unsigned int)colType.colWidth )
|
||||||
|
{
|
||||||
|
tmpStr = tmpStr.substr(0, colType.colWidth);
|
||||||
|
if ( !pushWarning )
|
||||||
|
pushWarning = true;
|
||||||
|
}
|
||||||
|
WriteEngine::ColTuple colTuple;
|
||||||
|
colTuple.data = datavalue;
|
||||||
|
|
||||||
|
colTuples.push_back(colTuple);
|
||||||
|
//@Bug 2515. Only pass string values to write engine
|
||||||
|
dicStrings.push_back( tmpStr );
|
||||||
|
}
|
||||||
|
colValuesList.push_back(colTuples);
|
||||||
|
//@Bug 2515. Only pass string values to write engine
|
||||||
|
dicStringList.push_back( dicStrings );
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
string x;
|
||||||
|
//scan once to check how many autoincrement value needed
|
||||||
|
uint32_t nextValNeeded = 0;
|
||||||
|
uint64_t nextVal = 1;
|
||||||
|
if (colType.autoincrement)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
nextVal = systemCatalogPtr->nextAutoIncrValue(tableName);
|
||||||
|
fDbrm.startAISequence(oid, nextVal, colType.colWidth, colType.colDataType);
|
||||||
|
}
|
||||||
|
catch (std::exception& ex)
|
||||||
|
{
|
||||||
|
err = ex.what();
|
||||||
|
rc = 1;
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for ( uint32_t i=0; i < valuesPerColumn; i++ )
|
||||||
|
{
|
||||||
|
bs >> tmp8;
|
||||||
|
isNULL = tmp8;
|
||||||
|
|
||||||
|
int8_t val8;
|
||||||
|
int16_t val16;
|
||||||
|
int32_t val32;
|
||||||
|
int64_t val64;
|
||||||
|
float valF;
|
||||||
|
double valD;
|
||||||
|
std::string valStr;
|
||||||
|
bool valZero = false; // Needed for autoinc check
|
||||||
|
switch (colType.colDataType)
|
||||||
|
{
|
||||||
|
case execplan::CalpontSystemCatalog::TINYINT:
|
||||||
|
bs >> val8;
|
||||||
|
if (val8 == 0)
|
||||||
|
valZero = true;
|
||||||
|
datavalue = (char)val8;
|
||||||
|
break;
|
||||||
|
case execplan::CalpontSystemCatalog::SMALLINT:
|
||||||
|
bs >> val16;
|
||||||
|
if (val16 == 0)
|
||||||
|
valZero = true;
|
||||||
|
datavalue = (short)val16;
|
||||||
|
break;
|
||||||
|
case execplan::CalpontSystemCatalog::MEDINT:
|
||||||
|
case execplan::CalpontSystemCatalog::INT:
|
||||||
|
bs >> val32;
|
||||||
|
if (val32 == 0)
|
||||||
|
valZero = true;
|
||||||
|
datavalue = (int)val32;
|
||||||
|
break;
|
||||||
|
case execplan::CalpontSystemCatalog::BIGINT:
|
||||||
|
bs >> val64;
|
||||||
|
if (val64 == 0)
|
||||||
|
valZero = true;
|
||||||
|
datavalue = (long long)val64;
|
||||||
|
break;
|
||||||
|
case execplan::CalpontSystemCatalog::UTINYINT:
|
||||||
|
bs >> val8;
|
||||||
|
if (val8 == 0)
|
||||||
|
valZero = true;
|
||||||
|
datavalue = (uint8_t)val8;
|
||||||
|
break;
|
||||||
|
case execplan::CalpontSystemCatalog::DATE:
|
||||||
|
case execplan::CalpontSystemCatalog::USMALLINT:
|
||||||
|
bs >> val16;
|
||||||
|
if (val16 == 0)
|
||||||
|
valZero = true;
|
||||||
|
datavalue = (uint16_t)val16;
|
||||||
|
break;
|
||||||
|
case execplan::CalpontSystemCatalog::UMEDINT:
|
||||||
|
case execplan::CalpontSystemCatalog::UINT:
|
||||||
|
bs >> val32;
|
||||||
|
if (val32 == 0)
|
||||||
|
valZero = true;
|
||||||
|
datavalue = (uint32_t)val32;
|
||||||
|
break;
|
||||||
|
case execplan::CalpontSystemCatalog::DATETIME:
|
||||||
|
case execplan::CalpontSystemCatalog::UBIGINT:
|
||||||
|
bs >> val64;
|
||||||
|
if (val64 == 0)
|
||||||
|
valZero = true;
|
||||||
|
datavalue = (uint64_t)val64;
|
||||||
|
break;
|
||||||
|
case execplan::CalpontSystemCatalog::DECIMAL:
|
||||||
|
switch (colType.colWidth)
|
||||||
|
{
|
||||||
|
case 1:
|
||||||
|
{
|
||||||
|
bs >> val8;
|
||||||
|
datavalue = (char) val8;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case 2:
|
||||||
|
{
|
||||||
|
bs >> val16;
|
||||||
|
datavalue = (short) val16;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case 4:
|
||||||
|
{
|
||||||
|
bs >> val32;
|
||||||
|
datavalue = (int) val32;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
{
|
||||||
|
bs >> val64;
|
||||||
|
datavalue = (long long) val64;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
case execplan::CalpontSystemCatalog::UDECIMAL:
|
||||||
|
// UDECIMAL numbers may not be negative
|
||||||
|
if (colType.colWidth == 1)
|
||||||
|
{
|
||||||
|
bs >> val8;
|
||||||
|
if (val8 < 0 &&
|
||||||
|
val8 != static_cast<int8_t>(joblist::TINYINTEMPTYROW) &&
|
||||||
|
val8 != static_cast<int8_t>(joblist::TINYINTNULL))
|
||||||
|
{
|
||||||
|
val8 = 0;
|
||||||
|
pushWarning = true;
|
||||||
|
}
|
||||||
|
datavalue = (char)val8;
|
||||||
|
}
|
||||||
|
else if (colType.colWidth == 2)
|
||||||
|
{
|
||||||
|
bs >> val16;
|
||||||
|
if (val16 < 0 &&
|
||||||
|
val16 != static_cast<int16_t>(joblist::SMALLINTEMPTYROW) &&
|
||||||
|
val16 != static_cast<int16_t>(joblist::SMALLINTNULL))
|
||||||
|
{
|
||||||
|
val16 = 0;
|
||||||
|
pushWarning = true;
|
||||||
|
}
|
||||||
|
datavalue = (short)val16;
|
||||||
|
}
|
||||||
|
else if (colType.colWidth == 4)
|
||||||
|
{
|
||||||
|
bs >> val32;
|
||||||
|
if (val32 < 0 &&
|
||||||
|
val32 != static_cast<int>(joblist::INTEMPTYROW) &&
|
||||||
|
val32 != static_cast<int>(joblist::INTNULL))
|
||||||
|
{
|
||||||
|
val32 = 0;
|
||||||
|
pushWarning = true;
|
||||||
|
}
|
||||||
|
datavalue = (int)val32;
|
||||||
|
}
|
||||||
|
else if (colType.colWidth == 8)
|
||||||
|
{
|
||||||
|
bs >> val64;
|
||||||
|
if (val64 < 0 &&
|
||||||
|
val64 != static_cast<long long>(joblist::BIGINTEMPTYROW) &&
|
||||||
|
val64 != static_cast<long long>(joblist::BIGINTNULL))
|
||||||
|
{
|
||||||
|
val64 = 0;
|
||||||
|
pushWarning = true;
|
||||||
|
}
|
||||||
|
datavalue = (long long)val64;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case execplan::CalpontSystemCatalog::DOUBLE:
|
||||||
|
bs >> val64;
|
||||||
|
memcpy(&valD, &val64, 8);
|
||||||
|
datavalue = valD;
|
||||||
|
break;
|
||||||
|
case execplan::CalpontSystemCatalog::UDOUBLE:
|
||||||
|
bs >> val64;
|
||||||
|
memcpy(&valD, &val64, 8);
|
||||||
|
if (valD < 0.0 && valD != joblist::DOUBLEEMPTYROW && valD != joblist::DOUBLENULL)
|
||||||
|
{
|
||||||
|
valD = 0.0;
|
||||||
|
pushWarning = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
datavalue = valD;
|
||||||
|
break;
|
||||||
|
case execplan::CalpontSystemCatalog::FLOAT:
|
||||||
|
bs >> val32;
|
||||||
|
memcpy(&valF, &val32, 4);
|
||||||
|
datavalue = valF;
|
||||||
|
break;
|
||||||
|
case execplan::CalpontSystemCatalog::UFLOAT:
|
||||||
|
bs >> val32;
|
||||||
|
memcpy(&valF, &val32, 4);
|
||||||
|
if (valF < 0.0 && valF != joblist::FLOATEMPTYROW && valF != joblist::FLOATNULL)
|
||||||
|
{
|
||||||
|
valF = 0.0;
|
||||||
|
pushWarning = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
datavalue = valF;
|
||||||
|
break;
|
||||||
|
|
||||||
|
case execplan::CalpontSystemCatalog::CHAR:
|
||||||
|
case execplan::CalpontSystemCatalog::VARCHAR:
|
||||||
|
case execplan::CalpontSystemCatalog::TEXT:
|
||||||
|
case execplan::CalpontSystemCatalog::BLOB:
|
||||||
|
bs >> valStr;
|
||||||
|
if (valStr.length() > (unsigned int)colType.colWidth)
|
||||||
|
{
|
||||||
|
valStr = valStr.substr(0, colType.colWidth);
|
||||||
|
pushWarning = true;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if ( (unsigned int)colType.colWidth > valStr.length())
|
||||||
|
{
|
||||||
|
//Pad null character to the string
|
||||||
|
valStr.resize(colType.colWidth, 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
datavalue = valStr;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
rc = 1;
|
||||||
|
err = IDBErrorInfo::instance()->errorMsg(ERR_DATATYPE_NOT_SUPPORT);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
//check if autoincrement column and value is 0 or null
|
||||||
|
if (colType.autoincrement && ( isNULL || valZero))
|
||||||
|
{
|
||||||
|
ostringstream oss;
|
||||||
|
oss << nextVal++;
|
||||||
|
isNULL = false;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
nextValNeeded++;
|
||||||
|
bool reserved = fDbrm.getAIRange(oid, nextValNeeded, &nextVal);
|
||||||
|
if (!reserved)
|
||||||
|
{
|
||||||
|
err = IDBErrorInfo::instance()->errorMsg(ERR_EXCEED_LIMIT);
|
||||||
|
rc = 1;
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (std::exception& ex)
|
||||||
|
{
|
||||||
|
err = ex.what();
|
||||||
|
rc = 1;
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
switch (colType.colDataType)
|
||||||
|
{
|
||||||
|
case execplan::CalpontSystemCatalog::TINYINT:
|
||||||
|
case execplan::CalpontSystemCatalog::UTINYINT:
|
||||||
|
datavalue = (uint8_t) nextVal;
|
||||||
|
break;
|
||||||
|
case execplan::CalpontSystemCatalog::SMALLINT:
|
||||||
|
case execplan::CalpontSystemCatalog::USMALLINT:
|
||||||
|
datavalue = (uint16_t) nextVal;
|
||||||
|
break;
|
||||||
|
case execplan::CalpontSystemCatalog::MEDINT:
|
||||||
|
case execplan::CalpontSystemCatalog::UMEDINT:
|
||||||
|
case execplan::CalpontSystemCatalog::INT:
|
||||||
|
case execplan::CalpontSystemCatalog::UINT:
|
||||||
|
datavalue = (uint32_t) nextVal;
|
||||||
|
break;
|
||||||
|
case execplan::CalpontSystemCatalog::BIGINT:
|
||||||
|
case execplan::CalpontSystemCatalog::UBIGINT:
|
||||||
|
default:
|
||||||
|
datavalue = (uint64_t) nextVal;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT)
|
||||||
|
{
|
||||||
|
if (isNULL && colType.defaultValue.empty()) //error out
|
||||||
|
{
|
||||||
|
Message::Args args;
|
||||||
|
args.add(tableColName.column);
|
||||||
|
err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
|
||||||
|
rc = 1;
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
else if (isNULL && !(colType.defaultValue.empty()))
|
||||||
|
{
|
||||||
|
datavalue = colType.defaultValue;
|
||||||
|
isNULL = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//@Bug 1806
|
||||||
|
if (rc != NO_ERROR && rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING)
|
||||||
|
{
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
if ( pushWarning && ( rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING ) )
|
||||||
|
rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;
|
||||||
|
|
||||||
|
WriteEngine::ColTuple colTuple;
|
||||||
|
colTuple.data = datavalue;
|
||||||
|
|
||||||
|
colTuples.push_back(colTuple);
|
||||||
|
//@Bug 2515. Only pass string values to write engine
|
||||||
|
dicStrings.push_back( valStr );
|
||||||
|
}
|
||||||
|
colValuesList.push_back(colTuples);
|
||||||
|
dicStringList.push_back( dicStrings );
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pushWarning)
|
||||||
|
{
|
||||||
|
colNames.push_back(tableColName.column);
|
||||||
|
isWarningSet = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (std::exception& ex)
|
||||||
|
{
|
||||||
|
err = ex.what();
|
||||||
|
rc = 1;
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// call the write engine to write the rows
|
||||||
|
int error = NO_ERROR;
|
||||||
|
//fWriteEngine.setDebugLevel(WriteEngine::DEBUG_3);
|
||||||
|
//cout << "Batch inserting a row with transaction id " << txnid.id << endl;
|
||||||
|
if (colValuesList.size() > 0)
|
||||||
|
{
|
||||||
|
if (colValuesList[0].size() > 0)
|
||||||
|
{
|
||||||
|
if (NO_ERROR !=
|
||||||
|
(error = fWEWrapper.insertColumnRecs(txnid.id, colStructs, colValuesList, dctnryStructList, dicStringList,
|
||||||
|
dbRootExtTrackerVec, 0, bFirstExtentOnThisPM, isInsertSelect, isAutocommitOn, roPair.objnum, fIsFirstBatchPm)))
|
||||||
|
{
|
||||||
|
if (error == ERR_BRM_DEAD_LOCK)
|
||||||
|
{
|
||||||
|
rc = dmlpackageprocessor::DMLPackageProcessor::DEAD_LOCK_ERROR;
|
||||||
|
WErrorCodes ec;
|
||||||
|
err = ec.errorString(error);
|
||||||
|
}
|
||||||
|
else if ( error == ERR_BRM_VB_OVERFLOW )
|
||||||
|
{
|
||||||
|
rc = dmlpackageprocessor::DMLPackageProcessor::VB_OVERFLOW_ERROR;
|
||||||
|
err = IDBErrorInfo::instance()->errorMsg(ERR_VERSIONBUFFER_OVERFLOW);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
rc = dmlpackageprocessor::DMLPackageProcessor::INSERT_ERROR;
|
||||||
|
WErrorCodes ec;
|
||||||
|
err = ec.errorString(error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (fIsFirstBatchPm && isAutocommitOn)
|
||||||
|
{
|
||||||
|
//fWEWrapper.writeVBEnd(txnid.id, rangeList);
|
||||||
|
fIsFirstBatchPm = false;
|
||||||
|
}
|
||||||
|
else if (fIsFirstBatchPm)
|
||||||
|
{
|
||||||
|
fIsFirstBatchPm = false;
|
||||||
|
}
|
||||||
|
if ( isWarningSet && ( rc == NO_ERROR ) )
|
||||||
|
{
|
||||||
|
rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;
|
||||||
|
//cout << "Got warning" << endl;
|
||||||
|
Message::Args args;
|
||||||
|
string cols = "'" + colNames[0] + "'";
|
||||||
|
|
||||||
|
for (unsigned i=1; i<colNames.size();i++)
|
||||||
|
{
|
||||||
|
cols = cols + ", " + "'" + colNames[i] + "'";
|
||||||
|
}
|
||||||
|
args.add(cols);
|
||||||
|
err = IDBErrorInfo::instance()->errorMsg(WARN_DATA_TRUNC,args);
|
||||||
|
|
||||||
|
// Strict mode enabled, so rollback on warning
|
||||||
|
// NOTE: This doesn't seem to be a possible code path yet
|
||||||
|
/*if (insertPkg.get_isWarnToError())
|
||||||
|
{
|
||||||
|
string applName ("BatchInsert");
|
||||||
|
fWEWrapper.bulkRollback(tblOid,txnid.id,tableName.toString(),
|
||||||
|
applName, false, err);
|
||||||
|
BulkRollbackMgr::deleteMetaFile( tblOid );
|
||||||
|
}*/
|
||||||
|
}
|
||||||
|
//cout << "Batch insert return code " << rc << endl;
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
uint8_t WE_DMLCommandProc::commitBatchAutoOn(messageqcpp::ByteStream& bs, std::string & err)
|
uint8_t WE_DMLCommandProc::commitBatchAutoOn(messageqcpp::ByteStream& bs, std::string & err)
|
||||||
{
|
{
|
||||||
uint8_t rc = 0;
|
uint8_t rc = 0;
|
||||||
|
@ -79,6 +79,7 @@ class WE_DMLCommandProc
|
|||||||
EXPORT uint8_t rollbackBlocks(messageqcpp::ByteStream& bs, std::string & err);
|
EXPORT uint8_t rollbackBlocks(messageqcpp::ByteStream& bs, std::string & err);
|
||||||
EXPORT uint8_t rollbackVersion(messageqcpp::ByteStream& bs, std::string & err);
|
EXPORT uint8_t rollbackVersion(messageqcpp::ByteStream& bs, std::string & err);
|
||||||
EXPORT uint8_t processBatchInsert(messageqcpp::ByteStream& bs, std::string & err, ByteStream::quadbyte & PMId);
|
EXPORT uint8_t processBatchInsert(messageqcpp::ByteStream& bs, std::string & err, ByteStream::quadbyte & PMId);
|
||||||
|
EXPORT uint8_t processBatchInsertBinary(messageqcpp::ByteStream& bs, std::string & err, ByteStream::quadbyte & PMId);
|
||||||
EXPORT uint8_t commitBatchAutoOn(messageqcpp::ByteStream& bs, std::string & err);
|
EXPORT uint8_t commitBatchAutoOn(messageqcpp::ByteStream& bs, std::string & err);
|
||||||
EXPORT uint8_t commitBatchAutoOff(messageqcpp::ByteStream& bs, std::string & err);
|
EXPORT uint8_t commitBatchAutoOff(messageqcpp::ByteStream& bs, std::string & err);
|
||||||
EXPORT uint8_t rollbackBatchAutoOn(messageqcpp::ByteStream& bs, std::string & err);
|
EXPORT uint8_t rollbackBatchAutoOn(messageqcpp::ByteStream& bs, std::string & err);
|
||||||
|
@ -61,8 +61,8 @@ enum ServerMessages
|
|||||||
WE_SVR_COMMIT_BATCH_AUTO_OFF,
|
WE_SVR_COMMIT_BATCH_AUTO_OFF,
|
||||||
WE_SVR_ROLLBACK_BATCH_AUTO_OFF,
|
WE_SVR_ROLLBACK_BATCH_AUTO_OFF,
|
||||||
WE_SVR_BATCH_AUTOON_REMOVE_META,
|
WE_SVR_BATCH_AUTOON_REMOVE_META,
|
||||||
WE_SVR_UPDATE, //35
|
WE_SVR_UPDATE,
|
||||||
WE_SVR_FLUSH_FILES,
|
WE_SVR_FLUSH_FILES, //35
|
||||||
WE_SVR_DELETE,
|
WE_SVR_DELETE,
|
||||||
WE_SVR_DML_BULKROLLBACK,
|
WE_SVR_DML_BULKROLLBACK,
|
||||||
WE_SVR_DML_BULKROLLBACK_CLEANUP,
|
WE_SVR_DML_BULKROLLBACK_CLEANUP,
|
||||||
@ -82,6 +82,7 @@ enum ServerMessages
|
|||||||
WE_END_TRANSACTION,
|
WE_END_TRANSACTION,
|
||||||
WE_SRV_FIX_ROWS,
|
WE_SRV_FIX_ROWS,
|
||||||
WE_SVR_WRITE_CREATE_SYSCOLUMN,
|
WE_SVR_WRITE_CREATE_SYSCOLUMN,
|
||||||
|
WE_SVR_BATCH_INSERT_BINARY,
|
||||||
|
|
||||||
WE_CLT_SRV_DATA=100,
|
WE_CLT_SRV_DATA=100,
|
||||||
WE_CLT_SRV_EOD,
|
WE_CLT_SRV_EOD,
|
||||||
|
@ -149,6 +149,11 @@ void DmlReadThread::operator()()
|
|||||||
//cout << "fWeDMLprocessor " << fWeDMLprocessor << " is processing batchinsert ..." << endl;
|
//cout << "fWeDMLprocessor " << fWeDMLprocessor << " is processing batchinsert ..." << endl;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case WE_SVR_BATCH_INSERT_BINARY:
|
||||||
|
{
|
||||||
|
rc = fWeDMLprocessor->processBatchInsertBinary(ibs, errMsg, PMId);
|
||||||
|
break;
|
||||||
|
}
|
||||||
case WE_SVR_BATCH_INSERT_END:
|
case WE_SVR_BATCH_INSERT_END:
|
||||||
{
|
{
|
||||||
rc = fWeDMLprocessor->processBatchInsertHwm(ibs, errMsg);
|
rc = fWeDMLprocessor->processBatchInsertHwm(ibs, errMsg);
|
||||||
|
Reference in New Issue
Block a user