/* Copyright (C) 2014 InfiniDB, Inc. Copyright (C) 2019 MariaDB Corporation This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ // // $Id: batchprimitiveprocessor-jl.cpp 9705 2013-07-17 20:06:07Z pleblanc $ // C++ Implementation: batchprimitiveprocessor // // Description: // // // Author: Patrick LeBlanc , (C) 2008 // // Copyright: See COPYING file that comes with this distribution // // #include //#define NDEBUG #include #include #include #include using namespace std; #include #include namespace bu = boost::uuids; #include "calpontsystemcatalog.h" using namespace execplan; #include "bpp-jl.h" #include "jlf_common.h" using namespace messageqcpp; using namespace rowgroup; using namespace joiner; #define XXX_BATCHPRIMPROC_TOKENS_RANGES_XXX namespace joblist { BatchPrimitiveProcessorJL::BatchPrimitiveProcessorJL(const ResourceManager* rm) : ot(BPS_ELEMENT_TYPE) , needToSetLBID(true) , count(1) , baseRid(0) , ridCount(0) , needStrValues(false) , wideColumnsWidths(0) , filterCount(0) , projectCount(0) , needRidsAtDelivery(false) , ridMap(0) , sendValues(false) , sendAbsRids(false) , _hasScan(false) , LBIDTrace(false) , tupleLength(0) , status(0) , valueColumn(0) , sendTupleJoinRowGroupData(false) , bop(BOP_AND) , forHJ(false) , threadCount(1) , fJoinerChunkSize(rm->getJlJoinerChunkSize()) , hasSmallOuterJoin(false) , _priority(1) { PMJoinerCount = 0; uuid = bu::nil_generator()(); } BatchPrimitiveProcessorJL::~BatchPrimitiveProcessorJL() { } void BatchPrimitiveProcessorJL::addFilterStep(const pColScanStep& scan, vector lastScannedLBID, bool hasAuxCol, const std::vector& extentsAux, execplan::CalpontSystemCatalog::OID oidAux) { SCommand cc; tableOID = scan.tableOid(); cc.reset(new ColumnCommandJL(scan, lastScannedLBID, hasAuxCol, extentsAux, oidAux)); cc->setBatchPrimitiveProcessor(this); cc->setQueryUuid(scan.queryUuid()); cc->setStepUuid(uuid); filterSteps.push_back(cc); filterCount++; _hasScan = true; if (utils::isWide(cc->getWidth())) wideColumnsWidths |= cc->getWidth(); idbassert(sessionID == scan.sessionId()); } void BatchPrimitiveProcessorJL::addFilterStep(const PseudoColStep& pcs) { SCommand cc; tableOID = pcs.tableOid(); cc.reset(new PseudoCCJL(pcs)); cc->setBatchPrimitiveProcessor(this); cc->setQueryUuid(pcs.queryUuid()); cc->setStepUuid(uuid); filterSteps.push_back(cc); filterCount++; idbassert(sessionID == pcs.sessionId()); } void BatchPrimitiveProcessorJL::addFilterStep(const pColStep& step) { SCommand cc; tableOID = step.tableOid(); cc.reset(new ColumnCommandJL(step)); cc->setBatchPrimitiveProcessor(this); cc->setQueryUuid(step.queryUuid()); cc->setStepUuid(uuid); filterSteps.push_back(cc); filterCount++; if (utils::isWide(cc->getWidth())) wideColumnsWidths |= cc->getWidth(); idbassert(sessionID == step.sessionId()); } void BatchPrimitiveProcessorJL::addFilterStep(const pDictionaryStep& step) { SCommand cc; tableOID = step.tableOid(); if (filterCount == 0) { sendAbsRids = true; sendValues = true; absRids.reset(new uint64_t[8192]); } cc.reset(new DictStepJL(step)); cc->setBatchPrimitiveProcessor(this); cc->setQueryUuid(step.queryUuid()); cc->setStepUuid(uuid); #if defined(XXX_BATCHPRIMPROC_TOKENS_RANGES_XXX) if (filterSteps.size() > 0) { size_t stepsIndex = filterSteps.size() - 1; SCommand prevCC = filterSteps[stepsIndex]; ColumnCommandJL* pcc = dynamic_cast(prevCC.get()); DictStepJL* ccc = dynamic_cast(cc.get()); if (pcc && ccc) { filterSteps[stepsIndex].reset( new ColumnCommandJL(*pcc, *ccc)); // column command will use same filters. } } #endif filterSteps.push_back(cc); filterCount++; needStrValues = true; idbassert(sessionID == step.sessionId()); } void BatchPrimitiveProcessorJL::addFilterStep(const FilterStep& step) { SCommand cc; tableOID = step.tableOid(); cc.reset(new FilterCommandJL(step)); cc->setBatchPrimitiveProcessor(this); cc->setQueryUuid(step.queryUuid()); cc->setStepUuid(uuid); filterSteps.push_back(cc); filterCount++; idbassert(sessionID == step.sessionId()); } void BatchPrimitiveProcessorJL::addProjectStep(const PseudoColStep& step) { SCommand cc; cc.reset(new PseudoCCJL(step)); cc->setBatchPrimitiveProcessor(this); cc->setTupleKey(step.tupleId()); cc->setQueryUuid(step.queryUuid()); cc->setStepUuid(uuid); projectSteps.push_back(cc); colWidths.push_back(cc->getWidth()); tupleLength += cc->getWidth(); projectCount++; idbassert(sessionID == step.sessionId()); } void BatchPrimitiveProcessorJL::addProjectStep(const pColStep& step) { SCommand cc; cc.reset(new ColumnCommandJL(step)); cc->setBatchPrimitiveProcessor(this); cc->setTupleKey(step.tupleId()); cc->setQueryUuid(step.queryUuid()); cc->setStepUuid(uuid); projectSteps.push_back(cc); colWidths.push_back(cc->getWidth()); tupleLength += cc->getWidth(); projectCount++; if (utils::isWide(cc->getWidth())) wideColumnsWidths |= cc->getWidth(); idbassert(sessionID == step.sessionId()); } void BatchPrimitiveProcessorJL::addProjectStep(const PassThruStep& step) { SCommand cc; cc.reset(new PassThruCommandJL(step)); cc->setBatchPrimitiveProcessor(this); cc->setTupleKey(step.tupleId()); cc->setQueryUuid(step.queryUuid()); cc->setStepUuid(uuid); projectSteps.push_back(cc); colWidths.push_back(cc->getWidth()); tupleLength += cc->getWidth(); projectCount++; if (utils::isWide(cc->getWidth())) wideColumnsWidths |= cc->getWidth(); if (filterCount == 0) sendValues = true; idbassert(sessionID == step.sessionId()); } void BatchPrimitiveProcessorJL::addProjectStep(const pColStep& col, const pDictionaryStep& dict) { SCommand cc; cc.reset(new RTSCommandJL(col, dict)); cc->setBatchPrimitiveProcessor(this); cc->setTupleKey(dict.tupleId()); cc->setQueryUuid(col.queryUuid()); cc->setStepUuid(uuid); projectSteps.push_back(cc); colWidths.push_back(cc->getWidth()); tupleLength += cc->getWidth(); projectCount++; needStrValues = true; idbassert(sessionID == col.sessionId()); idbassert(sessionID == dict.sessionId()); } void BatchPrimitiveProcessorJL::addProjectStep(const PassThruStep& p, const pDictionaryStep& dict) { SCommand cc; cc.reset(new RTSCommandJL(p, dict)); cc->setBatchPrimitiveProcessor(this); cc->setTupleKey(dict.tupleId()); cc->setQueryUuid(p.queryUuid()); cc->setStepUuid(uuid); projectSteps.push_back(cc); colWidths.push_back(cc->getWidth()); tupleLength += cc->getWidth(); projectCount++; needStrValues = true; if (filterCount == 0) { sendValues = true; sendAbsRids = true; absRids.reset(new uint64_t[8192]); } idbassert(sessionID == p.sessionId()); idbassert(sessionID == dict.sessionId()); } #if 0 void BatchPrimitiveProcessorJL::addDeliveryStep(const DeliveryStep& ds) { idbassert(sessionID == ds.sessionId()); uint32_t i; templateTB = ds.fEmptyTableBand; tableOID = ds.fTableOID; tableColumnCount = templateTB.getColumnCount(); idbassert(tableColumnCount > 0); tablePositions.reset(new int[tableColumnCount]); for (i = 0; i < tableColumnCount; i++) tablePositions[i] = -1; idbassert(projectCount <= projectSteps.size()); /* In theory, tablePositions maps a column's table position to a projection step -1 means it's a null column */ CalpontSystemCatalog::OID oid; int idx; for (i = 0; i < projectCount; i++) { oid = static_cast(projectSteps[i]->getOID()); if (oid > 0) { idx = templateTB.find(oid); if (idx >= 0) { tablePositions[idx] = i; } else { cout << "BatchPrimitiveProcessorJL::addDeliveryStep(): didn't find OID " << oid << " in tableband at pjstep idx " << i << endl; } } else { cout << "BatchPrimitiveProcessorJL::addDeliveryStep(): pjstep idx " << i << " doesn't have a valid OID" << endl; } } } #endif void BatchPrimitiveProcessorJL::addElementType(const ElementType& et, uint32_t dbroot) { uint32_t i; // rowCounter++; if (needToSetLBID) { needToSetLBID = false; for (i = 0; i < filterCount; ++i) filterSteps[i]->setLBID(et.first, dbroot); for (i = 0; i < projectCount; ++i) projectSteps[i]->setLBID(et.first, dbroot); baseRid = et.first & 0xffffffffffffe000ULL; } // TODO: get rid of magics if (sendAbsRids) absRids[ridCount] = et.first; else { relRids[ridCount] = et.first & 0x1fff; // 8192 rows per logical block ridMap |= 1 << (relRids[ridCount] >> 9); // LSB -> 0-511, MSB -> 7680-8191 } if (sendValues) { // cout << "adding value " << et.second << endl; values[ridCount] = et.second; } ridCount++; idbassert(ridCount <= 8192); } #if 0 void BatchPrimitiveProcessorJL::setRowGroupData(const rowgroup::RowGroup& rg) { uint32_t i; rowgroup::Row r; inputRG.setData(rg.getData()); inputRG.initRow(&r); inputRG.getRow(0, &r); if (needToSetLBID) { needToSetLBID = false; for (i = 0; i < filterCount; ++i) filterSteps[i]->setLBID(r.getRid(), 1); for (i = 0; i < projectCount; ++i) projectSteps[i]->setLBID(r.getRid(), 1); baseRid = r.getRid() & 0xffffffffffffe000ULL; } } #endif void BatchPrimitiveProcessorJL::addElementType(const StringElementType& et, uint32_t dbroot) { if (filterCount == 0) throw logic_error("BPPJL::addElementType(StringElementType): doesn't work without filter steps yet"); addElementType(ElementType(et.first, et.first), dbroot); } /** * When the output type is ElementType, the messages have the following format: * * ISMPacketHeader (ignored) * PrimitiveHeader (ignored) * -- * If the BPP starts with a scan * casual partitioning data from the scanned column * Base Rid for the returned logical block * rid count * (rid count)x 16-bit rids * (rid count)x 64-bit values * If there's a join performed by this BPP * pre-Join rid count (for logging purposes only) * small-side new match count * (match count)x ElementTypes * cached IO count * physical IO count * block touches */ // TODO MCOL-641 Add support here. Refer to BatchPrimitiveProcessor::makeResponse() void BatchPrimitiveProcessorJL::getElementTypes(ByteStream& in, vector* out, bool* validCPData, uint64_t* lbid, int64_t* min, int64_t* max, uint32_t* cachedIO, uint32_t* physIO, uint32_t* touchedBlocks) const { uint32_t i; uint16_t l_count; uint64_t l_baseRid; uint16_t* rids; uint64_t* vals; uint8_t* buf; uint64_t tmp64; uint8_t tmp8; /* skip the header */ idbassert(in.length() > sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader)); in.advance(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader)); if (_hasScan) { in >> tmp8; *validCPData = (tmp8 != 0); if (*validCPData) { in >> *lbid; in >> tmp64; *min = (int64_t)tmp64; in >> tmp64; *max = (int64_t)tmp64; } else in >> *lbid; } in >> l_baseRid; in >> l_count; // rowsProcessed += l_count; idbassert(l_count <= 8192); out->resize(l_count); buf = (uint8_t*)in.buf(); rids = (uint16_t*)buf; vals = (uint64_t*)(buf + (l_count << 1)); idbassert(in.length() > (uint32_t)((l_count << 1) + (l_count << 3))); in.advance((l_count << 1) + (l_count << 3)); for (i = 0; i < l_count; ++i) { (*out)[i].first = rids[i] + l_baseRid; // if (tableOID >= 3000) // idbassert((*out)[i].first > 1023); (*out)[i].second = vals[i]; } in >> *cachedIO; in >> *physIO; in >> *touchedBlocks; // cout << "ET: got physIO=" << (int) *physIO << " cachedIO=" << // (int) *cachedIO << " touchedBlocks=" << (int) *touchedBlocks << endl; idbassert(in.length() == 0); } /** * When the output type is StringElementType the messages have the following format: * * ISMPacketHeader (ignored) * PrimitiveHeader (ignored) * --- * If the BPP starts with a scan * Casual partitioning data from the column scanned * Rid count * (rid count)x 64-bit absolute rids * (rid count)x serialized strings * cached IO count * physical IO count * blocks touched */ // TODO MCOL-641 Add support here. Refer to BatchPrimitiveProcessor::makeResponse() void BatchPrimitiveProcessorJL::getStringElementTypes(ByteStream& in, vector* out, bool* validCPData, uint64_t* lbid, int64_t* min, int64_t* max, uint32_t* cachedIO, uint32_t* physIO, uint32_t* touchedBlocks) const { uint32_t i; uint16_t l_count; uint64_t* l_absRids; uint64_t tmp64; uint8_t tmp8; // cout << "get String ETs uniqueID\n"; /* skip the header */ in.advance(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader)); if (_hasScan) { in >> tmp8; *validCPData = (tmp8 != 0); if (*validCPData) { in >> *lbid; in >> tmp64; *min = (int64_t)tmp64; in >> tmp64; *max = (int64_t)tmp64; } else in >> *lbid; } in >> l_count; // cout << "parsing " << l_count << " strings\n"; l_absRids = (uint64_t*)in.buf(); out->resize(l_count); in.advance(l_count << 3); for (i = 0; i < l_count; ++i) { (*out)[i].first = l_absRids[i]; in >> (*out)[i].second; } in >> *cachedIO; in >> *physIO; in >> *touchedBlocks; // cout << "SET: got physIO=" << (int) *physIO << " cachedIO=" << // (int) *cachedIO << " touchedBlocks=" << (int) *touchedBlocks << endl; idbassert(in.length() == 0); } /** * When the output type is Tuples, the input message format is the same * as when the output type is TableBands */ // TODO MCOL-641 Add support here. Refer to BatchPrimitiveProcessor::makeResponse() void BatchPrimitiveProcessorJL::getTuples(messageqcpp::ByteStream& in, std::vector* out, bool* validCPData, uint64_t* lbid, int64_t* min, int64_t* max, uint32_t* cachedIO, uint32_t* physIO, uint32_t* touchedBlocks) const { uint32_t i, j, pos, len; uint16_t l_rowCount; uint64_t l_baseRid; uint16_t* l_relRids; uint64_t absRids[8192]; // const uint8_t* columnData[projectCount]; const uint8_t** columnData = (const uint8_t**)alloca(projectCount * sizeof(uint8_t*)); memset(columnData, 0, projectCount * sizeof(uint8_t*)); const uint8_t* buf; // uint32_t colLengths[projectCount]; uint32_t* colLengths = (uint32_t*)alloca(projectCount * sizeof(uint32_t)); uint64_t tmp64; uint8_t tmp8; // cout << "getTuples msg is " << in.length() << " bytes\n"; /* skip the header */ in.advance(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader)); if (_hasScan) { in >> tmp8; *validCPData = (tmp8 != 0); if (*validCPData) { in >> *lbid; in >> tmp64; *min = (int64_t)tmp64; in >> tmp64; *max = (int64_t)tmp64; } else in >> *lbid; } in >> l_rowCount; // cout << "read " << l_rowCount << " rows\n"; if (needRidsAtDelivery) { in >> l_baseRid; l_relRids = (uint16_t*)in.buf(); for (i = 0; i < l_rowCount; i++) absRids[i] = l_relRids[i] + l_baseRid; in.advance(l_rowCount << 1); } /* Set up pointers to the column data */ pos = 0; buf = in.buf(); for (i = 0; i < projectCount; i++) { colLengths[i] = *((uint32_t*)&buf[pos]); pos += 4; columnData[i] = &buf[pos]; // cout << "column " << i << " is " << colLengths[i] << " long at " << pos << endl; pos += colLengths[i]; idbassert(pos < in.length()); } in.advance(pos); out->resize(l_rowCount); for (i = 0; i < l_rowCount; i++) { (*out)[i].first = absRids[i]; (*out)[i].second = new char[tupleLength]; for (j = 0, pos = 0; j < projectCount; j++) { idbassert(pos + colWidths[j] <= tupleLength); if (projectSteps[j]->getCommandType() == CommandJL::RID_TO_STRING) { len = *((uint32_t*)columnData[j]); columnData[j] += 4; memcpy(&(*out)[i].second[pos], columnData[j], len); pos += len; columnData[j] += len; // insert padding... memset(&(*out)[i].second[pos], 0, colWidths[j] - len); pos += colWidths[j] - len; } else { switch (colWidths[j]) { case 8: *((uint64_t*)&(*out)[i].second[pos]) = *((uint64_t*)columnData[j]); columnData[j] += 8; pos += 8; break; case 4: *((uint32_t*)&(*out)[i].second[pos]) = *((uint32_t*)columnData[j]); columnData[j] += 4; pos += 4; break; case 2: *((uint16_t*)&(*out)[i].second[pos]) = *((uint16_t*)columnData[j]); columnData[j] += 2; pos += 4; break; case 1: (*out)[i].second[pos] = (char)*columnData[j]; columnData[j]++; pos++; break; // TODO MCOL-641 case 16: // fallthrough default: cout << "BPP::getTuples(): bad column width of " << colWidths[j] << endl; throw logic_error("BPP::getTuples(): bad column width"); } } } } in >> *cachedIO; in >> *physIO; in >> *touchedBlocks; idbassert(in.length() == 0); } bool BatchPrimitiveProcessorJL::countThisMsg(messageqcpp::ByteStream& in) const { const uint8_t* data = in.buf(); uint32_t offset = sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader); // skip the headers ISMPacketHeader* hdr = (ISMPacketHeader*)(data); if (_hasScan && in.length() > offset) { // This is a legitimate error message sent by PrimProc // so we need to return to allow upper layer to throw an error // if needed. if (hdr->Status > 0) { return true; } if (data[offset] != 0) offset += (data[offset + CP_FLAG_AND_LBID + 1] * 2) + CP_FLAG_AND_LBID + 1 + 1; // skip the CP data with wide min/max values (16/32 bytes each). we also skip // cpFromDictScan flag. else offset += CP_FLAG_AND_LBID; // skip only the "valid CP data" & LBID bytes } // Throw b/c PP throws and sends here error msg. // See BatchPrimitiveProcessor::writeErrorMsg() for details. // The inversion of the assert used here previously. if (in.length() <= offset) { if (hdr->Status > 0) { throw std::runtime_error(" an exception originally thrown by PrimProc: "); } throw std::runtime_error( " an exception because there is not enough \ data in the Primitive message from PrimProc."); } return (data[offset] != 0); } void BatchPrimitiveProcessorJL::deserializeAggregateResult(ByteStream* in, vector* out) const { RGData rgData; uint32_t count, i; *in >> count; for (i = 0; i < count; i++) { rgData.deserialize(*in, true); out->push_back(rgData); } } void BatchPrimitiveProcessorJL::getRowGroupData(ByteStream& in, vector* out, bool* validCPData, uint64_t* lbid, bool* fromDictScan, int128_t* min, int128_t* max, uint32_t* cachedIO, uint32_t* physIO, uint32_t* touchedBlocks, bool* countThis, uint32_t threadID, bool* hasWideColumn, const execplan::CalpontSystemCatalog::ColType& colType) const { uint64_t tmp64; int128_t tmp128; uint8_t tmp8; RGData rgData; uint32_t rowCount; RowGroup& org = primprocRG[threadID]; out->clear(); if (in.length() == 0) { // done, return an empty RG rgData = RGData(org, 0); org.setData(&rgData); org.resetRowGroup(0); out->push_back(rgData); *cachedIO = 0; *physIO = 0; *touchedBlocks = 0; return; } /* skip the header */ in.advance(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader)); if (_hasScan) { in >> tmp8; *validCPData = (tmp8 != 0); if (*validCPData) { in >> *lbid; in >> tmp8; *fromDictScan = tmp8 != 0; in >> tmp8; *hasWideColumn = (tmp8 > utils::MAXLEGACYWIDTH); if (UNLIKELY(*hasWideColumn)) { idbassert(colType.colWidth > utils::MAXLEGACYWIDTH); if (LIKELY(colType.isWideDecimalType())) { in >> tmp128; *min = tmp128; in >> tmp128; *max = tmp128; } else { std::ostringstream oss; oss << __func__ << " WARNING!!! Not implemented for the data type "; oss << colType.colDataType << std::endl; std::cout << oss.str(); idbassert(false); } } else { in >> tmp64; *min = static_cast(tmp64); in >> tmp64; *max = static_cast(tmp64); } } else in >> *lbid; } in >> tmp8; *countThis = (tmp8 != 0); /* Would be cleaner to make the PM BPP's send a msg count to unify the msg formats. * For later... */ if (aggregatorPM) { deserializeAggregateResult(&in, out); // for (uint32_t z = 0; z < out->size(); z++) { // org.setData(&(*out)[z]); // cout << "BPPJL: " << org.toString() << endl; //} } else { rgData.deserialize(in, true); out->push_back(rgData); org.setData(&rgData); rowCount = org.getRowCount(); bool pmSendsMatchesAnyway = (hasSmallOuterJoin && *countThis && PMJoinerCount > 0 && (fe2 || aggregatorPM)); if (!pmSendsFinalResult() || pmSendsMatchesAnyway) { std::shared_ptr[]> joinResults; uint32_t i, j; if (pmSendsMatchesAnyway) { uint16_t joinRowCount; in >> joinRowCount; rowCount = joinRowCount; } for (j = 0; j < PMJoinerCount; j++) { /* Reuse the result space if possible */ joinResults = tJoiners[j]->getPMJoinArrays(threadID); if (joinResults.get() == NULL) { auto v = new vector[8192]; joinResults.reset(v); tJoiners[j]->setPMJoinResults(joinResults, threadID); } for (i = 0; i < rowCount; i++) deserializeInlineVector(in, joinResults[i]); if (tJoiners[j]->smallOuterJoin()) tJoiners[j]->markMatches(threadID, rowCount); } } } if (*countThis) { // cout << "grabbing io stats\n"; in >> *cachedIO; in >> *physIO; in >> *touchedBlocks; } else { *cachedIO = 0; *physIO = 0; *touchedBlocks = 0; } idbassert(in.length() == 0); } RGData BatchPrimitiveProcessorJL::getErrorRowGroupData(uint16_t error) const { RGData ret; rowgroup::RowGroup rg(projectionRG); ret = RGData(rg, 0); rg.setData(&ret); // rg.convertToInlineDataInPlace(); rg.resetRowGroup(0); rg.setStatus(error); return ret; // return ret->rowData; } void BatchPrimitiveProcessorJL::reset() { ridCount = 0; ridMap = 0; needToSetLBID = true; } void BatchPrimitiveProcessorJL::setLBID(uint64_t l, const BRM::EMEntry& scannedExtent) { uint32_t i; dbRoot = scannedExtent.dbRoot; baseRid = rowgroup::convertToRid( scannedExtent.partitionNum, scannedExtent.segmentNum, scannedExtent.blockOffset / (scannedExtent.range.size * 1024), // the extent # (l - scannedExtent.range.start) / scannedExtent.range.size); // the logical block # /* cout << "got baserid=" << baseRid << " from partnum=" << scannedExtent.partitionNum << " segnum=" << scannedExtent.segmentNum << " extentnum=" << scannedExtent.blockOffset/(scannedExtent.range.size * 1024) << " blocknum=" << (l - scannedExtent.range.start)/scannedExtent.range.size << endl; */ for (i = 0; i < filterCount; ++i) filterSteps[i]->setLBID(baseRid, dbRoot); for (i = 0; i < projectCount; ++i) projectSteps[i]->setLBID(baseRid, dbRoot); } string BatchPrimitiveProcessorJL::toString() const { ostringstream ret; uint32_t i; ret << "BatchPrimitiveProcessorJL:" << endl; if (!_hasScan) { if (sendValues) ret << " -- serializing values" << endl; if (sendAbsRids) ret << " -- serializing absolute rids" << endl; else ret << " -- serializing relative rids" << endl; } else ret << " -- scan driven" << endl; ret << " " << (int)filterCount << " filter steps:\n"; for (i = 0; i < filterCount; i++) ret << " " << filterSteps[i]->toString() << endl; ret << " " << (int)projectCount << " projection steps:\n"; for (i = 0; i < projectCount; i++) ret << " " << projectSteps[i]->toString() << endl; return ret.str(); } /** * The creation messages have the following format: * * ISMPacketHeader (necessary for DEC and ReadThread classes) * --- * output type (ElementType, StringElementType, etc) * version # * transaction # * session # (unnecessary except for debugging) * step ID ( same ) * unique ID (uniquely identifies the DEC queue on the UM) * 8-bit flags * if there's a join... * # of elements in the Joiner object * a flag whether or not to match every element (for inner joins) * filter step count * (filter count)x serialized Commands * projection step count * (projection count)x serialized Commands */ void BatchPrimitiveProcessorJL::createBPP(ByteStream& bs) const { ISMPacketHeader ism; uint32_t i; uint16_t flags = 0; ism.Command = BATCH_PRIMITIVE_CREATE; bs.load((uint8_t*)&ism, sizeof(ism)); bs << (uint8_t)ot; bs << (messageqcpp::ByteStream::quadbyte)txnID; bs << (messageqcpp::ByteStream::quadbyte)sessionID; bs << (messageqcpp::ByteStream::quadbyte)stepID; bs << uniqueID; bs << versionInfo; if (needStrValues) flags |= NEED_STR_VALUES; if (sendAbsRids) flags |= GOT_ABS_RIDS; if (sendValues) flags |= GOT_VALUES; if (LBIDTrace) flags |= LBID_TRACE; if (needRidsAtDelivery) flags |= SEND_RIDS_AT_DELIVERY; if (tJoiners.size() > 0) flags |= HAS_JOINER; if (sendTupleJoinRowGroupData) flags |= JOIN_ROWGROUP_DATA; if (wideColumnsWidths) flags |= HAS_WIDE_COLUMNS; bs << flags; if (wideColumnsWidths) bs << wideColumnsWidths; bs << bop; bs << (uint8_t)(forHJ ? 1 : 0); if (ot == ROW_GROUP) { bs << projectionRG; // cout << "BPPJL: projectionRG is:\n" << projectionRG.toString() << endl; /* F&E serialization */ if (fe1) { bs << (uint8_t)1; bs << *fe1; bs << fe1Input; } else bs << (uint8_t)0; if (fe2) { bs << (uint8_t)1; bs << *fe2; bs << fe2Output; } else bs << (uint8_t)0; } /* if HAS_JOINER, send the init params */ if (flags & HAS_JOINER) { bs << (uint32_t)maxPmJoinResultCount; if (ot == ROW_GROUP) { idbassert(tJoiners.size() > 0); bs << (messageqcpp::ByteStream::quadbyte)PMJoinerCount; bool atLeastOneFE = false; #ifdef JLF_DEBUG cout << "PMJoinerCount = " << PMJoinerCount << endl; #endif bool smallSideRGSent = false; for (i = 0; i < PMJoinerCount; i++) { bs << (uint32_t)tJoiners[i]->size(); bs << tJoiners[i]->getJoinType(); // bs << (uint64_t) tJoiners[i]->smallNullValue(); bs << (uint8_t)tJoiners[i]->isTypelessJoin(); if (tJoiners[i]->hasFEFilter()) { atLeastOneFE = true; #ifdef JLF_DEBUG cout << "serializing join FE object\n"; #endif bs << *tJoiners[i]->getFcnExpFilter(); } if (!tJoiners[i]->isTypelessJoin()) { bs << (uint64_t)tJoiners[i]->smallNullValue(); bs << (messageqcpp::ByteStream::quadbyte)tJoiners[i]->getLargeKeyColumn(); // cout << "large key column is " << (uint32_t) tJoiners[i]->getLargeKeyColumn() << endl; } else { serializeVector(bs, tJoiners[i]->getLargeKeyColumns()); bs << (uint32_t)tJoiners[i]->getKeyLength(); // MCOL-4173 Notify PP if smallSide and largeSide have different column widths // and send smallSide RG to PP. bool joinHasSkewedKeyColumn = tJoiners[i]->joinHasSkewedKeyColumn(); bs << (uint8_t)joinHasSkewedKeyColumn; if (!smallSideRGSent && joinHasSkewedKeyColumn) { idbassert(!smallSideRGs.empty()); bs << smallSideRGs[0]; serializeVector(bs, tJoiners[i]->getSmallKeyColumns()); smallSideRGSent = true; } } } if (atLeastOneFE) bs << joinFERG; if (sendTupleJoinRowGroupData) { #ifdef JLF_DEBUG cout << "sending smallside data\n"; #endif serializeVector(bs, smallSideRGs); bs << largeSideRG; bs << joinedRG; // TODO: I think we can omit joinedRG if (!(fe2 || aggregatorPM)) // cout << "joined RG: " << joinedRG.toString() << endl; } } } bs << filterCount; for (i = 0; i < filterCount; ++i) { // cout << "serializing step " << i << endl; filterSteps[i]->createCommand(bs); } bs << projectCount; for (i = 0; i < projectCount; ++i) projectSteps[i]->createCommand(bs); // aggregate step only when output is row group if (ot == ROW_GROUP) { if (aggregatorPM.get() != NULL) { bs << (uint8_t)1; bs << aggregateRGPM; bs << *(aggregatorPM.get()); } else { bs << (uint8_t)0; } } // decide which rowgroup is received by PrimProc if (ot == ROW_GROUP) { primprocRG.reset(new RowGroup[threadCount]); for (uint32_t i = 0; i < threadCount; i++) if (aggregatorPM) primprocRG[i] = aggregateRGPM; else if (fe2) primprocRG[i] = fe2Output; // This shouldn't be necessary. As of 2-17-14, PrimProc // will only send joined results if fe2 || aggregatorPM, // so it will never send back data for joinedRG. // else if ((flags & HAS_JOINER) && sendTupleJoinRowGroupData) // primprocRG[i] = joinedRG; else primprocRG[i] = projectionRG; } } /** * The BPP Run messages have the following format: * * ISMPacketHeader * -- Interleave field is used for DEC interleaving data * -- Size field is used for the relative weight to process the message * ----- * Session ID * Step ID * Unique ID * Sequence # * Iteration count * Rid count * If absolute rids are sent * (rid count)x 64-bit absolute rids * else * RID range bitmap * base RID * (rid count)x 16-bit relative rids * If values are sent * (rid count)x 64-bit values * (filter count)x run msgs for filter Commands * (projection count)x run msgs for projection Commands */ // The deser counterpart function is BPP::resetBPP void BatchPrimitiveProcessorJL::runBPP(ByteStream& bs, uint32_t pmNum, bool isExeMgrDEC) { ISMPacketHeader ism; uint32_t i; /* XXXPAT: BPPJL currently reuses the ism Size fields for other things to save bandwidth. They're completely unused otherwise. We need to throw out all unused fields of every defined header. */ bs.restart(); memset((void*)&ism, 0, sizeof(ism)); ism.Command = BATCH_PRIMITIVE_RUN; // TODO: this causes the code using this ism to send on only one socket, even if more than one socket is // defined for each PM. ism.Interleave = pmNum; // ism.Interleave = pmNum - 1; /* ... and the Size field is used for the "weight" of processing a BPP where 1 is one Command on one logical block. */ ism.Size = count * (filterCount + projectCount); bs.append((uint8_t*)&ism, sizeof(ism)); /* The next 4 vars are for BPPSeeder; BPP itself skips them */ bs << sessionID; bs << stepID; bs << uniqueID; bs << _priority; // The weight is used by PrimProc thread pool algo uint32_t weight = calculateBPPWeight(); bs << weight; bs << dbRoot; bs << count; uint8_t sentByEM = (isExeMgrDEC) ? 1 : 0; bs << sentByEM; if (_hasScan) { idbassert(ridCount == 0); } else { idbassert(ridCount > 0 && (ridMap != 0 || sendAbsRids)); } bs << ridCount; if (sendAbsRids) bs.append((uint8_t*)absRids.get(), ridCount << 3); else { bs << ridMap; bs << baseRid; bs.append((uint8_t*)relRids, ridCount << 1); } if (sendValues) bs.append((uint8_t*)values, ridCount << 3); for (i = 0; i < filterCount; i++) filterSteps[i]->runCommand(bs); for (i = 0; i < projectCount; i++) projectSteps[i]->runCommand(bs); } void BatchPrimitiveProcessorJL::runErrorBPP(ByteStream& bs) { ISMPacketHeader ism; bs.restart(); memset((void*)&ism, 0, sizeof(ism)); ism.Command = BATCH_PRIMITIVE_RUN; ism.Status = status; ism.Size = sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader); bs.append((uint8_t*)&ism, sizeof(ism)); bs << (messageqcpp::ByteStream::quadbyte)sessionID; bs << (messageqcpp::ByteStream::quadbyte)stepID; bs << uniqueID; bs << count; bs << ridCount; } void BatchPrimitiveProcessorJL::destroyBPP(ByteStream& bs) const { ISMPacketHeader ism; // if (!(sessionID & 0x80000000)) // cout << "step ID " << stepID << " added " << rowCounter << " rows, processed " // << rowsProcessed << " rows" << endl; memset((void*)&ism, 0, sizeof(ism)); ism.Command = BATCH_PRIMITIVE_DESTROY; bs.append((uint8_t*)&ism, sizeof(ism)); /* XXXPAT: We can get rid of sessionID and stepID; it's there for easier debugging only */ bs << (messageqcpp::ByteStream::quadbyte)sessionID; bs << (messageqcpp::ByteStream::quadbyte)stepID; bs << uniqueID; } void BatchPrimitiveProcessorJL::useJoiners(const vector >& j) { pos = 0; joinerNum = 0; tJoiners = j; PMJoinerCount = 0; tlKeyLens.reset(new uint32_t[tJoiners.size()]); for (uint32_t i = 0; i < tJoiners.size(); i++) { if (tJoiners[i]->inPM()) { PMJoinerCount++; smallSideKeys.push_back(tJoiners[i]->getSmallKeyColumns()); smallSideRGs.push_back(tJoiners[i]->getSmallRG()); if (tJoiners[i]->isTypelessJoin()) tlKeyLens[i] = tJoiners[i]->getKeyLength(); if (tJoiners[i]->hasFEFilter()) sendTupleJoinRowGroupData = true; if (tJoiners[i]->smallOuterJoin()) hasSmallOuterJoin = true; } } largeSideRG = tJoiners[0]->getLargeRG(); if (aggregatorPM || fe2) { sendTupleJoinRowGroupData = true; #ifdef JLF_DEBUG cout << "will send small side row data\n"; #endif } posByJoinerNum.reset(new uint32_t[PMJoinerCount]); memset(posByJoinerNum.get(), 0, PMJoinerCount * sizeof(uint32_t)); } // helper fcn to interleave small side data by joinernum bool BatchPrimitiveProcessorJL::pickNextJoinerNum() { uint i; // find the next joiner that still has more data to send. Set joinerNum & pos. for (i = 0; i < PMJoinerCount; i++) { joinerNum = (joinerNum + 1) % PMJoinerCount; if (posByJoinerNum[joinerNum] != tJoiners[joinerNum]->getSmallSide()->size()) break; } if (i == PMJoinerCount) return false; pos = posByJoinerNum[joinerNum]; return true; } /* This algorithm relies on the joiners being sorted by size atm */ /* XXXPAT: Going to interleave across joiners to take advantage of the new locking env in PrimProc */ bool BatchPrimitiveProcessorJL::nextTupleJoinerMsg(ByteStream& bs) { uint32_t size = 0, toSend, i, j; ISMPacketHeader ism; Row r; vector* tSmallSide; joiner::TypelessData tlData; uint32_t smallKeyCol; uint32_t largeKeyCol; uint64_t smallkey; bool isNull; bool bSignedUnsigned; bool moreMsgs = pickNextJoinerNum(); if (!moreMsgs) { /* last message */ // cout << "sending last joiner msg\n"; ism.Command = BATCH_PRIMITIVE_END_JOINER; bs.load((uint8_t*)&ism, sizeof(ism)); bs << (messageqcpp::ByteStream::quadbyte)sessionID; bs << (messageqcpp::ByteStream::quadbyte)stepID; bs << uniqueID; return false; } memset((void*)&ism, 0, sizeof(ism)); tSmallSide = tJoiners[joinerNum]->getSmallSide(); size = tSmallSide->size(); #if 0 if (joinerNum == PMJoinerCount - 1 && pos == size) { /* last message */ // cout << "sending last joiner msg\n"; ism.Command = BATCH_PRIMITIVE_END_JOINER; bs.load((uint8_t*) &ism, sizeof(ism)); bs << (messageqcpp::ByteStream::quadbyte)sessionID; bs << (messageqcpp::ByteStream::quadbyte)stepID; bs << uniqueID; pos = 0; return false; } else if (pos == size) { joinerNum++; tSmallSide = tJoiners[joinerNum]->getSmallSide(); size = tSmallSide->size(); pos = 0; } #endif ism.Command = BATCH_PRIMITIVE_ADD_JOINER; bs.load((uint8_t*)&ism, sizeof(ism)); bs << (messageqcpp::ByteStream::quadbyte)sessionID; bs << (messageqcpp::ByteStream::quadbyte)stepID; bs << uniqueID; smallSideRGs[joinerNum].initRow(&r); unsigned metasize = 12; // 12 = sizeof(struct JoinerElements) if (tJoiners[joinerNum]->isTypelessJoin()) metasize = 0; unsigned rowunits = fJoinerChunkSize / (r.getSize() + metasize); toSend = std::min(size - pos, rowunits); bs << toSend; bs << pos; bs << joinerNum; if (tJoiners[joinerNum]->isTypelessJoin()) { utils::FixedAllocator fa(tlKeyLens[joinerNum], true); for (i = pos; i < pos + toSend; i++) { r.setPointer((*tSmallSide)[i]); isNull = false; bSignedUnsigned = tJoiners[joinerNum]->isSignedUnsignedJoin(); for (j = 0; j < smallSideKeys[joinerNum].size(); j++) { isNull |= r.isNullValue(smallSideKeys[joinerNum][j]); if (UNLIKELY(bSignedUnsigned)) { // BUG 5628 If this is a signed/unsigned join column and the sign bit is set on either side, // then it should not compare. Send null to PM to prevent compare smallKeyCol = smallSideKeys[joinerNum][j]; largeKeyCol = tJoiners[joinerNum]->getLargeKeyColumns()[j]; if (r.isUnsigned(smallKeyCol) != largeSideRG.isUnsigned(largeKeyCol)) { if (r.isUnsigned(smallKeyCol)) smallkey = r.getUintField(smallKeyCol); else smallkey = r.getIntField(smallKeyCol); if (smallkey & 0x8000000000000000ULL) { isNull = true; break; } } } } if (!isNull) { tlData = makeTypelessKey(r, smallSideKeys[joinerNum], tlKeyLens[joinerNum], &fa, largeSideRG, tJoiners[joinerNum]->getLargeKeyColumns()); if (tlData.len == 0) { isNull = true; } } bs << (uint8_t)isNull; if (!isNull) { tlData.serialize(bs); bs << i; } } } else { #pragma pack(push, 1) struct JoinerElements { int64_t key; uint32_t value; } * arr; #pragma pack(pop) bs.needAtLeast(toSend * sizeof(JoinerElements)); arr = (JoinerElements*)bs.getInputPtr(); smallKeyCol = smallSideKeys[joinerNum][0]; bSignedUnsigned = r.isUnsigned(smallKeyCol) != largeSideRG.isUnsigned(tJoiners[joinerNum]->getLargeKeyColumns()[0]); j = 0; for (i = pos, j = 0; i < pos + toSend; ++i, ++j) { r.setPointer((*tSmallSide)[i]); if (r.getColType(smallKeyCol) == CalpontSystemCatalog::LONGDOUBLE) { // Small side is a long double. Since CS can't store larger than DOUBLE, // we need to convert to whatever type large side is -- double or int64 long double smallkeyld = r.getLongDoubleField(smallKeyCol); switch (largeSideRG.getColType(tJoiners[joinerNum]->getLargeKeyColumns()[0])) { case CalpontSystemCatalog::DOUBLE: case CalpontSystemCatalog::UDOUBLE: case CalpontSystemCatalog::FLOAT: case CalpontSystemCatalog::UFLOAT: { if (smallkeyld > MAX_DOUBLE || smallkeyld < MIN_DOUBLE) { smallkey = joblist::UBIGINTEMPTYROW; } else { double d = (double)smallkeyld; smallkey = *(int64_t*)&d; } break; } default: { if (r.isUnsigned(smallKeyCol) && smallkeyld > MAX_UBIGINT) { smallkey = joblist::UBIGINTEMPTYROW; } else if (smallkeyld > MAX_BIGINT || smallkeyld < MIN_BIGINT) { smallkey = joblist::UBIGINTEMPTYROW; } else { smallkey = (int64_t)smallkeyld; } break; } } } else if (r.isUnsigned(smallKeyCol)) smallkey = r.getUintField(smallKeyCol); else smallkey = r.getIntField(smallKeyCol); // If this is a compare signed vs unsigned and the sign bit is on for this value, then all compares // against the large side should fall. UBIGINTEMPTYROW is not a valid value, so nothing will match. if (bSignedUnsigned && (smallkey & 0x8000000000000000ULL)) smallkey = joblist::UBIGINTEMPTYROW; arr[j].key = (int64_t)smallkey; arr[j].value = i; // cout << "sending " << arr[j].key << ", " << arr[j].value << endl; } bs.advanceInputPtr(toSend * sizeof(JoinerElements)); } if (sendTupleJoinRowGroupData) { RowGroup& smallSide = smallSideRGs[joinerNum]; RGData tmpData(smallSide, toSend); Row tmpRow; smallSide.setData(&tmpData); smallSide.initRow(&tmpRow); smallSide.getRow(0, &tmpRow); for (i = pos; i < pos + toSend; i++, tmpRow.nextRow()) { r.setPointer((*tSmallSide)[i]); copyRow(r, &tmpRow); } smallSide.setRowCount(toSend); tmpData.serialize(bs, smallSide.getDataSize()); } pos += toSend; posByJoinerNum[joinerNum] = pos; return true; } void BatchPrimitiveProcessorJL::setProjectionRowGroup(const rowgroup::RowGroup& rg) { ot = ROW_GROUP; projectionRG = rg; } void BatchPrimitiveProcessorJL::setJoinedRowGroup(const rowgroup::RowGroup& rg) { joinedRG = rg; } void BatchPrimitiveProcessorJL::setInputRowGroup(const rowgroup::RowGroup& rg) { sendAbsRids = false; sendValues = false; inputRG = rg; } void BatchPrimitiveProcessorJL::addAggregateStep(const rowgroup::SP_ROWAGG_PM_t& aggpm, const rowgroup::RowGroup& argpm) { aggregatorPM = aggpm; aggregateRGPM = argpm; if (tJoiners.size() > 0) sendTupleJoinRowGroupData = true; } /* OR hacks */ void BatchPrimitiveProcessorJL::setBOP(uint32_t op) { bop = op; if (op == BOP_OR && filterCount > 1) { for (int i = 1; i < filterCount; ++i) { ColumnCommandJL* colcmd = dynamic_cast(filterSteps[i].get()); if (colcmd != NULL) colcmd->scan(false); } } } void BatchPrimitiveProcessorJL::setForHJ(bool b) { forHJ = b; } void BatchPrimitiveProcessorJL::setFEGroup1(boost::shared_ptr fe, const RowGroup& input) { fe1 = fe; fe1Input = input; } void BatchPrimitiveProcessorJL::setFEGroup2(boost::shared_ptr fe, const RowGroup& output) { fe2 = fe; fe2Output = output; if (tJoiners.size() > 0 && PMJoinerCount > 0) sendTupleJoinRowGroupData = true; } const string BatchPrimitiveProcessorJL::toMiniString() const { ostringstream oss; int i; set colset; string col; for (i = 0; i < filterCount; i++) { col = filterSteps[i]->getColName(); // FilterCommandJL has two referenced columns, needs special handling. FilterCommandJL* filterCmd = dynamic_cast(filterSteps[i].get()); if (filterCmd == NULL) { colset.insert(col); } else { // is a FilterCommandJL size_t sep = col.find(','); colset.insert(col.substr(0, sep)); if (sep != string::npos) colset.insert(col.substr(++sep)); } } for (i = 0; i < projectCount; i++) { col = projectSteps[i]->getColName(); colset.insert(col); } set::iterator iter = colset.begin(); oss << '(' << *iter++; while (iter != colset.end()) oss << ',' << *iter++; oss << ')'; return oss.str(); } void BatchPrimitiveProcessorJL::setJoinFERG(const RowGroup& rg) { joinFERG = rg; } void BatchPrimitiveProcessorJL::abortProcessing(ByteStream* bs) { ISMPacketHeader ism; memset((void*)&ism, 0, sizeof(ism)); ism.Command = BATCH_PRIMITIVE_ABORT; bs->load((uint8_t*)&ism, sizeof(ism)); *bs << uniqueID; } void BatchPrimitiveProcessorJL::deliverStringTableRowGroup(bool b) { if (aggregatorPM) aggregateRGPM.setUseStringTable(b); else if (fe2) fe2Output.setUseStringTable(b); else projectionRG.setUseStringTable(b); } } // namespace joblist