mirror of
https://github.com/MariaDB/server.git
synced 2025-11-19 19:03:26 +03:00
ndb-wl2325.patch
This commit is contained in:
@@ -29,6 +29,7 @@
|
||||
#include <signaldata/AlterTable.hpp>
|
||||
#include <signaldata/DropIndx.hpp>
|
||||
#include <signaldata/ListTables.hpp>
|
||||
#include <signaldata/WaitGCP.hpp>
|
||||
#include <SimpleProperties.hpp>
|
||||
#include <Bitmask.hpp>
|
||||
#include <AttributeList.hpp>
|
||||
@@ -42,6 +43,8 @@
|
||||
#define DEBUG_PRINT 0
|
||||
#define INCOMPATIBLE_VERSION -2
|
||||
|
||||
extern Uint64 g_latest_trans_gci;
|
||||
|
||||
//#define EVENT_DEBUG
|
||||
|
||||
/**
|
||||
@@ -393,6 +396,7 @@ NdbTableImpl::assign(const NdbTableImpl& org)
|
||||
{
|
||||
m_tableId = org.m_tableId;
|
||||
m_internalName.assign(org.m_internalName);
|
||||
updateMysqlName();
|
||||
m_externalName.assign(org.m_externalName);
|
||||
m_newExternalName.assign(org.m_newExternalName);
|
||||
m_frm.assign(org.m_frm.get_data(), org.m_frm.length());
|
||||
@@ -439,6 +443,17 @@ NdbTableImpl::getName() const
|
||||
return m_newExternalName.c_str();
|
||||
}
|
||||
|
||||
void
|
||||
NdbTableImpl::updateMysqlName()
|
||||
{
|
||||
Vector<BaseString> v;
|
||||
if (m_internalName.split(v,"/") == 3)
|
||||
{
|
||||
m_mysqlName.assfmt("%s/%s",v[0].c_str(),v[2].c_str());
|
||||
return;
|
||||
}
|
||||
m_mysqlName.assign("");
|
||||
}
|
||||
|
||||
void
|
||||
NdbTableImpl::buildColumnHash(){
|
||||
@@ -629,8 +644,6 @@ void NdbEventImpl::init()
|
||||
mi_type= 0;
|
||||
m_dur= NdbDictionary::Event::ED_UNDEFINED;
|
||||
m_tableImpl= NULL;
|
||||
m_bufferId= RNIL;
|
||||
eventOp= NULL;
|
||||
}
|
||||
|
||||
NdbEventImpl::~NdbEventImpl()
|
||||
@@ -641,12 +654,12 @@ NdbEventImpl::~NdbEventImpl()
|
||||
|
||||
void NdbEventImpl::setName(const char * name)
|
||||
{
|
||||
m_externalName.assign(name);
|
||||
m_name.assign(name);
|
||||
}
|
||||
|
||||
const char *NdbEventImpl::getName() const
|
||||
{
|
||||
return m_externalName.c_str();
|
||||
return m_name.c_str();
|
||||
}
|
||||
|
||||
void
|
||||
@@ -671,12 +684,7 @@ NdbEventImpl::getTableName() const
|
||||
void
|
||||
NdbEventImpl::addTableEvent(const NdbDictionary::Event::TableEvent t = NdbDictionary::Event::TE_ALL)
|
||||
{
|
||||
switch (t) {
|
||||
case NdbDictionary::Event::TE_INSERT : mi_type |= 1; break;
|
||||
case NdbDictionary::Event::TE_DELETE : mi_type |= 2; break;
|
||||
case NdbDictionary::Event::TE_UPDATE : mi_type |= 4; break;
|
||||
default: mi_type = 4 | 2 | 1; // all types
|
||||
}
|
||||
mi_type |= (unsigned)t;
|
||||
}
|
||||
|
||||
void
|
||||
@@ -939,12 +947,6 @@ NdbDictInterface::execSignal(void* dictImpl,
|
||||
case GSN_SUB_START_REF:
|
||||
tmp->execSUB_START_REF(signal, ptr);
|
||||
break;
|
||||
case GSN_SUB_TABLE_DATA:
|
||||
tmp->execSUB_TABLE_DATA(signal, ptr);
|
||||
break;
|
||||
case GSN_SUB_GCP_COMPLETE_REP:
|
||||
tmp->execSUB_GCP_COMPLETE_REP(signal, ptr);
|
||||
break;
|
||||
case GSN_SUB_STOP_CONF:
|
||||
tmp->execSUB_STOP_CONF(signal, ptr);
|
||||
break;
|
||||
@@ -960,6 +962,12 @@ NdbDictInterface::execSignal(void* dictImpl,
|
||||
case GSN_LIST_TABLES_CONF:
|
||||
tmp->execLIST_TABLES_CONF(signal, ptr);
|
||||
break;
|
||||
case GSN_WAIT_GCP_CONF:
|
||||
tmp->execWAIT_GCP_CONF(signal, ptr);
|
||||
break;
|
||||
case GSN_WAIT_GCP_REF:
|
||||
tmp->execWAIT_GCP_REF(signal, ptr);
|
||||
break;
|
||||
default:
|
||||
abort();
|
||||
}
|
||||
@@ -1062,7 +1070,13 @@ NdbDictInterface::dictSignal(NdbApiSignal* signal,
|
||||
DBUG_RETURN(-1);
|
||||
}
|
||||
|
||||
if ( (temporaryMask & m_error.code) != 0 ) {
|
||||
if ( temporaryMask == -1)
|
||||
{
|
||||
const NdbError &error= getNdbError();
|
||||
if (error.status == NdbError::TemporaryError)
|
||||
continue;
|
||||
}
|
||||
else if ( (temporaryMask & m_error.code) != 0 ) {
|
||||
continue;
|
||||
}
|
||||
if (errcodes) {
|
||||
@@ -1320,6 +1334,7 @@ NdbDictInterface::parseTableInfo(NdbTableImpl ** ret,
|
||||
impl->m_version = tableDesc.TableVersion;
|
||||
impl->m_status = NdbDictionary::Object::Retrieved;
|
||||
impl->m_internalName.assign(internalName);
|
||||
impl->updateMysqlName();
|
||||
impl->m_externalName.assign(externalName);
|
||||
|
||||
impl->m_frm.assign(tableDesc.FrmData, tableDesc.FrmLen);
|
||||
@@ -1629,6 +1644,7 @@ NdbDictInterface::createOrAlterTable(Ndb & ndb,
|
||||
const BaseString internalName(
|
||||
ndb.internalize_table_name(impl.m_externalName.c_str()));
|
||||
impl.m_internalName.assign(internalName);
|
||||
impl.updateMysqlName();
|
||||
UtilBufferWriter w(m_buffer);
|
||||
DictTabInfo::Table tmpTab;
|
||||
tmpTab.init();
|
||||
@@ -2444,26 +2460,26 @@ NdbDictInterface::execDROP_INDX_REF(NdbApiSignal * signal,
|
||||
int
|
||||
NdbDictionaryImpl::createEvent(NdbEventImpl & evnt)
|
||||
{
|
||||
DBUG_ENTER("NdbDictionaryImpl::createEvent");
|
||||
int i;
|
||||
NdbTableImpl* tab = getTable(evnt.getTableName());
|
||||
|
||||
if(tab == 0){
|
||||
#ifdef EVENT_DEBUG
|
||||
ndbout_c("NdbDictionaryImpl::createEvent: table not found: %s",
|
||||
evnt.getTableName());
|
||||
#endif
|
||||
return -1;
|
||||
NdbTableImpl* tab= evnt.m_tableImpl;
|
||||
if (tab == 0)
|
||||
{
|
||||
tab= getTable(evnt.getTableName());
|
||||
if(tab == 0){
|
||||
DBUG_PRINT("info",("NdbDictionaryImpl::createEvent: table not found: %s",
|
||||
evnt.getTableName()));
|
||||
DBUG_RETURN(-1);
|
||||
}
|
||||
}
|
||||
|
||||
DBUG_PRINT("info",("Table: id: %d version: %d", tab->m_tableId, tab->m_version));
|
||||
|
||||
evnt.m_tableId = tab->m_tableId;
|
||||
evnt.m_tableVersion = tab->m_version;
|
||||
evnt.m_tableImpl = tab;
|
||||
#ifdef EVENT_DEBUG
|
||||
ndbout_c("Event on tableId=%d", evnt.m_tableId);
|
||||
#endif
|
||||
|
||||
NdbTableImpl &table = *evnt.m_tableImpl;
|
||||
|
||||
|
||||
int attributeList_sz = evnt.m_attrIds.size();
|
||||
|
||||
for (i = 0; i < attributeList_sz; i++) {
|
||||
@@ -2474,17 +2490,19 @@ NdbDictionaryImpl::createEvent(NdbEventImpl & evnt)
|
||||
ndbout_c("Attr id %u in table %s not found", evnt.m_attrIds[i],
|
||||
evnt.getTableName());
|
||||
m_error.code= 4713;
|
||||
return -1;
|
||||
DBUG_RETURN(-1);
|
||||
}
|
||||
}
|
||||
|
||||
evnt.m_attrIds.clear();
|
||||
|
||||
attributeList_sz = evnt.m_columns.size();
|
||||
#ifdef EVENT_DEBUG
|
||||
ndbout_c("creating event %s", evnt.m_externalName.c_str());
|
||||
ndbout_c("no of columns %d", evnt.m_columns.size());
|
||||
#endif
|
||||
|
||||
DBUG_PRINT("info",("Event on tableId=%d, tableVersion=%d, event name %s, no of columns %d",
|
||||
evnt.m_tableId, evnt.m_tableVersion,
|
||||
evnt.m_name.c_str(),
|
||||
evnt.m_columns.size()));
|
||||
|
||||
int pk_count = 0;
|
||||
evnt.m_attrListBitmask.clear();
|
||||
|
||||
@@ -2493,7 +2511,7 @@ NdbDictionaryImpl::createEvent(NdbEventImpl & evnt)
|
||||
table.getColumn(evnt.m_columns[i]->m_name.c_str());
|
||||
if(col == 0){
|
||||
m_error.code= 4247;
|
||||
return -1;
|
||||
DBUG_RETURN(-1);
|
||||
}
|
||||
// Copy column definition
|
||||
*evnt.m_columns[i] = *col;
|
||||
@@ -2519,7 +2537,7 @@ NdbDictionaryImpl::createEvent(NdbEventImpl & evnt)
|
||||
for(i = 1; i<attributeList_sz; i++) {
|
||||
if (evnt.m_columns[i-1]->m_attrId == evnt.m_columns[i]->m_attrId) {
|
||||
m_error.code= 4258;
|
||||
return -1;
|
||||
DBUG_RETURN(-1);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2530,7 +2548,7 @@ NdbDictionaryImpl::createEvent(NdbEventImpl & evnt)
|
||||
#endif
|
||||
|
||||
// NdbDictInterface m_receiver;
|
||||
return m_receiver.createEvent(m_ndb, evnt, 0 /* getFlag unset */);
|
||||
DBUG_RETURN(m_receiver.createEvent(m_ndb, evnt, 0 /* getFlag unset */));
|
||||
}
|
||||
|
||||
int
|
||||
@@ -2538,6 +2556,9 @@ NdbDictInterface::createEvent(class Ndb & ndb,
|
||||
NdbEventImpl & evnt,
|
||||
int getFlag)
|
||||
{
|
||||
DBUG_ENTER("NdbDictInterface::createEvent");
|
||||
DBUG_PRINT("enter",("getFlag=%d",getFlag));
|
||||
|
||||
NdbApiSignal tSignal(m_reference);
|
||||
tSignal.theReceiversBlockNumber = DBDICT;
|
||||
tSignal.theVerId_signalNumber = GSN_CREATE_EVNT_REQ;
|
||||
@@ -2555,22 +2576,25 @@ NdbDictInterface::createEvent(class Ndb & ndb,
|
||||
// getting event from Dictionary
|
||||
req->setRequestType(CreateEvntReq::RT_USER_GET);
|
||||
} else {
|
||||
DBUG_PRINT("info",("tableId: %u tableVersion: %u",
|
||||
evnt.m_tableId, evnt.m_tableVersion));
|
||||
// creating event in Dictionary
|
||||
req->setRequestType(CreateEvntReq::RT_USER_CREATE);
|
||||
req->setTableId(evnt.m_tableId);
|
||||
req->setTableVersion(evnt.m_tableVersion);
|
||||
req->setAttrListBitmask(evnt.m_attrListBitmask);
|
||||
req->setEventType(evnt.mi_type);
|
||||
}
|
||||
|
||||
UtilBufferWriter w(m_buffer);
|
||||
|
||||
const size_t len = strlen(evnt.m_externalName.c_str()) + 1;
|
||||
const size_t len = strlen(evnt.m_name.c_str()) + 1;
|
||||
if(len > MAX_TAB_NAME_SIZE) {
|
||||
m_error.code= 4241;
|
||||
return -1;
|
||||
DBUG_RETURN(-1);
|
||||
}
|
||||
|
||||
w.add(SimpleProperties::StringValue, evnt.m_externalName.c_str());
|
||||
w.add(SimpleProperties::StringValue, evnt.m_name.c_str());
|
||||
|
||||
if (getFlag == 0)
|
||||
{
|
||||
@@ -2587,7 +2611,7 @@ NdbDictInterface::createEvent(class Ndb & ndb,
|
||||
int ret = createEvent(&tSignal, ptr, 1);
|
||||
|
||||
if (ret) {
|
||||
return ret;
|
||||
DBUG_RETURN(ret);
|
||||
}
|
||||
|
||||
char *dataPtr = (char *)m_buffer.get_data();
|
||||
@@ -2600,89 +2624,86 @@ NdbDictInterface::createEvent(class Ndb & ndb,
|
||||
|
||||
if (getFlag) {
|
||||
evnt.m_tableId = evntConf->getTableId();
|
||||
evnt.m_tableVersion = evntConf->getTableVersion();
|
||||
evnt.m_attrListBitmask = evntConf->getAttrListBitmask();
|
||||
evnt.mi_type = evntConf->getEventType();
|
||||
evnt.setTable(dataPtr);
|
||||
} else {
|
||||
if (evnt.m_tableId != evntConf->getTableId() ||
|
||||
evnt.m_tableVersion != evntConf->getTableVersion() ||
|
||||
//evnt.m_attrListBitmask != evntConf->getAttrListBitmask() ||
|
||||
evnt.mi_type != evntConf->getEventType()) {
|
||||
ndbout_c("ERROR*************");
|
||||
return 1;
|
||||
DBUG_RETURN(1);
|
||||
}
|
||||
}
|
||||
|
||||
evnt.m_eventId = evntConf->getEventId();
|
||||
evnt.m_eventKey = evntConf->getEventKey();
|
||||
|
||||
return ret;
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
int
|
||||
NdbDictInterface::createEvent(NdbApiSignal* signal,
|
||||
LinearSectionPtr ptr[3], int noLSP)
|
||||
{
|
||||
const int noErrCodes = 1;
|
||||
int errCodes[noErrCodes] = {CreateEvntRef::Busy};
|
||||
return dictSignal(signal,ptr,noLSP,
|
||||
1 /*use masternode id*/,
|
||||
100,
|
||||
WAIT_CREATE_INDX_REQ /*WAIT_CREATE_EVNT_REQ*/,
|
||||
-1,
|
||||
errCodes,noErrCodes, CreateEvntRef::Temporary);
|
||||
NULL,0, -1);
|
||||
}
|
||||
|
||||
int
|
||||
NdbDictionaryImpl::executeSubscribeEvent(NdbEventImpl & ev)
|
||||
NdbDictionaryImpl::executeSubscribeEvent(NdbEventOperationImpl & ev_op)
|
||||
{
|
||||
// NdbDictInterface m_receiver;
|
||||
return m_receiver.executeSubscribeEvent(m_ndb, ev);
|
||||
return m_receiver.executeSubscribeEvent(m_ndb, ev_op);
|
||||
}
|
||||
|
||||
int
|
||||
NdbDictInterface::executeSubscribeEvent(class Ndb & ndb,
|
||||
NdbEventImpl & evnt)
|
||||
NdbEventOperationImpl & ev_op)
|
||||
{
|
||||
DBUG_ENTER("NdbDictInterface::executeSubscribeEvent");
|
||||
NdbApiSignal tSignal(m_reference);
|
||||
// tSignal.theReceiversBlockNumber = SUMA;
|
||||
tSignal.theReceiversBlockNumber = DBDICT;
|
||||
tSignal.theVerId_signalNumber = GSN_SUB_START_REQ;
|
||||
tSignal.theLength = SubStartReq::SignalLength2;
|
||||
|
||||
SubStartReq * sumaStart = CAST_PTR(SubStartReq, tSignal.getDataPtrSend());
|
||||
SubStartReq * req = CAST_PTR(SubStartReq, tSignal.getDataPtrSend());
|
||||
|
||||
sumaStart->subscriptionId = evnt.m_eventId;
|
||||
sumaStart->subscriptionKey = evnt.m_eventKey;
|
||||
sumaStart->part = SubscriptionData::TableData;
|
||||
sumaStart->subscriberData = evnt.m_bufferId & 0xFF;
|
||||
sumaStart->subscriberRef = m_reference;
|
||||
req->subscriptionId = ev_op.m_eventImpl->m_eventId;
|
||||
req->subscriptionKey = ev_op.m_eventImpl->m_eventKey;
|
||||
req->part = SubscriptionData::TableData;
|
||||
req->subscriberData = ev_op.m_oid;
|
||||
req->subscriberRef = m_reference;
|
||||
|
||||
DBUG_RETURN(executeSubscribeEvent(&tSignal, NULL));
|
||||
DBUG_PRINT("info",("GSN_SUB_START_REQ subscriptionId=%d,subscriptionKey=%d,"
|
||||
"subscriberData=%d",req->subscriptionId,
|
||||
req->subscriptionKey,req->subscriberData));
|
||||
|
||||
int errCodes[] = { SubStartRef::Busy };
|
||||
DBUG_RETURN(dictSignal(&tSignal,NULL,0,
|
||||
1 /*use masternode id*/,
|
||||
100,
|
||||
WAIT_CREATE_INDX_REQ /*WAIT_CREATE_EVNT_REQ*/,
|
||||
-1,
|
||||
errCodes, sizeof(errCodes)/sizeof(errCodes[0])));
|
||||
}
|
||||
|
||||
int
|
||||
NdbDictInterface::executeSubscribeEvent(NdbApiSignal* signal,
|
||||
LinearSectionPtr ptr[3])
|
||||
{
|
||||
return dictSignal(signal,NULL,0,
|
||||
1 /*use masternode id*/,
|
||||
100,
|
||||
WAIT_CREATE_INDX_REQ /*WAIT_CREATE_EVNT_REQ*/,
|
||||
-1,
|
||||
NULL,0);
|
||||
}
|
||||
|
||||
int
|
||||
NdbDictionaryImpl::stopSubscribeEvent(NdbEventImpl & ev)
|
||||
NdbDictionaryImpl::stopSubscribeEvent(NdbEventOperationImpl & ev_op)
|
||||
{
|
||||
// NdbDictInterface m_receiver;
|
||||
return m_receiver.stopSubscribeEvent(m_ndb, ev);
|
||||
return m_receiver.stopSubscribeEvent(m_ndb, ev_op);
|
||||
}
|
||||
|
||||
int
|
||||
NdbDictInterface::stopSubscribeEvent(class Ndb & ndb,
|
||||
NdbEventImpl & evnt)
|
||||
NdbEventOperationImpl & ev_op)
|
||||
{
|
||||
DBUG_ENTER("NdbDictInterface::stopSubscribeEvent");
|
||||
|
||||
@@ -2692,36 +2713,36 @@ NdbDictInterface::stopSubscribeEvent(class Ndb & ndb,
|
||||
tSignal.theVerId_signalNumber = GSN_SUB_STOP_REQ;
|
||||
tSignal.theLength = SubStopReq::SignalLength;
|
||||
|
||||
SubStopReq * sumaStop = CAST_PTR(SubStopReq, tSignal.getDataPtrSend());
|
||||
SubStopReq * req = CAST_PTR(SubStopReq, tSignal.getDataPtrSend());
|
||||
|
||||
sumaStop->subscriptionId = evnt.m_eventId;
|
||||
sumaStop->subscriptionKey = evnt.m_eventKey;
|
||||
sumaStop->subscriberData = evnt.m_bufferId & 0xFF;
|
||||
sumaStop->part = (Uint32) SubscriptionData::TableData;
|
||||
sumaStop->subscriberRef = m_reference;
|
||||
req->subscriptionId = ev_op.m_eventImpl->m_eventId;
|
||||
req->subscriptionKey = ev_op.m_eventImpl->m_eventKey;
|
||||
req->subscriberData = ev_op.m_oid;
|
||||
req->part = (Uint32) SubscriptionData::TableData;
|
||||
req->subscriberRef = m_reference;
|
||||
|
||||
DBUG_RETURN(stopSubscribeEvent(&tSignal, NULL));
|
||||
}
|
||||
DBUG_PRINT("info",("GSN_SUB_STOP_REQ subscriptionId=%d,subscriptionKey=%d,"
|
||||
"subscriberData=%d",req->subscriptionId,
|
||||
req->subscriptionKey,req->subscriberData));
|
||||
|
||||
int
|
||||
NdbDictInterface::stopSubscribeEvent(NdbApiSignal* signal,
|
||||
LinearSectionPtr ptr[3])
|
||||
{
|
||||
return dictSignal(signal,NULL,0,
|
||||
1 /*use masternode id*/,
|
||||
100,
|
||||
WAIT_CREATE_INDX_REQ /*WAIT_SUB_STOP__REQ*/,
|
||||
-1,
|
||||
NULL,0);
|
||||
int errCodes[] = { SubStopRef::Busy };
|
||||
DBUG_RETURN(dictSignal(&tSignal,NULL,0,
|
||||
1 /*use masternode id*/,
|
||||
100,
|
||||
WAIT_CREATE_INDX_REQ /*WAIT_SUB_STOP__REQ*/,
|
||||
-1,
|
||||
errCodes, sizeof(errCodes)/sizeof(errCodes[0])));
|
||||
}
|
||||
|
||||
NdbEventImpl *
|
||||
NdbDictionaryImpl::getEvent(const char * eventName)
|
||||
{
|
||||
NdbEventImpl *ev = new NdbEventImpl();
|
||||
DBUG_ENTER("NdbDictionaryImpl::getEvent");
|
||||
DBUG_PRINT("enter",("eventName= %s", eventName));
|
||||
|
||||
NdbEventImpl *ev = new NdbEventImpl();
|
||||
if (ev == NULL) {
|
||||
return NULL;
|
||||
DBUG_RETURN(NULL);
|
||||
}
|
||||
|
||||
ev->setName(eventName);
|
||||
@@ -2730,48 +2751,83 @@ NdbDictionaryImpl::getEvent(const char * eventName)
|
||||
|
||||
if (ret) {
|
||||
delete ev;
|
||||
return NULL;
|
||||
DBUG_RETURN(NULL);
|
||||
}
|
||||
|
||||
// We only have the table name with internal name
|
||||
DBUG_PRINT("info",("table %s", ev->getTableName()));
|
||||
Ndb_local_table_info *info;
|
||||
int retry= 0;
|
||||
while (1)
|
||||
{
|
||||
info= get_local_table_info(ev->getTableName(), true);
|
||||
if (info == 0)
|
||||
{
|
||||
DBUG_PRINT("error",("unable to find table %s", ev->getTableName()));
|
||||
delete ev;
|
||||
DBUG_RETURN(NULL);
|
||||
}
|
||||
|
||||
if (ev->m_tableId == info->m_table_impl->m_tableId &&
|
||||
ev->m_tableVersion == info->m_table_impl->m_version)
|
||||
break;
|
||||
if (retry)
|
||||
{
|
||||
m_error.code= 241;
|
||||
DBUG_PRINT("error",("%s: table version mismatch, event: [%u,%u] table: [%u,%u]",
|
||||
ev->getTableName(), ev->m_tableId, ev->m_tableVersion,
|
||||
info->m_table_impl->m_tableId, info->m_table_impl->m_version));
|
||||
delete ev;
|
||||
DBUG_RETURN(NULL);
|
||||
}
|
||||
invalidateObject(*info->m_table_impl);
|
||||
retry++;
|
||||
}
|
||||
|
||||
ev->m_tableImpl= info->m_table_impl;
|
||||
ev->setTable(m_ndb.externalizeTableName(ev->getTableName()));
|
||||
ev->m_tableImpl = getTable(ev->getTableName());
|
||||
|
||||
// get the columns from the attrListBitmask
|
||||
|
||||
NdbTableImpl &table = *ev->m_tableImpl;
|
||||
AttributeMask & mask = ev->m_attrListBitmask;
|
||||
int attributeList_sz = mask.count();
|
||||
int id = -1;
|
||||
unsigned attributeList_sz = mask.count();
|
||||
|
||||
#ifdef EVENT_DEBUG
|
||||
ndbout_c("NdbDictionaryImpl::getEvent attributeList_sz = %d",
|
||||
attributeList_sz);
|
||||
DBUG_PRINT("info",("Table: id: %d version: %d", table.m_tableId, table.m_version));
|
||||
|
||||
#ifndef DBUG_OFF
|
||||
char buf[128] = {0};
|
||||
mask.getText(buf);
|
||||
ndbout_c("mask = %s", buf);
|
||||
DBUG_PRINT("info",("attributeList_sz= %d, mask= %s", attributeList_sz, buf));
|
||||
#endif
|
||||
|
||||
for(int i = 0; i < attributeList_sz; i++) {
|
||||
id++; while (!mask.get(id)) id++;
|
||||
|
||||
if ( attributeList_sz > table.getNoOfColumns() )
|
||||
{
|
||||
DBUG_PRINT("error",("Invalid version, too many columns"));
|
||||
delete ev;
|
||||
DBUG_RETURN(NULL);
|
||||
}
|
||||
|
||||
assert( (int)attributeList_sz <= table.getNoOfColumns() );
|
||||
for(unsigned id= 0; ev->m_columns.size() < attributeList_sz; id++) {
|
||||
if ( id >= table.getNoOfColumns())
|
||||
{
|
||||
DBUG_PRINT("error",("Invalid version, column %d out of range", id));
|
||||
delete ev;
|
||||
DBUG_RETURN(NULL);
|
||||
}
|
||||
if (!mask.get(id))
|
||||
continue;
|
||||
|
||||
const NdbColumnImpl* col = table.getColumn(id);
|
||||
if(col == 0) {
|
||||
#ifdef EVENT_DEBUG
|
||||
ndbout_c("NdbDictionaryImpl::getEvent could not find column id %d", id);
|
||||
#endif
|
||||
m_error.code= 4247;
|
||||
delete ev;
|
||||
return NULL;
|
||||
}
|
||||
DBUG_PRINT("info",("column %d %s", id, col->getName()));
|
||||
NdbColumnImpl* new_col = new NdbColumnImpl;
|
||||
// Copy column definition
|
||||
*new_col = *col;
|
||||
|
||||
ev->m_columns.push_back(new_col);
|
||||
}
|
||||
|
||||
return ev;
|
||||
DBUG_RETURN(ev);
|
||||
}
|
||||
|
||||
void
|
||||
@@ -2795,7 +2851,8 @@ NdbDictInterface::execCREATE_EVNT_CONF(NdbApiSignal * signal,
|
||||
Uint32 subscriptionId = createEvntConf->getEventId();
|
||||
Uint32 subscriptionKey = createEvntConf->getEventKey();
|
||||
|
||||
DBUG_PRINT("info",("subscriptionId=%d,subscriptionKey=%d",
|
||||
DBUG_PRINT("info",("nodeid=%d,subscriptionId=%d,subscriptionKey=%d",
|
||||
refToNode(signal->theSendersBlockRef),
|
||||
subscriptionId,subscriptionKey));
|
||||
m_waiter.signal(NO_WAIT);
|
||||
DBUG_VOID_RETURN;
|
||||
@@ -2900,74 +2957,6 @@ NdbDictInterface::execSUB_START_REF(NdbApiSignal * signal,
|
||||
m_waiter.signal(NO_WAIT);
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
void
|
||||
NdbDictInterface::execSUB_GCP_COMPLETE_REP(NdbApiSignal * signal,
|
||||
LinearSectionPtr ptr[3])
|
||||
{
|
||||
const SubGcpCompleteRep * const rep=
|
||||
CAST_CONSTPTR(SubGcpCompleteRep, signal->getDataPtr());
|
||||
|
||||
const Uint32 gci = rep->gci;
|
||||
// const Uint32 senderRef = rep->senderRef;
|
||||
const Uint32 subscriberData = rep->subscriberData;
|
||||
|
||||
const Uint32 bufferId = subscriberData;
|
||||
|
||||
const Uint32 ref = signal->theSendersBlockRef;
|
||||
|
||||
NdbApiSignal tSignal(m_reference);
|
||||
SubGcpCompleteAcc * acc=
|
||||
CAST_PTR(SubGcpCompleteAcc, tSignal.getDataPtrSend());
|
||||
|
||||
acc->rep = *rep;
|
||||
|
||||
tSignal.theReceiversBlockNumber = refToBlock(ref);
|
||||
tSignal.theVerId_signalNumber = GSN_SUB_GCP_COMPLETE_ACC;
|
||||
tSignal.theLength = SubGcpCompleteAcc::SignalLength;
|
||||
|
||||
Uint32 aNodeId = refToNode(ref);
|
||||
|
||||
// m_transporter->lock_mutex();
|
||||
int r;
|
||||
r = m_transporter->sendSignal(&tSignal, aNodeId);
|
||||
// m_transporter->unlock_mutex();
|
||||
|
||||
NdbGlobalEventBufferHandle::latestGCI(bufferId, gci);
|
||||
}
|
||||
|
||||
void
|
||||
NdbDictInterface::execSUB_TABLE_DATA(NdbApiSignal * signal,
|
||||
LinearSectionPtr ptr[3])
|
||||
{
|
||||
#ifdef EVENT_DEBUG
|
||||
const char * FNAME = "NdbDictInterface::execSUB_TABLE_DATA";
|
||||
#endif
|
||||
//TODO
|
||||
const SubTableData * const sdata = CAST_CONSTPTR(SubTableData, signal->getDataPtr());
|
||||
|
||||
// const Uint32 gci = sdata->gci;
|
||||
// const Uint32 operation = sdata->operation;
|
||||
// const Uint32 tableId = sdata->tableId;
|
||||
// const Uint32 noOfAttrs = sdata->noOfAttributes;
|
||||
// const Uint32 dataLen = sdata->dataSize;
|
||||
const Uint32 subscriberData = sdata->subscriberData;
|
||||
// const Uint32 logType = sdata->logType;
|
||||
|
||||
for (int i=signal->m_noOfSections;i < 3; i++) {
|
||||
ptr[i].p = NULL;
|
||||
ptr[i].sz = 0;
|
||||
}
|
||||
#ifdef EVENT_DEBUG
|
||||
ndbout_c("%s: senderData %d, gci %d, operation %d, tableId %d, noOfAttrs %d, dataLen %d",
|
||||
FNAME, subscriberData, gci, operation, tableId, noOfAttrs, dataLen);
|
||||
ndbout_c("ptr[0] %u %u ptr[1] %u %u ptr[2] %u %u\n",
|
||||
ptr[0].p,ptr[0].sz,ptr[1].p,ptr[1].sz,ptr[2].p,ptr[2].sz);
|
||||
#endif
|
||||
const Uint32 bufferId = subscriberData;
|
||||
|
||||
NdbGlobalEventBufferHandle::insertDataL(bufferId,
|
||||
sdata, ptr);
|
||||
}
|
||||
|
||||
/*****************************************************************
|
||||
* Drop event
|
||||
@@ -2999,7 +2988,7 @@ NdbDictInterface::dropEvent(const NdbEventImpl &evnt)
|
||||
|
||||
UtilBufferWriter w(m_buffer);
|
||||
|
||||
w.add(SimpleProperties::StringValue, evnt.m_externalName.c_str());
|
||||
w.add(SimpleProperties::StringValue, evnt.m_name.c_str());
|
||||
|
||||
LinearSectionPtr ptr[1];
|
||||
ptr[0].p = (Uint32*)m_buffer.get_data();
|
||||
@@ -3013,14 +3002,12 @@ NdbDictInterface::dropEvent(NdbApiSignal* signal,
|
||||
LinearSectionPtr ptr[3], int noLSP)
|
||||
{
|
||||
//TODO
|
||||
const int noErrCodes = 1;
|
||||
int errCodes[noErrCodes] = {DropEvntRef::Busy};
|
||||
return dictSignal(signal,ptr,noLSP,
|
||||
1 /*use masternode id*/,
|
||||
100,
|
||||
WAIT_CREATE_INDX_REQ /*WAIT_CREATE_EVNT_REQ*/,
|
||||
-1,
|
||||
errCodes,noErrCodes, DropEvntRef::Temporary);
|
||||
NULL,0, -1);
|
||||
}
|
||||
void
|
||||
NdbDictInterface::execDROP_EVNT_CONF(NdbApiSignal * signal,
|
||||
@@ -3213,6 +3200,65 @@ NdbDictInterface::execLIST_TABLES_CONF(NdbApiSignal* signal,
|
||||
}
|
||||
}
|
||||
|
||||
int
|
||||
NdbDictionaryImpl::forceGCPWait()
|
||||
{
|
||||
return m_receiver.forceGCPWait();
|
||||
}
|
||||
|
||||
int
|
||||
NdbDictInterface::forceGCPWait()
|
||||
{
|
||||
NdbApiSignal tSignal(m_reference);
|
||||
WaitGCPReq* const req = CAST_PTR(WaitGCPReq, tSignal.getDataPtrSend());
|
||||
req->senderRef = m_reference;
|
||||
req->senderData = 0;
|
||||
req->requestType = WaitGCPReq::CompleteForceStart;
|
||||
tSignal.theReceiversBlockNumber = DBDIH;
|
||||
tSignal.theVerId_signalNumber = GSN_WAIT_GCP_REQ;
|
||||
tSignal.theLength = WaitGCPReq::SignalLength;
|
||||
|
||||
const Uint32 RETRIES = 100;
|
||||
for (Uint32 i = 0; i < RETRIES; i++)
|
||||
{
|
||||
m_transporter->lock_mutex();
|
||||
Uint16 aNodeId = m_transporter->get_an_alive_node();
|
||||
if (aNodeId == 0) {
|
||||
m_error.code= 4009;
|
||||
m_transporter->unlock_mutex();
|
||||
return -1;
|
||||
}
|
||||
if (m_transporter->sendSignal(&tSignal, aNodeId) != 0) {
|
||||
m_transporter->unlock_mutex();
|
||||
continue;
|
||||
}
|
||||
m_error.code= 0;
|
||||
m_waiter.m_node = aNodeId;
|
||||
m_waiter.m_state = WAIT_LIST_TABLES_CONF;
|
||||
m_waiter.wait(WAITFOR_RESPONSE_TIMEOUT);
|
||||
m_transporter->unlock_mutex();
|
||||
return 0;
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
void
|
||||
NdbDictInterface::execWAIT_GCP_CONF(NdbApiSignal* signal,
|
||||
LinearSectionPtr ptr[3])
|
||||
{
|
||||
const WaitGCPConf * const conf=
|
||||
CAST_CONSTPTR(WaitGCPConf, signal->getDataPtr());
|
||||
g_latest_trans_gci= conf->gcp;
|
||||
m_waiter.signal(NO_WAIT);
|
||||
}
|
||||
|
||||
void
|
||||
NdbDictInterface::execWAIT_GCP_REF(NdbApiSignal* signal,
|
||||
LinearSectionPtr ptr[3])
|
||||
{
|
||||
m_waiter.signal(NO_WAIT);
|
||||
}
|
||||
|
||||
template class Vector<int>;
|
||||
template class Vector<Uint16>;
|
||||
template class Vector<Uint32>;
|
||||
|
||||
Reference in New Issue
Block a user