mirror of
https://github.com/MariaDB/server.git
synced 2026-01-06 05:22:24 +03:00
ndb - wl#2972 rbr blobs ndb api support
storage/ndb/include/ndbapi/NdbBlob.hpp: rbr blobs ndb api support storage/ndb/include/ndbapi/NdbDictionary.hpp: rbr blobs ndb api support storage/ndb/include/ndbapi/NdbEventOperation.hpp: rbr blobs ndb api support storage/ndb/ndbapi-examples/ndbapi_event/Makefile: rbr blobs ndb api support storage/ndb/ndbapi-examples/ndbapi_event/ndbapi_event.cpp: rbr blobs ndb api support storage/ndb/src/ndbapi/NdbBlob.cpp: rbr blobs ndb api support storage/ndb/src/ndbapi/NdbDictionary.cpp: rbr blobs ndb api support storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp: rbr blobs ndb api support storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp: rbr blobs ndb api support storage/ndb/src/ndbapi/NdbEventOperation.cpp: rbr blobs ndb api support storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp: rbr blobs ndb api support storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp: rbr blobs ndb api support storage/ndb/test/ndbapi/test_event_merge.cpp: rbr blobs ndb api support
This commit is contained in:
@@ -54,26 +54,32 @@
|
||||
#include <stdio.h>
|
||||
#include <iostream>
|
||||
#include <unistd.h>
|
||||
#ifdef VM_TRACE
|
||||
#include <my_global.h>
|
||||
#endif
|
||||
#ifndef assert
|
||||
#include <assert.h>
|
||||
#endif
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
* Assume that there is a table t0 which is being updated by
|
||||
* Assume that there is a table which is being updated by
|
||||
* another process (e.g. flexBench -l 0 -stdtables).
|
||||
* We want to monitor what happens with columns c0,c1,c2,c3.
|
||||
* We want to monitor what happens with column values.
|
||||
*
|
||||
* or together with the mysql client;
|
||||
* Or using the mysql client:
|
||||
*
|
||||
* shell> mysql -u root
|
||||
* mysql> create database TEST_DB;
|
||||
* mysql> use TEST_DB;
|
||||
* mysql> create table t0 (c0 int, c1 int, c2 char(4), c3 char(4),
|
||||
* mysql> create table t0
|
||||
* (c0 int, c1 int, c2 char(4), c3 char(4), c4 text,
|
||||
* primary key(c0, c2)) engine ndb charset latin1;
|
||||
*
|
||||
* In another window start ndbapi_event, wait until properly started
|
||||
*
|
||||
insert into t0 values (1, 2, 'a', 'b');
|
||||
insert into t0 values (3, 4, 'c', 'd');
|
||||
|
||||
insert into t0 values (1, 2, 'a', 'b', null);
|
||||
insert into t0 values (3, 4, 'c', 'd', null);
|
||||
update t0 set c3 = 'e' where c0 = 1 and c2 = 'a'; -- use pk
|
||||
update t0 set c3 = 'f'; -- use scan
|
||||
update t0 set c3 = 'F'; -- use scan update to 'same'
|
||||
@@ -81,7 +87,18 @@
|
||||
update t0 set c2 = 'G' where c0 = 1; -- update pk part to 'same'
|
||||
update t0 set c0 = 5, c2 = 'H' where c0 = 3; -- update full PK
|
||||
delete from t0;
|
||||
*
|
||||
|
||||
insert ...; update ...; -- see events w/ same pk merged (if -m option)
|
||||
delete ...; insert ...; -- there are 5 combinations ID IU DI UD UU
|
||||
update ...; update ...;
|
||||
|
||||
-- text requires -m flag
|
||||
set @a = repeat('a',256); -- inline size
|
||||
set @b = repeat('b',2000); -- part size
|
||||
set @c = repeat('c',2000*30); -- 30 parts
|
||||
|
||||
-- update the text field using combinations of @a, @b, @c ...
|
||||
|
||||
* you should see the data popping up in the example window
|
||||
*
|
||||
*/
|
||||
@@ -95,12 +112,18 @@ int myCreateEvent(Ndb* myNdb,
|
||||
const char *eventName,
|
||||
const char *eventTableName,
|
||||
const char **eventColumnName,
|
||||
const int noEventColumnName);
|
||||
const int noEventColumnName,
|
||||
bool merge_events);
|
||||
|
||||
int main(int argc, char** argv)
|
||||
{
|
||||
ndb_init();
|
||||
bool merge_events = argc > 1 && strcmp(argv[1], "-m") == 0;
|
||||
bool merge_events = argc > 1 && strchr(argv[1], 'm') != 0;
|
||||
#ifdef VM_TRACE
|
||||
bool dbug = argc > 1 && strchr(argv[1], 'd') != 0;
|
||||
if (dbug) DBUG_PUSH("d:t:");
|
||||
if (dbug) putenv("API_SIGNAL_LOG=-");
|
||||
#endif
|
||||
|
||||
Ndb_cluster_connection *cluster_connection=
|
||||
new Ndb_cluster_connection(); // Object representing the cluster
|
||||
@@ -134,12 +157,13 @@ int main(int argc, char** argv)
|
||||
|
||||
const char *eventName= "CHNG_IN_t0";
|
||||
const char *eventTableName= "t0";
|
||||
const int noEventColumnName= 4;
|
||||
const int noEventColumnName= 5;
|
||||
const char *eventColumnName[noEventColumnName]=
|
||||
{"c0",
|
||||
"c1",
|
||||
"c2",
|
||||
"c3"
|
||||
"c3",
|
||||
"c4"
|
||||
};
|
||||
|
||||
// Create events
|
||||
@@ -147,9 +171,14 @@ int main(int argc, char** argv)
|
||||
eventName,
|
||||
eventTableName,
|
||||
eventColumnName,
|
||||
noEventColumnName);
|
||||
noEventColumnName,
|
||||
merge_events);
|
||||
|
||||
int j= 0;
|
||||
// Normal values and blobs are unfortunately handled differently..
|
||||
typedef union { NdbRecAttr* ra; NdbBlob* bh; } RA_BH;
|
||||
|
||||
int i, j, k, l;
|
||||
j = 0;
|
||||
while (j < 99) {
|
||||
|
||||
// Start "transaction" for handling events
|
||||
@@ -160,12 +189,17 @@ int main(int argc, char** argv)
|
||||
op->mergeEvents(merge_events);
|
||||
|
||||
printf("get values\n");
|
||||
NdbRecAttr* recAttr[noEventColumnName];
|
||||
NdbRecAttr* recAttrPre[noEventColumnName];
|
||||
RA_BH recAttr[noEventColumnName];
|
||||
RA_BH recAttrPre[noEventColumnName];
|
||||
// primary keys should always be a part of the result
|
||||
for (int i = 0; i < noEventColumnName; i++) {
|
||||
recAttr[i] = op->getValue(eventColumnName[i]);
|
||||
recAttrPre[i] = op->getPreValue(eventColumnName[i]);
|
||||
for (i = 0; i < noEventColumnName; i++) {
|
||||
if (i < 4) {
|
||||
recAttr[i].ra = op->getValue(eventColumnName[i]);
|
||||
recAttrPre[i].ra = op->getPreValue(eventColumnName[i]);
|
||||
} else if (merge_events) {
|
||||
recAttr[i].bh = op->getBlobHandle(eventColumnName[i]);
|
||||
recAttrPre[i].bh = op->getPreBlobHandle(eventColumnName[i]);
|
||||
}
|
||||
}
|
||||
|
||||
// set up the callbacks
|
||||
@@ -174,13 +208,16 @@ int main(int argc, char** argv)
|
||||
if (op->execute())
|
||||
APIERROR(op->getNdbError());
|
||||
|
||||
int i= 0;
|
||||
while(i < 40) {
|
||||
NdbEventOperation* the_op = op;
|
||||
|
||||
i= 0;
|
||||
while (i < 40) {
|
||||
// printf("now waiting for event...\n");
|
||||
int r= myNdb->pollEvents(1000); // wait for event or 1000 ms
|
||||
int r = myNdb->pollEvents(1000); // wait for event or 1000 ms
|
||||
if (r > 0) {
|
||||
// printf("got data! %d\n", r);
|
||||
while ((op= myNdb->nextEvent())) {
|
||||
assert(the_op == op);
|
||||
i++;
|
||||
switch (op->getEventType()) {
|
||||
case NdbDictionary::Event::TE_INSERT:
|
||||
@@ -195,40 +232,66 @@ int main(int argc, char** argv)
|
||||
default:
|
||||
abort(); // should not happen
|
||||
}
|
||||
printf(" gci=%d\n", op->getGCI());
|
||||
printf("post: ");
|
||||
for (int i = 0; i < noEventColumnName; i++) {
|
||||
if (recAttr[i]->isNULL() >= 0) { // we have a value
|
||||
if (recAttr[i]->isNULL() == 0) { // we have a non-null value
|
||||
if (i < 2)
|
||||
printf("%-5u", recAttr[i]->u_32_value());
|
||||
else
|
||||
printf("%-5.4s", recAttr[i]->aRef());
|
||||
} else // we have a null value
|
||||
printf("%-5s", "NULL");
|
||||
} else
|
||||
printf("%-5s", "-");
|
||||
printf(" gci=%d\n", (int)op->getGCI());
|
||||
for (k = 0; k <= 1; k++) {
|
||||
printf(k == 0 ? "post: " : "pre : ");
|
||||
for (l = 0; l < noEventColumnName; l++) {
|
||||
if (l < 4) {
|
||||
NdbRecAttr* ra = k == 0 ? recAttr[l].ra : recAttrPre[l].ra;
|
||||
if (ra->isNULL() >= 0) { // we have a value
|
||||
if (ra->isNULL() == 0) { // we have a non-null value
|
||||
if (l < 2)
|
||||
printf("%-5u", ra->u_32_value());
|
||||
else
|
||||
printf("%-5.4s", ra->aRef());
|
||||
} else
|
||||
printf("%-5s", "NULL");
|
||||
} else
|
||||
printf("%-5s", "-"); // no value
|
||||
} else if (merge_events) {
|
||||
int isNull;
|
||||
NdbBlob* bh = k == 0 ? recAttr[l].bh : recAttrPre[l].bh;
|
||||
bh->getDefined(isNull);
|
||||
if (isNull >= 0) { // we have a value
|
||||
if (! isNull) { // we have a non-null value
|
||||
Uint64 length = 0;
|
||||
bh->getLength(length);
|
||||
// read into buffer
|
||||
unsigned char* buf = new unsigned char [length];
|
||||
memset(buf, 'X', length);
|
||||
Uint32 n = length;
|
||||
bh->readData(buf, n); // n is in/out
|
||||
assert(n == length);
|
||||
// pretty-print
|
||||
bool first = true;
|
||||
Uint32 i = 0;
|
||||
while (i < n) {
|
||||
unsigned char c = buf[i++];
|
||||
Uint32 m = 1;
|
||||
while (i < n && buf[i] == c)
|
||||
i++, m++;
|
||||
if (! first)
|
||||
printf("+");
|
||||
printf("%u%c", m, c);
|
||||
first = false;
|
||||
}
|
||||
printf("[%u]", n);
|
||||
delete [] buf;
|
||||
} else
|
||||
printf("%-5s", "NULL");
|
||||
} else
|
||||
printf("%-5s", "-"); // no value
|
||||
}
|
||||
}
|
||||
printf("\n");
|
||||
}
|
||||
printf("\npre : ");
|
||||
for (int i = 0; i < noEventColumnName; i++) {
|
||||
if (recAttrPre[i]->isNULL() >= 0) { // we have a value
|
||||
if (recAttrPre[i]->isNULL() == 0) { // we have a non-null value
|
||||
if (i < 2)
|
||||
printf("%-5u", recAttrPre[i]->u_32_value());
|
||||
else
|
||||
printf("%-5.4s", recAttrPre[i]->aRef());
|
||||
} else // we have a null value
|
||||
printf("%-5s", "NULL");
|
||||
} else
|
||||
printf("%-5s", "-");
|
||||
}
|
||||
printf("\n");
|
||||
}
|
||||
} else
|
||||
;//printf("timed out\n");
|
||||
}
|
||||
// don't want to listen to events anymore
|
||||
if (myNdb->dropEventOperation(op)) APIERROR(myNdb->getNdbError());
|
||||
if (myNdb->dropEventOperation(the_op)) APIERROR(myNdb->getNdbError());
|
||||
the_op = 0;
|
||||
|
||||
j++;
|
||||
}
|
||||
@@ -250,7 +313,8 @@ int myCreateEvent(Ndb* myNdb,
|
||||
const char *eventName,
|
||||
const char *eventTableName,
|
||||
const char **eventColumnNames,
|
||||
const int noEventColumnNames)
|
||||
const int noEventColumnNames,
|
||||
bool merge_events)
|
||||
{
|
||||
NdbDictionary::Dictionary *myDict= myNdb->getDictionary();
|
||||
if (!myDict) APIERROR(myNdb->getNdbError());
|
||||
@@ -265,6 +329,7 @@ int myCreateEvent(Ndb* myNdb,
|
||||
// myEvent.addTableEvent(NdbDictionary::Event::TE_DELETE);
|
||||
|
||||
myEvent.addEventColumns(noEventColumnNames, eventColumnNames);
|
||||
myEvent.mergeEvents(merge_events);
|
||||
|
||||
// Add event to database
|
||||
if (myDict->createEvent(myEvent) == 0)
|
||||
|
||||
Reference in New Issue
Block a user