You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-30 19:23:07 +03:00
MCOL-267 Fix LONGBLOB issues
* Set max column length to a little under 2.1GB in DDL * Fix token edge case * Re-write RowGroup string handling to take more than 64KB in one string
This commit is contained in:
@ -895,7 +895,7 @@ blob_type:
|
|||||||
| LONGBLOB
|
| LONGBLOB
|
||||||
{
|
{
|
||||||
$$ = new ColumnType(DDL_BLOB);
|
$$ = new ColumnType(DDL_BLOB);
|
||||||
$$->fLength = 4294967295;
|
$$->fLength = 2100000000;
|
||||||
}
|
}
|
||||||
;
|
;
|
||||||
|
|
||||||
@ -923,7 +923,7 @@ text_type:
|
|||||||
| LONGTEXT
|
| LONGTEXT
|
||||||
{
|
{
|
||||||
$$ = new ColumnType(DDL_BLOB);
|
$$ = new ColumnType(DDL_BLOB);
|
||||||
$$->fLength = 4294967295;
|
$$->fLength = 2100000000;
|
||||||
}
|
}
|
||||||
;
|
;
|
||||||
|
|
||||||
|
@ -435,7 +435,7 @@ void DictStep::_projectToRG(RowGroup &rg, uint32_t col)
|
|||||||
int64_t l_lbid=0;
|
int64_t l_lbid=0;
|
||||||
int64_t o_lbid=0;
|
int64_t o_lbid=0;
|
||||||
OldGetSigParams *pt;
|
OldGetSigParams *pt;
|
||||||
StringPtr tmpStrings[LOGICAL_BLOCK_RIDS];
|
StringPtr *tmpStrings = new StringPtr[LOGICAL_BLOCK_RIDS];
|
||||||
rowgroup::Row r;
|
rowgroup::Row r;
|
||||||
boost::scoped_array<OrderedToken> newRidList;
|
boost::scoped_array<OrderedToken> newRidList;
|
||||||
|
|
||||||
@ -524,12 +524,12 @@ void DictStep::_projectToRG(RowGroup &rg, uint32_t col)
|
|||||||
// If this is a multi-block blob, get all the blocks
|
// If this is a multi-block blob, get all the blocks
|
||||||
// We do string copy here, should maybe have a RowGroup
|
// We do string copy here, should maybe have a RowGroup
|
||||||
// function to append strings or something?
|
// function to append strings or something?
|
||||||
if ((newRidList[i].token != 0xffffffffffffffffLL) &&
|
if (((newRidList[i].token >> 46) < 0x3FFFF) &&
|
||||||
((newRidList[i].token >> 46) > 0))
|
((newRidList[i].token >> 46) > 0))
|
||||||
{
|
{
|
||||||
StringPtr multi_part[1];
|
StringPtr multi_part[1];
|
||||||
uint16_t old_offset = primMsg->tokens[0].offset;
|
uint16_t old_offset = primMsg->tokens[0].offset;
|
||||||
string result((char*)tmpStrings[i].ptr, tmpStrings[i].len);
|
string *result = new string((char*)tmpStrings[i].ptr, tmpStrings[i].len);
|
||||||
uint64_t origin_lbid = primMsg->LBID;
|
uint64_t origin_lbid = primMsg->LBID;
|
||||||
uint32_t lbid_count = newRidList[i].token >> 46;
|
uint32_t lbid_count = newRidList[i].token >> 46;
|
||||||
primMsg->tokens[0].offset = 1; // first offset of a sig
|
primMsg->tokens[0].offset = 1; // first offset of a sig
|
||||||
@ -541,11 +541,12 @@ void DictStep::_projectToRG(RowGroup &rg, uint32_t col)
|
|||||||
primMsg->tokens[0].LBID = origin_lbid + j;
|
primMsg->tokens[0].LBID = origin_lbid + j;
|
||||||
issuePrimitive(false);
|
issuePrimitive(false);
|
||||||
projectResult(multi_part);
|
projectResult(multi_part);
|
||||||
result.append((char*)multi_part[0].ptr, multi_part[0].len);
|
result->append((char*)multi_part[0].ptr, multi_part[0].len);
|
||||||
}
|
}
|
||||||
primMsg->tokens[0].offset = old_offset;
|
primMsg->tokens[0].offset = old_offset;
|
||||||
tmpResultCounter = firstTmpResultCounter;
|
tmpResultCounter = firstTmpResultCounter;
|
||||||
r.setVarBinaryField((unsigned char*)result.c_str(), result.length(), col);
|
r.setVarBinaryField((unsigned char*)result->c_str(), result->length(), col);
|
||||||
|
delete result;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -559,6 +560,7 @@ void DictStep::_projectToRG(RowGroup &rg, uint32_t col)
|
|||||||
//cout << "_projectToRG() total length = " << totalResultLength << endl;
|
//cout << "_projectToRG() total length = " << totalResultLength << endl;
|
||||||
idbassert(tmpResultCounter == bpp->ridCount);
|
idbassert(tmpResultCounter == bpp->ridCount);
|
||||||
|
|
||||||
|
delete [] tmpStrings;
|
||||||
//cout << "DS: /projectingToRG l: " << (int64_t)primMsg->LBID
|
//cout << "DS: /projectingToRG l: " << (int64_t)primMsg->LBID
|
||||||
// << " len: " << tmpResultCounter
|
// << " len: " << tmpResultCounter
|
||||||
// << endl;
|
// << endl;
|
||||||
|
@ -38,6 +38,7 @@
|
|||||||
using namespace std;
|
using namespace std;
|
||||||
|
|
||||||
#include <boost/shared_array.hpp>
|
#include <boost/shared_array.hpp>
|
||||||
|
#include <boost/shared_ptr.hpp>
|
||||||
using namespace boost;
|
using namespace boost;
|
||||||
|
|
||||||
#include "bytestream.h"
|
#include "bytestream.h"
|
||||||
@ -73,9 +74,9 @@ StringStore::~StringStore()
|
|||||||
uint64_t inUse = 0, allocated = 0;
|
uint64_t inUse = 0, allocated = 0;
|
||||||
|
|
||||||
for (i = 0; i < mem.size(); i++) {
|
for (i = 0; i < mem.size(); i++) {
|
||||||
MemChunk *tmp = (MemChunk *) mem.back().get();
|
std::string *tmp = mem.back().get();
|
||||||
inUse += tmp->currentSize;
|
inUse += tmp->length();
|
||||||
allocated += tmp->capacity;
|
allocated += tmp->length();
|
||||||
}
|
}
|
||||||
if (allocated > 0)
|
if (allocated > 0)
|
||||||
cout << "~SS: " << inUse << "/" << allocated << " = " << (float) inUse/(float) allocated << endl;
|
cout << "~SS: " << inUse << "/" << allocated << " = " << (float) inUse/(float) allocated << endl;
|
||||||
@ -84,7 +85,6 @@ StringStore::~StringStore()
|
|||||||
|
|
||||||
uint32_t StringStore::storeString(const uint8_t *data, uint32_t len)
|
uint32_t StringStore::storeString(const uint8_t *data, uint32_t len)
|
||||||
{
|
{
|
||||||
MemChunk *lastMC = NULL;
|
|
||||||
uint32_t ret = 0;
|
uint32_t ret = 0;
|
||||||
|
|
||||||
empty = false; // At least a NULL is being stored.
|
empty = false; // At least a NULL is being stored.
|
||||||
@ -102,31 +102,10 @@ uint32_t StringStore::storeString(const uint8_t *data, uint32_t len)
|
|||||||
if (fUseStoreStringMutex)
|
if (fUseStoreStringMutex)
|
||||||
lk.lock();
|
lk.lock();
|
||||||
|
|
||||||
if (mem.size() > 0)
|
shared_ptr<std::string> newString(new std::string((char*)data, len));
|
||||||
lastMC = (MemChunk *) mem.back().get();
|
mem.push_back(newString);
|
||||||
|
|
||||||
if ((lastMC == NULL) || (lastMC->capacity - lastMC->currentSize < len)) {
|
ret = mem.size();
|
||||||
// mem usage debugging
|
|
||||||
//if (lastMC)
|
|
||||||
//cout << "Memchunk efficiency = " << lastMC->currentSize << "/" << lastMC->capacity << endl;
|
|
||||||
shared_array<uint8_t> newOne(new uint8_t[CHUNK_SIZE + sizeof(MemChunk)]);
|
|
||||||
mem.push_back(newOne);
|
|
||||||
lastMC = (MemChunk *) mem.back().get();
|
|
||||||
lastMC->currentSize = 0;
|
|
||||||
lastMC->capacity = CHUNK_SIZE;
|
|
||||||
memset(lastMC->data, 0, CHUNK_SIZE);
|
|
||||||
}
|
|
||||||
|
|
||||||
ret = ((mem.size()-1) * CHUNK_SIZE) + lastMC->currentSize;
|
|
||||||
memcpy(&(lastMC->data[lastMC->currentSize]), data, len);
|
|
||||||
/*
|
|
||||||
cout << "stored: '" << hex;
|
|
||||||
for (uint32_t i = 0; i < len ; i++) {
|
|
||||||
cout << (char) lastMC->data[lastMC->currentSize + i];
|
|
||||||
}
|
|
||||||
cout << "' at position " << lastMC->currentSize << " len " << len << dec << endl;
|
|
||||||
*/
|
|
||||||
lastMC->currentSize += len;
|
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -134,16 +113,17 @@ uint32_t StringStore::storeString(const uint8_t *data, uint32_t len)
|
|||||||
void StringStore::serialize(ByteStream &bs) const
|
void StringStore::serialize(ByteStream &bs) const
|
||||||
{
|
{
|
||||||
uint32_t i;
|
uint32_t i;
|
||||||
MemChunk *mc;
|
std::string empty_str;
|
||||||
|
|
||||||
bs << (uint32_t) mem.size();
|
bs << (uint32_t) mem.size();
|
||||||
bs << (uint8_t) empty;
|
bs << (uint8_t) empty;
|
||||||
for (i = 0; i < mem.size(); i++) {
|
for (i = 0; i < mem.size(); i++) {
|
||||||
mc = (MemChunk *) mem[i].get();
|
if (mem[i].get() == NULL)
|
||||||
bs << (uint32_t) mc->currentSize;
|
bs << empty_str;
|
||||||
|
else
|
||||||
|
bs << *mem[i].get();
|
||||||
//cout << "serialized " << mc->currentSize << " bytes\n";
|
//cout << "serialized " << mc->currentSize << " bytes\n";
|
||||||
bs.append(mc->data, mc->currentSize);
|
}
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
uint32_t StringStore::deserialize(ByteStream &bs)
|
uint32_t StringStore::deserialize(ByteStream &bs)
|
||||||
@ -151,27 +131,22 @@ uint32_t StringStore::deserialize(ByteStream &bs)
|
|||||||
uint32_t i;
|
uint32_t i;
|
||||||
uint32_t count;
|
uint32_t count;
|
||||||
uint32_t size;
|
uint32_t size;
|
||||||
uint8_t *buf;
|
std::string buf;
|
||||||
MemChunk *mc;
|
|
||||||
uint8_t tmp8;
|
uint8_t tmp8;
|
||||||
uint32_t ret = 0;
|
uint32_t ret = 0;
|
||||||
|
|
||||||
//mem.clear();
|
//mem.clear();
|
||||||
bs >> count;
|
bs >> count;
|
||||||
mem.resize(count);
|
mem.reserve(count);
|
||||||
bs >> tmp8;
|
bs >> tmp8;
|
||||||
empty = (bool) tmp8;
|
empty = (bool) tmp8;
|
||||||
ret += 5;
|
ret += 5;
|
||||||
for (i = 0; i < count; i++) {
|
for (i = 0; i < count; i++) {
|
||||||
bs >> size;
|
|
||||||
//cout << "deserializing " << size << " bytes\n";
|
//cout << "deserializing " << size << " bytes\n";
|
||||||
buf = bs.buf();
|
bs >> buf;
|
||||||
mem[i].reset(new uint8_t[size + sizeof(MemChunk)]);
|
shared_ptr<std::string> newString(new std::string(buf));
|
||||||
mc = (MemChunk *) mem[i].get();
|
mem.push_back(newString);
|
||||||
mc->currentSize = size;
|
//bs.advance(size);
|
||||||
mc->capacity = size;
|
|
||||||
memcpy(mc->data, buf, size);
|
|
||||||
bs.advance(size);
|
|
||||||
ret += (size + 4);
|
ret += (size + 4);
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
@ -179,7 +154,7 @@ uint32_t StringStore::deserialize(ByteStream &bs)
|
|||||||
|
|
||||||
void StringStore::clear()
|
void StringStore::clear()
|
||||||
{
|
{
|
||||||
vector<shared_array<uint8_t> > emptyv;
|
vector<shared_ptr<std::string> > emptyv;
|
||||||
mem.swap(emptyv);
|
mem.swap(emptyv);
|
||||||
empty = true;
|
empty = true;
|
||||||
}
|
}
|
||||||
|
@ -119,14 +119,8 @@ private:
|
|||||||
|
|
||||||
// This is an overlay b/c the underlying data needs to be any size,
|
// This is an overlay b/c the underlying data needs to be any size,
|
||||||
// and alloc'd in one chunk. data can't be a sepatate dynamic chunk.
|
// and alloc'd in one chunk. data can't be a sepatate dynamic chunk.
|
||||||
struct MemChunk
|
|
||||||
{
|
std::vector<boost::shared_ptr<std::string> > mem;
|
||||||
uint32_t currentSize;
|
|
||||||
uint32_t capacity;
|
|
||||||
uint8_t data[];
|
|
||||||
};
|
|
||||||
|
|
||||||
std::vector<boost::shared_array<uint8_t> > mem;
|
|
||||||
bool empty;
|
bool empty;
|
||||||
bool fUseStoreStringMutex; //@bug6065, make StringStore::storeString() thread safe
|
bool fUseStoreStringMutex; //@bug6065, make StringStore::storeString() thread safe
|
||||||
boost::mutex fMutex;
|
boost::mutex fMutex;
|
||||||
@ -1447,17 +1441,13 @@ inline std::string StringStore::getString(uint32_t off, uint32_t len) const
|
|||||||
if (off == std::numeric_limits<uint32_t>::max())
|
if (off == std::numeric_limits<uint32_t>::max())
|
||||||
return joblist::CPNULLSTRMARK;
|
return joblist::CPNULLSTRMARK;
|
||||||
|
|
||||||
MemChunk *mc;
|
if ((mem.size() < off) || off == 0)
|
||||||
uint32_t chunk = off / CHUNK_SIZE;
|
|
||||||
uint32_t offset = off % CHUNK_SIZE;
|
|
||||||
// this has to handle uninitialized data as well. If it's uninitialized it doesn't matter
|
|
||||||
// what gets returned, it just can't go out of bounds.
|
|
||||||
if (mem.size() <= chunk)
|
|
||||||
return joblist::CPNULLSTRMARK;
|
return joblist::CPNULLSTRMARK;
|
||||||
mc = (MemChunk *) mem[chunk].get();
|
|
||||||
if ((offset + len) > mc->currentSize)
|
if (mem[off-1].get() == NULL)
|
||||||
return joblist::CPNULLSTRMARK;
|
return joblist::CPNULLSTRMARK;
|
||||||
return std::string((char *) &(mc->data[offset]), len);
|
|
||||||
|
return *mem[off-1].get();
|
||||||
}
|
}
|
||||||
|
|
||||||
inline const uint8_t * StringStore::getPointer(uint32_t off) const
|
inline const uint8_t * StringStore::getPointer(uint32_t off) const
|
||||||
@ -1465,17 +1455,15 @@ inline const uint8_t * StringStore::getPointer(uint32_t off) const
|
|||||||
if (off == std::numeric_limits<uint32_t>::max())
|
if (off == std::numeric_limits<uint32_t>::max())
|
||||||
return (const uint8_t *) joblist::CPNULLSTRMARK.c_str();
|
return (const uint8_t *) joblist::CPNULLSTRMARK.c_str();
|
||||||
|
|
||||||
uint32_t chunk = off / CHUNK_SIZE;
|
|
||||||
uint32_t offset = off % CHUNK_SIZE;
|
|
||||||
MemChunk *mc;
|
|
||||||
// this has to handle uninitialized data as well. If it's uninitialized it doesn't matter
|
// this has to handle uninitialized data as well. If it's uninitialized it doesn't matter
|
||||||
// what gets returned, it just can't go out of bounds.
|
// what gets returned, it just can't go out of bounds.
|
||||||
if (UNLIKELY(mem.size() <= chunk))
|
if (UNLIKELY(mem.size() < off))
|
||||||
return (const uint8_t *) joblist::CPNULLSTRMARK.c_str();
|
return (const uint8_t *) joblist::CPNULLSTRMARK.c_str();
|
||||||
mc = (MemChunk *) mem[chunk].get();
|
|
||||||
if (offset > mc->currentSize)
|
if (off == 0 || (mem[off-1].get() == NULL))
|
||||||
return (const uint8_t *) joblist::CPNULLSTRMARK.c_str();
|
return (const uint8_t *) joblist::CPNULLSTRMARK.c_str();
|
||||||
return &(mc->data[offset]);
|
|
||||||
|
return (uint8_t*)mem[off-1].get()->c_str();
|
||||||
}
|
}
|
||||||
|
|
||||||
inline bool StringStore::isNullValue(uint32_t off, uint32_t len) const
|
inline bool StringStore::isNullValue(uint32_t off, uint32_t len) const
|
||||||
@ -1486,17 +1474,15 @@ inline bool StringStore::isNullValue(uint32_t off, uint32_t len) const
|
|||||||
if (len < 8)
|
if (len < 8)
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
uint32_t chunk = off / CHUNK_SIZE;
|
if ((mem.size() < off) || off == 0)
|
||||||
uint32_t offset = off % CHUNK_SIZE;
|
|
||||||
MemChunk *mc;
|
|
||||||
if (mem.size() <= chunk)
|
|
||||||
return true;
|
return true;
|
||||||
mc = (MemChunk *) mem[chunk].get();
|
|
||||||
if ((offset + len) > mc->currentSize)
|
if (mem[off-1].get() == NULL)
|
||||||
return true;
|
return true;
|
||||||
if (mc->data[offset] == 0) // "" = NULL string for some reason...
|
|
||||||
return true;
|
if (mem[off-1].get()->empty()) // Empty string is NULL
|
||||||
return (*((uint64_t *) &mc->data[offset]) == *((uint64_t *) joblist::CPNULLSTRMARK.c_str()));
|
return true;
|
||||||
|
return (mem[off-1].get()->compare(joblist::CPNULLSTRMARK) == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
inline bool StringStore::equals(const std::string &str, uint32_t off, uint32_t len) const
|
inline bool StringStore::equals(const std::string &str, uint32_t off, uint32_t len) const
|
||||||
@ -1504,15 +1490,13 @@ inline bool StringStore::equals(const std::string &str, uint32_t off, uint32_t l
|
|||||||
if (off == std::numeric_limits<uint32_t>::max() || len == 0)
|
if (off == std::numeric_limits<uint32_t>::max() || len == 0)
|
||||||
return str == joblist::CPNULLSTRMARK;
|
return str == joblist::CPNULLSTRMARK;
|
||||||
|
|
||||||
uint32_t chunk = off / CHUNK_SIZE;
|
if ((mem.size() < off) || off == 0)
|
||||||
uint32_t offset = off % CHUNK_SIZE;
|
|
||||||
if (mem.size() <= chunk)
|
|
||||||
return false;
|
|
||||||
MemChunk *mc = (MemChunk *) mem[chunk].get();
|
|
||||||
if ((offset + len) > mc->currentSize)
|
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
return (strncmp(str.c_str(), (const char *) &mc->data[offset], len) == 0);
|
if (mem[off-1].get() == NULL)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
return (mem[off-1].get()->compare(str) == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
inline bool StringStore::isEmpty() const
|
inline bool StringStore::isEmpty() const
|
||||||
@ -1524,11 +1508,9 @@ inline uint64_t StringStore::getSize() const
|
|||||||
{
|
{
|
||||||
uint32_t i;
|
uint32_t i;
|
||||||
uint64_t ret = 0;
|
uint64_t ret = 0;
|
||||||
MemChunk *mc;
|
|
||||||
|
|
||||||
for (i = 0; i < mem.size(); i++) {
|
for (i = 0; i < mem.size(); i++) {
|
||||||
mc = (MemChunk *) mem[i].get();
|
ret+= mem[i].get()->length();
|
||||||
ret += mc->capacity;
|
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user