1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-12-20 01:42:27 +03:00

Merge pull request #497 from mariadb-corporation/develop-1.1

Develop 1.1
This commit is contained in:
benthompson15
2018-06-18 15:11:36 -05:00
committed by GitHub
32 changed files with 772 additions and 411 deletions

View File

@@ -1,4 +1,4 @@
COLUMNSTORE_VERSION_MAJOR=1
COLUMNSTORE_VERSION_MINOR=1
COLUMNSTORE_VERSION_PATCH=4
COLUMNSTORE_VERSION_PATCH=5
COLUMNSTORE_VERSION_RELEASE=1

View File

@@ -9,6 +9,10 @@ ADD_CUSTOM_COMMAND(
DEPENDS ddl.y ddl.l
)
# Parser puts extra info to stderr.
INCLUDE(../../check_compiler_flag.cmake)
MY_CHECK_AND_SET_COMPILER_FLAG("-DYYDEBUG" DEBUG)
########### next target ###############
set(ddlpackage_LIB_SRCS

View File

@@ -18,6 +18,7 @@
/* $Id: ddl.l 9341 2013-03-27 14:10:35Z chao $ */
%{
#include <string.h>
#include <iostream>
#include <vector>
#include <stdio.h>
@@ -31,10 +32,11 @@
#endif
using namespace ddlpackage;
typedef enum { NOOP, STRIP_QUOTES } copy_action_t;
int lineno = 1;
void ddlerror(struct pass_to_bison* x, char const *s);
static char* scanner_copy(char *str, yyscan_t yyscanner);
static char* scanner_copy(char *str, yyscan_t yyscanner, copy_action_t action = NOOP );
%}
@@ -54,6 +56,10 @@ horiz_space [ \t\f]
newline [\n\r]
non_newline [^\n\r]
quote '
double_quote \"
grave_accent `
comment ("--"{non_newline}*)
self [,()\[\].;\:\+\-\*\/\%\^\<\>\=]
whitespace ({space}+|{comment})
@@ -62,6 +68,10 @@ digit [0-9]
ident_start [A-Za-z\200-\377_]
ident_cont [A-Za-z\200-\377_0-9\$]
identifier {ident_start}{ident_cont}*
/* fully qualified names regexes */
fq_identifier {identifier}\.{identifier}
identifier_quoted {grave_accent}{identifier}{grave_accent}
identifier_double_quoted {double_quote}{identifier}{double_quote}
integer [-+]?{digit}+
decimal ([-+]?({digit}*\.{digit}+)|({digit}+\.{digit}*))
@@ -69,11 +79,13 @@ real ({integer}|{decimal})[Ee][-+]?{digit}+
realfail1 ({integer}|{decimal})[Ee]
realfail2 ({integer}|{decimal})[Ee][-+]
quote '
grave_accent `
%%
{identifier_quoted} { ddlget_lval(yyscanner)->str = scanner_copy( ddlget_text(yyscanner), yyscanner, STRIP_QUOTES ); return IDENT; }
{identifier_double_quoted} { ddlget_lval(yyscanner)->str = scanner_copy( ddlget_text(yyscanner), yyscanner, STRIP_QUOTES ); return DQ_IDENT; }
ACTION {return ACTION;}
ADD {return ADD;}
ALTER {return ALTER;}
@@ -171,14 +183,14 @@ LONGTEXT {return LONGTEXT;}
/* ignore */
}
{identifier} {ddlget_lval(yyscanner)->str = scanner_copy(ddlget_text(yyscanner), yyscanner); return IDENT;}
{identifier} { ddlget_lval(yyscanner)->str = scanner_copy(ddlget_text(yyscanner), yyscanner); return DQ_IDENT;}
{self} {
return ddlget_text(yyscanner)[0];
}
{grave_accent} {
/* ignore */
return ddlget_text(yyscanner)[0];
}
%%
@@ -198,6 +210,11 @@ using namespace ddlpackage;
*/
void scanner_init(const char* str, yyscan_t yyscanner)
{
#ifdef YYDEBUG
extern int ddldebug;
ddldebug = 1;
#endif
size_t slen = strlen(str);
scan_data* pScanData = (scan_data*)ddlget_extra(yyscanner);
@@ -246,10 +263,20 @@ void scanner_finish(yyscan_t yyscanner)
pScanData->valbuf.clear();
}
char* scanner_copy (char *str, yyscan_t yyscanner)
char* scanner_copy (char *str, yyscan_t yyscanner, copy_action_t action)
{
char* nv = strdup(str);
if(nv)
((scan_data*)ddlget_extra(yyscanner))->valbuf.push_back(nv);
return nv;
char* result;
char* nv = strdup(str);
result = nv;
// free strduped memory later to prevent possible memory leak
if(nv)
((scan_data*)ddlget_extra(yyscanner))->valbuf.push_back(nv);
if(action == STRIP_QUOTES)
{
nv[strlen(str) - 1] = '\0';
result = nv + 1;
}
return result;
}

View File

@@ -29,20 +29,13 @@
Understanding the New Sql book
The postgress and mysql sources. find x -name \*.y -o -name \*.yy.
We don't support delimited identifiers.
We support quoted identifiers.
All literals are stored as unconverted strings.
You can't say "NOT DEFERRABLE". See the comment below.
This is not a reentrant parser. It uses the original global
variable style method of communication between the parser and
scanner. If we ever needed more than one parser thread per
processes, we would use the pure/reentrant options of bison and
flex. In that model, things that are traditionally global live
inside a struct that is passed around. We would need to upgrade to
a more recent version of flex. At the time of this writing, our
development systems have: flex version 2.5.4
This is a reentrant parser.
MCOL-66 Modify to be a reentrant parser
*/
@@ -121,7 +114,7 @@ REFERENCES RENAME RESTRICT SET SMALLINT TABLE TEXT TIME TINYBLOB TINYTEXT
TINYINT TO UNIQUE UNSIGNED UPDATE USER SESSION_USER SYSTEM_USER VARCHAR VARBINARY
VARYING WITH ZONE DOUBLE IDB_FLOAT REAL CHARSET IDB_IF EXISTS CHANGE TRUNCATE
%token <str> IDENT FCONST SCONST CP_SEARCH_CONDITION_TEXT ICONST DATE
%token <str> DQ_IDENT IDENT FCONST SCONST CP_SEARCH_CONDITION_TEXT ICONST DATE
/* Notes:
* 1. "ata" stands for alter_table_action
@@ -205,6 +198,7 @@ VARYING WITH ZONE DOUBLE IDB_FLOAT REAL CHARSET IDB_IF EXISTS CHANGE TRUNCATE
%type <str> opt_if_exists
%type <str> opt_if_not_exists
%type <sqlStmt> trunc_table_statement
%type <str> ident
%%
stmtblock: stmtmulti { x->fParseTree = $1; }
@@ -464,7 +458,7 @@ opt_equal:
;
table_option:
ENGINE opt_equal IDENT {$$ = new pair<string,string>("engine", $3);}
ENGINE opt_equal ident {$$ = new pair<string,string>("engine", $3);}
|
MAX_ROWS opt_equal ICONST {$$ = new pair<string,string>("max_rows", $3);}
|
@@ -479,9 +473,9 @@ table_option:
$$ = new pair<string,string>("auto_increment", $3);
}
|
DEFAULT CHARSET opt_equal IDENT {$$ = new pair<string,string>("default charset", $4);}
DEFAULT CHARSET opt_equal ident {$$ = new pair<string,string>("default charset", $4);}
|
DEFAULT IDB_CHAR SET opt_equal IDENT {$$ = new pair<string,string>("default charset", $5);}
DEFAULT IDB_CHAR SET opt_equal ident {$$ = new pair<string,string>("default charset", $5);}
;
alter_table_statement:
@@ -611,15 +605,23 @@ table_name:
;
qualified_name:
IDENT '.' IDENT {$$ = new QualifiedName($1, $3);}
| IDENT {
| ident {
if (x->fDBSchema.size())
$$ = new QualifiedName((char*)x->fDBSchema.c_str(), $1);
else
$$ = new QualifiedName($1);
}
| ident '.' ident
{
$$ = new QualifiedName($1, $3);
}
;
ident:
DQ_IDENT
| IDENT
;
ata_add_column:
/* See the documentation for SchemaObject for an explanation of why we are using
* dynamic_cast here.
@@ -632,11 +634,11 @@ ata_add_column:
column_name:
DATE
|IDENT
|ident
;
constraint_name:
IDENT
ident
;
column_option:
@@ -696,6 +698,10 @@ default_clause:
{
$$ = new ColumnDefaultValue($2);
}
| DEFAULT DQ_IDENT /* MCOL-1406 */
{
$$ = new ColumnDefaultValue($2);
}
| DEFAULT NULL_TOK {$$ = new ColumnDefaultValue(NULL);}
| DEFAULT USER {$$ = new ColumnDefaultValue("$USER");}
| DEFAULT CURRENT_USER {$$ = new ColumnDefaultValue("$CURRENT_USER");}

View File

@@ -1500,10 +1500,7 @@ const JobStepVector doSimpleFilter(SimpleFilter* sf, JobInfo& jobInfo)
return doExpressionFilter(sf, jobInfo);
}
// trim trailing space char in the predicate
string constval(cc->constval());
size_t spos = constval.find_last_not_of(" ");
if (spos != string::npos) constval = constval.substr(0, spos+1);
CalpontSystemCatalog::OID dictOid = 0;
CalpontSystemCatalog::ColType ct = sc->colType();
@@ -2569,10 +2566,7 @@ const JobStepVector doConstantFilter(const ConstantFilter* cf, JobInfo& jobInfo)
if (ConstantColumn::NULLDATA == cc->type() && (opeq == *sop || opne == *sop))
cop = COMPARE_NIL;
// trim trailing space char
string value = cc->constval();
size_t spos = value.find_last_not_of(" ");
if (spos != string::npos) value = value.substr(0, spos+1);
pds->addFilter(cop, value);
}
@@ -2652,10 +2646,7 @@ const JobStepVector doConstantFilter(const ConstantFilter* cf, JobInfo& jobInfo)
if (ConstantColumn::NULLDATA == cc->type() && (opeq == *sop || opne == *sop))
cop = COMPARE_NIL;
// trim trailing space char
string value = cc->constval();
size_t spos = value.find_last_not_of(" ");
if (spos != string::npos) value = value.substr(0, spos+1);
pds->addFilter(cop, value);
}
@@ -2759,9 +2750,6 @@ const JobStepVector doConstantFilter(const ConstantFilter* cf, JobInfo& jobInfo)
int8_t cop = op2num(sop);
int64_t value = 0;
string constval = cc->constval();
// trim trailing space char
size_t spos = constval.find_last_not_of(" ");
if (spos != string::npos) constval = constval.substr(0, spos+1);
// @bug 1151 string longer than colwidth of char/varchar.
uint8_t rf = 0;

View File

@@ -958,6 +958,7 @@ void TupleAggregateStep::prep1PhaseAggregate(
vector<SP_ROWAGG_FUNC_t> functionVec;
uint32_t bigIntWidth = sizeof(int64_t);
uint32_t bigUintWidth = sizeof(uint64_t);
uint32_t projColsUDAFIndex = 0;
mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL;
// for count column of average function
@@ -1135,18 +1136,27 @@ void TupleAggregateStep::prep1PhaseAggregate(
SP_ROWAGG_FUNC_t funct;
if (aggOp == ROWAGG_UDAF)
{
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(jobInfo.projectionCols[i].get());
if (udafc)
{
// Create a RowAggFunctionCol (UDAF subtype) with the context.
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, i));
}
else
{
throw logic_error("prep1PhasesAggregate: A UDAF function is called but there's no UDAFColumn");
}
}
{
std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex;
for (; it != jobInfo.projectionCols.end(); it++)
{
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>((*it).get());
projColsUDAFIndex++;
if (udafc)
{
pUDAFFunc = udafc->getContext().getFunction();
// Create a RowAggFunctionCol (UDAF subtype) with the context.
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, i));
break;
}
}
if (it == jobInfo.projectionCols.end())
{
throw logic_error("prep1PhaseAggregate: A UDAF function is called but there's no/not enough UDAFColumn/-s");
}
}
else
{
funct.reset(new RowAggFunctionCol(aggOp, stats, colProj, i));
@@ -1484,6 +1494,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
AGG_MAP aggFuncMap;
mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL;
set<uint32_t> avgSet;
uint32_t projColsUDAFIndex = 0;
// for count column of average function
map<uint32_t, SP_ROWAGG_FUNC_t> avgFuncMap, avgDistFuncMap;
@@ -1636,19 +1647,27 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
SP_ROWAGG_FUNC_t funct;
if (aggOp == ROWAGG_UDAF)
{
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(jobInfo.projectionCols[i].get());
if (udafc)
{
pUDAFFunc = udafc->getContext().getFunction();
// Create a RowAggFunctionCol (UDAF subtype) with the context.
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAgg));
}
else
{
throw logic_error("prep1PhaseDistinctAggregate: A UDAF function is called but there's no UDAFColumn");
}
}
{
std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex;
for (; it != jobInfo.projectionCols.end(); it++)
{
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>((*it).get());
projColsUDAFIndex++;
if (udafc)
{
pUDAFFunc = udafc->getContext().getFunction();
// Create a RowAggFunctionCol (UDAF subtype) with the context.
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAgg));
break;
}
}
if (it == jobInfo.projectionCols.end())
{
throw logic_error("prep1PhaseDistinctAggregate: A UDAF function is called but there's no/not enough UDAFColumn/-s");
}
}
else
{
funct.reset(new RowAggFunctionCol(aggOp, stats, colProj, colAgg));
@@ -2579,6 +2598,7 @@ void TupleAggregateStep::prep2PhasesAggregate(
vector<pair<uint32_t, int> > aggColVec;
set<uint32_t> avgSet;
vector<std::pair<uint32_t, int> >& returnedColVec = jobInfo.returnedColVec;
uint32_t projColsUDAFIndex = 0;
for (uint64_t i = 0; i < returnedColVec.size(); i++)
{
// skip if not an aggregation column
@@ -2746,18 +2766,26 @@ void TupleAggregateStep::prep2PhasesAggregate(
SP_ROWAGG_FUNC_t funct;
if (aggOp == ROWAGG_UDAF)
{
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(jobInfo.projectionCols[i].get());
if (udafc)
{
pUDAFFunc = udafc->getContext().getFunction();
// Create a RowAggFunctionCol (UDAF subtype) with the context.
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAggPm));
}
else
{
throw logic_error("prep2PhasesAggregate: A UDAF function is called but there's no UDAFColumn");
}
}
std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex;
for (; it != jobInfo.projectionCols.end(); it++)
{
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>((*it).get());
projColsUDAFIndex++;
if (udafc)
{
pUDAFFunc = udafc->getContext().getFunction();
// Create a RowAggFunctionCol (UDAF subtype) with the context.
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAggPm));
break;
}
}
if (it == jobInfo.projectionCols.end())
{
throw logic_error("prep2PhasesAggregate: A UDAF function is called but there's no/not enough UDAFColumn/-s");
}
}
else
{
funct.reset(new RowAggFunctionCol(aggOp, stats, colProj, colAggPm));
@@ -3301,6 +3329,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
vector<pair<uint32_t, int> > aggColVec, aggNoDistColVec;
set<uint32_t> avgSet, avgDistSet;
vector<std::pair<uint32_t, int> >& returnedColVec = jobInfo.returnedColVec;
uint32_t projColsUDAFIndex = 0;
for (uint64_t i = 0; i < returnedColVec.size(); i++)
{
// col should be an aggregate or groupBy or window function
@@ -3498,18 +3527,25 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
SP_ROWAGG_FUNC_t funct;
if (aggOp == ROWAGG_UDAF)
{
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(jobInfo.projectionCols[i].get());
if (udafc)
{
pUDAFFunc = udafc->getContext().getFunction();
// Create a RowAggFunctionCol (UDAF subtype) with the context.
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAggPm));
}
else
{
throw logic_error("prep2PhasesDistinctAggregate: A UDAF function is called but there's no UDAFColumn");
}
}
std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex;
for (; it != jobInfo.projectionCols.end(); it++)
{
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>((*it).get());
projColsUDAFIndex++;
if (udafc)
{
pUDAFFunc = udafc->getContext().getFunction();
// Create a RowAggFunctionCol (UDAF subtype) with the context.
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAggPm));
break;
}
}
if (it == jobInfo.projectionCols.end())
{
throw logic_error("prep2PhasesDistinctAggregate: A UDAF function is called but there's no/not enough UDAFColumn/-s");
}
}
else
{
funct.reset(new RowAggFunctionCol(aggOp, stats, colProj, colAggPm));

View File

@@ -47,7 +47,7 @@ using namespace dataconvert;
#endif
namespace {
//returns the value of 10 raised to the power x.
inline double pow10(double x)
inline double exp10(double x)
{
return exp(x * M_LN10);
}
@@ -406,7 +406,7 @@ void TupleUnion::normalize(const Row &in, Row *out)
ostringstream os;
if (in.getScale(i)) {
double d = in.getIntField(i);
d /= pow10(in.getScale(i));
d /= exp10(in.getScale(i));
os.precision(15);
os << d;
}
@@ -488,7 +488,7 @@ dec1: uint64_t val = in.getIntField(i);
ostringstream os;
if (in.getScale(i)) {
double d = in.getUintField(i);
d /= pow10(in.getScale(i));
d /= exp10(in.getScale(i));
os.precision(15);
os << d;
}

View File

@@ -2039,51 +2039,53 @@ int ha_calpont_impl_delete_table_(const char *db, const char *name, cal_connecti
int ha_calpont_impl_rename_table_(const char* from, const char* to, cal_connection_info& ci)
{
THD *thd = current_thd;
string emsg;
THD* thd = current_thd;
string emsg;
ostringstream stmt1;
pair<string, string> fromPair;
pair<string, string> toPair;
string stmt;
pair<string, string> fromPair;
pair<string, string> toPair;
string stmt;
//this is replcated DDL, treat it just like SSO
if (thd->slave_thread)
return 0;
//this is replcated DDL, treat it just like SSO
if (thd->slave_thread)
return 0;
//@bug 5660. Error out REAL DDL/DML on slave node.
// When the statement gets here, it's NOT SSO or RESTRICT
if (ci.isSlaveNode)
{
string emsg = logging::IDBErrorInfo::instance()->errorMsg(ERR_DML_DDL_SLAVE);
setError(current_thd, ER_CHECK_NOT_IMPLEMENTED, emsg);
return 1;
}
//@bug 5660. Error out REAL DDL/DML on slave node.
// When the statement gets here, it's NOT SSO or RESTRICT
if (ci.isSlaveNode)
{
string emsg = logging::IDBErrorInfo::instance()->errorMsg(ERR_DML_DDL_SLAVE);
setError(current_thd, ER_CHECK_NOT_IMPLEMENTED, emsg);
return 1;
}
fromPair = parseTableName(from);
toPair = parseTableName(to);
fromPair = parseTableName(from);
toPair = parseTableName(to);
if (fromPair.first != toPair.first)
{
thd->get_stmt_da()->set_overwrite_status(true);
thd->raise_error_printf(ER_CHECK_NOT_IMPLEMENTED, "Both tables must be in the same database to use RENAME TABLE");
return -1;
}
if (fromPair.first != toPair.first)
{
thd->get_stmt_da()->set_overwrite_status(true);
thd->raise_error_printf(ER_CHECK_NOT_IMPLEMENTED, "Both tables must be in the same database to use RENAME TABLE");
return -1;
}
stmt1 << "alter table " << fromPair.second << " rename to " << toPair.second << ";";
// This explicitely shields both db objects with quotes that the lexer strips down later.
stmt = "alter table `" + fromPair.second + "` rename to `" + toPair.second + "`;";
string db;
stmt = stmt1.str();
string db;
if ( fromPair.first.length() !=0 )
db = fromPair.first;
else if ( thd->db )
db = thd->db;
if ( thd->db )
db = thd->db;
else if ( fromPair.first.length() != 0 )
db = fromPair.first;
else
db = toPair.first;
int rc = ProcessDDLStatement(stmt, db, "", tid2sid(thd->thread_id), emsg);
if (rc != 0)
push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, emsg.c_str());
int rc = ProcessDDLStatement(stmt, db, "", tid2sid(thd->thread_id), emsg);
return rc;
if (rc != 0)
push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, emsg.c_str());
return rc;
}
@@ -2121,7 +2123,7 @@ long long calonlinealter(UDF_INIT* initid, UDF_ARGS* args,
int rc = ProcessDDLStatement(stmt, db, "", tid2sid(thd->thread_id), emsg, compressiontype);
if (rc != 0)
push_warning(thd, Sql_condition::WARN_LEVEL_ERROR, 9999, emsg.c_str());
push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, emsg.c_str());
return rc;
}

View File

@@ -801,8 +801,8 @@ uint32_t buildOuterJoin(gp_walk_info& gwi, SELECT_LEX& select_lex)
// View is already processed in view::transform
// @bug5319. view is sometimes treated as derived table and
// fromSub::transform does not build outer join filters.
//if (!table_ptr->derived && table_ptr->view)
// continue;
if (!table_ptr->derived && table_ptr->view)
continue;
CalpontSystemCatalog:: TableAliasName tan = make_aliasview(
(table_ptr->db ? table_ptr->db : ""),
@@ -4099,7 +4099,6 @@ void gp_walk(const Item *item, void *arg)
Item_string* isp = (Item_string*)item;
if (isp)
{
// @bug 3669. trim trailing spaces for the compare value
if (isp->result_type() == STRING_RESULT)
{
String val, *str = isp->val_str(&val);
@@ -4108,9 +4107,6 @@ void gp_walk(const Item *item, void *arg)
{
cval.assign(str->ptr(), str->length());
}
size_t spos = cval.find_last_not_of(" ");
if (spos != string::npos)
cval = cval.substr(0, spos+1);
gwip->rcWorkStack.push(new ConstantColumn(cval));
break;
}

View File

@@ -27,6 +27,8 @@
#include <boost/shared_ptr.hpp>
#include "calpontsystemcatalog.h"
#include "dataconvert.h"
#include "exceptclasses.h"
using namespace logging;
// Required declaration as it isn't in a MairaDB include
@@ -70,7 +72,22 @@ static int is_columnstore_columns_fill(THD *thd, TABLE_LIST *tables, COND *cond)
for (std::vector<std::pair<execplan::CalpontSystemCatalog::OID, execplan::CalpontSystemCatalog::TableName> >::const_iterator it = catalog_tables.begin();
it != catalog_tables.end(); ++it)
{
execplan::CalpontSystemCatalog::RIDList column_rid_list = systemCatalogPtr->columnRIDs((*it).second, true);
execplan::CalpontSystemCatalog::RIDList column_rid_list;
// Note a table may get dropped as you iterate over the list of tables.
// So simply ignore the dropped table.
try {
column_rid_list = systemCatalogPtr->columnRIDs((*it).second, true);
}
catch (IDBExcept& ex)
{
if (ex.errorCode() == ERR_TABLE_NOT_IN_CATALOG) {
continue;
}
else {
return 1;
}
}
for (size_t col_num = 0; col_num < column_rid_list.size(); col_num++)
{
execplan::CalpontSystemCatalog::TableColName tcn = systemCatalogPtr->colName(column_rid_list[col_num].objnum);

View File

@@ -202,7 +202,7 @@ detachvolume() {
checkInfostatus
if [ $STATUS == "detaching" ]; then
retries=1
while [ $retries -ne 60 ]; do
while [ $retries -ne 10 ]; do
#retry until it's attached
$AWSCLI detach-volume --volume-id $volumeName --region $Region > /tmp/volumeInfo_$volumeName 2>&1
@@ -239,7 +239,7 @@ attachvolume() {
checkInfostatus
if [ $STATUS == "attaching" -o $STATUS == "already-attached" ]; then
retries=1
while [ $retries -ne 60 ]; do
while [ $retries -ne 10 ]; do
#check status until it's attached
describevolume
if [ $STATUS == "attached" ]; then

View File

@@ -13,6 +13,7 @@ syslog_conf=nofile
rsyslog7=0
user=`whoami 2>/dev/null`
group=user
SUDO=" "
if [ "$user" != "root" ]; then
@@ -167,9 +168,24 @@ if [ ! -z "$syslog_conf" ] ; then
# remove older version incase it was installed by previous build
$SUDO rm -rf /etc/rsyslog.d/columnstore.conf
// determine username/groupname
if [ -f /var/log/messages ]; then
user=`stat -c "%U %G" /var/log/messages | awk '{print $1}'`
group=`stat -c "%U %G" /var/log/messages | awk '{print $2}'`
fi
if [ -f /var/log/syslog ]; then
user=`stat -c "%U %G" /var/log/syslog | awk '{print $1}'`
group=`stat -c "%U %G" /var/log/syslog | awk '{print $2}'`
fi
//set permissions
$SUDO chown $user:$group -R /var/log/mariadb > /dev/null 2>&1
if [ $rsyslog7 == 1 ]; then
sed -i -e s/groupname/adm/g ${columnstoreSyslogFile7}
sed -i -e s/username/syslog/g ${columnstoreSyslogFile7}
sed -i -e s/groupname/$group/g ${columnstoreSyslogFile7}
sed -i -e s/username/$user/g ${columnstoreSyslogFile7}
$SUDO rm -f /etc/rsyslog.d/49-columnstore.conf
$SUDO cp ${columnstoreSyslogFile7} ${syslog_conf}

View File

@@ -5477,6 +5477,35 @@ namespace oam
exceptionControl("autoMovePmDbroot", API_INVALID_PARAMETER);
}
//detach first to make sure DBS can be detach before trying to move to another pm
DBRootConfigList::iterator pt3 = residedbrootConfigList.begin();
for( ; pt3 != residedbrootConfigList.end() ; pt3++ )
{
int dbrootID = *pt3;
try
{
typedef std::vector<string> dbrootList;
dbrootList dbrootlist;
dbrootlist.push_back(itoa(dbrootID));
amazonDetach(dbrootlist);
}
catch (exception& )
{
writeLog("ERROR: amazonDetach failure", LOG_TYPE_ERROR );
//reattach
typedef std::vector<string> dbrootList;
dbrootList dbrootlist;
dbrootlist.push_back(itoa(dbrootID));
amazonAttach(residePM, dbrootlist);
exceptionControl("autoMovePmDbroot", API_DETACH_FAILURE);
}
}
//get dbroot id for other PMs
systemStorageInfo_t t;
DeviceDBRootList moduledbrootlist;
@@ -5951,9 +5980,8 @@ namespace oam
}
if (!found) {
writeLog("ERROR: no dbroots found in ../Calpont/local/moveDbrootTransactionLog", LOG_TYPE_ERROR );
cout << "ERROR: no dbroots found in " << fileName << endl;
exceptionControl("autoUnMovePmDbroot", API_FAILURE);
writeLog("No dbroots found in ../Calpont/local/moveDbrootTransactionLog", LOG_TYPE_DEBUG );
cout << "No dbroots found in " << fileName << endl;
}
oldFile.close();
@@ -7248,7 +7276,7 @@ namespace oam
else
return;
// check if mysql-Capont is installed
// check if mysql-Columnstore is installed
string mysqlscript = InstallDir + "/mysql/mysql-Columnstore";
if (access(mysqlscript.c_str(), X_OK) != 0)
return;
@@ -9644,6 +9672,146 @@ namespace oam
}
/***************************************************************************
*
* Function: amazonDetach
*
* Purpose: Amazon EC2 volume deattach needed
*
****************************************************************************/
void Oam::amazonDetach(dbrootList dbrootConfigList)
{
//if amazon cloud with external volumes, do the detach/attach moves
string cloud;
string DBRootStorageType;
try {
getSystemConfig("Cloud", cloud);
getSystemConfig("DBRootStorageType", DBRootStorageType);
}
catch(...) {}
if ( (cloud == "amazon-ec2" || cloud == "amazon-vpc") &&
DBRootStorageType == "external" )
{
writeLog("amazonDetach function started ", LOG_TYPE_DEBUG );
dbrootList::iterator pt3 = dbrootConfigList.begin();
for( ; pt3 != dbrootConfigList.end() ; pt3++)
{
string dbrootid = *pt3;
string volumeNameID = "PMVolumeName" + dbrootid;
string volumeName = oam::UnassignedName;
string deviceNameID = "PMVolumeDeviceName" + dbrootid;
string deviceName = oam::UnassignedName;
try {
getSystemConfig( volumeNameID, volumeName);
getSystemConfig( deviceNameID, deviceName);
}
catch(...)
{}
if ( volumeName == oam::UnassignedName || deviceName == oam::UnassignedName )
{
cout << " ERROR: amazonDetach, invalid configure " + volumeName + ":" + deviceName << endl;
writeLog("ERROR: amazonDetach, invalid configure " + volumeName + ":" + deviceName, LOG_TYPE_ERROR );
exceptionControl("amazonDetach", API_INVALID_PARAMETER);
}
//send msg to to-pm to umount volume
int returnStatus = sendMsgToProcMgr(UNMOUNT, dbrootid, FORCEFUL, ACK_YES);
if (returnStatus != API_SUCCESS) {
writeLog("ERROR: amazonDetach, umount failed on " + dbrootid, LOG_TYPE_ERROR );
}
if (!detachEC2Volume(volumeName)) {
cout << " ERROR: amazonDetach, detachEC2Volume failed on " + volumeName << endl;
writeLog("ERROR: amazonDetach, detachEC2Volume failed on " + volumeName , LOG_TYPE_ERROR );
exceptionControl("amazonDetach", API_FAILURE);
}
writeLog("amazonDetach, detachEC2Volume passed on " + volumeName , LOG_TYPE_DEBUG );
}
}
}
/***************************************************************************
*
* Function: amazonAttach
*
* Purpose: Amazon EC2 volume Attach needed
*
****************************************************************************/
void Oam::amazonAttach(std::string toPM, dbrootList dbrootConfigList)
{
//if amazon cloud with external volumes, do the detach/attach moves
string cloud;
string DBRootStorageType;
try {
getSystemConfig("Cloud", cloud);
getSystemConfig("DBRootStorageType", DBRootStorageType);
}
catch(...) {}
if ( (cloud == "amazon-ec2" || cloud == "amazon-vpc") &&
DBRootStorageType == "external" )
{
writeLog("amazonAttach function started ", LOG_TYPE_DEBUG );
//get Instance Name for to-pm
string toInstanceName = oam::UnassignedName;
try
{
ModuleConfig moduleconfig;
getSystemConfig(toPM, moduleconfig);
HostConfigList::iterator pt1 = moduleconfig.hostConfigList.begin();
toInstanceName = (*pt1).HostName;
}
catch(...)
{}
if ( toInstanceName == oam::UnassignedName || toInstanceName.empty() )
{
cout << " ERROR: amazonAttach, invalid Instance Name for " << toPM << endl;
writeLog("ERROR: amazonAttach, invalid Instance Name " + toPM, LOG_TYPE_ERROR );
exceptionControl("amazonAttach", API_INVALID_PARAMETER);
}
dbrootList::iterator pt3 = dbrootConfigList.begin();
for( ; pt3 != dbrootConfigList.end() ; pt3++)
{
string dbrootid = *pt3;
string volumeNameID = "PMVolumeName" + dbrootid;
string volumeName = oam::UnassignedName;
string deviceNameID = "PMVolumeDeviceName" + dbrootid;
string deviceName = oam::UnassignedName;
try {
getSystemConfig( volumeNameID, volumeName);
getSystemConfig( deviceNameID, deviceName);
}
catch(...)
{}
if ( volumeName == oam::UnassignedName || deviceName == oam::UnassignedName )
{
cout << " ERROR: amazonAttach, invalid configure " + volumeName + ":" + deviceName << endl;
writeLog("ERROR: amazonAttach, invalid configure " + volumeName + ":" + deviceName, LOG_TYPE_ERROR );
exceptionControl("amazonAttach", API_INVALID_PARAMETER);
}
if (!attachEC2Volume(volumeName, deviceName, toInstanceName)) {
cout << " ERROR: amazonAttach, attachEC2Volume failed on " + volumeName + ":" + deviceName + ":" + toInstanceName << endl;
writeLog("ERROR: amazonAttach, attachEC2Volume failed on " + volumeName + ":" + deviceName + ":" + toInstanceName, LOG_TYPE_ERROR );
exceptionControl("amazonAttach", API_FAILURE);
}
writeLog("amazonAttach, attachEC2Volume passed on " + volumeName + ":" + toPM, LOG_TYPE_DEBUG );
}
}
}
/***************************************************************************
*
* Function: amazonReattach
*
@@ -9736,6 +9904,7 @@ namespace oam
}
}
/***************************************************************************
*
* Function: mountDBRoot

View File

@@ -229,6 +229,7 @@ namespace oam
API_CONN_REFUSED,
API_CANCELLED,
API_STILL_WORKING,
API_DETACH_FAILURE,
API_MAX
};
@@ -2432,6 +2433,8 @@ namespace oam
void amazonReattach(std::string toPM, dbrootList dbrootConfigList, bool attach = false);
void mountDBRoot(dbrootList dbrootConfigList, bool mount = true);
void amazonDetach(dbrootList dbrootConfigList);
void amazonAttach(std::string toPM, dbrootList dbrootConfigList);
/**
*@brief gluster control

View File

@@ -1553,7 +1553,7 @@ void pingDeviceThread()
processManager.restartProcessType("WriteEngineServer", moduleName);
//set module to enable state
processManager.enableModule(moduleName, oam::AUTO_OFFLINE);
processManager.enableModule(moduleName, oam::AUTO_OFFLINE, true);
downActiveOAMModule = false;
int retry;
@@ -1647,7 +1647,7 @@ void pingDeviceThread()
}
else
//set module to enable state
processManager.enableModule(moduleName, oam::AUTO_OFFLINE);
processManager.enableModule(moduleName, oam::AUTO_OFFLINE, true);
//restart module processes
int retry = 0;
@@ -1922,7 +1922,7 @@ void pingDeviceThread()
if ( PrimaryUMModuleName == moduleName )
downPrimaryUM = true;
// if not disabled and amazon, skip
// if disabled, skip
if (opState != oam::AUTO_DISABLED )
{
//Log failure, issue alarm, set moduleOpState
@@ -1968,6 +1968,7 @@ void pingDeviceThread()
if ( ( moduleName.find("pm") == 0 && !amazon && ( DBRootStorageType != "internal") ) ||
( moduleName.find("pm") == 0 && amazon && downActiveOAMModule ) ||
( moduleName.find("pm") == 0 && amazon && AmazonPMFailover == "y") ) {
string error;
try {
log.writeLog(__LINE__, "Call autoMovePmDbroot", LOG_TYPE_DEBUG);
oam.autoMovePmDbroot(moduleName);
@@ -1984,6 +1985,23 @@ void pingDeviceThread()
{
log.writeLog(__LINE__, "EXCEPTION ERROR on autoMovePmDbroot: Caught unknown exception!", LOG_TYPE_ERROR);
}
if ( error == oam.itoa(oam::API_DETACH_FAILURE) )
{
processManager.setModuleState(moduleName, oam::AUTO_DISABLED);
// resume the dbrm
oam.dbrmctl("resume");
log.writeLog(__LINE__, "'dbrmctl resume' done", LOG_TYPE_DEBUG);
//enable query stats
dbrm.setSystemQueryReady(true);
//set query system state ready
processManager.setQuerySystemState(true);
break;
}
}
}

View File

@@ -3438,7 +3438,7 @@ void ProcessManager::recycleProcess(string module, bool enableModule)
restartProcessType("ExeMgr");
sleep(1);
restartProcessType("mysql");
restartProcessType("mysqld");
restartProcessType("WriteEngineServer");
sleep(1);
@@ -3457,7 +3457,7 @@ void ProcessManager::recycleProcess(string module, bool enableModule)
* purpose: Clear the Disable State on a specified module
*
******************************************************************************************/
int ProcessManager::enableModule(string target, int state)
int ProcessManager::enableModule(string target, int state, bool failover)
{
Oam oam;
ModuleConfig moduleconfig;
@@ -3496,7 +3496,8 @@ int ProcessManager::enableModule(string target, int state)
setStandbyModule(newStandbyModule);
//set recycle process
recycleProcess(target);
if (!failover)
recycleProcess(target);
log.writeLog(__LINE__, "enableModule request for " + target + " completed", LOG_TYPE_DEBUG);
@@ -4256,7 +4257,7 @@ int ProcessManager::restartProcessType( std::string processName, std::string ski
PMwithUM = "n";
}
// If mysql is the processName, then send to modules were ExeMgr is running
// If mysqld is the processName, then send to modules were ExeMgr is running
try
{
oam.getProcessStatus(systemprocessstatus);
@@ -4267,7 +4268,7 @@ int ProcessManager::restartProcessType( std::string processName, std::string ski
if ( systemprocessstatus.processstatus[i].Module == skipModule )
continue;
if ( processName == "mysql" ) {
if ( processName == "mysqld" ) {
if ( systemprocessstatus.processstatus[i].ProcessName == "ExeMgr") {
ProcessStatus procstat;
oam.getProcessStatus("mysqld", systemprocessstatus.processstatus[i].Module, procstat);
@@ -8985,7 +8986,7 @@ int ProcessManager::OAMParentModuleChange()
if (systemstatus.SystemOpState == ACTIVE) {
log.writeLog(__LINE__, "System Active, restart needed processes", LOG_TYPE_DEBUG);
processManager.restartProcessType("mysql");
processManager.restartProcessType("mysqld");
processManager.restartProcessType("ExeMgr");
processManager.restartProcessType("WriteEngineServer");
processManager.reinitProcessType("DBRMWorkerNode");
@@ -10099,7 +10100,7 @@ void ProcessManager::stopProcessTypes(bool manualFlag)
log.writeLog(__LINE__, "stopProcessTypes Called");
//front-end first
processManager.stopProcessType("mysql", manualFlag);
processManager.stopProcessType("mysqld", manualFlag);
processManager.stopProcessType("DMLProc", manualFlag);
processManager.stopProcessType("DDLProc", manualFlag);
processManager.stopProcessType("ExeMgr", manualFlag);

View File

@@ -307,7 +307,7 @@ public:
/**
*@brief Enable a specified module
*/
int enableModule(std::string target, int state);
int enableModule(std::string target, int state, bool failover = false);
/**
*@brief Enable a specified module

View File

@@ -695,11 +695,11 @@ int main(int argc, char **argv)
if ( ret != 0 )
log.writeLog(__LINE__, "pthread_create failed, return code = " + oam.itoa(ret), LOG_TYPE_ERROR);
//mysql status monitor thread
if ( ( config.ServerInstallType() != oam::INSTALL_COMBINE_DM_UM_PM ) ||
(PMwithUM == "y") )
//mysqld status monitor thread
if ( config.moduleType() == "um" ||
( config.moduleType() == "pm" && config.ServerInstallType() == oam::INSTALL_COMBINE_DM_UM_PM ) ||
( config.moduleType() == "pm" && PMwithUM == "y") )
{
pthread_t mysqlThread;
ret = pthread_create (&mysqlThread, NULL, (void*(*)(void*)) &mysqlMonitorThread, NULL);
if ( ret != 0 )
@@ -1127,7 +1127,7 @@ static void mysqlMonitorThread(MonitorConfig config)
catch(...)
{}
sleep(10);
sleep(5);
}
}

View File

@@ -457,7 +457,7 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO
log.writeLog(__LINE__, "MSG RECEIVED: Stop process request on " + processName);
int requestStatus = API_SUCCESS;
// check for mysql
// check for mysqld
if ( processName == "mysqld" ) {
try {
oam.actionMysqlCalpont(MYSQL_STOP);
@@ -520,7 +520,7 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO
msg >> manualFlag;
log.writeLog(__LINE__, "MSG RECEIVED: Start process request on: " + processName);
// check for mysql
// check for mysqld
if ( processName == "mysqld" ) {
try {
oam.actionMysqlCalpont(MYSQL_START);
@@ -640,7 +640,7 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO
log.writeLog(__LINE__, "MSG RECEIVED: Restart process request on " + processName);
int requestStatus = API_SUCCESS;
// check for mysql restart
// check for mysqld restart
if ( processName == "mysqld" ) {
try {
oam.actionMysqlCalpont(MYSQL_RESTART);
@@ -869,7 +869,7 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO
log.writeLog(__LINE__, "Error running DBRM clearShm", LOG_TYPE_ERROR);
}
//stop the mysql daemon
//stop the mysqld daemon
try {
oam.actionMysqlCalpont(MYSQL_STOP);
log.writeLog(__LINE__, "Stop MySQL Process", LOG_TYPE_DEBUG);
@@ -995,12 +995,12 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO
system(cmd.c_str());
//start the mysql daemon
//start the mysqld daemon
try {
oam.actionMysqlCalpont(MYSQL_START);
}
catch(...)
{ // mysql didn't start, return with error
{ // mysqld didn't start, return with error
log.writeLog(__LINE__, "STARTALL: MySQL failed to start, start-module failure", LOG_TYPE_CRITICAL);
ackMsg << (ByteStream::byte) ACK;
@@ -1265,7 +1265,7 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO
//send down notification
oam.sendDeviceNotification(config.moduleName(), MODULE_DOWN);
//stop the mysql daemon and then columnstore
//stop the mysqld daemon and then columnstore
try {
oam.actionMysqlCalpont(MYSQL_STOP);
}
@@ -1444,7 +1444,7 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO
}
}
// install mysql rpms if being reconfigured as a um
// install mysqld rpms if being reconfigured as a um
if ( reconfigureModuleName.find("um") != string::npos ) {
string cmd = startup::StartUp::installDir() + "/bin/post-mysqld-install >> /tmp/rpminstall";
system(cmd.c_str());

View File

@@ -54,15 +54,25 @@ namespace anyimpl
template<typename T>
struct big_any_policy : typed_base_any_policy<T>
{
virtual void static_delete(void** x) { if (*x)
delete(*reinterpret_cast<T**>(x)); *x = NULL; }
virtual void copy_from_value(void const* src, void** dest) {
*dest = new T(*reinterpret_cast<T const*>(src)); }
virtual void clone(void* const* src, void** dest) {
*dest = new T(**reinterpret_cast<T* const*>(src)); }
virtual void move(void* const* src, void** dest) {
virtual void static_delete(void** x)
{
if (*x)
delete(*reinterpret_cast<T**>(x));
*x = NULL;
}
virtual void copy_from_value(void const* src, void** dest)
{
*dest = new T(*reinterpret_cast<T const*>(src));
}
virtual void clone(void* const* src, void** dest)
{
*dest = new T(**reinterpret_cast<T* const*>(src));
}
virtual void move(void* const* src, void** dest)
{
(*reinterpret_cast<T**>(dest))->~T();
**reinterpret_cast<T**>(dest) = **reinterpret_cast<T* const*>(src); }
**reinterpret_cast<T**>(dest) = **reinterpret_cast<T* const*>(src);
}
virtual void* get_value(void** src) { return *src; }
};

View File

@@ -19,6 +19,7 @@
#include "configcpp.h"
#include "logger.h"
#include <fstream>
#include <iostream>
#include <boost/regex.hpp>
#ifdef _MSC_VER
#include "unistd.h"

View File

@@ -71,6 +71,9 @@ std::string Func_substring_index::getStrVal(rowgroup::Row& row,
if ( count > (int64_t) end )
return str;
if (( count < 0 ) && ((count * -1) > end))
return str;
string value = str;
if ( count > 0 ) {

View File

@@ -79,10 +79,10 @@ StringStore::~StringStore()
#endif
}
uint32_t StringStore::storeString(const uint8_t *data, uint32_t len)
uint64_t StringStore::storeString(const uint8_t *data, uint32_t len)
{
MemChunk *lastMC = NULL;
uint32_t ret = 0;
uint64_t ret = 0;
empty = false; // At least a NULL is being stored.
@@ -92,7 +92,7 @@ uint32_t StringStore::storeString(const uint8_t *data, uint32_t len)
if ((len == 8 || len == 9) &&
*((uint64_t *) data) == *((uint64_t *) joblist::CPNULLSTRMARK.c_str()))
return numeric_limits<uint32_t>::max();
return numeric_limits<uint64_t>::max();
//@bug6065, make StringStore::storeString() thread safe
boost::mutex::scoped_lock lk(fMutex, defer_lock);
@@ -102,20 +102,21 @@ uint32_t StringStore::storeString(const uint8_t *data, uint32_t len)
if (mem.size() > 0)
lastMC = (MemChunk *) mem.back().get();
if (len >= CHUNK_SIZE)
if ((len+4) >= CHUNK_SIZE)
{
shared_array<uint8_t> newOne(new uint8_t[len + sizeof(MemChunk)]);
shared_array<uint8_t> newOne(new uint8_t[len + sizeof(MemChunk) + 4]);
longStrings.push_back(newOne);
lastMC = (MemChunk*) longStrings.back().get();
lastMC->capacity = lastMC->currentSize = len;
memcpy(lastMC->data, data, len);
lastMC->capacity = lastMC->currentSize = len + 4;
memcpy(lastMC->data, &len, 4);
memcpy(lastMC->data + 4, data, len);
// High bit to mark a long string
ret = 0x80000000;
ret = 0x8000000000000000;
ret += longStrings.size() - 1;
}
else
{
if ((lastMC == NULL) || (lastMC->capacity - lastMC->currentSize < len))
if ((lastMC == NULL) || (lastMC->capacity - lastMC->currentSize < (len + 4)))
{
// mem usage debugging
//if (lastMC)
@@ -130,7 +131,11 @@ uint32_t StringStore::storeString(const uint8_t *data, uint32_t len)
ret = ((mem.size()-1) * CHUNK_SIZE) + lastMC->currentSize;
memcpy(&(lastMC->data[lastMC->currentSize]), data, len);
// If this ever happens then we have big problems
if (ret & 0x8000000000000000)
throw logic_error("StringStore memory exceeded.");
memcpy(&(lastMC->data[lastMC->currentSize]), &len, 4);
memcpy(&(lastMC->data[lastMC->currentSize]) + 4, data, len);
/*
cout << "stored: '" << hex;
for (uint32_t i = 0; i < len ; i++) {
@@ -138,7 +143,7 @@ uint32_t StringStore::storeString(const uint8_t *data, uint32_t len)
}
cout << "' at position " << lastMC->currentSize << " len " << len << dec << endl;
*/
lastMC->currentSize += len;
lastMC->currentSize += len + 4;
}
return ret;
@@ -146,31 +151,31 @@ uint32_t StringStore::storeString(const uint8_t *data, uint32_t len)
void StringStore::serialize(ByteStream &bs) const
{
uint32_t i;
uint64_t i;
MemChunk *mc;
bs << (uint32_t) mem.size();
bs << (uint64_t) mem.size();
bs << (uint8_t) empty;
for (i = 0; i < mem.size(); i++) {
mc = (MemChunk *) mem[i].get();
bs << (uint32_t) mc->currentSize;
bs << (uint64_t) mc->currentSize;
//cout << "serialized " << mc->currentSize << " bytes\n";
bs.append(mc->data, mc->currentSize);
}
bs << (uint32_t) longStrings.size();
bs << (uint64_t) longStrings.size();
for (i = 0; i < longStrings.size(); i++)
{
mc = (MemChunk *) longStrings[i].get();
bs << (uint32_t) mc->currentSize;
bs << (uint64_t) mc->currentSize;
bs.append(mc->data, mc->currentSize);
}
}
void StringStore::deserialize(ByteStream &bs)
{
uint32_t i;
uint32_t count;
uint32_t size;
uint64_t i;
uint64_t count;
uint64_t size;
uint8_t *buf;
MemChunk *mc;
uint8_t tmp8;
@@ -718,10 +723,9 @@ bool Row::isNullValue(uint32_t colIndex) const
case CalpontSystemCatalog::STRINT: {
uint32_t len = getColumnWidth(colIndex);
if (inStringTable(colIndex)) {
uint32_t offset, length;
offset = *((uint32_t *) &data[offsets[colIndex]]);
length = *((uint32_t *) &data[offsets[colIndex] + 4]);
return strings->isNullValue(offset, length);
uint64_t offset;
offset = *((uint64_t *) &data[offsets[colIndex]]);
return strings->isNullValue(offset);
}
if (data[offsets[colIndex]] == 0) // empty string
return true;
@@ -757,10 +761,9 @@ bool Row::isNullValue(uint32_t colIndex) const
case CalpontSystemCatalog::VARBINARY: {
uint32_t pos = offsets[colIndex];
if (inStringTable(colIndex)) {
uint32_t offset, length;
offset = *((uint32_t *) &data[pos]);
length = *((uint32_t *) &data[pos+4]);
return strings->isNullValue(offset, length);
uint64_t offset;
offset = *((uint64_t *) &data[pos]);
return strings->isNullValue(offset);
}
if (*((uint16_t*) &data[pos]) == 0)
return true;
@@ -1416,8 +1419,8 @@ RGData RowGroup::duplicate()
void Row::setStringField(const std::string &val, uint32_t colIndex)
{
uint32_t length;
uint32_t offset;
uint64_t offset;
uint64_t length;
//length = strlen(val.c_str()) + 1;
length = val.length();
@@ -1426,8 +1429,7 @@ void Row::setStringField(const std::string &val, uint32_t colIndex)
if (inStringTable(colIndex)) {
offset = strings->storeString((const uint8_t *) val.data(), length);
*((uint32_t *) &data[offsets[colIndex]]) = offset;
*((uint32_t *) &data[offsets[colIndex] + 4]) = length;
*((uint64_t *) &data[offsets[colIndex]]) = offset;
// cout << " -- stored offset " << *((uint32_t *) &data[offsets[colIndex]])
// << " length " << *((uint32_t *) &data[offsets[colIndex] + 4])
// << endl;

View File

@@ -92,13 +92,14 @@ public:
StringStore();
virtual ~StringStore();
inline std::string getString(uint32_t offset, uint32_t length) const;
uint32_t storeString(const uint8_t *data, uint32_t length); //returns the offset
inline const uint8_t * getPointer(uint32_t offset) const;
inline std::string getString(uint64_t offset) const;
uint64_t storeString(const uint8_t *data, uint32_t length); //returns the offset
inline const uint8_t * getPointer(uint64_t offset) const;
inline uint32_t getStringLength(uint64_t offset);
inline bool isEmpty() const;
inline uint64_t getSize() const;
inline bool isNullValue(uint32_t offset, uint32_t length) const;
inline bool equals(const std::string &str, uint32_t offset, uint32_t length) const;
inline bool isNullValue(uint64_t offset) const;
inline bool equals(const std::string &str, uint64_t offset) const;
void clear();
@@ -541,9 +542,8 @@ inline bool Row::equals(uint64_t val, uint32_t colIndex) const
inline bool Row::equals(const std::string &val, uint32_t colIndex) const
{
if (inStringTable(colIndex)) {
uint32_t offset = *((uint32_t *) &data[offsets[colIndex]]);
uint32_t length = *((uint32_t *) &data[offsets[colIndex] + 4]);
return strings->equals(val, offset, length);
uint64_t offset = *((uint64_t *) &data[offsets[colIndex]]);
return strings->equals(val, offset);
}
else
return (strncmp(val.c_str(), (char *) &data[offsets[colIndex]], getColumnWidth(colIndex)) == 0);
@@ -609,28 +609,27 @@ inline int64_t Row::getIntField(uint32_t colIndex) const
inline const uint8_t * Row::getStringPointer(uint32_t colIndex) const
{
if (inStringTable(colIndex))
return strings->getPointer(*((uint32_t *) &data[offsets[colIndex]]));
return strings->getPointer(*((uint64_t *) &data[offsets[colIndex]]));
return &data[offsets[colIndex]];
}
inline uint32_t Row::getStringLength(uint32_t colIndex) const
{
if (inStringTable(colIndex))
return *((uint32_t *) &data[offsets[colIndex] + 4]);
return strings->getStringLength(*((uint64_t *) &data[offsets[colIndex]]));
return strnlen((char *) &data[offsets[colIndex]], getColumnWidth(colIndex));
}
inline void Row::setStringField(const uint8_t *strdata, uint32_t length, uint32_t colIndex)
{
uint32_t offset;
uint64_t offset;
if (length > getColumnWidth(colIndex))
length = getColumnWidth(colIndex);
if (inStringTable(colIndex)) {
offset = strings->storeString(strdata, length);
*((uint32_t *) &data[offsets[colIndex]]) = offset;
*((uint32_t *) &data[offsets[colIndex] + 4]) = length;
*((uint64_t *) &data[offsets[colIndex]]) = offset;
// cout << " -- stored offset " << *((uint32_t *) &data[offsets[colIndex]])
// << " length " << *((uint32_t *) &data[offsets[colIndex] + 4])
// << endl;
@@ -645,8 +644,7 @@ inline void Row::setStringField(const uint8_t *strdata, uint32_t length, uint32_
inline std::string Row::getStringField(uint32_t colIndex) const
{
if (inStringTable(colIndex))
return strings->getString(*((uint32_t *) &data[offsets[colIndex]]),
*((uint32_t *) &data[offsets[colIndex] + 4]));
return strings->getString(*((uint64_t *) &data[offsets[colIndex]]));
// Not all CHAR/VARCHAR are NUL terminated so use length
return std::string((char *) &data[offsets[colIndex]],
strnlen((char *) &data[offsets[colIndex]], getColumnWidth(colIndex)));
@@ -662,21 +660,21 @@ inline std::string Row::getVarBinaryStringField(uint32_t colIndex) const
inline uint32_t Row::getVarBinaryLength(uint32_t colIndex) const
{
if (inStringTable(colIndex))
return *((uint32_t *) &data[offsets[colIndex] + 4]);
return strings->getStringLength(*((uint64_t *) &data[offsets[colIndex]]));;
return *((uint16_t*) &data[offsets[colIndex]]);
}
inline const uint8_t* Row::getVarBinaryField(uint32_t colIndex) const
{
if (inStringTable(colIndex))
return strings->getPointer(*((uint32_t *) &data[offsets[colIndex]]));
return strings->getPointer(*((uint64_t *) &data[offsets[colIndex]]));
return &data[offsets[colIndex] + 2];
}
inline const uint8_t* Row::getVarBinaryField(uint32_t& len, uint32_t colIndex) const
{
if (inStringTable(colIndex)) {
len = *((uint32_t *) &data[offsets[colIndex] + 4]);
len = strings->getStringLength(*((uint64_t *) &data[offsets[colIndex]]));
return getVarBinaryField(colIndex);
}
else {
@@ -854,9 +852,8 @@ inline void Row::setVarBinaryField(const uint8_t *val, uint32_t len, uint32_t co
if (len > getColumnWidth(colIndex))
len = getColumnWidth(colIndex);
if (inStringTable(colIndex)) {
uint32_t offset = strings->storeString(val, len);
*((uint32_t *) &data[offsets[colIndex]]) = offset;
*((uint32_t *) &data[offsets[colIndex] + 4]) = len;
uint64_t offset = strings->storeString(val, len);
*((uint64_t *) &data[offsets[colIndex]]) = offset;
}
else {
*((uint16_t*) &data[offsets[colIndex]]) = len;
@@ -1535,49 +1532,53 @@ inline void copyRow(const Row &in, Row *out)
copyRow(in, out, std::min(in.getColumnCount(), out->getColumnCount()));
}
inline std::string StringStore::getString(uint32_t off, uint32_t len) const
inline std::string StringStore::getString(uint64_t off) const
{
if (off == std::numeric_limits<uint32_t>::max())
uint32_t length;
if (off == std::numeric_limits<uint64_t>::max())
return joblist::CPNULLSTRMARK;
MemChunk *mc;
if (off & 0x80000000)
if (off & 0x8000000000000000)
{
off = off - 0x80000000;
off = off - 0x8000000000000000;
if (longStrings.size() <= off)
return joblist::CPNULLSTRMARK;
mc = (MemChunk*) longStrings[off].get();
return std::string((char *) mc->data, len);
memcpy(&length, mc->data, 4);
return std::string((char *) mc->data+4, length);
}
uint32_t chunk = off / CHUNK_SIZE;
uint32_t offset = off % CHUNK_SIZE;
uint64_t chunk = off / CHUNK_SIZE;
uint64_t offset = off % CHUNK_SIZE;
// this has to handle uninitialized data as well. If it's uninitialized it doesn't matter
// what gets returned, it just can't go out of bounds.
if (mem.size() <= chunk)
return joblist::CPNULLSTRMARK;
mc = (MemChunk *) mem[chunk].get();
if ((offset + len) > mc->currentSize)
memcpy(&length, &mc->data[offset], 4);
if ((offset + length) > mc->currentSize)
return joblist::CPNULLSTRMARK;
return std::string((char *) &(mc->data[offset]), len);
return std::string((char *) &(mc->data[offset])+4, length);
}
inline const uint8_t * StringStore::getPointer(uint32_t off) const
inline const uint8_t * StringStore::getPointer(uint64_t off) const
{
if (off == std::numeric_limits<uint32_t>::max())
if (off == std::numeric_limits<uint64_t>::max())
return (const uint8_t *) joblist::CPNULLSTRMARK.c_str();
uint32_t chunk = off / CHUNK_SIZE;
uint32_t offset = off % CHUNK_SIZE;
uint64_t chunk = off / CHUNK_SIZE;
uint64_t offset = off % CHUNK_SIZE;
MemChunk *mc;
if (off & 0x80000000)
if (off & 0x8000000000000000)
{
off = off - 0x80000000;
off = off - 0x8000000000000000;
if (longStrings.size() <= off)
return (const uint8_t *) joblist::CPNULLSTRMARK.c_str();
mc = (MemChunk*) longStrings[off].get();
return mc->data;
return mc->data+4;
}
// this has to handle uninitialized data as well. If it's uninitialized it doesn't matter
// what gets returned, it just can't go out of bounds.
@@ -1587,19 +1588,17 @@ inline const uint8_t * StringStore::getPointer(uint32_t off) const
if (offset > mc->currentSize)
return (const uint8_t *) joblist::CPNULLSTRMARK.c_str();
return &(mc->data[offset]);
return &(mc->data[offset]) + 4;
}
inline bool StringStore::isNullValue(uint32_t off, uint32_t len) const
inline bool StringStore::isNullValue(uint64_t off) const
{
if (off == std::numeric_limits<uint32_t>::max() || len == 0)
uint32_t length;
if (off == std::numeric_limits<uint64_t>::max())
return true;
if (len < 8)
return false;
// Long strings won't be NULL
if (off & 0x80000000)
if (off & 0x8000000000000000)
return false;
uint32_t chunk = off / CHUNK_SIZE;
@@ -1609,31 +1608,38 @@ inline bool StringStore::isNullValue(uint32_t off, uint32_t len) const
return true;
mc = (MemChunk *) mem[chunk].get();
if ((offset + len) > mc->currentSize)
memcpy(&length, &mc->data[offset], 4);
if (length == 0)
return true;
if (mc->data[offset] == 0) // "" = NULL string for some reason...
if (length < 8)
return false;
if ((offset + length) > mc->currentSize)
return true;
return (*((uint64_t *) &mc->data[offset]) == *((uint64_t *) joblist::CPNULLSTRMARK.c_str()));
if (mc->data[offset+4] == 0) // "" = NULL string for some reason...
return true;
return (*((uint64_t *) &mc->data[offset]+4) == *((uint64_t *) joblist::CPNULLSTRMARK.c_str()));
}
inline bool StringStore::equals(const std::string &str, uint32_t off, uint32_t len) const
inline bool StringStore::equals(const std::string &str, uint64_t off) const
{
if (off == std::numeric_limits<uint32_t>::max() || len == 0)
uint32_t length;
if (off == std::numeric_limits<uint64_t>::max())
return str == joblist::CPNULLSTRMARK;
MemChunk *mc;
if (off & 0x80000000)
if (off & 0x8000000000000000)
{
if (longStrings.size() <= (off - 0x80000000))
if (longStrings.size() <= (off - 0x8000000000000000))
return false;
mc = (MemChunk *) longStrings[off - 0x80000000].get();
mc = (MemChunk *) longStrings[off - 0x8000000000000000].get();
memcpy(&length, mc->data, 4);
// Not sure if this check it needed, but adds safety
if (len > mc->currentSize)
if (length > mc->currentSize)
return false;
return (strncmp(str.c_str(), (const char*) mc->data, len) == 0);
return (strncmp(str.c_str(), (const char*) mc->data+4, length) == 0);
}
uint32_t chunk = off / CHUNK_SIZE;
uint32_t offset = off % CHUNK_SIZE;
@@ -1641,10 +1647,37 @@ inline bool StringStore::equals(const std::string &str, uint32_t off, uint32_t l
return false;
mc = (MemChunk *) mem[chunk].get();
if ((offset + len) > mc->currentSize)
memcpy(&length, &mc->data[offset], 4);
if ((offset + length) > mc->currentSize)
return false;
return (strncmp(str.c_str(), (const char *) &mc->data[offset], len) == 0);
return (strncmp(str.c_str(), (const char *) &mc->data[offset]+4, length) == 0);
}
inline uint32_t StringStore::getStringLength(uint64_t off)
{
uint32_t length;
MemChunk *mc;
if (off == std::numeric_limits<uint64_t>::max())
return 0;
if (off & 0x8000000000000000)
{
off = off - 0x8000000000000000;
if (longStrings.size() <= off)
return 0;
mc = (MemChunk*) longStrings[off].get();
memcpy(&length, mc->data, 4);
}
else
{
uint64_t chunk = off / CHUNK_SIZE;
uint64_t offset = off % CHUNK_SIZE;
if (mem.size() <= chunk)
return 0;
mc = (MemChunk *) mem[chunk].get();
memcpy(&length, &mc->data[offset], 4);
}
return length;
}
inline bool StringStore::isEmpty() const

View File

@@ -21,6 +21,7 @@
*
***********************************************************************/
#include <stdexcept>
#include <iostream>
using namespace std;
#include "messageobj.h"

View File

@@ -77,6 +77,7 @@ class WECmdArgs
char getDelimChar() { return fColDelim; }
ImportDataMode getImportDataMode() const { return fImportDataMode; }
bool getConsoleLog() { return fConsoleLog; }
int getReadBufSize() { return fReadBufSize; }
bool isCpimportInvokeMode(){return (fBlockMode3)? false : fCpiInvoke;}
bool isQuiteMode() const { return fQuiteMode; }

View File

@@ -87,6 +87,15 @@ WEFileReadThread::WEFileReadThread(WESDHandler& aSdh):fSdh(aSdh),
{
//TODO batch qty to get from config
fBatchQty = 10000;
if (fSdh.getReadBufSize() < DEFAULTBUFFSIZE)
{
fBuffSize = DEFAULTBUFFSIZE;
}
else
{
fBuffSize = fSdh.getReadBufSize();
}
fBuff = new char [fBuffSize];
}
@@ -106,6 +115,7 @@ WEFileReadThread::~WEFileReadThread()
delete fpThread;
}
fpThread=0;
delete []fBuff;
//cout << "WEFileReadThread destructor called" << endl;
}
@@ -330,16 +340,16 @@ unsigned int WEFileReadThread::readDataFile(messageqcpp::SBS& Sbs)
if(fEnclEsc)
{
//pStart = aBuff;
aLen = getNextRow(fInFile, fBuff, sizeof(fBuff)-1);
aLen = getNextRow(fInFile, fBuff, fBuffSize-1);
}
else
{
fInFile.getline(fBuff, sizeof(fBuff)-1);
fInFile.getline(fBuff, fBuffSize-1);
aLen=fInFile.gcount();
}
////aLen chars incl \n, Therefore aLen-1; '<<' oper won't go past it
//cout << "Data Length " << aLen <<endl;
if((aLen < (sizeof(fBuff)-2)) && (aLen>0))
if((aLen < (fBuffSize-2)) && (aLen>0))
{
fBuff[aLen-1] = '\n';
fBuff[aLen]=0;
@@ -348,7 +358,7 @@ unsigned int WEFileReadThread::readDataFile(messageqcpp::SBS& Sbs)
aIdx++;
if(fSdh.getDebugLvl()>2) cout << "File data line = " << aIdx <<endl;
}
else if(aLen>=sizeof(fBuff)-2) //Didn't hit delim; BIG ROW
else if(aLen>=fBuffSize-2) //Didn't hit delim; BIG ROW
{
cout <<"Bad Row data " << endl;
cout << fBuff << endl;

View File

@@ -98,7 +98,7 @@ public:
void add2InputDataFileList(std::string& FileName);
private:
enum { MAXBUFFSIZE=1024*1024 };
enum { DEFAULTBUFFSIZE=1024*1024 };
// don't allow anyone else to set
void setTgtPmId(unsigned int fTgtPmId) { this->fTgtPmId = fTgtPmId; }
@@ -120,7 +120,8 @@ private:
char fEncl; // Encl char
char fEsc; // Esc char
char fDelim; // Column Delimit char
char fBuff[MAXBUFFSIZE]; // main data buffer
char* fBuff; // main data buffer
int fBuffSize;
};
} /* namespace WriteEngine */

View File

@@ -2301,6 +2301,13 @@ char WESDHandler::getEscChar()
//------------------------------------------------------------------------------
int WESDHandler::getReadBufSize()
{
return fRef.fCmdArgs.getReadBufSize();
}
//------------------------------------------------------------------------------
char WESDHandler::getDelimChar()
{
return fRef.fCmdArgs.getDelimChar();

View File

@@ -149,6 +149,7 @@ public:
char getEscChar();
char getDelimChar();
bool getConsoleLog();
int getReadBufSize();
ImportDataMode getImportDataMode() const;
void sysLog(const logging::Message::Args& msgArgs,
logging::LOG_TYPE logType, logging::Message::MessageID msgId);

View File

@@ -208,7 +208,7 @@ int ColumnOp::allocRowId(const TxnID& txnid, bool useStartingExtent,
//Find out where the rest rows go
BRM::LBID_t startLbid;
//need to put in a loop until newExtent is true
newExtent = dbRootExtentTrackers[0]->nextSegFile(dbRoot, partition, segment, newHwm, startLbid);
newExtent = dbRootExtentTrackers[column.colNo]->nextSegFile(dbRoot, partition, segment, newHwm, startLbid);
TableMetaData* tableMetaData= TableMetaData::makeTableMetaData(tableOid);
while (!newExtent)
{
@@ -223,7 +223,7 @@ int ColumnOp::allocRowId(const TxnID& txnid, bool useStartingExtent,
for (i=0; i < dbRootExtentTrackers.size(); i++)
{
if (i != 0)
if (i != column.colNo)
dbRootExtentTrackers[i]->nextSegFile(dbRoot, partition, segment, newHwm, startLbid);
// Round up HWM to the end of the current extent
@@ -278,7 +278,7 @@ int ColumnOp::allocRowId(const TxnID& txnid, bool useStartingExtent,
}
tableMetaData->setColExtsInfo(newColStructList[i].dataOid, aColExtsInfo);
}
newExtent = dbRootExtentTrackers[0]->nextSegFile(dbRoot, partition, segment, newHwm, startLbid);
newExtent = dbRootExtentTrackers[column.colNo]->nextSegFile(dbRoot, partition, segment, newHwm, startLbid);
}
}
@@ -297,7 +297,7 @@ int ColumnOp::allocRowId(const TxnID& txnid, bool useStartingExtent,
}
rc = BRMWrapper::getInstance()->allocateStripeColExtents(cols, dbRoot, partition, segment, extents);
newHwm = extents[0].startBlkOffset;
newHwm = extents[column.colNo].startBlkOffset;
if (rc != NO_ERROR)
return rc;

View File

@@ -1505,6 +1505,19 @@ int WriteEngineWrapper::insertColumnRecsBinary(const TxnID& txnid,
for (i = 0; i < colStructList.size(); i++)
Convertor::convertColType(&colStructList[i]);
// MCOL-984: find the smallest column width to calculate the RowID from so
// that all HWMs will be incremented by this operation
int32_t lowColLen = 8192;
int32_t colId = 0;
for (uint32_t colIt = 0; colIt < colStructList.size(); colIt++)
{
if (colStructList[colIt].colWidth < lowColLen)
{
colId = colIt;
lowColLen = colStructList[colId].colWidth;
}
}
// rc = checkValid(txnid, colStructList, colValueList, ridList);
// if (rc != NO_ERROR)
// return rc;
@@ -1531,8 +1544,8 @@ int WriteEngineWrapper::insertColumnRecsBinary(const TxnID& txnid,
//--------------------------------------------------------------------------
if (isFirstBatchPm)
{
currentDBrootIdx = dbRootExtentTrackers[0]->getCurrentDBRootIdx();
extentInfo = dbRootExtentTrackers[0]->getDBRootExtentList();
currentDBrootIdx = dbRootExtentTrackers[colId]->getCurrentDBRootIdx();
extentInfo = dbRootExtentTrackers[colId]->getDBRootExtentList();
dbRoot = extentInfo[currentDBrootIdx].fDbRoot;
partitionNum = extentInfo[currentDBrootIdx].fPartition;
@@ -1698,7 +1711,7 @@ int WriteEngineWrapper::insertColumnRecsBinary(const TxnID& txnid,
} // if (isFirstBatchPm)
else //get the extent info from tableMetaData
{
ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[0].dataOid);
ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[colId].dataOid);
ColExtsInfo::iterator it = aColExtsInfo.begin();
while (it != aColExtsInfo.end())
{
@@ -1730,20 +1743,7 @@ int WriteEngineWrapper::insertColumnRecsBinary(const TxnID& txnid,
//--------------------------------------------------------------------------
// allocate row id(s)
//--------------------------------------------------------------------------
// MCOL-984: find the smallest column width to calculate the RowID from so
// that all HWMs will be incremented by this operation
int32_t lowColLen = 8192;
int32_t colId = 0;
for (uint32_t colIt = 0; colIt < colStructList.size(); colIt++)
{
if (colStructList[colIt].colWidth < lowColLen)
{
colId = colIt;
lowColLen = colStructList[colId].colWidth;
curColStruct = colStructList[colId];
}
}
curColStruct = colStructList[colId];
colOp = m_colOp[op(curColStruct.fCompressionType)];
colOp->initColumn(curCol);
@@ -1765,7 +1765,7 @@ int WriteEngineWrapper::insertColumnRecsBinary(const TxnID& txnid,
if (it != aColExtsInfo.end())
{
hwm = it->hwm;
//cout << "Got from colextinfo hwm for oid " << colStructList[0].dataOid << " is " << hwm << " and seg is " << colStructList[0].fColSegment << endl;
//cout << "Got from colextinfo hwm for oid " << colStructList[colId].dataOid << " is " << hwm << " and seg is " << colStructList[colId].fColSegment << endl;
}
oldHwm = hwm; //Save this info for rollback
@@ -2008,7 +2008,6 @@ timer.stop("tokenize");
if (it != aColExtsInfo.end()) //update hwm info
{
oldHwm = it->hwm;
}
// save hwm for the old extent
colWidth = colStructList[i].colWidth;
@@ -2032,6 +2031,7 @@ timer.stop("tokenize");
else
return ERR_INVALID_PARAM;
}
//update hwm for the new extent
if (newExtent)
{
@@ -2043,6 +2043,7 @@ timer.stop("tokenize");
break;
it++;
}
colWidth = newColStructList[i].colWidth;
succFlag = colOp->calculateRowId(lastRidNew, BYTE_PER_BLOCK/colWidth, colWidth, curFbo, curBio);
if (succFlag)
{
@@ -2107,6 +2108,9 @@ timer.start("writeColumnRec");
curFbo));
}
}
else
return ERR_INVALID_PARAM;
}
}
// If we create a new extent for this batch
for (unsigned i = 0; i < newColStructList.size(); i++)
@@ -2123,7 +2127,8 @@ timer.start("writeColumnRec");
curFbo));
}
}
}
else
return ERR_INVALID_PARAM;
}
if (lbids.size() > 0)
@@ -4604,7 +4609,7 @@ int WriteEngineWrapper::writeColumnRecBinary(const TxnID& txnid,
bool versioning)
{
int rc = 0;
void* valArray;
void* valArray = NULL;
string segFile;
Column curCol;
ColStructList::size_type totalColumn;
@@ -4629,132 +4634,135 @@ StopWatch timer;
totalRow2 = 0;
}
valArray = malloc(sizeof(uint64_t) * totalRow1);
if (totalRow1 == 0)
// It is possible totalRow1 is zero but totalRow2 has values
if ((totalRow1 == 0) && (totalRow2 == 0))
return rc;
TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid);
for (i = 0; i < totalColumn; i++)
if (totalRow1)
{
//@Bug 2205 Check if all rows go to the new extent
//Write the first batch
RID * firstPart = rowIdArray;
ColumnOp* colOp = m_colOp[op(colStructList[i].fCompressionType)];
// set params
colOp->initColumn(curCol);
// need to pass real dbRoot, partition, and segment to setColParam
colOp->setColParam(curCol, 0, colStructList[i].colWidth,
colStructList[i].colDataType, colStructList[i].colType, colStructList[i].dataOid,
colStructList[i].fCompressionType, colStructList[i].fColDbRoot,
colStructList[i].fColPartition, colStructList[i].fColSegment);
ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(colStructList[i].dataOid);
ColExtsInfo::iterator it = aColExtsInfo.begin();
while (it != aColExtsInfo.end())
valArray = malloc(sizeof(uint64_t) * totalRow1);
for (i = 0; i < totalColumn; i++)
{
if ((it->dbRoot == colStructList[i].fColDbRoot) && (it->partNum == colStructList[i].fColPartition) && (it->segNum == colStructList[i].fColSegment))
break;
it++;
}
//@Bug 2205 Check if all rows go to the new extent
//Write the first batch
RID * firstPart = rowIdArray;
ColumnOp* colOp = m_colOp[op(colStructList[i].fCompressionType)];
if (it == aColExtsInfo.end()) //add this one to the list
{
ColExtInfo aExt;
aExt.dbRoot =colStructList[i].fColDbRoot;
aExt.partNum = colStructList[i].fColPartition;
aExt.segNum = colStructList[i].fColSegment;
aExt.compType = colStructList[i].fCompressionType;
aColExtsInfo.push_back(aExt);
aTbaleMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo);
}
// set params
colOp->initColumn(curCol);
// need to pass real dbRoot, partition, and segment to setColParam
colOp->setColParam(curCol, 0, colStructList[i].colWidth,
colStructList[i].colDataType, colStructList[i].colType, colStructList[i].dataOid,
colStructList[i].fCompressionType, colStructList[i].fColDbRoot,
colStructList[i].fColPartition, colStructList[i].fColSegment);
rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix, IO_BUFF_SIZE); // @bug 5572 HDFS tmp file
if (rc != NO_ERROR)
break;
// handling versioning
vector<LBIDRange> rangeList;
if (versioning)
{
rc = processVersionBuffer(curCol.dataFile.pFile, txnid, colStructList[i],
colStructList[i].colWidth, totalRow1, firstPart, rangeList);
if (rc != NO_ERROR) {
if (colStructList[i].fCompressionType == 0)
{
curCol.dataFile.pFile->flush();
}
BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
break;
}
}
//totalRow1 -= totalRow2;
// have to init the size here
// nullArray = (bool*) malloc(sizeof(bool) * totalRow);
uint8_t tmp8;
uint16_t tmp16;
uint32_t tmp32;
for (size_t j = 0; j < totalRow1; j++)
{
uint64_t curValue = colValueList[((totalRow1 + totalRow2)*i) + j];
switch (colStructList[i].colType)
ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(colStructList[i].dataOid);
ColExtsInfo::iterator it = aColExtsInfo.begin();
while (it != aColExtsInfo.end())
{
case WriteEngine::WR_VARBINARY : // treat same as char for now
case WriteEngine::WR_CHAR:
case WriteEngine::WR_BLOB:
case WriteEngine::WR_TEXT:
((uint64_t*)valArray)[j] = curValue;
break;
case WriteEngine::WR_INT:
case WriteEngine::WR_UINT:
case WriteEngine::WR_FLOAT:
tmp32 = curValue;
((uint32_t*)valArray)[j] = tmp32;
break;
case WriteEngine::WR_ULONGLONG:
case WriteEngine::WR_LONGLONG:
case WriteEngine::WR_DOUBLE:
case WriteEngine::WR_TOKEN:
((uint64_t*)valArray)[j] = curValue;
break;
case WriteEngine::WR_BYTE:
case WriteEngine::WR_UBYTE:
tmp8 = curValue;
((uint8_t*)valArray)[j] = tmp8;
break;
case WriteEngine::WR_SHORT:
case WriteEngine::WR_USHORT:
tmp16 = curValue;
((uint16_t*)valArray)[j] = tmp16;
if ((it->dbRoot == colStructList[i].fColDbRoot) && (it->partNum == colStructList[i].fColPartition) && (it->segNum == colStructList[i].fColSegment))
break;
it++;
}
if (it == aColExtsInfo.end()) //add this one to the list
{
ColExtInfo aExt;
aExt.dbRoot =colStructList[i].fColDbRoot;
aExt.partNum = colStructList[i].fColPartition;
aExt.segNum = colStructList[i].fColSegment;
aExt.compType = colStructList[i].fCompressionType;
aColExtsInfo.push_back(aExt);
aTbaleMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo);
}
rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix, IO_BUFF_SIZE); // @bug 5572 HDFS tmp file
if (rc != NO_ERROR)
break;
// handling versioning
vector<LBIDRange> rangeList;
if (versioning)
{
rc = processVersionBuffer(curCol.dataFile.pFile, txnid, colStructList[i],
colStructList[i].colWidth, totalRow1, firstPart, rangeList);
if (rc != NO_ERROR) {
if (colStructList[i].fCompressionType == 0)
{
curCol.dataFile.pFile->flush();
}
BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
break;
}
}
//totalRow1 -= totalRow2;
// have to init the size here
// nullArray = (bool*) malloc(sizeof(bool) * totalRow);
uint8_t tmp8;
uint16_t tmp16;
uint32_t tmp32;
for (size_t j = 0; j < totalRow1; j++)
{
uint64_t curValue = colValueList[((totalRow1 + totalRow2)*i) + j];
switch (colStructList[i].colType)
{
case WriteEngine::WR_VARBINARY : // treat same as char for now
case WriteEngine::WR_CHAR:
case WriteEngine::WR_BLOB:
case WriteEngine::WR_TEXT:
((uint64_t*)valArray)[j] = curValue;
break;
case WriteEngine::WR_INT:
case WriteEngine::WR_UINT:
case WriteEngine::WR_FLOAT:
tmp32 = curValue;
((uint32_t*)valArray)[j] = tmp32;
break;
case WriteEngine::WR_ULONGLONG:
case WriteEngine::WR_LONGLONG:
case WriteEngine::WR_DOUBLE:
case WriteEngine::WR_TOKEN:
((uint64_t*)valArray)[j] = curValue;
break;
case WriteEngine::WR_BYTE:
case WriteEngine::WR_UBYTE:
tmp8 = curValue;
((uint8_t*)valArray)[j] = tmp8;
break;
case WriteEngine::WR_SHORT:
case WriteEngine::WR_USHORT:
tmp16 = curValue;
((uint16_t*)valArray)[j] = tmp16;
break;
}
}
#ifdef PROFILE
timer.start("writeRow ");
#endif
rc = colOp->writeRow(curCol, totalRow1, firstPart, valArray);
#ifdef PROFILE
timer.stop("writeRow ");
#endif
colOp->closeColumnFile(curCol);
if (versioning)
BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
// check error
if (rc != NO_ERROR)
break;
} // end of for (i = 0
if (valArray != NULL)
{
free(valArray);
valArray = NULL;
}
#ifdef PROFILE
timer.start("writeRow ");
#endif
rc = colOp->writeRow(curCol, totalRow1, firstPart, valArray);
#ifdef PROFILE
timer.stop("writeRow ");
#endif
colOp->closeColumnFile(curCol);
if (versioning)
BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
// check error
if (rc != NO_ERROR)
break;
} // end of for (i = 0
if (valArray != NULL)
{
free(valArray);
valArray = NULL;
}
// MCOL-1176 - Write second extent