/* Copyright (C) 2014 InfiniDB, Inc. Copyright (c) 2019 MariaDB Corporation This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ // // C++ Implementation: rowgroup // // Description: // // Author: Patrick LeBlanc , (C) 2008 // // #define NDEBUG #include #include using namespace std; #include #include "bytestream.h" using namespace messageqcpp; #include "calpontsystemcatalog.h" using namespace execplan; #include "nullvaluemanip.h" #include "rowgroup.h" #include "dataconvert.h" #include "columnwidth.h" namespace rowgroup { using cscType = execplan::CalpontSystemCatalog::ColDataType; StringStore::~StringStore() { #if 0 // for mem usage debugging uint32_t i; uint64_t inUse = 0, allocated = 0; for (i = 0; i < mem.size(); i++) { MemChunk* tmp = (MemChunk*) mem.back().get(); inUse += tmp->currentSize; allocated += tmp->capacity; } if (allocated > 0) cout << "~SS: " << inUse << "/" << allocated << " = " << (float) inUse / (float) allocated << endl; #endif } uint64_t StringStore::storeString(const uint8_t* data, uint32_t len) { MemChunk* lastMC = nullptr; uint64_t ret = 0; empty = false; // At least a nullptr is being stored. // Sometimes the caller actually wants "" to be returned....... argggghhhh...... // if (len == 0) // return numeric_limits::max(); if (!data) return numeric_limits::max(); //@bug6065, make StringStore::storeString() thread safe boost::mutex::scoped_lock lk(fMutex, boost::defer_lock); if (fUseStoreStringMutex) lk.lock(); if (mem.size() > 0) lastMC = (MemChunk*)mem.back().get(); if ((len + 4) >= CHUNK_SIZE) { std::shared_ptr newOne(new uint8_t[len + sizeof(MemChunk) + 4]); longStrings.push_back(newOne); lastMC = (MemChunk*)longStrings.back().get(); lastMC->capacity = lastMC->currentSize = len + 4; memcpy(lastMC->data, &len, 4); memcpy(lastMC->data + 4, data, len); // High bit to mark a long string ret = 0x8000000000000000; ret += longStrings.size() - 1; } else { if ((lastMC == nullptr) || (lastMC->capacity - lastMC->currentSize < (len + 4))) { // mem usage debugging // if (lastMC) // cout << "Memchunk efficiency = " << lastMC->currentSize << "/" << lastMC->capacity << endl; std::shared_ptr newOne(new uint8_t[CHUNK_SIZE + sizeof(MemChunk)]); mem.push_back(newOne); lastMC = (MemChunk*)mem.back().get(); lastMC->currentSize = 0; lastMC->capacity = CHUNK_SIZE; memset(lastMC->data, 0, CHUNK_SIZE); } ret = ((mem.size() - 1) * CHUNK_SIZE) + lastMC->currentSize; // If this ever happens then we have big problems if (ret & 0x8000000000000000) throw logic_error("StringStore memory exceeded."); memcpy(&(lastMC->data[lastMC->currentSize]), &len, 4); memcpy(&(lastMC->data[lastMC->currentSize]) + 4, data, len); /* cout << "stored: '" << hex; for (uint32_t i = 0; i < len ; i++) { cout << (char) lastMC->data[lastMC->currentSize + i]; } cout << "' at position " << lastMC->currentSize << " len " << len << dec << endl; */ lastMC->currentSize += len + 4; } return ret; } void StringStore::serialize(ByteStream& bs) const { uint64_t i; MemChunk* mc; bs << (uint64_t)mem.size(); bs << (uint8_t)empty; for (i = 0; i < mem.size(); i++) { mc = (MemChunk*)mem[i].get(); bs << (uint64_t)mc->currentSize; // cout << "serialized " << mc->currentSize << " bytes\n"; bs.append(mc->data, mc->currentSize); } bs.setLongStrings(longStrings); } void StringStore::deserialize(ByteStream& bs) { uint64_t i; uint64_t count; uint64_t size; uint8_t* buf; MemChunk* mc; uint8_t tmp8; // mem.clear(); bs >> count; mem.resize(count); bs >> tmp8; empty = (bool)tmp8; for (i = 0; i < count; i++) { bs >> size; // cout << "deserializing " << size << " bytes\n"; buf = bs.buf(); mem[i].reset(new uint8_t[size + sizeof(MemChunk)]); mc = (MemChunk*)mem[i].get(); mc->currentSize = size; mc->capacity = size; memcpy(mc->data, buf, size); bs.advance(size); } longStrings = bs.getLongStrings(); return; } void StringStore::clear() { vector > emptyv; vector > emptyv2; mem.swap(emptyv); longStrings.swap(emptyv2); empty = true; } uint32_t UserDataStore::storeUserData(mcsv1sdk::mcsv1Context& context, boost::shared_ptr data, uint32_t len) { uint32_t ret = 0; if (len == 0 || data == nullptr) { return numeric_limits::max(); } boost::mutex::scoped_lock lk(fMutex, boost::defer_lock); if (fUseUserDataMutex) lk.lock(); StoreData storeData; storeData.length = len; storeData.functionName = context.getName(); storeData.userData = data; vStoreData.push_back(storeData); ret = vStoreData.size(); return ret; } boost::shared_ptr UserDataStore::getUserData(uint32_t off) const { if (off == std::numeric_limits::max()) return boost::shared_ptr(); if ((vStoreData.size() < off) || off == 0) return boost::shared_ptr(); return vStoreData[off - 1].userData; } void UserDataStore::serialize(ByteStream& bs) const { size_t i; bs << (uint32_t)vStoreData.size(); for (i = 0; i < vStoreData.size(); ++i) { const StoreData& storeData = vStoreData[i]; bs << storeData.length; bs << storeData.functionName; storeData.userData->serialize(bs); } } void UserDataStore::deserialize(ByteStream& bs) { size_t i; uint32_t cnt; bs >> cnt; // vStoreData.clear(); vStoreData.resize(cnt); for (i = 0; i < cnt; i++) { bs >> vStoreData[i].length; bs >> vStoreData[i].functionName; // We don't have easy access to the context here, so we do our own lookup if (vStoreData[i].functionName.length() == 0) { throw std::logic_error("UserDataStore::deserialize: has empty name"); } mcsv1sdk::UDAF_MAP::iterator funcIter = mcsv1sdk::UDAFMap::getMap().find(vStoreData[i].functionName); if (funcIter == mcsv1sdk::UDAFMap::getMap().end()) { std::ostringstream errmsg; errmsg << "UserDataStore::deserialize: " << vStoreData[i].functionName << " is undefined"; throw std::logic_error(errmsg.str()); } mcsv1sdk::mcsv1_UDAF::ReturnCode rc; mcsv1sdk::UserData* userData = nullptr; rc = funcIter->second->createUserData(userData, vStoreData[i].length); if (rc != mcsv1sdk::mcsv1_UDAF::SUCCESS) { std::ostringstream errmsg; errmsg << "UserDataStore::deserialize: " << vStoreData[i].functionName << " createUserData failed(" << rc << ")"; throw std::logic_error(errmsg.str()); } userData->unserialize(bs); vStoreData[i].userData = boost::shared_ptr(userData); } return; } RGData::RGData(const RowGroup& rg, uint32_t rowCount) { // cout << "rgdata++ = " << __sync_add_and_fetch(&rgDataCount, 1) << endl; rowData.reset(new uint8_t[rg.getDataSize(rowCount)]); if (rg.usesStringTable() && rowCount > 0) strings.reset(new StringStore()); userDataStore.reset(); #ifdef VALGRIND /* In a PM-join, we can serialize entire tables; not every value has been * filled in yet. Need to look into that. Valgrind complains that * those bytes are uninitialized, this suppresses that error. */ memset(rowData.get(), 0, rg.getDataSize(rowCount)); // XXXPAT: make valgrind happy temporarily #endif memset(rowData.get(), 0, rg.getDataSize(rowCount)); // XXXPAT: make valgrind happy temporarily columnCount = rg.getColumnCount(); rowSize = rg.getRowSize(); } RGData::RGData(const RowGroup& rg) { // cout << "rgdata++ = " << __sync_add_and_fetch(&rgDataCount, 1) << endl; rowData.reset(new uint8_t[rg.getMaxDataSize()]); if (rg.usesStringTable()) strings.reset(new StringStore()); userDataStore.reset(); #ifdef VALGRIND /* In a PM-join, we can serialize entire tables; not every value has been * filled in yet. Need to look into that. Valgrind complains that * those bytes are uninitialized, this suppresses that error. */ memset(rowData.get(), 0, rg.getMaxDataSize()); #endif columnCount = rg.getColumnCount(); rowSize = rg.getRowSize(); } void RGData::reinit(const RowGroup& rg, uint32_t rowCount) { rowData.reset(new uint8_t[rg.getDataSize(rowCount)]); userDataStore.reset(); if (rg.usesStringTable()) strings.reset(new StringStore()); else strings.reset(); #ifdef VALGRIND /* In a PM-join, we can serialize entire tables; not every value has been * filled in yet. Need to look into that. Valgrind complains that * those bytes are uninitialized, this suppresses that error. */ memset(rowData.get(), 0, rg.getDataSize(rowCount)); #endif columnCount = rg.getColumnCount(); rowSize = rg.getRowSize(); } void RGData::reinit(const RowGroup& rg) { reinit(rg, 8192); } void RGData::serialize(ByteStream& bs, uint32_t amount) const { // cout << "serializing!\n"; bs << (uint32_t)RGDATA_SIG; bs << (uint32_t)amount; bs << columnCount; bs << rowSize; bs.append(rowData.get(), amount); if (strings) { bs << (uint8_t)1; strings->serialize(bs); } else bs << (uint8_t)0; if (userDataStore) { bs << (uint8_t)1; userDataStore->serialize(bs); } else bs << (uint8_t)0; } void RGData::deserialize(ByteStream& bs, uint32_t defAmount) { uint32_t amount, sig; uint8_t* buf; uint8_t tmp8; bs.peek(sig); if (sig == RGDATA_SIG) { bs >> sig; bs >> amount; uint32_t colCountTemp; uint32_t rowSizeTemp; bs >> colCountTemp; bs >> rowSizeTemp; if (rowSize != 0 || columnCount != 0) { idbassert(rowSize == rowSizeTemp && colCountTemp == columnCount); } else { // if deserializing into an empty RGData created by default constructor // which sets columnCount = 0 and rowSize = 0, set columnCount and rowSize // from deserialized bytestream columnCount = colCountTemp; rowSize = rowSizeTemp; } rowData.reset(new uint8_t[std::max(amount, defAmount)]); buf = bs.buf(); memcpy(rowData.get(), buf, amount); bs.advance(amount); bs >> tmp8; if (tmp8) { strings.reset(new StringStore()); strings->deserialize(bs); } else strings.reset(); // UDAF user data bs >> tmp8; if (tmp8) { userDataStore.reset(new UserDataStore()); userDataStore->deserialize(bs); } else userDataStore.reset(); } return; } void RGData::clear() { rowData.reset(); strings.reset(); userDataStore.reset(); } // UserDataStore is only used for UDAF. // Just in time construction because most of the time we don't need one. UserDataStore* RGData::getUserDataStore() { if (!userDataStore) { userDataStore.reset(new UserDataStore); } return userDataStore.get(); } Row::Row(const Row& r) : columnCount(r.columnCount) , baseRid(r.baseRid) , oldOffsets(r.oldOffsets) , stOffsets(r.stOffsets) , offsets(r.offsets) , colWidths(r.colWidths) , types(r.types) , charsetNumbers(r.charsetNumbers) , charsets(r.charsets) , data(r.data) , scale(r.scale) , precision(r.precision) , strings(r.strings) , useStringTable(r.useStringTable) , hasCollation(r.hasCollation) , hasLongStringField(r.hasLongStringField) , sTableThreshold(r.sTableThreshold) , forceInline(r.forceInline) , userDataStore(nullptr) { } Row& Row::operator=(const Row& r) { columnCount = r.columnCount; baseRid = r.baseRid; oldOffsets = r.oldOffsets; stOffsets = r.stOffsets; offsets = r.offsets; colWidths = r.colWidths; types = r.types; charsetNumbers = r.charsetNumbers; charsets = r.charsets; data = r.data; scale = r.scale; precision = r.precision; strings = r.strings; useStringTable = r.useStringTable; hasCollation = r.hasCollation; hasLongStringField = r.hasLongStringField; sTableThreshold = r.sTableThreshold; forceInline = r.forceInline; return *this; } string Row::toString(uint32_t rownum) const { ostringstream os; uint32_t i; // os << getRid() << ": "; os << "[" << std::setw(5) << rownum << std::setw(0) << "]: "; os << (int)useStringTable << ": "; for (i = 0; i < columnCount; i++) { if (isNullValue(i)) os << "NULL "; else switch (types[i]) { case CalpontSystemCatalog::CHAR: case CalpontSystemCatalog::VARCHAR: { const utils::ConstString tmp = getConstString(i); os << "(" << tmp.length() << ") '"; os.write(tmp.str(), tmp.length()); os << "' "; break; } case CalpontSystemCatalog::FLOAT: case CalpontSystemCatalog::UFLOAT: os << getFloatField(i) << " "; break; case CalpontSystemCatalog::DOUBLE: case CalpontSystemCatalog::UDOUBLE: os << getDoubleField(i) << " "; break; case CalpontSystemCatalog::LONGDOUBLE: os << getLongDoubleField(i) << " "; break; case CalpontSystemCatalog::VARBINARY: case CalpontSystemCatalog::BLOB: case CalpontSystemCatalog::TEXT: { uint32_t len = getVarBinaryLength(i); const uint8_t* val = getVarBinaryField(i); os << "0x" << hex; while (len-- > 0) { os << (uint32_t)(*val >> 4); os << (uint32_t)(*val++ & 0x0F); } os << " " << dec; break; } case CalpontSystemCatalog::DECIMAL: case CalpontSystemCatalog::UDECIMAL: if (colWidths[i] == datatypes::MAXDECIMALWIDTH) { datatypes::Decimal dec(getTSInt128Field(i), scale[i], precision[i]); os << dec << " "; break; } // fallthrough default: os << getIntField(i) << " "; break; } } return os.str(); } string Row::toCSV() const { ostringstream os; for (uint32_t i = 0; i < columnCount; i++) { if (i > 0) { os << ","; } if (isNullValue(i)) os << "NULL"; else switch (types[i]) { case CalpontSystemCatalog::CHAR: case CalpontSystemCatalog::VARCHAR: os << getStringField(i).safeString(); break; case CalpontSystemCatalog::FLOAT: case CalpontSystemCatalog::UFLOAT: os << getFloatField(i); break; case CalpontSystemCatalog::DOUBLE: case CalpontSystemCatalog::UDOUBLE: os << getDoubleField(i); break; case CalpontSystemCatalog::LONGDOUBLE: os << getLongDoubleField(i); break; case CalpontSystemCatalog::VARBINARY: case CalpontSystemCatalog::BLOB: case CalpontSystemCatalog::TEXT: { uint32_t len = getVarBinaryLength(i); const uint8_t* val = getVarBinaryField(i); os << "0x" << hex; while (len-- > 0) { os << (uint32_t)(*val >> 4); os << (uint32_t)(*val++ & 0x0F); } os << dec; break; } default: os << getIntField(i); break; } } return os.str(); } void Row::setToNull(uint32_t colIndex) { setNullMark(colIndex, true); // mark as null. switch (types[colIndex]) { case CalpontSystemCatalog::TINYINT: data[offsets[colIndex]] = joblist::TINYINTNULL; break; case CalpontSystemCatalog::SMALLINT: *((int16_t*)&data[offsets[colIndex]]) = static_cast(joblist::SMALLINTNULL); break; case CalpontSystemCatalog::MEDINT: case CalpontSystemCatalog::INT: *((int32_t*)&data[offsets[colIndex]]) = static_cast(joblist::INTNULL); break; case CalpontSystemCatalog::FLOAT: case CalpontSystemCatalog::UFLOAT: *((int32_t*)&data[offsets[colIndex]]) = static_cast(joblist::FLOATNULL); break; case CalpontSystemCatalog::DATE: *((int32_t*)&data[offsets[colIndex]]) = static_cast(joblist::DATENULL); break; case CalpontSystemCatalog::BIGINT: if (precision[colIndex] != MagicPrecisionForCountAgg) *((uint64_t*)&data[offsets[colIndex]]) = joblist::BIGINTNULL; else // work around for count() in outer join result. *((uint64_t*)&data[offsets[colIndex]]) = 0; break; case CalpontSystemCatalog::DOUBLE: case CalpontSystemCatalog::UDOUBLE: *((uint64_t*)&data[offsets[colIndex]]) = joblist::DOUBLENULL; break; case CalpontSystemCatalog::LONGDOUBLE: *((long double*)&data[offsets[colIndex]]) = joblist::LONGDOUBLENULL; break; case CalpontSystemCatalog::DATETIME: *((uint64_t*)&data[offsets[colIndex]]) = joblist::DATETIMENULL; break; case CalpontSystemCatalog::TIMESTAMP: *((uint64_t*)&data[offsets[colIndex]]) = joblist::TIMESTAMPNULL; break; case CalpontSystemCatalog::TIME: *((uint64_t*)&data[offsets[colIndex]]) = joblist::TIMENULL; break; case CalpontSystemCatalog::VARBINARY: case CalpontSystemCatalog::BLOB: case CalpontSystemCatalog::CHAR: case CalpontSystemCatalog::VARCHAR: case CalpontSystemCatalog::TEXT: case CalpontSystemCatalog::STRINT: { if (inStringTable(colIndex)) { utils::NullString nullstr; setStringField(nullstr, colIndex); break; } uint32_t len = getColumnWidth(colIndex); switch (len) { case 1: data[offsets[colIndex]] = joblist::CHAR1NULL; break; case 2: *((uint16_t*)&data[offsets[colIndex]]) = joblist::CHAR2NULL; break; case 3: case 4: *((uint32_t*)&data[offsets[colIndex]]) = joblist::CHAR4NULL; break; case 5: case 6: case 7: case 8: *((uint64_t*)&data[offsets[colIndex]]) = joblist::CHAR8NULL; break; default: setNullMark(colIndex, true); break; } break; } case CalpontSystemCatalog::DECIMAL: case CalpontSystemCatalog::UDECIMAL: { uint32_t len = getColumnWidth(colIndex); switch (len) { case 1: data[offsets[colIndex]] = joblist::TINYINTNULL; break; case 2: *((int16_t*)&data[offsets[colIndex]]) = static_cast(joblist::SMALLINTNULL); break; case 4: *((int32_t*)&data[offsets[colIndex]]) = static_cast(joblist::INTNULL); break; case 16: { int128_t* s128ValuePtr = (int128_t*)(&data[offsets[colIndex]]); datatypes::TSInt128::storeUnaligned(s128ValuePtr, datatypes::Decimal128Null); } break; default: *((int64_t*)&data[offsets[colIndex]]) = static_cast(joblist::BIGINTNULL); break; } break; } case CalpontSystemCatalog::UTINYINT: data[offsets[colIndex]] = joblist::UTINYINTNULL; break; case CalpontSystemCatalog::USMALLINT: *((uint16_t*)&data[offsets[colIndex]]) = joblist::USMALLINTNULL; break; case CalpontSystemCatalog::UMEDINT: case CalpontSystemCatalog::UINT: *((uint32_t*)&data[offsets[colIndex]]) = joblist::UINTNULL; break; case CalpontSystemCatalog::UBIGINT: *((uint64_t*)&data[offsets[colIndex]]) = joblist::UBIGINTNULL; break; default: ostringstream os; os << "Row::initToNull(): got bad column type (" << types[colIndex] << "). Width=" << getColumnWidth(colIndex) << endl; os << toString(); throw logic_error(os.str()); } } void Row::initToNull() { uint32_t i; for (i = 0; i < columnCount; i++) { setToNull(i); } } template inline bool Row::isNullValue_offset(uint32_t offset) const { ostringstream os; os << "Row::isNullValue(): got bad column type at offset("; os << offset; os << "). Width="; os << width << endl; throw logic_error(os.str()); } template <> inline bool Row::isNullValue_offset(uint32_t offset) const { const int128_t* intPtr = reinterpret_cast(&data[offset]); const datatypes::TSInt128 value(intPtr); return datatypes::Decimal::isWideDecimalNullValue(value.getValue()); } template <> inline bool Row::isNullValue_offset(uint32_t offset) const { return (*reinterpret_cast(&data[offset]) == static_cast(joblist::BIGINTNULL)); } template <> inline bool Row::isNullValue_offset(uint32_t offset) const { return (*reinterpret_cast(&data[offset]) == static_cast(joblist::INTNULL)); } template <> inline bool Row::isNullValue_offset(uint32_t offset) const { return (*reinterpret_cast(&data[offset]) == static_cast(joblist::SMALLINTNULL)); } template <> inline bool Row::isNullValue_offset(uint32_t offset) const { return (data[offset] == joblist::TINYINTNULL); } bool Row::isNullValue(uint32_t colIndex) const { switch (types[colIndex]) { case CalpontSystemCatalog::TINYINT: return (data[offsets[colIndex]] == joblist::TINYINTNULL); case CalpontSystemCatalog::SMALLINT: return (*((int16_t*)&data[offsets[colIndex]]) == static_cast(joblist::SMALLINTNULL)); case CalpontSystemCatalog::MEDINT: case CalpontSystemCatalog::INT: return (*((int32_t*)&data[offsets[colIndex]]) == static_cast(joblist::INTNULL)); case CalpontSystemCatalog::FLOAT: case CalpontSystemCatalog::UFLOAT: return (*((int32_t*)&data[offsets[colIndex]]) == static_cast(joblist::FLOATNULL)); case CalpontSystemCatalog::DATE: return (*((int32_t*)&data[offsets[colIndex]]) == static_cast(joblist::DATENULL)); case CalpontSystemCatalog::BIGINT: return (*((int64_t*)&data[offsets[colIndex]]) == static_cast(joblist::BIGINTNULL)); case CalpontSystemCatalog::DOUBLE: case CalpontSystemCatalog::UDOUBLE: return (*((uint64_t*)&data[offsets[colIndex]]) == joblist::DOUBLENULL); case CalpontSystemCatalog::DATETIME: return (*((uint64_t*)&data[offsets[colIndex]]) == joblist::DATETIMENULL); case CalpontSystemCatalog::TIMESTAMP: return (*((uint64_t*)&data[offsets[colIndex]]) == joblist::TIMESTAMPNULL); case CalpontSystemCatalog::TIME: return (*((uint64_t*)&data[offsets[colIndex]]) == joblist::TIMENULL); case CalpontSystemCatalog::BLOB: case CalpontSystemCatalog::TEXT: case CalpontSystemCatalog::VARBINARY: case CalpontSystemCatalog::CHAR: case CalpontSystemCatalog::VARCHAR: case CalpontSystemCatalog::STRINT: { uint32_t len = getColumnWidth(colIndex); if (inStringTable(colIndex)) { uint64_t offset; offset = *((uint64_t*)&data[offsets[colIndex]]); return strings->isNullValue(offset); } // if (data[offsets[colIndex]] == 0) // empty string // return true; switch (len) { case 1: return (data[offsets[colIndex]] == joblist::CHAR1NULL); case 2: return (*((uint16_t*)&data[offsets[colIndex]]) == joblist::CHAR2NULL); case 3: case 4: return (*((uint32_t*)&data[offsets[colIndex]]) == joblist::CHAR4NULL); case 5: case 6: case 7: case 8: return (*((uint64_t*)&data[offsets[colIndex]]) == joblist::CHAR8NULL); default: // a case for value stored with NULL flag prefix. // see setStringField method. return getNullMark(colIndex); } break; } case CalpontSystemCatalog::DECIMAL: case CalpontSystemCatalog::UDECIMAL: { // TODO MCOL-641 Allmighty hack. switch (getColumnWidth(colIndex)) { // MCOL-641 case 16: return isNullValue_offset(offsets[colIndex]); case 1: return (data[offsets[colIndex]] == joblist::TINYINTNULL); case 2: return (*((int16_t*)&data[offsets[colIndex]]) == static_cast(joblist::SMALLINTNULL)); case 4: return (*((int32_t*)&data[offsets[colIndex]]) == static_cast(joblist::INTNULL)); default: return (*((int64_t*)&data[offsets[colIndex]]) == static_cast(joblist::BIGINTNULL)); } break; } case CalpontSystemCatalog::UTINYINT: return (data[offsets[colIndex]] == joblist::UTINYINTNULL); case CalpontSystemCatalog::USMALLINT: return (*((uint16_t*)&data[offsets[colIndex]]) == joblist::USMALLINTNULL); case CalpontSystemCatalog::UMEDINT: case CalpontSystemCatalog::UINT: return (*((uint32_t*)&data[offsets[colIndex]]) == joblist::UINTNULL); case CalpontSystemCatalog::UBIGINT: return (*((uint64_t*)&data[offsets[colIndex]]) == joblist::UBIGINTNULL); case CalpontSystemCatalog::LONGDOUBLE: return (*((long double*)&data[offsets[colIndex]]) == joblist::LONGDOUBLENULL); break; default: { ostringstream os; os << "Row::isNullValue(): got bad column type ("; os << types[colIndex]; os << "). Width="; os << getColumnWidth(colIndex) << endl; throw logic_error(os.str()); } } return false; } uint64_t Row::getNullValue(uint32_t colIndex) const { return utils::getNullValue(types[colIndex], getColumnWidth(colIndex)); } /* This fcn might produce overflow warnings from the compiler, but that's OK. * The overflow is intentional... */ int64_t Row::getSignedNullValue(uint32_t colIndex) const { return utils::getSignedNullValue(types[colIndex], getColumnWidth(colIndex)); } bool Row::equals(const Row& r2, uint32_t lastCol) const { // This check fires with empty r2 only. if (lastCol >= columnCount) return true; // If there are no strings in the row, then we can just memcmp the whole row. // hasCollation is true if there is any column of type CHAR, VARCHAR or TEXT // useStringTable is true if any field declared > max inline field size, including BLOB // For memcmp to be correct, both must be false. if (!hasCollation && !useStringTable && !r2.hasCollation && !r2.useStringTable) return !(memcmp(&data[offsets[0]], &r2.data[offsets[0]], offsets[lastCol + 1] - offsets[0])); // There are strings involved, so we need to check each column // because binary equality is not equality for many charsets/collations for (uint32_t col = 0; col <= lastCol; col++) { cscDataType columnType = getColType(col); if (UNLIKELY(typeHasCollation(columnType))) { auto c1 = getConstString(col); auto c2 = r2.getConstString(col); if (c1.isNull() != c2.isNull()) { return false; } datatypes::Charset cs(getCharset(col)); if (cs.strnncollsp(c1, c2)) { return false; } } else if (UNLIKELY(columnType == execplan::CalpontSystemCatalog::BLOB)) { if (!getConstString(col).eq(r2.getConstString(col))) return false; } else { if (UNLIKELY(columnType == execplan::CalpontSystemCatalog::LONGDOUBLE)) { if (getLongDoubleField(col) != r2.getLongDoubleField(col)) return false; } else if (UNLIKELY(datatypes::isWideDecimalType(columnType, colWidths[col]))) { if (getTSInt128Field(col).getValue() != r2.getTSInt128Field(col).getValue()) return false; } else if (getUintField(col) != r2.getUintField(col)) { return false; } } } return true; } const CHARSET_INFO* Row::getCharset(uint32_t col) const { if (charsets[col] == nullptr) { const_cast(charsets)[col] = &datatypes::Charset(charsetNumbers[col]).getCharset(); } return charsets[col]; } RowGroup::RowGroup() { // 1024 is too generous to waste. oldOffsets.reserve(10); oids.reserve(10); keys.reserve(10); types.reserve(10); charsetNumbers.reserve(10); charsets.reserve(10); scale.reserve(10); precision.reserve(10); } RowGroup::RowGroup(uint32_t colCount, const vector& positions, const vector& roids, const vector& tkeys, const vector& colTypes, const vector& csNumbers, const vector& cscale, const vector& cprecision, uint32_t stringTableThreshold, bool stringTable, const vector& forceInlineData) : columnCount(colCount) , data(nullptr) , oldOffsets(positions) , oids(roids) , keys(tkeys) , types(colTypes) , charsetNumbers(csNumbers) , scale(cscale) , precision(cprecision) , rgData(nullptr) , strings(nullptr) , sTableThreshold(stringTableThreshold) { uint32_t i; forceInline.reset(new bool[columnCount]); if (forceInlineData.empty()) for (i = 0; i < columnCount; i++) forceInline[i] = false; else for (i = 0; i < columnCount; i++) forceInline[i] = forceInlineData[i]; colWidths.resize(columnCount); stOffsets.resize(columnCount + 1); stOffsets[0] = 2; // 2-byte rid hasLongStringField = false; hasCollation = false; for (i = 0; i < columnCount; i++) { colWidths[i] = positions[i + 1] - positions[i]; if (colWidths[i] >= sTableThreshold && !forceInline[i]) { hasLongStringField = true; stOffsets[i + 1] = stOffsets[i] + 8; } else stOffsets[i + 1] = stOffsets[i] + colWidths[i]; if (colHasCollation(i)) { hasCollation = true; } } useStringTable = (stringTable && hasLongStringField); offsets = (useStringTable ? &stOffsets[0] : &oldOffsets[0]); // Set all the charsets to nullptr for jit initialization. charsets.insert(charsets.begin(), charsetNumbers.size(), nullptr); } RowGroup::RowGroup(const RowGroup& r) : columnCount(r.columnCount) , data(r.data) , oldOffsets(r.oldOffsets) , stOffsets(r.stOffsets) , colWidths(r.colWidths) , oids(r.oids) , keys(r.keys) , types(r.types) , charsetNumbers(r.charsetNumbers) , charsets(r.charsets) , scale(r.scale) , precision(r.precision) , rgData(r.rgData) , strings(r.strings) , useStringTable(r.useStringTable) , hasCollation(r.hasCollation) , hasLongStringField(r.hasLongStringField) , sTableThreshold(r.sTableThreshold) , forceInline(r.forceInline) { // stOffsets and oldOffsets are sometimes empty... // offsets = (useStringTable ? &stOffsets[0] : &oldOffsets[0]); offsets = 0; if (useStringTable && !stOffsets.empty()) offsets = &stOffsets[0]; else if (!useStringTable && !oldOffsets.empty()) offsets = &oldOffsets[0]; } RowGroup& RowGroup::operator=(const RowGroup& r) { columnCount = r.columnCount; oldOffsets = r.oldOffsets; stOffsets = r.stOffsets; colWidths = r.colWidths; oids = r.oids; keys = r.keys; types = r.types; charsetNumbers = r.charsetNumbers; charsets = r.charsets; data = r.data; scale = r.scale; precision = r.precision; rgData = r.rgData; strings = r.strings; useStringTable = r.useStringTable; hasCollation = r.hasCollation; hasLongStringField = r.hasLongStringField; sTableThreshold = r.sTableThreshold; forceInline = r.forceInline; // offsets = (useStringTable ? &stOffsets[0] : &oldOffsets[0]); offsets = 0; if (useStringTable && !stOffsets.empty()) offsets = &stOffsets[0]; else if (!useStringTable && !oldOffsets.empty()) offsets = &oldOffsets[0]; return *this; } RowGroup::RowGroup(ByteStream& bs) { this->deserialize(bs); } RowGroup::~RowGroup() { } void RowGroup::resetRowGroup(uint64_t rid) { *((uint32_t*)&data[rowCountOffset]) = 0; *((uint64_t*)&data[baseRidOffset]) = rid; *((uint16_t*)&data[statusOffset]) = 0; *((uint32_t*)&data[dbRootOffset]) = 0; if (strings) strings->clear(); } void RowGroup::serialize(ByteStream& bs) const { bs << columnCount; serializeInlineVector(bs, oldOffsets); serializeInlineVector(bs, stOffsets); serializeInlineVector(bs, colWidths); serializeInlineVector(bs, oids); serializeInlineVector(bs, keys); serializeInlineVector(bs, types); serializeInlineVector(bs, charsetNumbers); serializeInlineVector(bs, scale); serializeInlineVector(bs, precision); bs << (uint8_t)useStringTable; bs << (uint8_t)hasCollation; bs << (uint8_t)hasLongStringField; bs << sTableThreshold; bs.append((uint8_t*)&forceInline[0], sizeof(bool) * columnCount); } void RowGroup::deserialize(ByteStream& bs) { uint8_t tmp8; bs >> columnCount; deserializeInlineVector(bs, oldOffsets); deserializeInlineVector(bs, stOffsets); deserializeInlineVector(bs, colWidths); deserializeInlineVector(bs, oids); deserializeInlineVector(bs, keys); deserializeInlineVector(bs, types); deserializeInlineVector(bs, charsetNumbers); deserializeInlineVector(bs, scale); deserializeInlineVector(bs, precision); bs >> tmp8; useStringTable = (bool)tmp8; bs >> tmp8; hasCollation = (bool)tmp8; bs >> tmp8; hasLongStringField = (bool)tmp8; bs >> sTableThreshold; forceInline.reset(new bool[columnCount]); memcpy(&forceInline[0], bs.buf(), sizeof(bool) * columnCount); bs.advance(sizeof(bool) * columnCount); // offsets = (useStringTable ? &stOffsets[0] : &oldOffsets[0]); offsets = 0; if (useStringTable && !stOffsets.empty()) offsets = &stOffsets[0]; else if (!useStringTable && !oldOffsets.empty()) offsets = &oldOffsets[0]; // Set all the charsets to nullptr for jit initialization. charsets.insert(charsets.begin(), charsetNumbers.size(), nullptr); } void RowGroup::serializeRGData(ByteStream& bs) const { rgData->serialize(bs, getDataSize()); } uint32_t RowGroup::getDataSize() const { return getDataSize(getRowCount()); } uint32_t RowGroup::getDataSize(uint64_t n) const { return headerSize + (n * getRowSize()); } uint32_t RowGroup::getMaxDataSize() const { return headerSize + (8192 * getRowSize()); } uint32_t RowGroup::getMaxDataSizeWithStrings() const { return headerSize + (8192 * (oldOffsets[columnCount] + columnCount)); } uint32_t RowGroup::getEmptySize() const { return headerSize; } uint32_t RowGroup::getStatus() const { return *((uint16_t*)&data[statusOffset]); } void RowGroup::setStatus(uint16_t err) { *((uint16_t*)&data[statusOffset]) = err; } uint32_t RowGroup::getColumnWidth(uint32_t col) const { return colWidths[col]; } uint32_t RowGroup::getColumnCount() const { return columnCount; } string RowGroup::toString(const std::vector& used) const { ostringstream os; ostream_iterator oIter1(os, "\t"); os << "columncount = " << columnCount << endl; os << "oids:\t\t"; copy(oids.begin(), oids.end(), oIter1); os << endl; os << "keys:\t\t"; copy(keys.begin(), keys.end(), oIter1); os << endl; os << "offsets:\t"; copy(&offsets[0], &offsets[columnCount + 1], oIter1); os << endl; os << "colWidths:\t"; copy(colWidths.begin(), colWidths.end(), oIter1); os << endl; os << "types:\t\t"; copy(types.begin(), types.end(), oIter1); os << endl; os << "scales:\t\t"; copy(scale.begin(), scale.end(), oIter1); os << endl; os << "precisions:\t"; copy(precision.begin(), precision.end(), oIter1); os << endl; if (useStringTable) os << "uses a string table\n"; else os << "doesn't use a string table\n"; if (!used.empty()) os << "sparse\n"; // os << "strings = " << hex << (int64_t) strings << "\n"; // os << "data = " << (int64_t) data << "\n" << dec; if (data != nullptr) { Row r; initRow(&r); getRow(0, &r); os << "rowcount = " << getRowCount() << endl; if (!used.empty()) { uint64_t cnt = std::accumulate(used.begin(), used.end(), 0ULL, [](uint64_t a, uint64_t bits) { return a + __builtin_popcountll(bits); }); os << "sparse row count = " << cnt << endl; } os << "base rid = " << getBaseRid() << endl; os << "status = " << getStatus() << endl; os << "dbroot = " << getDBRoot() << endl; os << "row data...\n"; uint32_t max_cnt = used.empty() ? getRowCount() : (used.size() * 64); for (uint32_t i = 0; i < max_cnt; i++) { if (!used.empty() && !(used[i / 64] & (1ULL << (i % 64)))) continue; os << r.toString(i) << endl; r.nextRow(); } } return os.str(); } std::shared_ptr makeMapping(const RowGroup& r1, const RowGroup& r2) { std::shared_ptr ret(new int[r1.getColumnCount()]); // bool reserved[r2.getColumnCount()]; bool* reserved = (bool*)alloca(r2.getColumnCount() * sizeof(bool)); uint32_t i, j; for (i = 0; i < r2.getColumnCount(); i++) reserved[i] = false; for (i = 0; i < r1.getColumnCount(); i++) { for (j = 0; j < r2.getColumnCount(); j++) if ((r1.getKeys()[i] == r2.getKeys()[j]) && !reserved[j]) { ret[i] = j; reserved[j] = true; break; } if (j == r2.getColumnCount()) ret[i] = -1; } return ret; } void applyMapping(const std::shared_ptr& mapping, const Row& in, Row* out) { applyMapping(mapping.get(), in, out); } void applyMapping(const std::vector& mapping, const Row& in, Row* out) { applyMapping((int*)&mapping[0], in, out); } void applyMapping(const int* mapping, const Row& in, Row* out) { uint32_t i; for (i = 0; i < in.getColumnCount(); i++) if (mapping[i] != -1) { if (UNLIKELY(in.getColTypes()[i] == execplan::CalpontSystemCatalog::VARBINARY || in.getColTypes()[i] == execplan::CalpontSystemCatalog::BLOB || in.getColTypes()[i] == execplan::CalpontSystemCatalog::TEXT)) { out->setVarBinaryField(in.getVarBinaryField(i), in.getVarBinaryLength(i), mapping[i]); } else if (UNLIKELY(in.isLongString(i))) { out->setStringField(in.getConstString(i), mapping[i]); } else if (UNLIKELY(in.isShortString(i))) out->setUintField(in.getUintField(i), mapping[i]); else if (UNLIKELY(in.getColTypes()[i] == execplan::CalpontSystemCatalog::LONGDOUBLE)) out->setLongDoubleField(in.getLongDoubleField(i), mapping[i]); // WIP this doesn't look right b/c we can pushdown colType // Migrate to offset based methods here // code precision 2 width convertor else if (UNLIKELY(datatypes::isWideDecimalType(in.getColTypes()[i], in.getColumnWidth(i)))) out->setInt128Field(in.getTSInt128Field(i).getValue(), mapping[i]); else if (in.isUnsigned(i)) out->setUintField(in.getUintField(i), mapping[i]); else out->setIntField(in.getIntField(i), mapping[i]); } } RowGroup& RowGroup::operator+=(const RowGroup& rhs) { std::shared_ptr tmp; uint32_t i, j; // not appendable if data is set assert(!data); tmp.reset(new bool[columnCount + rhs.columnCount]); for (i = 0; i < columnCount; i++) tmp[i] = forceInline[i]; for (j = 0; j < rhs.columnCount; i++, j++) tmp[i] = rhs.forceInline[j]; forceInline.swap(tmp); columnCount += rhs.columnCount; oids.insert(oids.end(), rhs.oids.begin(), rhs.oids.end()); keys.insert(keys.end(), rhs.keys.begin(), rhs.keys.end()); types.insert(types.end(), rhs.types.begin(), rhs.types.end()); charsetNumbers.insert(charsetNumbers.end(), rhs.charsetNumbers.begin(), rhs.charsetNumbers.end()); charsets.insert(charsets.end(), rhs.charsets.begin(), rhs.charsets.end()); scale.insert(scale.end(), rhs.scale.begin(), rhs.scale.end()); precision.insert(precision.end(), rhs.precision.begin(), rhs.precision.end()); colWidths.insert(colWidths.end(), rhs.colWidths.begin(), rhs.colWidths.end()); // +4 +4 +8 +2 +4 +8 // (2, 6, 10, 18) + (2, 4, 8, 16) = (2, 6, 10, 18, 20, 24, 32) for (i = 1; i < rhs.stOffsets.size(); i++) { stOffsets.push_back(stOffsets.back() + rhs.stOffsets[i] - rhs.stOffsets[i - 1]); oldOffsets.push_back(oldOffsets.back() + rhs.oldOffsets[i] - rhs.oldOffsets[i - 1]); } hasLongStringField = rhs.hasLongStringField || hasLongStringField; useStringTable = rhs.useStringTable || useStringTable; hasCollation = rhs.hasCollation || hasCollation; offsets = (useStringTable ? &stOffsets[0] : &oldOffsets[0]); return *this; } RowGroup operator+(const RowGroup& lhs, const RowGroup& rhs) { RowGroup temp(lhs); return temp += rhs; } uint32_t RowGroup::getDBRoot() const { return *((uint32_t*)&data[dbRootOffset]); } void RowGroup::addToSysDataList(execplan::CalpontSystemCatalog::NJLSysDataList& sysDataList) { execplan::ColumnResult* cr; rowgroup::Row row; initRow(&row); uint32_t rowCount = getRowCount(); uint32_t columnCount = getColumnCount(); for (uint32_t i = 0; i < rowCount; i++) { getRow(i, &row); for (uint32_t j = 0; j < columnCount; j++) { int idx = sysDataList.findColumn(getOIDs()[j]); if (idx >= 0) { cr = sysDataList.sysDataVec[idx]; } else { cr = new execplan::ColumnResult(); cr->SetColumnOID(getOIDs()[j]); sysDataList.push_back(cr); } // @todo more data type checking. for now only check string, midint and bigint switch ((getColTypes()[j])) { case CalpontSystemCatalog::CHAR: case CalpontSystemCatalog::VARCHAR: { switch (getColumnWidth(j)) { case 1: cr->PutData(row.getUintField<1>(j)); break; case 2: cr->PutData(row.getUintField<2>(j)); break; case 4: cr->PutData(row.getUintField<4>(j)); break; case 8: cr->PutData(row.getUintField<8>(j)); break; case 16: default: { NullString s = row.getStringField(j); cr->PutStringData(s.str(), s.isNull() ? 0 : strlen(s.str())); } } break; } case CalpontSystemCatalog::MEDINT: case CalpontSystemCatalog::INT: case CalpontSystemCatalog::UINT: cr->PutData(row.getIntField<4>(j)); break; case CalpontSystemCatalog::DATE: cr->PutData(row.getUintField<4>(j)); break; default: cr->PutData(row.getIntField<8>(j)); } cr->PutRid(row.getFileRelativeRid()); } } } const CHARSET_INFO* RowGroup::getCharset(uint32_t col) { if (charsets[col] == nullptr) { charsets[col] = &datatypes::Charset(charsetNumbers[col]).getCharset(); } return charsets[col]; } void RowGroup::setDBRoot(uint32_t dbroot) { *((uint32_t*)&data[dbRootOffset]) = dbroot; } RGData RowGroup::duplicate() { RGData ret(*this, getRowCount()); if (useStringTable) { // this isn't a straight memcpy of everything b/c it might be remapping strings. // think about a big memcpy + a remap operation; might be faster. // SZ: copy columns (can even be donw COW style), not rows. even memcpy approach for // columns is safer. Row r1, r2; RowGroup rg(*this); rg.setData(&ret); rg.resetRowGroup(getBaseRid()); rg.setStatus(getStatus()); rg.setRowCount(getRowCount()); rg.setDBRoot(getDBRoot()); initRow(&r1); initRow(&r2); getRow(0, &r1); rg.getRow(0, &r2); for (uint32_t i = 0; i < getRowCount(); i++) { copyRow(r1, &r2); r1.nextRow(); r2.nextRow(); } } else { memcpy(ret.rowData.get(), data, getDataSize()); } return ret; } void RowGroup::append(RGData& rgd) { RowGroup tmp(*this); Row src, dest; tmp.setData(&rgd); initRow(&src); initRow(&dest); tmp.getRow(0, &src); getRow(getRowCount(), &dest); for (uint32_t i = 0; i < tmp.getRowCount(); i++, src.nextRow(), dest.nextRow()) { // cerr << "appending row: " << src.toString() << endl; copyRow(src, &dest); } setRowCount(getRowCount() + tmp.getRowCount()); } void RowGroup::append(RowGroup& rg) { append(*rg.getRGData()); } void RowGroup::append(RGData& rgd, uint32_t startPos) { RowGroup tmp(*this); Row src, dest; tmp.setData(&rgd); initRow(&src); initRow(&dest); tmp.getRow(0, &src); getRow(startPos, &dest); for (uint32_t i = 0; i < tmp.getRowCount(); i++, src.nextRow(), dest.nextRow()) { // cerr << "appending row: " << src.toString() << endl; copyRow(src, &dest); } setRowCount(getRowCount() + tmp.getRowCount()); } void RowGroup::append(RowGroup& rg, uint32_t startPos) { append(*rg.getRGData(), startPos); } RowGroup RowGroup::truncate(uint32_t cols) { idbassert(cols <= columnCount); RowGroup ret(*this); ret.columnCount = cols; ret.oldOffsets.resize(cols + 1); ret.stOffsets.resize(cols + 1); ret.colWidths.resize(cols); ret.oids.resize(cols); ret.keys.resize(cols); ret.types.resize(cols); ret.charsetNumbers.resize(cols); ret.charsets.resize(cols); ret.scale.resize(cols); ret.precision.resize(cols); ret.forceInline.reset(new bool[cols]); memcpy(ret.forceInline.get(), forceInline.get(), cols * sizeof(bool)); ret.hasLongStringField = false; ret.hasCollation = false; for (uint32_t i = 0; i < columnCount && (!ret.hasLongStringField || !ret.hasCollation); i++) { if (colWidths[i] >= sTableThreshold && !forceInline[i]) { ret.hasLongStringField = true; } if (colHasCollation(i)) { ret.hasCollation = true; } } ret.useStringTable = (ret.useStringTable && ret.hasLongStringField); ret.offsets = (ret.useStringTable ? &ret.stOffsets[0] : &ret.oldOffsets[0]); return ret; } } // namespace rowgroup