1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

MCOL-641 Work of Ivan Zuniga on basic read and write support for Binary16

This commit is contained in:
Gagan Goel
2019-10-24 14:01:47 -04:00
committed by Roman Nozdrin
parent d943beb445
commit 32f6167067
48 changed files with 1114 additions and 75 deletions

View File

@ -273,6 +273,21 @@ inline bool colStrCompare_(uint64_t val1, uint64_t val2, uint8_t COP, const idb_
template<int>
inline bool isEmptyVal(uint8_t type, const uint8_t* val8);
template<>
inline bool isEmptyVal<32>(uint8_t type, const uint8_t* ival) // For BINARY
{
const uint64_t* val = reinterpret_cast<const uint64_t*>(ival);
return ((val[0] == joblist::BINARYEMPTYROW) && (val[1] == joblist::BINARYEMPTYROW)
&& (val[2] == joblist::BINARYEMPTYROW) && (val[3] == joblist::BINARYEMPTYROW));
}
template<>
inline bool isEmptyVal<16>(uint8_t type, const uint8_t* ival) // For BINARY
{
const uint64_t* val = reinterpret_cast<const uint64_t*>(ival);
return ((val[0] == joblist::BINARYEMPTYROW) && (val[1] == joblist::BINARYEMPTYROW));
}
template<>
inline bool isEmptyVal<8>(uint8_t type, const uint8_t* ival)
{
@ -394,6 +409,21 @@ inline bool isEmptyVal<1>(uint8_t type, const uint8_t* ival)
template<int>
inline bool isNullVal(uint8_t type, const uint8_t* val8);
template<>
inline bool isNullVal<16>(uint8_t type, const uint8_t* ival) // For BINARY
{
const uint64_t* val = reinterpret_cast<const uint64_t*>(ival);
return ((val[0] == joblist::BINARYNULL) && (val[1] == joblist::BINARYNULL));
}
template<>
inline bool isNullVal<32>(uint8_t type, const uint8_t* ival) // For BINARY
{
const uint64_t* val = reinterpret_cast<const uint64_t*>(ival);
return ((val[0] == joblist::BINARYNULL) && (val[1] == joblist::BINARYNULL)
&& (val[2] == joblist::BINARYNULL) && (val[3] == joblist::BINARYNULL));
}
template<>
inline bool isNullVal<8>(uint8_t type, const uint8_t* ival)
{
@ -521,6 +551,12 @@ inline bool isNullVal(uint32_t length, uint8_t type, const uint8_t* val8)
{
switch (length)
{
case 32:
return isNullVal<32>(type, val8);
case 16:
return isNullVal<16>(type, val8);
case 8:
return isNullVal<8>(type, val8);
@ -703,6 +739,16 @@ inline void store(const NewColRequestHeader* in,
switch (in->DataSize)
{
case 32:
ptr2 += (rid << 5);
memcpy(ptr1, ptr2, 32);
break;
case 16:
ptr2 += (rid << 4);
memcpy(ptr1, ptr2, 16);
break;
default:
case 8:
ptr2 += (rid << 3);
@ -724,7 +770,6 @@ inline void store(const NewColRequestHeader* in,
memcpy(ptr1, ptr2, 1);
break;
}
*written += in->DataSize;
}
@ -811,6 +856,66 @@ inline uint64_t nextUnsignedColValue(int type,
return -1;
}
}
template<int W>
inline uint8_t* nextBinColValue(int type,
const uint16_t* ridArray,
int NVALS,
int* index,
bool* done,
bool* isNull,
bool* isEmpty,
uint16_t* rid,
uint8_t OutputType, uint8_t* val8, unsigned itemsPerBlk)
{
if (ridArray == NULL)
{
while (static_cast<unsigned>(*index) < itemsPerBlk &&
isEmptyVal<W>(type, &val8[*index * W]) &&
(OutputType & OT_RID))
{
(*index)++;
}
if (static_cast<unsigned>(*index) >= itemsPerBlk)
{
*done = true;
return NULL;
}
*rid = (*index)++;
}
else
{
//FIXME: not complete nor tested . How make execution flow pass here
// whe is ridArray not NULL ? fidn by id? how?
while (*index < NVALS &&
isEmptyVal<W>(type, &val8[ridArray[*index] * W]))
{
(*index)++;
}
if (*index >= NVALS)
{
*done = true;
return NULL;
}
*rid = ridArray[(*index)++];
}
*isNull = isNullVal<W>(type, val8);
*isEmpty = isEmptyVal<W>(type, val8);
//cout << "nextUnsignedColValue index " << *index << " rowid " << *rid << endl;
// at this point, nextRid is the index to return, and index is...
// if RIDs are not specified, nextRid + 1,
// if RIDs are specified, it's the next index in the rid array.
return &val8[*rid * W];
#ifdef PRIM_DEBUG
throw logic_error("PrimitiveProcessor::nextColBinValue() bad width");
#endif
return NULL;
}
}
template<int W>
inline int64_t nextColValue(int type,
@ -1426,6 +1531,225 @@ inline void p_Col_ridArray(NewColRequestHeader* in,
#endif
}
// for BINARY
template<int W>
inline void p_Col_bin_ridArray(NewColRequestHeader* in,
NewColResultHeader* out,
unsigned outSize,
unsigned* written, int* block, Stats* fStatsPtr, unsigned itemsPerBlk,
boost::shared_ptr<ParsedColumnFilter> parsedColumnFilter)
{
uint16_t* ridArray = 0;
uint8_t* in8 = reinterpret_cast<uint8_t*>(in);
const uint8_t filterSize = sizeof(uint8_t) + sizeof(uint8_t) + W;
idb_regex_t placeholderRegex;
placeholderRegex.used = false;
//FIXME: pCol is setting it to 8192 cause logicalBlockMode is true
if(itemsPerBlk == BLOCK_SIZE){
itemsPerBlk = BLOCK_SIZE/W;
}
if (in->NVALS > 0)
ridArray = reinterpret_cast<uint16_t*>(&in8[sizeof(NewColRequestHeader) +
(in->NOPS * filterSize)]);
if (ridArray && 1 == in->sort )
{
qsort(ridArray, in->NVALS, sizeof(uint16_t), compareBlock<uint16_t>);
if (fStatsPtr)
#ifdef _MSC_VER
fStatsPtr->markEvent(in->LBID, GetCurrentThreadId(), in->hdr.SessionID, 'O');
#else
fStatsPtr->markEvent(in->LBID, pthread_self(), in->hdr.SessionID, 'O');
#endif
}
// Set boolean indicating whether to capture the min and max values.
out->ValidMinMax = isMinMaxValid(in);
if (out->ValidMinMax)
{
if (isUnsigned((CalpontSystemCatalog::ColDataType)in->DataType))
{
out->Min = static_cast<int64_t>(numeric_limits<uint64_t>::max());
out->Max = 0;
}
else
{
out->Min = numeric_limits<int64_t>::max();
out->Max = numeric_limits<int64_t>::min();
}
}
else
{
out->Min = 0;
out->Max = 0;
}
typedef char binWtype [W];
const ColArgs* args = NULL;
int64_t val = 0;
binWtype* bval;
int nextRidIndex = 0, argIndex = 0;
bool done = false, cmp = false, isNull = false, isEmpty = false;
uint16_t rid = 0;
prestored_set_t::const_iterator it;
binWtype* argVals = (binWtype*)alloca(in->NOPS * W);
uint8_t* std_cops = (uint8_t*)alloca(in->NOPS * sizeof(uint8_t));
uint8_t* std_rfs = (uint8_t*)alloca(in->NOPS * sizeof(uint8_t));
uint8_t* cops = NULL;
uint8_t* rfs = NULL;
scoped_array<idb_regex_t> std_regex;
idb_regex_t* regex = NULL;
uint8_t likeOps = 0;
// no pre-parsed column filter is set, parse the filter in the message
if (parsedColumnFilter.get() == NULL) {
std_regex.reset(new idb_regex_t[in->NOPS]);
regex = &(std_regex[0]);
cops = std_cops;
rfs = std_rfs;
for (argIndex = 0; argIndex < in->NOPS; argIndex++) {
args = reinterpret_cast<const ColArgs*> (&in8[sizeof (NewColRequestHeader) +
(argIndex * filterSize)]);
cops[argIndex] = args->COP;
rfs[argIndex] = args->rf;
memcpy(argVals[argIndex],args->val, W);
}
regex[argIndex].used = false;
}
// else we have a pre-parsed filter, and it's an unordered set for quick == comparisons
bval = (binWtype*)nextBinColValue<W>(in->DataType, ridArray, in->NVALS, &nextRidIndex, &done, &isNull,
&isEmpty, &rid, in->OutputType, reinterpret_cast<uint8_t*>(block), itemsPerBlk);
while (!done)
{
// if((*((uint64_t *) (bval))) != 0)
// {
// cout << "rid "<< rid << " value ";
// if(W > 16) printf("%016X%016X ",( *(((uint64_t *) (bval)) +3)),(*(((uint64_t *) (bval)) +2)));
// printf("%016X%016X ",( *(((uint64_t *) (bval)) +1)),(*((uint64_t *) (bval))) );
//
// cout << endl;
// }
if (cops == NULL) // implies parsedColumnFilter && columnFilterMode == SET
{
/* bug 1920: ignore NULLs in the set and in the column data */
if (!(isNull && in->BOP == BOP_AND))
{
it = parsedColumnFilter->prestored_set->find(val);
if (in->BOP == BOP_OR)
{
// assume COP == COMPARE_EQ
if (it != parsedColumnFilter->prestored_set->end())
{
store(in, out, outSize, written, rid, reinterpret_cast<const uint8_t*>(block));
}
}
else if (in->BOP == BOP_AND)
{
// assume COP == COMPARE_NE
if (it == parsedColumnFilter->prestored_set->end())
{
store(in, out, outSize, written, rid, reinterpret_cast<const uint8_t*>(block));
}
}
}
}
else
{
for (argIndex = 0; argIndex < in->NOPS; argIndex++)
{
// if((*((uint64_t *) (uval))) != 0) cout << "comparing " << dec << (*((uint64_t *) (uval))) << " to " << (*((uint64_t *) (argVals[argIndex]))) << endl;
int val1 = memcmp(*bval, &argVals[argIndex], W);
switch (cops[argIndex]) {
case COMPARE_NIL:
cmp = false;
break;
case COMPARE_LT:
cmp = val1 < 0;
break;
case COMPARE_EQ:
cmp = val1 == 0;
break;
case COMPARE_LE:
cmp = val1 <= 0;
break;
case COMPARE_GT:
cmp = val1 > 0;
break;
case COMPARE_NE:
cmp = val1 != 0;
break;
case COMPARE_GE:
cmp = val1 >= 0;
break;
default:
logIt(34, cops[argIndex], "colCompare");
cmp = false; // throw an exception here?
}
// cout << cmp << endl;
if (in->NOPS == 1)
{
if (cmp == true)
{
store(in, out, outSize, written, rid, reinterpret_cast<const uint8_t*>(block));
}
break;
}
else if (in->BOP == BOP_AND && cmp == false)
{
break;
}
else if (in->BOP == BOP_OR && cmp == true)
{
store(in, out, outSize, written, rid, reinterpret_cast<const uint8_t*>(block));
break;
}
}
if ((argIndex == in->NOPS && in->BOP == BOP_AND) || in->NOPS == 0)
{
store(in, out, outSize, written, rid, reinterpret_cast<const uint8_t*>(block));
}
}
bval = (binWtype*)nextBinColValue<W>(in->DataType, ridArray, in->NVALS, &nextRidIndex, &done, &isNull,
&isEmpty, &rid, in->OutputType, reinterpret_cast<uint8_t*>(block), itemsPerBlk);
}
if (fStatsPtr)
#ifdef _MSC_VER
fStatsPtr->markEvent(in->LBID, GetCurrentThreadId(), in->hdr.SessionID, 'K');
#else
fStatsPtr->markEvent(in->LBID, pthread_self(), in->hdr.SessionID, 'K');
#endif
} //namespace anon
namespace primitives
@ -1476,6 +1800,14 @@ void PrimitiveProcessor::p_Col(NewColRequestHeader* in, NewColResultHeader* out,
switch (in->DataSize)
{
case 32:
p_Col_bin_ridArray<32>(in, out, outSize, written, block, fStatsPtr, itemsPerBlk, parsedColumnFilter);
break;
case 16:
p_Col_bin_ridArray<16>(in, out, outSize, written, block, fStatsPtr, itemsPerBlk, parsedColumnFilter);
break;
case 8:
p_Col_ridArray<8>(in, out, outSize, written, block, fStatsPtr, itemsPerBlk, parsedColumnFilter);
break;
@ -1578,7 +1910,9 @@ boost::shared_ptr<ParsedColumnFilter> parseColumnFilter
case 8:
ret->prestored_argVals[argIndex] = *reinterpret_cast<const uint64_t*>(args->val);
break;
break;
case 16:
cout << __FILE__<< ":" <<__LINE__ << " Fix for 16 Bytes ?" << endl;
}
}
else
@ -1614,6 +1948,8 @@ boost::shared_ptr<ParsedColumnFilter> parseColumnFilter
case 8:
ret->prestored_argVals[argIndex] = *reinterpret_cast<const int64_t*>(args->val);
break;
case 16:
cout << __FILE__<< ":" <<__LINE__ << " Fix for 16 Bytes ?" << endl;
}
}

View File

@ -39,9 +39,9 @@
#include <cppunit/extensions/HelperMacros.h>
#include "primitiveprocessor.h"
using namespace primitives;
using namespace std;
int done;
void alarm_handler(int sig)
@ -87,7 +87,6 @@ class PrimTest : public CppUnit::TestFixture
CPPUNIT_TEST(p_IdxList_1);
CPPUNIT_TEST(p_IdxList_2);
// whole block tests
CPPUNIT_TEST(p_Col_1);
CPPUNIT_TEST(p_Col_2);
@ -162,7 +161,11 @@ class PrimTest : public CppUnit::TestFixture
// CPPUNIT_TEST(p_Dictionary_like_prefixbench_1);
// CPPUNIT_TEST(p_Dictionary_like_substrbench_1);
// binary data type
CPPUNIT_TEST(p_Col_bin_16);
CPPUNIT_TEST(p_Col_bin_32);
CPPUNIT_TEST_SUITE_END();
private:
@ -3744,6 +3747,178 @@ public:
close(fd);
}
template<uint8_t W> struct binary;
typedef binary<16> binary16;
typedef binary<32> binary32;
template<uint8_t W>
struct binary {
unsigned char data[W]; // May be ok for empty value ?
void operator=(uint64_t v) {*((uint64_t *) data) = v; memset(data + 8, 0, W - 8);}
inline uint8_t& operator[](const int index) {return *((uint8_t*) (data + index));}
inline uint64_t& uint64(const int index) {return *((uint64_t*) (data + (index << 3)));}
};
void p_Col_bin_16()
{
PrimitiveProcessor pp;
uint8_t input[BLOCK_SIZE], output[4 * BLOCK_SIZE], block[BLOCK_SIZE];
NewColRequestHeader* in;
NewColResultHeader* out;
ColArgs* args;
binary16* results;
uint32_t written, i;
int fd;
binary16 tmp;
binary16* bin16 = (binary16*) block;
for(int i = 0; i < BLOCK_SIZE/16; i++)
{
bin16[i] = 0;
}
bin16[0].uint64(0) = 10UL;
bin16[1].uint64(0) = 1000UL;
bin16[3].uint64(0) = 1000UL;
bin16[3].uint64(1) = 1;
bin16[4].uint64(0) = 256;
bin16[4].uint64(1) = 1;
typedef char bin16_t[16];
*(uint64_t*)(((bin16_t*)block) + 5) = 500;
*(uint64_t*)&((bin16_t*)block)[6] = 501;
memset(input, 0, BLOCK_SIZE);
memset(output, 0, 4 * BLOCK_SIZE);
in = reinterpret_cast<NewColRequestHeader*>(input);
out = reinterpret_cast<NewColResultHeader*>(output);
args = reinterpret_cast<ColArgs*>(&input[sizeof(NewColRequestHeader)]);
in->DataSize = sizeof(binary16);
in->DataType = execplan::CalpontSystemCatalog::BINARY;
in->OutputType = OT_DATAVALUE;
in->NOPS = 3;
in->BOP = BOP_OR;
in->NVALS = 0;
tmp = 10;
args->COP = COMPARE_EQ;
memcpy(args->val, &tmp, in->DataSize);
args = reinterpret_cast<ColArgs*> (args->val + in->DataSize);
args->COP = COMPARE_EQ;
tmp = 1000;
memcpy(args->val, &tmp, in->DataSize);
args = reinterpret_cast<ColArgs*> (args->val + in->DataSize);
tmp.uint64(0) = 256;
tmp.uint64(1) = 1;
args->COP = COMPARE_EQ;
memcpy(args->val, &tmp, in->DataSize);
pp.setBlockPtr((int*) block);
pp.p_Col(in, out, 4 * BLOCK_SIZE, &written);
results = reinterpret_cast<binary16*>(&output[sizeof(NewColResultHeader)]);
// cout << "NVALS = " << out->NVALS << endl;
CPPUNIT_ASSERT_EQUAL((uint16_t)3, out->NVALS);
CPPUNIT_ASSERT_EQUAL((u_int64_t)10, results[0].uint64(0));
CPPUNIT_ASSERT_EQUAL((u_int64_t)1000, results[1].uint64(0));
for (i = 0; i < out->NVALS; i++) {
printf("Result %d Value %016X%016X\n",i ,results[i].uint64(1),results[i].uint64(0) );
// CPPUNIT_ASSERT(results[i] == (uint32_t) (i < 10 ? i : i - 10 + 1001));
}
}
void p_Col_bin_32()
{
PrimitiveProcessor pp;
uint8_t input[2 * BLOCK_SIZE], output[8 * BLOCK_SIZE], block[BLOCK_SIZE];
NewColRequestHeader* in;
NewColResultHeader* out;
ColArgs* args;
binary32* results;
uint32_t written, i;
int fd;
binary32 tmp;
binary32* bin32 = (binary32*) block;
for(int i = 0; i < BLOCK_SIZE/32; i++)
{
bin32[i].uint64(0) = 0;
}
bin32[0].uint64(0) = 10UL;
bin32[1].uint64(0) = 1000UL;
bin32[3].uint64(0) = 1000UL;
bin32[3].uint64(1) = 1;
bin32[4].uint64(0) = 256;
bin32[4].uint64(1) = 254;
bin32[4].uint64(2) = 253;
bin32[4].uint64(3) = 252;
typedef char bin32_t[32];
*(uint64_t*)(((bin32_t*)block) + 5) = 500;
*(uint64_t*)&((bin32_t*)block)[6] = 501;
memset(input, 0, BLOCK_SIZE);
memset(output, 0, 4 * BLOCK_SIZE);
in = reinterpret_cast<NewColRequestHeader*>(input);
out = reinterpret_cast<NewColResultHeader*>(output);
args = reinterpret_cast<ColArgs*>(&input[sizeof(NewColRequestHeader)]);
in->DataSize = sizeof(binary32);
in->DataType = execplan::CalpontSystemCatalog::BINARY;
in->OutputType = OT_DATAVALUE;
in->NOPS = 3;
in->BOP = BOP_OR;
in->NVALS = 0;
tmp = 10;
args->COP = COMPARE_EQ;
memcpy(args->val, &tmp, in->DataSize);
args = reinterpret_cast<ColArgs*> (args->val + in->DataSize);
args->COP = COMPARE_EQ;
tmp = 1000;
memcpy(args->val, &tmp, in->DataSize);
args = reinterpret_cast<ColArgs*> (args->val + in->DataSize);
tmp.uint64(0) = 256;
tmp.uint64(1) = 254;
tmp.uint64(2) = 253;
tmp.uint64(3) = 252;
args->COP = COMPARE_EQ;
memcpy(args->val, &tmp, in->DataSize);
pp.setBlockPtr((int*) block);
pp.p_Col(in, out, 4 * BLOCK_SIZE, &written);
results = reinterpret_cast<binary32*>(&output[sizeof(NewColResultHeader)]);
// cout << "NVALS = " << out->NVALS << endl;
CPPUNIT_ASSERT_EQUAL((uint16_t)3, out->NVALS);
// CPPUNIT_ASSERT_EQUAL((u_int64_t)10, results[0].uint64(0));
// CPPUNIT_ASSERT_EQUAL((u_int64_t)1000, results[1].uint64(0));
for (i = 0; i < out->NVALS; i++) {
printf("Result %d Value %016X%016X%016X%016X\n",i ,results[i].uint64(3),results[i].uint64(2),results[i].uint64(1),results[i].uint64(0) );
// CPPUNIT_ASSERT(results[i] == (uint32_t) (i < 10 ? i : i - 10 + 1001));
}
}
void p_Dictionary_1()
{