diff --git a/mysql-test/r/ndb_blob.result b/mysql-test/r/ndb_blob.result new file mode 100644 index 00000000000..89b53aea7d1 --- /dev/null +++ b/mysql-test/r/ndb_blob.result @@ -0,0 +1,272 @@ +drop table if exists t1; +set autocommit=0; +create table t1 ( +a int not null primary key, +b text not null, +c int not null, +d longblob, +key (c) +) engine=ndbcluster; +set @x0 = '01234567012345670123456701234567'; +set @x0 = concat(@x0,@x0,@x0,@x0,@x0,@x0,@x0,@x0); +set @b1 = 'b1'; +set @b1 = concat(@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1); +set @b1 = concat(@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1); +set @b1 = concat(@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1); +set @b1 = concat(@b1,@x0); +set @d1 = 'dd1'; +set @d1 = concat(@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1); +set @d1 = concat(@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1); +set @d1 = concat(@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1); +set @b2 = 'b2'; +set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2); +set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2); +set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2); +set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2); +set @d2 = 'dd2'; +set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2); +set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2); +set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2); +set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2); +select length(@x0),length(@b1),length(@d1) from dual; +length(@x0) length(@b1) length(@d1) +256 2256 3000 +select length(@x0),length(@b2),length(@d2) from dual; +length(@x0) length(@b2) length(@d2) +256 20000 30000 +insert into t1 values(1,@b1,111,@d1); +insert into t1 values(2,@b2,222,@d2); +commit; +explain select * from t1 where a = 1; +id select_type table type possible_keys key key_len ref rows Extra +1 SIMPLE t1 const PRIMARY PRIMARY 4 const 1 +select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3) +from t1 where a=1; +a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3) +1 2256 b1 3000 dd1 +select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3) +from t1 where a=2; +a length(b) substr(b,1+2*9000,2) length(d) substr(d,1+3*9000,3) +2 20000 b2 30000 dd2 +update t1 set b=@b2,d=@d2 where a=1; +update t1 set b=@b1,d=@d1 where a=2; +commit; +select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3) +from t1 where a=1; +a length(b) substr(b,1+2*9000,2) length(d) substr(d,1+3*9000,3) +1 20000 b2 30000 dd2 +select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3) +from t1 where a=2; +a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3) +2 2256 b1 3000 dd1 +update t1 set b=concat(b,b),d=concat(d,d) where a=1; +update t1 set b=concat(b,b),d=concat(d,d) where a=2; +commit; +select a,length(b),substr(b,1+4*9000,2),length(d),substr(d,1+6*9000,3) +from t1 where a=1; +a length(b) substr(b,1+4*9000,2) length(d) substr(d,1+6*9000,3) +1 40000 b2 60000 dd2 +select a,length(b),substr(b,1+4*900,2),length(d),substr(d,1+6*900,3) +from t1 where a=2; +a length(b) substr(b,1+4*900,2) length(d) substr(d,1+6*900,3) +2 4512 b1 6000 dd1 +update t1 set d=null where a=1; +commit; +select a from t1 where d is null; +a +1 +delete from t1 where a=1; +delete from t1 where a=2; +commit; +select count(*) from t1; +count(*) +0 +insert into t1 values(1,@b1,111,@d1); +insert into t1 values(2,@b2,222,@d2); +commit; +explain select * from t1 where c = 111; +id select_type table type possible_keys key key_len ref rows Extra +1 SIMPLE t1 ref c c 4 const 10 Using where +select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3) +from t1 where c=111; +a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3) +1 2256 b1 3000 dd1 +select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3) +from t1 where c=222; +a length(b) substr(b,1+2*9000,2) length(d) substr(d,1+3*9000,3) +2 20000 b2 30000 dd2 +update t1 set b=@b2,d=@d2 where c=111; +update t1 set b=@b1,d=@d1 where c=222; +commit; +select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3) +from t1 where c=111; +a length(b) substr(b,1+2*9000,2) length(d) substr(d,1+3*9000,3) +1 20000 b2 30000 dd2 +select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3) +from t1 where c=222; +a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3) +2 2256 b1 3000 dd1 +update t1 set d=null where c=111; +commit; +select a from t1 where d is null; +a +1 +delete from t1 where c=111; +delete from t1 where c=222; +commit; +select count(*) from t1; +count(*) +0 +insert into t1 values(1,'b1',111,'dd1'); +insert into t1 values(2,'b2',222,'dd2'); +insert into t1 values(3,'b3',333,'dd3'); +insert into t1 values(4,'b4',444,'dd4'); +insert into t1 values(5,'b5',555,'dd5'); +insert into t1 values(6,'b6',666,'dd6'); +insert into t1 values(7,'b7',777,'dd7'); +insert into t1 values(8,'b8',888,'dd8'); +insert into t1 values(9,'b9',999,'dd9'); +commit; +explain select * from t1; +id select_type table type possible_keys key key_len ref rows Extra +1 SIMPLE t1 ALL NULL NULL NULL NULL 100 +select * from t1 order by a; +a b c d +1 b1 111 dd1 +2 b2 222 dd2 +3 b3 333 dd3 +4 b4 444 dd4 +5 b5 555 dd5 +6 b6 666 dd6 +7 b7 777 dd7 +8 b8 888 dd8 +9 b9 999 dd9 +update t1 set b=concat(a,'x',b),d=concat(a,'x',d); +commit; +select * from t1 order by a; +a b c d +1 1xb1 111 1xdd1 +2 2xb2 222 2xdd2 +3 3xb3 333 3xdd3 +4 4xb4 444 4xdd4 +5 5xb5 555 5xdd5 +6 6xb6 666 6xdd6 +7 7xb7 777 7xdd7 +8 8xb8 888 8xdd8 +9 9xb9 999 9xdd9 +delete from t1; +commit; +select count(*) from t1; +count(*) +0 +insert into t1 values(1,@b1,111,@d1); +insert into t1 values(2,@b2,222,@d2); +commit; +explain select * from t1; +id select_type table type possible_keys key key_len ref rows Extra +1 SIMPLE t1 ALL NULL NULL NULL NULL 100 +select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3) +from t1 order by a; +a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3) +1 2256 b1 3000 dd1 +2 20000 b2 30000 dd2 +update t1 set b=concat(b,b),d=concat(d,d); +commit; +select a,length(b),substr(b,1+4*9000,2),length(d),substr(d,1+6*9000,3) +from t1 order by a; +a length(b) substr(b,1+4*9000,2) length(d) substr(d,1+6*9000,3) +1 4512 6000 +2 40000 b2 60000 dd2 +delete from t1; +commit; +select count(*) from t1; +count(*) +0 +insert into t1 values(1,'b1',111,'dd1'); +insert into t1 values(2,'b2',222,'dd2'); +insert into t1 values(3,'b3',333,'dd3'); +insert into t1 values(4,'b4',444,'dd4'); +insert into t1 values(5,'b5',555,'dd5'); +insert into t1 values(6,'b6',666,'dd6'); +insert into t1 values(7,'b7',777,'dd7'); +insert into t1 values(8,'b8',888,'dd8'); +insert into t1 values(9,'b9',999,'dd9'); +commit; +explain select * from t1 where c >= 100 order by a; +id select_type table type possible_keys key key_len ref rows Extra +1 SIMPLE t1 range c c 4 NULL 10 Using where; Using filesort +select * from t1 where c >= 100 order by a; +a b c d +1 b1 111 dd1 +2 b2 222 dd2 +3 b3 333 dd3 +4 b4 444 dd4 +5 b5 555 dd5 +6 b6 666 dd6 +7 b7 777 dd7 +8 b8 888 dd8 +9 b9 999 dd9 +update t1 set b=concat(a,'x',b),d=concat(a,'x',d) +where c >= 100; +commit; +select * from t1 where c >= 100 order by a; +a b c d +1 1xb1 111 1xdd1 +2 2xb2 222 2xdd2 +3 3xb3 333 3xdd3 +4 4xb4 444 4xdd4 +5 5xb5 555 5xdd5 +6 6xb6 666 6xdd6 +7 7xb7 777 7xdd7 +8 8xb8 888 8xdd8 +9 9xb9 999 9xdd9 +delete from t1 where c >= 100; +commit; +select count(*) from t1; +count(*) +0 +insert into t1 values(1,@b1,111,@d1); +insert into t1 values(2,@b2,222,@d2); +commit; +explain select * from t1 where c >= 100 order by a; +id select_type table type possible_keys key key_len ref rows Extra +1 SIMPLE t1 range c c 4 NULL 10 Using where; Using filesort +select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3) +from t1 where c >= 100 order by a; +a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3) +1 2256 b1 3000 dd1 +2 20000 b2 30000 dd2 +update t1 set b=concat(b,b),d=concat(d,d); +commit; +select a,length(b),substr(b,1+4*9000,2),length(d),substr(d,1+6*9000,3) +from t1 where c >= 100 order by a; +a length(b) substr(b,1+4*9000,2) length(d) substr(d,1+6*9000,3) +1 4512 6000 +2 40000 b2 60000 dd2 +delete from t1 where c >= 100; +commit; +select count(*) from t1; +count(*) +0 +insert into t1 values(1,@b1,111,@d1); +insert into t1 values(2,@b2,222,@d2); +select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3) +from t1 where a = 0; +a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3) +select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3) +from t1 where a = 1; +a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3) +1 2256 b1 3000 dd1 +select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3) +from t1 where a = 2; +a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3) +2 20000 b2 30000 dd2 +select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3) +from t1 order by a; +a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3) +1 2256 b1 3000 dd1 +2 20000 b2 30000 dd2 +rollback; +select count(*) from t1; +count(*) +0 diff --git a/mysql-test/t/ndb_blob.test b/mysql-test/t/ndb_blob.test new file mode 100644 index 00000000000..c1166a7a90c --- /dev/null +++ b/mysql-test/t/ndb_blob.test @@ -0,0 +1,249 @@ +--source include/have_ndb.inc + +--disable_warnings +drop table if exists t1; +--enable_warnings + +# +# Minimal NDB blobs test. +# +# On NDB API level there is an extensive test program "testBlobs". +# A prerequisite for this handler test is that "testBlobs" succeeds. +# + +# make test harder with autocommit off +set autocommit=0; + +create table t1 ( + a int not null primary key, + b text not null, + c int not null, + d longblob, + key (c) +) engine=ndbcluster; + +# -- values -- + +# x0 size 256 (current inline size) +set @x0 = '01234567012345670123456701234567'; +set @x0 = concat(@x0,@x0,@x0,@x0,@x0,@x0,@x0,@x0); + +# b1 length 2000+256 (blob part aligned) +set @b1 = 'b1'; +set @b1 = concat(@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1); +set @b1 = concat(@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1); +set @b1 = concat(@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1); +set @b1 = concat(@b1,@x0); +# d1 length 3000 +set @d1 = 'dd1'; +set @d1 = concat(@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1); +set @d1 = concat(@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1); +set @d1 = concat(@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1); + +# b2 length 20000 +set @b2 = 'b2'; +set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2); +set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2); +set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2); +set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2); +# d2 length 30000 +set @d2 = 'dd2'; +set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2); +set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2); +set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2); +set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2); + +select length(@x0),length(@b1),length(@d1) from dual; +select length(@x0),length(@b2),length(@d2) from dual; + +# -- pk ops -- + +insert into t1 values(1,@b1,111,@d1); +insert into t1 values(2,@b2,222,@d2); +commit; +explain select * from t1 where a = 1; + +# pk read +select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3) +from t1 where a=1; +select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3) +from t1 where a=2; + +# pk update +update t1 set b=@b2,d=@d2 where a=1; +update t1 set b=@b1,d=@d1 where a=2; +commit; +select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3) +from t1 where a=1; +select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3) +from t1 where a=2; + +# pk update +update t1 set b=concat(b,b),d=concat(d,d) where a=1; +update t1 set b=concat(b,b),d=concat(d,d) where a=2; +commit; +select a,length(b),substr(b,1+4*9000,2),length(d),substr(d,1+6*9000,3) +from t1 where a=1; +select a,length(b),substr(b,1+4*900,2),length(d),substr(d,1+6*900,3) +from t1 where a=2; + +# pk update to null +update t1 set d=null where a=1; +commit; +select a from t1 where d is null; + +# pk delete +delete from t1 where a=1; +delete from t1 where a=2; +commit; +select count(*) from t1; + +# -- hash index ops -- + +insert into t1 values(1,@b1,111,@d1); +insert into t1 values(2,@b2,222,@d2); +commit; +explain select * from t1 where c = 111; + +# hash key read +select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3) +from t1 where c=111; +select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3) +from t1 where c=222; + +# hash key update +update t1 set b=@b2,d=@d2 where c=111; +update t1 set b=@b1,d=@d1 where c=222; +commit; +select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3) +from t1 where c=111; +select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3) +from t1 where c=222; + +# hash key update to null +update t1 set d=null where c=111; +commit; +select a from t1 where d is null; + +# hash key delete +delete from t1 where c=111; +delete from t1 where c=222; +commit; +select count(*) from t1; + +# -- table scan ops, short values -- + +insert into t1 values(1,'b1',111,'dd1'); +insert into t1 values(2,'b2',222,'dd2'); +insert into t1 values(3,'b3',333,'dd3'); +insert into t1 values(4,'b4',444,'dd4'); +insert into t1 values(5,'b5',555,'dd5'); +insert into t1 values(6,'b6',666,'dd6'); +insert into t1 values(7,'b7',777,'dd7'); +insert into t1 values(8,'b8',888,'dd8'); +insert into t1 values(9,'b9',999,'dd9'); +commit; +explain select * from t1; + +# table scan read +select * from t1 order by a; + +# table scan update +update t1 set b=concat(a,'x',b),d=concat(a,'x',d); +commit; +select * from t1 order by a; + +# table scan delete +delete from t1; +commit; +select count(*) from t1; + +# -- table scan ops, long values -- + +insert into t1 values(1,@b1,111,@d1); +insert into t1 values(2,@b2,222,@d2); +commit; +explain select * from t1; + +# table scan read +select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3) +from t1 order by a; + +# table scan update +update t1 set b=concat(b,b),d=concat(d,d); +commit; +select a,length(b),substr(b,1+4*9000,2),length(d),substr(d,1+6*9000,3) +from t1 order by a; + +# table scan delete +delete from t1; +commit; +select count(*) from t1; + +# -- range scan ops, short values -- + +insert into t1 values(1,'b1',111,'dd1'); +insert into t1 values(2,'b2',222,'dd2'); +insert into t1 values(3,'b3',333,'dd3'); +insert into t1 values(4,'b4',444,'dd4'); +insert into t1 values(5,'b5',555,'dd5'); +insert into t1 values(6,'b6',666,'dd6'); +insert into t1 values(7,'b7',777,'dd7'); +insert into t1 values(8,'b8',888,'dd8'); +insert into t1 values(9,'b9',999,'dd9'); +commit; +explain select * from t1 where c >= 100 order by a; + +# range scan read +select * from t1 where c >= 100 order by a; + +# range scan update +update t1 set b=concat(a,'x',b),d=concat(a,'x',d) +where c >= 100; +commit; +select * from t1 where c >= 100 order by a; + +# range scan delete +delete from t1 where c >= 100; +commit; +select count(*) from t1; + +# -- range scan ops, long values -- + +insert into t1 values(1,@b1,111,@d1); +insert into t1 values(2,@b2,222,@d2); +commit; +explain select * from t1 where c >= 100 order by a; + +# range scan read +select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3) +from t1 where c >= 100 order by a; + +# range scan update +update t1 set b=concat(b,b),d=concat(d,d); +commit; +select a,length(b),substr(b,1+4*9000,2),length(d),substr(d,1+6*9000,3) +from t1 where c >= 100 order by a; + +# range scan delete +delete from t1 where c >= 100; +commit; +select count(*) from t1; + +# -- rollback -- + +insert into t1 values(1,@b1,111,@d1); +insert into t1 values(2,@b2,222,@d2); +# 626 +select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3) +from t1 where a = 0; +select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3) +from t1 where a = 1; +select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3) +from t1 where a = 2; +select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3) +from t1 order by a; +rollback; +select count(*) from t1; + +--drop table t1; diff --git a/ndb/include/kernel/signaldata/DictTabInfo.hpp b/ndb/include/kernel/signaldata/DictTabInfo.hpp index d5f27257eb8..dec7145c897 100644 --- a/ndb/include/kernel/signaldata/DictTabInfo.hpp +++ b/ndb/include/kernel/signaldata/DictTabInfo.hpp @@ -311,7 +311,7 @@ public: ExtDatetime = NdbSqlUtil::Type::Datetime, ExtTimespec = NdbSqlUtil::Type::Timespec, ExtBlob = NdbSqlUtil::Type::Blob, - ExtClob = NdbSqlUtil::Type::Clob + ExtText = NdbSqlUtil::Type::Text }; // Attribute data interpretation @@ -435,7 +435,7 @@ public: AttributeArraySize = 12 * AttributeExtLength; return true; case DictTabInfo::ExtBlob: - case DictTabInfo::ExtClob: + case DictTabInfo::ExtText: AttributeType = DictTabInfo::StringType; AttributeSize = DictTabInfo::an8Bit; // head + inline part [ attr precision ] diff --git a/ndb/include/ndbapi/NdbBlob.hpp b/ndb/include/ndbapi/NdbBlob.hpp index 16df56e230b..af4c049d4a7 100644 --- a/ndb/include/ndbapi/NdbBlob.hpp +++ b/ndb/include/ndbapi/NdbBlob.hpp @@ -50,24 +50,33 @@ class NdbColumnImpl; * - closed: after transaction commit * - invalid: after rollback or transaction close * - * NdbBlob supports 2 styles of data access: + * NdbBlob supports 3 styles of data access: * * - in prepare phase, NdbBlob methods getValue and setValue are used to - * prepare a read or write of a single blob value of known size + * prepare a read or write of a blob value of known size * - * - in active phase, NdbBlob methods readData and writeData are used to - * read or write blob data of undetermined size + * - in prepare phase, setActiveHook is used to define a routine which + * is invoked as soon as the handle becomes active + * + * - in active phase, readData and writeData are used to read or write + * blob data of arbitrary size + * + * The styles can be applied in combination (in above order). + * + * Blob operations take effect at next transaction execute. In some + * cases NdbBlob is forced to do implicit executes. To avoid this, + * operate on complete blob parts. + * + * Use NdbConnection::executePendingBlobOps to flush your reads and + * writes. It avoids execute penalty if nothing is pending. It is not + * needed after execute (obviously) or after next scan result. * * NdbBlob methods return -1 on error and 0 on success, and use output * parameters when necessary. * * Notes: * - table and its blob part tables are not created atomically - * - blob data operations take effect at next transaction execute - * - NdbBlob may need to do implicit executes on the transaction - * - read and write of complete parts is much more efficient * - scan must use the "new" interface NdbScanOperation - * - scan with blobs applies hold-read-lock (at minimum) * - to update a blob in a read op requires exclusive tuple lock * - update op in scan must do its own getBlobHandle * - delete creates implicit, not-accessible blob handles @@ -78,12 +87,16 @@ class NdbColumnImpl; * - scan must use exclusive locking for now * * Todo: - * - add scan method hold-read-lock-until-next + return-keyinfo - * - better check of keyinfo length when setting keys - * - better check of allowed blob op vs locking mode + * - add scan method hold-read-lock + return-keyinfo + * - check keyinfo length when setting keys + * - check allowed blob ops vs locking mode + * - overload control (too many pending ops) */ class NdbBlob { public: + /** + * State. + */ enum State { Idle = 0, Prepared = 1, @@ -92,9 +105,15 @@ public: Invalid = 9 }; State getState(); + /** + * Inline blob header. + */ + struct Head { + Uint64 length; + }; /** * Prepare to read blob value. The value is available after execute. - * Use isNull to check for NULL and getLength to get the real length + * Use getNull to check for NULL and getLength to get the real length * and to check for truncation. Sets current read/write position to * after the data read. */ @@ -106,6 +125,20 @@ public: * data to null pointer (0) to create a NULL value. */ int setValue(const void* data, Uint32 bytes); + /** + * Callback for setActiveHook. Invoked immediately when the prepared + * operation has been executed (but not committed). Any getValue or + * setValue is done first. The blob handle is active so readData or + * writeData etc can be used to manipulate blob value. A user-defined + * argument is passed along. Returns non-zero on error. + */ + typedef int ActiveHook(NdbBlob* me, void* arg); + /** + * Define callback for blob handle activation. The queue of prepared + * operations will be executed in no commit mode up to this point and + * then the callback is invoked. + */ + int setActiveHook(ActiveHook* activeHook, void* arg); /** * Check if blob is null. */ @@ -115,7 +148,7 @@ public: */ int setNull(); /** - * Get current length in bytes. Use isNull to distinguish between + * Get current length in bytes. Use getNull to distinguish between * length 0 blob and NULL blob. */ int getLength(Uint64& length); @@ -180,6 +213,13 @@ public: static const int ErrAbort = 4268; // "Unknown blob error" static const int ErrUnknown = 4269; + /** + * Return info about all blobs in this operation. + */ + // Get first blob in list + NdbBlob* blobsFirstBlob(); + // Get next blob in list after this one + NdbBlob* blobsNextBlob(); private: friend class Ndb; @@ -214,10 +254,11 @@ private: bool theSetFlag; const char* theSetBuf; Uint32 theGetSetBytes; - // head - struct Head { - Uint64 length; - }; + // pending ops + Uint8 thePendingBlobOps; + // activation callback + ActiveHook* theActiveHook; + void* theActiveHookArg; // buffers struct Buf { char* data; @@ -235,7 +276,6 @@ private: char* theInlineData; NdbRecAttr* theHeadInlineRecAttr; bool theHeadInlineUpdateFlag; - bool theNewPartFlag; // length and read/write position int theNullFlag; Uint64 theLength; @@ -276,6 +316,11 @@ private: int insertParts(const char* buf, Uint32 part, Uint32 count); int updateParts(const char* buf, Uint32 part, Uint32 count); int deleteParts(Uint32 part, Uint32 count); + // pending ops + int executePendingBlobReads(); + int executePendingBlobWrites(); + // callbacks + int invokeActiveHook(); // blob handle maintenance int atPrepare(NdbConnection* aCon, NdbOperation* anOp, const NdbColumnImpl* aColumn); int preExecute(ExecType anExecType, bool& batch); @@ -287,6 +332,7 @@ private: void setErrorCode(NdbOperation* anOp, bool invalidFlag = true); void setErrorCode(NdbConnection* aCon, bool invalidFlag = true); #ifdef VM_TRACE + int getOperationType() const; friend class NdbOut& operator<<(NdbOut&, const NdbBlob&); #endif }; diff --git a/ndb/include/ndbapi/NdbConnection.hpp b/ndb/include/ndbapi/NdbConnection.hpp index 5d73058cc24..4e0330e3fda 100644 --- a/ndb/include/ndbapi/NdbConnection.hpp +++ b/ndb/include/ndbapi/NdbConnection.hpp @@ -431,6 +431,15 @@ public: /** @} *********************************************************************/ + /** + * Execute the transaction in NoCommit mode if there are any not-yet + * executed blob part operations of given types. Otherwise do + * nothing. The flags argument is bitwise OR of (1 << optype) where + * optype comes from NdbOperation::OperationType. Only the basic PK + * ops are used (read, insert, update, delete). + */ + int executePendingBlobOps(Uint8 flags = 0xFF); + private: /** * Release completed operations @@ -642,6 +651,7 @@ private: Uint32 theBuddyConPtr; // optim: any blobs bool theBlobFlag; + Uint8 thePendingBlobOps; static void sendTC_COMMIT_ACK(NdbApiSignal *, Uint32 transId1, Uint32 transId2, @@ -869,6 +879,21 @@ NdbConnection::OpSent() theNoOfOpSent++; } +/****************************************************************************** +void executePendingBlobOps(); +******************************************************************************/ +#include +inline +int +NdbConnection::executePendingBlobOps(Uint8 flags) +{ + if (thePendingBlobOps & flags) { + // not executeNoBlobs because there can be new ops with blobs + return execute(NoCommit); + } + return 0; +} + inline Uint32 NdbConnection::ptr2int(){ @@ -876,5 +901,3 @@ NdbConnection::ptr2int(){ } #endif - - diff --git a/ndb/include/ndbapi/NdbDictionary.hpp b/ndb/include/ndbapi/NdbDictionary.hpp index 3b38e33ec91..4a3adb61d9e 100644 --- a/ndb/include/ndbapi/NdbDictionary.hpp +++ b/ndb/include/ndbapi/NdbDictionary.hpp @@ -183,7 +183,7 @@ public: Datetime, ///< Precision down to 1 sec (sizeof(Datetime) == 8 bytes ) Timespec, ///< Precision down to 1 nsec(sizeof(Datetime) == 12 bytes ) Blob, ///< Binary large object (see NdbBlob) - Clob ///< Text blob + Text ///< Text blob }; /** @@ -309,7 +309,8 @@ public: /** * For blob, set or get "part size" i.e. number of bytes to store in - * each tuple of the "blob table". Must be less than 64k. + * each tuple of the "blob table". Can be set to zero to omit parts + * and to allow only inline bytes ("tinyblob"). */ void setPartSize(int size) { setScale(size); } int getPartSize() const { return getScale(); } @@ -1060,6 +1061,6 @@ public: }; }; -class NdbOut& operator <<(class NdbOut& ndbout, const NdbDictionary::Column::Type type); +class NdbOut& operator <<(class NdbOut& out, const NdbDictionary::Column& col); #endif diff --git a/ndb/include/util/NdbSqlUtil.hpp b/ndb/include/util/NdbSqlUtil.hpp index 841da513d4a..78416fe9d01 100644 --- a/ndb/include/util/NdbSqlUtil.hpp +++ b/ndb/include/util/NdbSqlUtil.hpp @@ -80,7 +80,7 @@ public: Datetime, // Precision down to 1 sec (size 8 bytes) Timespec, // Precision down to 1 nsec (size 12 bytes) Blob, // Blob - Clob // Text blob + Text // Text blob }; Enum m_typeId; Cmp* m_cmp; // set to NULL if cmp not implemented @@ -125,7 +125,7 @@ private: static Cmp cmpDatetime; static Cmp cmpTimespec; static Cmp cmpBlob; - static Cmp cmpClob; + static Cmp cmpText; }; inline int @@ -344,17 +344,15 @@ NdbSqlUtil::cmp(Uint32 typeId, const Uint32* p1, const Uint32* p2, Uint32 full, break; case Type::Blob: // XXX fix break; - case Type::Clob: + case Type::Text: { - // skip blob head, the rest is varchar + // skip blob head, the rest is char const unsigned skip = NDB_BLOB_HEAD_SIZE; if (size >= skip + 1) { union { const Uint32* p; const char* v; } u1, u2; u1.p = p1 + skip; u2.p = p2 + skip; - // length in first 2 bytes - int k = strncmp(u1.v + 2, u2.v + 2, ((size - skip) << 2) - 2); - return k < 0 ? -1 : k > 0 ? +1 : full == size ? 0 : CmpUnknown; + // TODO } return CmpUnknown; } diff --git a/ndb/src/common/util/NdbSqlUtil.cpp b/ndb/src/common/util/NdbSqlUtil.cpp index e34d6d18539..f8d993f22f9 100644 --- a/ndb/src/common/util/NdbSqlUtil.cpp +++ b/ndb/src/common/util/NdbSqlUtil.cpp @@ -161,8 +161,8 @@ NdbSqlUtil::m_typeList[] = { NULL // cmpDatetime }, { - Type::Clob, - cmpClob + Type::Text, + cmpText } }; @@ -299,9 +299,9 @@ NdbSqlUtil::cmpBlob(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size } int -NdbSqlUtil::cmpClob(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) +NdbSqlUtil::cmpText(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) { - return cmp(Type::Clob, p1, p2, full, size); + return cmp(Type::Text, p1, p2, full, size); } #ifdef NDB_SQL_UTIL_TEST diff --git a/ndb/src/ndbapi/NdbBlob.cpp b/ndb/src/ndbapi/NdbBlob.cpp index 638012b6a00..72990870bf8 100644 --- a/ndb/src/ndbapi/NdbBlob.cpp +++ b/ndb/src/ndbapi/NdbBlob.cpp @@ -28,10 +28,11 @@ do { \ static const char* p = getenv("NDB_BLOB_DEBUG"); \ if (p == 0 || *p == 0 || *p == '0') break; \ - const char* cname = theColumn == NULL ? "BLOB" : theColumn->m_name.c_str(); \ - ndbout << cname << " " << __LINE__ << " " << x << " " << *this << endl; \ + static char* prefix = "BLOB"; \ + const char* cname = theColumn == NULL ? "-" : theColumn->m_name.c_str(); \ + ndbout << prefix << " " << hex << (void*)this << " " << cname; \ + ndbout << " " << dec << __LINE__ << " " << x << " " << *this << endl; \ } while (0) -#define EXE() assert(theNdbCon->executeNoBlobs(NoCommit) == 0) #else #define DBG(x) #endif @@ -49,7 +50,7 @@ ndb_blob_debug(const Uint32* data, unsigned size) /* * Reading index table directly (as a table) is faster but there are - * bugs or limitations. Keep the code but make possible to choose. + * bugs or limitations. Keep the code and make possible to choose. */ static const bool g_ndb_blob_ok_to_read_index_table = false; @@ -116,7 +117,7 @@ NdbBlob::getBlobTable(NdbTableImpl& bt, const NdbTableImpl* t, const NdbColumnIm case NdbDictionary::Column::Blob: bc.setType(NdbDictionary::Column::Binary); break; - case NdbDictionary::Column::Clob: + case NdbDictionary::Column::Text: bc.setType(NdbDictionary::Column::Char); break; default: @@ -155,11 +156,13 @@ NdbBlob::init() theSetFlag = false; theSetBuf = NULL; theGetSetBytes = 0; + thePendingBlobOps = 0; + theActiveHook = NULL; + theActiveHookArg = NULL; theHead = NULL; theInlineData = NULL; theHeadInlineRecAttr = NULL; theHeadInlineUpdateFlag = false; - theNewPartFlag = false; theNullFlag = -1; theLength = 0; thePos = 0; @@ -270,7 +273,7 @@ NdbBlob::isScanOp() inline Uint32 NdbBlob::getPartNumber(Uint64 pos) { - assert(pos >= theInlineSize); + assert(thePartSize != 0 && pos >= theInlineSize); return (pos - theInlineSize) / thePartSize; } @@ -322,10 +325,10 @@ int NdbBlob::setTableKeyValue(NdbOperation* anOp) { const Uint32* data = (const Uint32*)theKeyBuf.data; + DBG("setTableKeyValue key=" << ndb_blob_debug(data, theTable->m_sizeOfKeysInWords)); + const unsigned columns = theTable->m_columns.size(); unsigned pos = 0; - const unsigned size = theTable->m_columns.size(); - DBG("setTableKeyValue key=" << ndb_blob_debug(data, size)); - for (unsigned i = 0; i < size; i++) { + for (unsigned i = 0; i < columns; i++) { NdbColumnImpl* c = theTable->m_columns[i]; assert(c != NULL); if (c->m_pk) { @@ -345,10 +348,10 @@ int NdbBlob::setAccessKeyValue(NdbOperation* anOp) { const Uint32* data = (const Uint32*)theAccessKeyBuf.data; + DBG("setAccessKeyValue key=" << ndb_blob_debug(data, theAccessTable->m_sizeOfKeysInWords)); + const unsigned columns = theAccessTable->m_columns.size(); unsigned pos = 0; - const unsigned size = theAccessTable->m_columns.size(); - DBG("setAccessKeyValue key=" << ndb_blob_debug(data, size)); - for (unsigned i = 0; i < size; i++) { + for (unsigned i = 0; i < columns; i++) { NdbColumnImpl* c = theAccessTable->m_columns[i]; assert(c != NULL); if (c->m_pk) { @@ -479,11 +482,27 @@ NdbBlob::setValue(const void* data, Uint32 bytes) return 0; } +// activation hook + +int +NdbBlob::setActiveHook(ActiveHook activeHook, void* arg) +{ + DBG("setActiveHook hook=" << hex << (void*)activeHook << " arg=" << hex << arg); + if (theState != Prepared) { + setErrorCode(ErrState); + return -1; + } + theActiveHook = activeHook; + theActiveHookArg = arg; + return 0; +} + // misc operations int NdbBlob::getNull(bool& isNull) { + DBG("getNull"); if (theState == Prepared && theSetFlag) { isNull = (theSetBuf == NULL); return 0; @@ -520,6 +539,7 @@ NdbBlob::setNull() int NdbBlob::getLength(Uint64& len) { + DBG("getLength"); if (theState == Prepared && theSetFlag) { len = theGetSetBytes; return 0; @@ -535,17 +555,17 @@ NdbBlob::getLength(Uint64& len) int NdbBlob::truncate(Uint64 length) { - DBG("truncate kength=" << length); + DBG("truncate length=" << length); if (theNullFlag == -1) { setErrorCode(ErrState); return -1; } if (theLength > length) { - if (length >= theInlineSize) { - Uint32 part1 = getPartNumber(length); + if (length > theInlineSize) { + Uint32 part1 = getPartNumber(length - 1); Uint32 part2 = getPartNumber(theLength - 1); assert(part2 >= part1); - if (deleteParts(part1, part2 - part1) == -1) + if (part2 > part1 && deleteParts(part1 + 1, part2 - part1) == -1) return -1; } else { if (deleteParts(0, getPartCount()) == -1) @@ -560,6 +580,7 @@ NdbBlob::truncate(Uint64 length) int NdbBlob::getPos(Uint64& pos) { + DBG("getPos"); if (theNullFlag == -1) { setErrorCode(ErrState); return -1; @@ -571,6 +592,7 @@ NdbBlob::getPos(Uint64& pos) int NdbBlob::setPos(Uint64 pos) { + DBG("setPos pos=" << pos); if (theNullFlag == -1) { setErrorCode(ErrState); return -1; @@ -629,6 +651,10 @@ NdbBlob::readDataPrivate(Uint64 pos, char* buf, Uint32& bytes) len -= n; } } + if (len > 0 && thePartSize == 0) { + setErrorCode(ErrSeek); + return -1; + } if (len > 0) { assert(pos >= theInlineSize); Uint32 off = (pos - theInlineSize) % thePartSize; @@ -638,11 +664,10 @@ NdbBlob::readDataPrivate(Uint64 pos, char* buf, Uint32& bytes) Uint32 part = (pos - theInlineSize) / thePartSize; if (readParts(thePartBuf.data, part, 1) == -1) return -1; - DBG("force execute"); - if (theNdbCon->executeNoBlobs(NoCommit) == -1) { - setErrorCode(theNdbOp); + // need result now + DBG("execute pending part reads"); + if (executePendingBlobReads() == -1) return -1; - } Uint32 n = thePartSize - off; if (n > len) n = len; @@ -673,11 +698,10 @@ NdbBlob::readDataPrivate(Uint64 pos, char* buf, Uint32& bytes) Uint32 part = (pos - theInlineSize) / thePartSize; if (readParts(thePartBuf.data, part, 1) == -1) return -1; - DBG("force execute"); - if (theNdbCon->executeNoBlobs(NoCommit) == -1) { - setErrorCode(theNdbOp); + // need result now + DBG("execute pending part reads"); + if (executePendingBlobReads() == -1) return -1; - } memcpy(buf, thePartBuf.data, len); Uint32 n = len; pos += n; @@ -736,29 +760,27 @@ NdbBlob::writeDataPrivate(Uint64 pos, const char* buf, Uint32 bytes) len -= n; } } + if (len > 0 && thePartSize == 0) { + setErrorCode(ErrSeek); + return -1; + } if (len > 0) { assert(pos >= theInlineSize); Uint32 off = (pos - theInlineSize) % thePartSize; // partial first block if (off != 0) { DBG("partial first block pos=" << pos << " len=" << len); - if (theNewPartFlag) { - // must flush insert to guarantee read - DBG("force execute"); - if (theNdbCon->executeNoBlobs(NoCommit) == -1) { - setErrorCode(theNdbOp); - return -1; - } - theNewPartFlag = false; - } + // flush writes to guarantee correct read + DBG("execute pending part writes"); + if (executePendingBlobWrites() == -1) + return -1; Uint32 part = (pos - theInlineSize) / thePartSize; if (readParts(thePartBuf.data, part, 1) == -1) return -1; - DBG("force execute"); - if (theNdbCon->executeNoBlobs(NoCommit) == -1) { - setErrorCode(theNdbOp); + // need result now + DBG("execute pending part reafs"); + if (executePendingBlobReads() == -1) return -1; - } Uint32 n = thePartSize - off; if (n > len) { memset(thePartBuf.data + off + len, theFillChar, n - len); @@ -799,22 +821,16 @@ NdbBlob::writeDataPrivate(Uint64 pos, const char* buf, Uint32 bytes) assert((pos - theInlineSize) % thePartSize == 0 && len < thePartSize); Uint32 part = (pos - theInlineSize) / thePartSize; if (theLength > pos + len) { - if (theNewPartFlag) { - // must flush insert to guarantee read - DBG("force execute"); - if (theNdbCon->executeNoBlobs(NoCommit) == -1) { - setErrorCode(theNdbOp); - return -1; - } - theNewPartFlag = false; - } + // flush writes to guarantee correct read + DBG("execute pending part writes"); + if (executePendingBlobWrites() == -1) + return -1; if (readParts(thePartBuf.data, part, 1) == -1) return -1; - DBG("force execute"); - if (theNdbCon->executeNoBlobs(NoCommit) == -1) { - setErrorCode(theNdbOp); + // need result now + DBG("execute pending part reads"); + if (executePendingBlobReads() == -1) return -1; - } memcpy(thePartBuf.data, buf, len); if (updateParts(thePartBuf.data, part, 1) == -1) return -1; @@ -859,6 +875,8 @@ NdbBlob::readParts(char* buf, Uint32 part, Uint32 count) } buf += thePartSize; n++; + thePendingBlobOps |= (1 << NdbOperation::ReadRequest); + theNdbCon->thePendingBlobOps |= (1 << NdbOperation::ReadRequest); } return 0; } @@ -879,7 +897,8 @@ NdbBlob::insertParts(const char* buf, Uint32 part, Uint32 count) } buf += thePartSize; n++; - theNewPartFlag = true; + thePendingBlobOps |= (1 << NdbOperation::InsertRequest); + theNdbCon->thePendingBlobOps |= (1 << NdbOperation::InsertRequest); } return 0; } @@ -900,7 +919,8 @@ NdbBlob::updateParts(const char* buf, Uint32 part, Uint32 count) } buf += thePartSize; n++; - theNewPartFlag = true; + thePendingBlobOps |= (1 << NdbOperation::UpdateRequest); + theNdbCon->thePendingBlobOps |= (1 << NdbOperation::UpdateRequest); } return 0; } @@ -919,6 +939,52 @@ NdbBlob::deleteParts(Uint32 part, Uint32 count) return -1; } n++; + thePendingBlobOps |= (1 << NdbOperation::DeleteRequest); + theNdbCon->thePendingBlobOps |= (1 << NdbOperation::DeleteRequest); + } + return 0; +} + +// pending ops + +int +NdbBlob::executePendingBlobReads() +{ + Uint8 flags = (1 << NdbOperation::ReadRequest); + if (thePendingBlobOps & flags) { + if (theNdbCon->executeNoBlobs(NoCommit) == -1) + return -1; + thePendingBlobOps = 0; + theNdbCon->thePendingBlobOps = 0; + } + return 0; +} + +int +NdbBlob::executePendingBlobWrites() +{ + Uint8 flags = 0xFF & ~(1 << NdbOperation::ReadRequest); + if (thePendingBlobOps & flags) { + if (theNdbCon->executeNoBlobs(NoCommit) == -1) + return -1; + thePendingBlobOps = 0; + theNdbCon->thePendingBlobOps = 0; + } + return 0; +} + +// callbacks + +int +NdbBlob::invokeActiveHook() +{ + DBG("invokeActiveHook"); + assert(theState == Active && theActiveHook != NULL); + int ret = (*theActiveHook)(this, theActiveHookArg); + DBG("invokeActiveHook ret=" << ret); + if (ret != 0) { + // no error is set on blob level + return -1; } return 0; } @@ -948,7 +1014,7 @@ NdbBlob::atPrepare(NdbConnection* aCon, NdbOperation* anOp, const NdbColumnImpl* partType = NdbDictionary::Column::Binary; theFillChar = 0x0; break; - case NdbDictionary::Column::Clob: + case NdbDictionary::Column::Text: partType = NdbDictionary::Column::Char; theFillChar = 0x20; break; @@ -960,22 +1026,21 @@ NdbBlob::atPrepare(NdbConnection* aCon, NdbOperation* anOp, const NdbColumnImpl* theInlineSize = theColumn->getInlineSize(); thePartSize = theColumn->getPartSize(); theStripeSize = theColumn->getStripeSize(); - // blob table sanity check + // sanity check assert((NDB_BLOB_HEAD_SIZE << 2) == sizeof(Head)); assert(theColumn->m_attrSize * theColumn->m_arraySize == sizeof(Head) + theInlineSize); getBlobTableName(theBlobTableName, theTable, theColumn); const NdbDictionary::Table* bt; const NdbDictionary::Column* bc; - if (theInlineSize >= (1 << 16) || - thePartSize == 0 || - thePartSize >= (1 << 16) || - theStripeSize == 0 || - (bt = theNdb->theDictionary->getTable(theBlobTableName)) == NULL || - (bc = bt->getColumn("DATA")) == NULL || - bc->getType() != partType || - bc->getLength() != (int)thePartSize) { - setErrorCode(ErrTable); - return -1; + if (thePartSize > 0) { + if (theStripeSize == 0 || + (bt = theNdb->theDictionary->getTable(theBlobTableName)) == NULL || + (bc = bt->getColumn("DATA")) == NULL || + bc->getType() != partType || + bc->getLength() != (int)thePartSize) { + setErrorCode(ErrTable); + return -1; + } } // buffers theKeyBuf.alloc(theTable->m_sizeOfKeysInWords << 2); @@ -1061,7 +1126,7 @@ NdbBlob::preExecute(ExecType anExecType, bool& batch) Uint32 bytes = theGetSetBytes - theInlineSize; if (writeDataPrivate(pos, buf, bytes) == -1) return -1; - if (anExecType == Commit && theHeadInlineUpdateFlag) { + if (theHeadInlineUpdateFlag) { // add an operation to update head+inline NdbOperation* tOp = theNdbCon->getNdbOperation(theTable); if (tOp == NULL || @@ -1129,6 +1194,10 @@ NdbBlob::preExecute(ExecType anExecType, bool& batch) batch = true; } } + if (theActiveHook != NULL) { + // need blob head for callback + batch = true; + } DBG("preExecute out batch=" << batch); return 0; } @@ -1145,8 +1214,11 @@ NdbBlob::postExecute(ExecType anExecType) DBG("postExecute type=" << anExecType); if (theState == Invalid) return -1; - if (theState == Active) + if (theState == Active) { + setState(anExecType == NoCommit ? Active : Closed); + DBG("postExecute skip"); return 0; + } assert(theState == Prepared); assert(isKeyOp()); if (isIndexOp()) { @@ -1200,8 +1272,12 @@ NdbBlob::postExecute(ExecType anExecType) if (deleteParts(0, getPartCount()) == -1) return -1; } - theNewPartFlag = false; setState(anExecType == NoCommit ? Active : Closed); + // activation callback + if (theActiveHook != NULL) { + if (invokeActiveHook() == -1) + return -1; + } DBG("postExecute out"); return 0; } @@ -1275,20 +1351,18 @@ NdbBlob::atNextResult() Uint32 bytes = theGetSetBytes - theInlineSize; if (readDataPrivate(pos, buf, bytes) == -1) return -1; - // must also execute them - DBG("force execute"); - if (theNdbCon->executeNoBlobs(NoCommit) == -1) { - setErrorCode((NdbOperation*)0); - return -1; - } } } setState(Active); + // activation callback + if (theActiveHook != NULL) { + if (invokeActiveHook() == -1) + return -1; + } DBG("atNextResult out"); return 0; } - // misc const NdbDictionary::Column* @@ -1304,6 +1378,9 @@ NdbBlob::setErrorCode(int anErrorCode, bool invalidFlag) { DBG("setErrorCode code=" << anErrorCode); theError.code = anErrorCode; + // conditionally copy error to operation level + if (theNdbOp != NULL && theNdbOp->theError.code == 0) + theNdbOp->setErrorCode(theError.code); if (invalidFlag) setState(Invalid); } @@ -1336,11 +1413,34 @@ NdbBlob::setErrorCode(NdbConnection* aCon, bool invalidFlag) setErrorCode(code, invalidFlag); } +// info about all blobs in this operation + +NdbBlob* +NdbBlob::blobsFirstBlob() +{ + return theNdbOp->theBlobList; +} + +NdbBlob* +NdbBlob::blobsNextBlob() +{ + return theNext; +} + +// debug + #ifdef VM_TRACE +inline int +NdbBlob::getOperationType() const +{ + return theNdbOp != NULL ? theNdbOp->theOperationType : -1; +} + NdbOut& operator<<(NdbOut& out, const NdbBlob& blob) { - ndbout << dec << "s=" << blob.theState; + ndbout << dec << "o=" << blob.getOperationType(); + ndbout << dec << " s=" << blob.theState; ndbout << dec << " n=" << blob.theNullFlag;; ndbout << dec << " l=" << blob.theLength; ndbout << dec << " p=" << blob.thePos; diff --git a/ndb/src/ndbapi/NdbConnection.cpp b/ndb/src/ndbapi/NdbConnection.cpp index 6f9dbd23372..db6201ee9bb 100644 --- a/ndb/src/ndbapi/NdbConnection.cpp +++ b/ndb/src/ndbapi/NdbConnection.cpp @@ -89,7 +89,8 @@ NdbConnection::NdbConnection( Ndb* aNdb ) : // Scan operations theScanningOp(NULL), theBuddyConPtr(0xFFFFFFFF), - theBlobFlag(false) + theBlobFlag(false), + thePendingBlobOps(0) { theListState = NotInList; theError.code = 0; @@ -150,6 +151,7 @@ NdbConnection::init() theBuddyConPtr = 0xFFFFFFFF; // theBlobFlag = false; + thePendingBlobOps = 0; }//NdbConnection::init() /***************************************************************************** @@ -269,26 +271,34 @@ NdbConnection::execute(ExecType aTypeOfExec, if (! theBlobFlag) return executeNoBlobs(aTypeOfExec, abortOption, forceSend); - // execute prepared ops in batches, as requested by blobs + /* + * execute prepared ops in batches, as requested by blobs + * - blob error does not terminate execution + * - blob error sets error on operation + * - if error on operation skip blob calls + */ ExecType tExecType; NdbOperation* tPrepOp; + int ret = 0; do { tExecType = aTypeOfExec; tPrepOp = theFirstOpInList; while (tPrepOp != NULL) { - bool batch = false; - NdbBlob* tBlob = tPrepOp->theBlobList; - while (tBlob != NULL) { - if (tBlob->preExecute(tExecType, batch) == -1) - return -1; - tBlob = tBlob->theNext; - } - if (batch) { - // blob asked to execute all up to here now - tExecType = NoCommit; - break; + if (tPrepOp->theError.code == 0) { + bool batch = false; + NdbBlob* tBlob = tPrepOp->theBlobList; + while (tBlob != NULL) { + if (tBlob->preExecute(tExecType, batch) == -1) + ret = -1; + tBlob = tBlob->theNext; + } + if (batch) { + // blob asked to execute all up to here now + tExecType = NoCommit; + break; + } } tPrepOp = tPrepOp->next(); } @@ -304,26 +314,30 @@ NdbConnection::execute(ExecType aTypeOfExec, if (tExecType == Commit) { NdbOperation* tOp = theCompletedFirstOp; while (tOp != NULL) { - NdbBlob* tBlob = tOp->theBlobList; - while (tBlob != NULL) { - if (tBlob->preCommit() == -1) - return -1; - tBlob = tBlob->theNext; + if (tOp->theError.code == 0) { + NdbBlob* tBlob = tOp->theBlobList; + while (tBlob != NULL) { + if (tBlob->preCommit() == -1) + ret = -1; + tBlob = tBlob->theNext; + } } tOp = tOp->next(); } } if (executeNoBlobs(tExecType, abortOption, forceSend) == -1) - return -1; + ret = -1; { NdbOperation* tOp = theCompletedFirstOp; while (tOp != NULL) { - NdbBlob* tBlob = tOp->theBlobList; - while (tBlob != NULL) { - // may add new operations if batch - if (tBlob->postExecute(tExecType) == -1) - return -1; - tBlob = tBlob->theNext; + if (tOp->theError.code == 0) { + NdbBlob* tBlob = tOp->theBlobList; + while (tBlob != NULL) { + // may add new operations if batch + if (tBlob->postExecute(tExecType) == -1) + ret = -1; + tBlob = tBlob->theNext; + } } tOp = tOp->next(); } @@ -338,7 +352,7 @@ NdbConnection::execute(ExecType aTypeOfExec, } } while (theFirstOpInList != NULL || tExecType != aTypeOfExec); - return 0; + return ret; } int @@ -397,6 +411,7 @@ NdbConnection::executeNoBlobs(ExecType aTypeOfExec, break; } } + thePendingBlobOps = 0; return 0; }//NdbConnection::execute() diff --git a/ndb/src/ndbapi/NdbDictionary.cpp b/ndb/src/ndbapi/NdbDictionary.cpp index 413ad0745db..d5a16546071 100644 --- a/ndb/src/ndbapi/NdbDictionary.cpp +++ b/ndb/src/ndbapi/NdbDictionary.cpp @@ -806,73 +806,90 @@ NdbDictionary::Dictionary::getNdbError() const { return m_impl.getNdbError(); } -NdbOut& operator <<(NdbOut& ndbout, const NdbDictionary::Column::Type type) +// printers + +NdbOut& +operator<<(NdbOut& out, const NdbDictionary::Column& col) { - switch(type){ - case NdbDictionary::Column::Bigunsigned: - ndbout << "Bigunsigned"; - break; - case NdbDictionary::Column::Unsigned: - ndbout << "Unsigned"; - break; - case NdbDictionary::Column::Smallunsigned: - ndbout << "Smallunsigned"; + out << col.getName() << " "; + switch (col.getType()) { + case NdbDictionary::Column::Tinyint: + out << "Tinyint"; break; case NdbDictionary::Column::Tinyunsigned: - ndbout << "Tinyunsigned"; - break; - case NdbDictionary::Column::Bigint: - ndbout << "Bigint"; - break; - case NdbDictionary::Column::Int: - ndbout << "Int"; + out << "Tinyunsigned"; break; case NdbDictionary::Column::Smallint: - ndbout << "Smallint"; + out << "Smallint"; break; - case NdbDictionary::Column::Tinyint: - ndbout << "Tinyint"; - break; - case NdbDictionary::Column::Char: - ndbout << "Char"; - break; - case NdbDictionary::Column::Varchar: - ndbout << "Varchar"; - break; - case NdbDictionary::Column::Float: - ndbout << "Float"; - break; - case NdbDictionary::Column::Double: - ndbout << "Double"; + case NdbDictionary::Column::Smallunsigned: + out << "Smallunsigned"; break; case NdbDictionary::Column::Mediumint: - ndbout << "Mediumint"; + out << "Mediumint"; break; case NdbDictionary::Column::Mediumunsigned: - ndbout << "Mediumunsigend"; + out << "Mediumunsigned"; break; - case NdbDictionary::Column::Binary: - ndbout << "Binary"; + case NdbDictionary::Column::Int: + out << "Int"; break; - case NdbDictionary::Column::Varbinary: - ndbout << "Varbinary"; + case NdbDictionary::Column::Unsigned: + out << "Unsigned"; + break; + case NdbDictionary::Column::Bigint: + out << "Bigint"; + break; + case NdbDictionary::Column::Bigunsigned: + out << "Bigunsigned"; + break; + case NdbDictionary::Column::Float: + out << "Float"; + break; + case NdbDictionary::Column::Double: + out << "Double"; break; case NdbDictionary::Column::Decimal: - ndbout << "Decimal"; + out << "Decimal(" << col.getScale() << "," << col.getPrecision() << ")"; + break; + case NdbDictionary::Column::Char: + out << "Char(" << col.getLength() << ")"; + break; + case NdbDictionary::Column::Varchar: + out << "Varchar(" << col.getLength() << ")"; + break; + case NdbDictionary::Column::Binary: + out << "Binary(" << col.getLength() << ")"; + break; + case NdbDictionary::Column::Varbinary: + out << "Varbinary(" << col.getLength() << ")"; + break; + case NdbDictionary::Column::Datetime: + out << "Datetime"; break; case NdbDictionary::Column::Timespec: - ndbout << "Timespec"; + out << "Timespec"; break; case NdbDictionary::Column::Blob: - ndbout << "Blob"; + out << "Blob(" << col.getInlineSize() << "," << col.getPartSize() + << ";" << col.getStripeSize() << ")"; + break; + case NdbDictionary::Column::Text: + out << "Text(" << col.getInlineSize() << "," << col.getPartSize() + << ";" << col.getStripeSize() << ")"; break; case NdbDictionary::Column::Undefined: - ndbout << "Undefined"; + out << "Undefined"; break; default: - ndbout << "Unknown type=" << (Uint32)type; + out << "Type" << (Uint32)col.getType(); break; } - - return ndbout; + if (col.getPrimaryKey()) + out << " PRIMARY KEY"; + else if (! col.getNullable()) + out << " NOT NULL"; + else + out << " NULL"; + return out; } diff --git a/ndb/src/ndbapi/NdbDictionaryImpl.cpp b/ndb/src/ndbapi/NdbDictionaryImpl.cpp index 71a51efde70..f1091ad5fb3 100644 --- a/ndb/src/ndbapi/NdbDictionaryImpl.cpp +++ b/ndb/src/ndbapi/NdbDictionaryImpl.cpp @@ -181,7 +181,7 @@ NdbColumnImpl::equal(const NdbColumnImpl& col) const case NdbDictionary::Column::Timespec: break; case NdbDictionary::Column::Blob: - case NdbDictionary::Column::Clob: + case NdbDictionary::Column::Text: if (m_precision != col.m_precision || m_scale != col.m_scale || m_length != col.m_length) { @@ -1088,7 +1088,7 @@ columnTypeMapping[] = { { DictTabInfo::ExtDatetime, NdbDictionary::Column::Datetime }, { DictTabInfo::ExtTimespec, NdbDictionary::Column::Timespec }, { DictTabInfo::ExtBlob, NdbDictionary::Column::Blob }, - { DictTabInfo::ExtClob, NdbDictionary::Column::Clob }, + { DictTabInfo::ExtText, NdbDictionary::Column::Text }, { -1, -1 } }; @@ -1253,7 +1253,7 @@ NdbDictionaryImpl::createBlobTables(NdbTableImpl &t) { for (unsigned i = 0; i < t.m_columns.size(); i++) { NdbColumnImpl & c = *t.m_columns[i]; - if (! c.getBlobType()) + if (! c.getBlobType() || c.getPartSize() == 0) continue; NdbTableImpl bt; NdbBlob::getBlobTable(bt, &t, &c); @@ -1622,7 +1622,7 @@ NdbDictionaryImpl::dropBlobTables(NdbTableImpl & t) { for (unsigned i = 0; i < t.m_columns.size(); i++) { NdbColumnImpl & c = *t.m_columns[i]; - if (! c.getBlobType()) + if (! c.getBlobType() || c.getPartSize() == 0) continue; char btname[NdbBlob::BlobTableNameSize]; NdbBlob::getBlobTableName(btname, &t, &c); diff --git a/ndb/src/ndbapi/NdbDictionaryImpl.hpp b/ndb/src/ndbapi/NdbDictionaryImpl.hpp index 5851c199893..85d334416ce 100644 --- a/ndb/src/ndbapi/NdbDictionaryImpl.hpp +++ b/ndb/src/ndbapi/NdbDictionaryImpl.hpp @@ -441,7 +441,7 @@ inline bool NdbColumnImpl::getBlobType() const { return (m_type == NdbDictionary::Column::Blob || - m_type == NdbDictionary::Column::Clob); + m_type == NdbDictionary::Column::Text); } inline diff --git a/ndb/src/ndbapi/NdbRecAttr.cpp b/ndb/src/ndbapi/NdbRecAttr.cpp index 99a7c368af7..2e753f13006 100644 --- a/ndb/src/ndbapi/NdbRecAttr.cpp +++ b/ndb/src/ndbapi/NdbRecAttr.cpp @@ -29,6 +29,7 @@ Adjust: 971206 UABRONM First version #include #include #include +#include #include "NdbDictionaryImpl.hpp" #include @@ -147,78 +148,100 @@ NdbRecAttr::receive_data(const Uint32 * data, Uint32 sz){ return false; } -NdbOut& operator<<(NdbOut& ndbout, const NdbRecAttr &r) +NdbOut& operator<<(NdbOut& out, const NdbRecAttr &r) { if (r.isNULL()) { - ndbout << "[NULL]"; - return ndbout; + out << "[NULL]"; + return out; } if (r.arraySize() > 1) - ndbout << "["; + out << "["; for (Uint32 j = 0; j < r.arraySize(); j++) { if (j > 0) - ndbout << " "; + out << " "; switch(r.getType()) { case NdbDictionary::Column::Bigunsigned: - ndbout << r.u_64_value(); + out << r.u_64_value(); break; case NdbDictionary::Column::Unsigned: - ndbout << r.u_32_value(); + out << r.u_32_value(); break; case NdbDictionary::Column::Smallunsigned: - ndbout << r.u_short_value(); + out << r.u_short_value(); break; case NdbDictionary::Column::Tinyunsigned: - ndbout << (unsigned) r.u_char_value(); + out << (unsigned) r.u_char_value(); break; case NdbDictionary::Column::Bigint: - ndbout << r.int64_value(); + out << r.int64_value(); break; case NdbDictionary::Column::Int: - ndbout << r.int32_value(); + out << r.int32_value(); break; case NdbDictionary::Column::Smallint: - ndbout << r.short_value(); + out << r.short_value(); break; case NdbDictionary::Column::Tinyint: - ndbout << (int) r.char_value(); + out << (int) r.char_value(); break; case NdbDictionary::Column::Char: - ndbout.print("%.*s", r.arraySize(), r.aRef()); + out.print("%.*s", r.arraySize(), r.aRef()); j = r.arraySize(); break; case NdbDictionary::Column::Varchar: { short len = ntohs(r.u_short_value()); - ndbout.print("%.*s", len, r.aRef()+2); + out.print("%.*s", len, r.aRef()+2); } j = r.arraySize(); break; case NdbDictionary::Column::Float: - ndbout << r.float_value(); + out << r.float_value(); break; case NdbDictionary::Column::Double: - ndbout << r.double_value(); + out << r.double_value(); break; + case NdbDictionary::Column::Blob: + { + const NdbBlob::Head* h = (const NdbBlob::Head*)r.aRef(); + out << h->length << ":"; + const unsigned char* p = (const unsigned char*)(h + 1); + unsigned n = r.arraySize() - sizeof(*h); + for (unsigned k = 0; k < n && k < h->length; k++) + out.print("%02X", (int)p[k]); + j = r.arraySize(); + } + break; + case NdbDictionary::Column::Text: + { + const NdbBlob::Head* h = (const NdbBlob::Head*)r.aRef(); + out << h->length << ":"; + const unsigned char* p = (const unsigned char*)(h + 1); + unsigned n = r.arraySize() - sizeof(*h); + for (unsigned k = 0; k < n && k < h->length; k++) + out.print("%c", (int)p[k]); + j = r.arraySize(); + } + break; default: /* no print functions for the rest, just print type */ - ndbout << r.getType(); + out << r.getType(); j = r.arraySize(); if (j > 1) - ndbout << " %u times" << j; + out << " " << j << " times"; break; } } if (r.arraySize() > 1) { - ndbout << "]"; + out << "]"; } - return ndbout; + return out; } diff --git a/ndb/src/ndbapi/NdbResultSet.cpp b/ndb/src/ndbapi/NdbResultSet.cpp index b286c9fd7c9..f270584d227 100644 --- a/ndb/src/ndbapi/NdbResultSet.cpp +++ b/ndb/src/ndbapi/NdbResultSet.cpp @@ -55,6 +55,13 @@ int NdbResultSet::nextResult(bool fetchAllowed) return -1; tBlob = tBlob->theNext; } + /* + * Flush blob part ops on behalf of user because + * - nextResult is analogous to execute(NoCommit) + * - user is likely to want blob value before next execute + */ + if (m_operation->m_transConnection->executePendingBlobOps() == -1) + return -1; return 0; } return res; diff --git a/ndb/test/include/NDBT_Table.hpp b/ndb/test/include/NDBT_Table.hpp index eee76773106..bf44e1eb52b 100644 --- a/ndb/test/include/NDBT_Table.hpp +++ b/ndb/test/include/NDBT_Table.hpp @@ -23,7 +23,6 @@ #include class NDBT_Attribute : public NdbDictionary::Column { - friend class NdbOut& operator <<(class NdbOut&, const NDBT_Attribute &); public: NDBT_Attribute(const char* _name, Column::Type _type, diff --git a/ndb/test/ndbapi/testBlobs.cpp b/ndb/test/ndbapi/testBlobs.cpp index b880266f8de..6ffac3028b1 100644 --- a/ndb/test/ndbapi/testBlobs.cpp +++ b/ndb/test/ndbapi/testBlobs.cpp @@ -38,6 +38,7 @@ struct Bcol { }; struct Opt { + unsigned m_batch; bool m_core; bool m_dbg; bool m_dbgall; @@ -46,7 +47,8 @@ struct Opt { unsigned m_parts; unsigned m_rows; unsigned m_seed; - char m_skip[255]; + const char* m_skip; + const char* m_style; // metadata const char* m_tname; const char* m_x1name; // hash index @@ -60,6 +62,7 @@ struct Opt { int m_bug; int (*m_bugtest)(); Opt() : + m_batch(7), m_core(false), m_dbg(false), m_dbgall(false), @@ -68,6 +71,8 @@ struct Opt { m_parts(10), m_rows(100), m_seed(0), + m_skip(""), + m_style("012"), // metadata m_tname("TBLOB1"), m_x1name("TBLOB1X1"), @@ -80,7 +85,6 @@ struct Opt { // bugs m_bug(0), m_bugtest(0) { - memset(m_skip, false, sizeof(m_skip)); } }; @@ -92,6 +96,7 @@ printusage() Opt d; ndbout << "usage: testBlobs options [default/max]" << endl + << " -batch N number of pk ops in batch [" << d.m_batch << "]" << endl << " -core dump core on error" << endl << " -dbg print debug" << endl << " -dbgall print also NDB API debug (if compiled in)" << endl @@ -101,7 +106,8 @@ printusage() << " -parts N max parts in blob value [" << d.m_parts << "]" << endl << " -rows N number of rows [" << d.m_rows << "]" << endl << " -seed N random seed 0=loop number [" << d.m_seed << "]" << endl - << " -skip xxx skip these tests (see list)" << endl + << " -skip xxx skip these tests (see list) [" << d.m_skip << endl + << " -style xxx access styles to test (see list) [" << d.m_style << "]" << endl << "metadata" << endl << " -pk2len N length of PK2 [" << d.m_pk2len << "/" << g_max_pk2len <<"]" << endl << " -oneblob only 1 blob attribute [default 2]" << endl @@ -111,8 +117,10 @@ printusage() << " s table scans" << endl << " r ordered index scans" << endl << " u update blob value" << endl - << " v getValue / setValue" << endl - << " w readData / writeData" << endl + << "access styles for -style" << endl + << " 0 getValue / setValue" << endl + << " 1 setActiveHook" << endl + << " 2 readData / writeData" << endl << "bug tests (no blob test)" << endl << " -bug 4088 ndb api hang with mixed ops on index table" << endl << " -bug 2222 delete + write gives 626" << endl @@ -122,11 +130,16 @@ printusage() static Opt g_opt; -static char& -skip(unsigned x) +static bool +skipcase(int x) { - assert(x < sizeof(g_opt.m_skip)); - return g_opt.m_skip[x]; + return strchr(g_opt.m_skip, x) != 0; +} + +static bool +skipstyle(int x) +{ + return strchr(g_opt.m_style, '0' + x) == 0; } static Ndb* g_ndb = 0; @@ -138,11 +151,12 @@ static NdbScanOperation* g_ops = 0; static NdbBlob* g_bh1 = 0; static NdbBlob* g_bh2 = 0; static bool g_printerror = true; +static unsigned g_loop = 0; static void printerror(int line, const char* msg) { - ndbout << "line " << line << ": " << msg << " failed" << endl; + ndbout << "line " << line << " FAIL " << msg << endl; if (! g_printerror) { return; } @@ -205,6 +219,7 @@ static int createTable() { NdbDictionary::Table tab(g_opt.m_tname); + tab.setLogging(false); // col PK1 - Uint32 { NdbDictionary::Column col("PK1"); col.setType(NdbDictionary::Column::Unsigned); @@ -228,11 +243,11 @@ createTable() col.setPrimaryKey(true); tab.addColumn(col); } - // col BL2 - Clob nullable + // col BL2 - Text nullable if (! g_opt.m_oneblob) { NdbDictionary::Column col("BL2"); const Bcol& b = g_opt.m_blob2; - col.setType(NdbDictionary::Column::Clob); + col.setType(NdbDictionary::Column::Text); col.setNullable(true); col.setInlineSize(b.m_inline); col.setPartSize(b.m_partsize); @@ -245,6 +260,7 @@ createTable() if (g_opt.m_pk2len != 0) { NdbDictionary::Index idx(g_opt.m_x1name); idx.setType(NdbDictionary::Index::UniqueHashIndex); + idx.setLogging(false); idx.setTable(g_opt.m_tname); idx.addColumnName("PK2"); CHK(g_dic->createIndex(idx) == 0); @@ -281,7 +297,7 @@ struct Bval { m_buf = new char [m_buflen]; trash(); } - void copy(const Bval& v) { + void copyfrom(const Bval& v) { m_len = v.m_len; delete [] m_val; if (v.m_val == 0) @@ -313,10 +329,10 @@ struct Tup { m_blob1.alloc(g_opt.m_blob1.m_inline + g_opt.m_blob1.m_partsize * g_opt.m_parts); m_blob2.alloc(g_opt.m_blob2.m_inline + g_opt.m_blob2.m_partsize * g_opt.m_parts); } - void copy(const Tup& tup) { + void copyfrom(const Tup& tup) { assert(m_pk1 == tup.m_pk1); - m_blob1.copy(tup.m_blob1); - m_blob2.copy(tup.m_blob2); + m_blob1.copyfrom(tup.m_blob1); + m_blob2.copyfrom(tup.m_blob2); } private: Tup(const Tup&); @@ -357,6 +373,14 @@ calcBval(const Bcol& b, Bval& v, bool keepsize) v.trash(); } +static void +calcBval(Tup& tup, bool keepsize) +{ + calcBval(g_opt.m_blob1, tup.m_blob1, keepsize); + if (! g_opt.m_oneblob) + calcBval(g_opt.m_blob2, tup.m_blob2, keepsize); +} + static void calcTups(bool keepsize) { @@ -371,14 +395,39 @@ calcTups(bool keepsize) tup.m_pk2[i] = 'a' + i % 26; } } - calcBval(g_opt.m_blob1, tup.m_blob1, keepsize); - if (! g_opt.m_oneblob) - calcBval(g_opt.m_blob2, tup.m_blob2, keepsize); + calcBval(tup, keepsize); } } // blob handle ops +static int +getBlobHandles(NdbOperation* opr) +{ + CHK((g_bh1 = opr->getBlobHandle("BL1")) != 0); + if (! g_opt.m_oneblob) + CHK((g_bh2 = opr->getBlobHandle("BL2")) != 0); + return 0; +} + +static int +getBlobHandles(NdbIndexOperation* opx) +{ + CHK((g_bh1 = opx->getBlobHandle("BL1")) != 0); + if (! g_opt.m_oneblob) + CHK((g_bh2 = opx->getBlobHandle("BL2")) != 0); + return 0; +} + +static int +getBlobHandles(NdbScanOperation* ops) +{ + CHK((g_bh1 = ops->getBlobHandle("BL1")) != 0); + if (! g_opt.m_oneblob) + CHK((g_bh2 = ops->getBlobHandle("BL2")) != 0); + return 0; +} + static int getBlobLength(NdbBlob* h, unsigned& len) { @@ -386,16 +435,19 @@ getBlobLength(NdbBlob* h, unsigned& len) CHK(h->getLength(len2) == 0); len = (unsigned)len2; assert(len == len2); + DBG("getBlobLength " << h->getColumn()->getName() << " len=" << len); return 0; } +// setValue / getValue + static int setBlobValue(NdbBlob* h, const Bval& v) { bool null = (v.m_val == 0); bool isNull; unsigned len; - DBG("set " << h->getColumn()->getName() << " len=" << v.m_len << " null=" << null); + DBG("setValue " << h->getColumn()->getName() << " len=" << v.m_len << " null=" << null); if (null) { CHK(h->setNull() == 0); isNull = false; @@ -409,11 +461,20 @@ setBlobValue(NdbBlob* h, const Bval& v) return 0; } +static int +setBlobValue(const Tup& tup) +{ + CHK(setBlobValue(g_bh1, tup.m_blob1) == 0); + if (! g_opt.m_oneblob) + CHK(setBlobValue(g_bh2, tup.m_blob2) == 0); + return 0; +} + static int getBlobValue(NdbBlob* h, const Bval& v) { bool null = (v.m_val == 0); - DBG("get " << h->getColumn()->getName() << " len=" << v.m_len << " null=" << null); + DBG("getValue " << h->getColumn()->getName() << " buflen=" << v.m_buflen); CHK(h->getValue(v.m_buf, v.m_buflen) == 0); return 0; } @@ -456,6 +517,8 @@ verifyBlobValue(const Tup& tup) return 0; } +// readData / writeData + static int writeBlobData(NdbBlob* h, const Bval& v) { @@ -469,6 +532,7 @@ writeBlobData(NdbBlob* h, const Bval& v) CHK(h->getNull(isNull) == 0 && isNull == true); CHK(getBlobLength(h, len) == 0 && len == 0); } else { + CHK(h->truncate(v.m_len) == 0); unsigned n = 0; do { unsigned m = g_opt.m_full ? v.m_len : urandom(v.m_len + 1); @@ -486,6 +550,15 @@ writeBlobData(NdbBlob* h, const Bval& v) return 0; } +static int +writeBlobData(const Tup& tup) +{ + CHK(writeBlobData(g_bh1, tup.m_blob1) == 0); + if (! g_opt.m_oneblob) + CHK(writeBlobData(g_bh2, tup.m_blob2) == 0); + return 0; +} + static int readBlobData(NdbBlob* h, const Bval& v) { @@ -531,6 +604,71 @@ readBlobData(const Tup& tup) return 0; } +// hooks + +static NdbBlob::ActiveHook blobWriteHook; + +static int +blobWriteHook(NdbBlob* h, void* arg) +{ + DBG("blobWriteHook"); + Bval& v = *(Bval*)arg; + CHK(writeBlobData(h, v) == 0); + return 0; +} + +static int +setBlobWriteHook(NdbBlob* h, Bval& v) +{ + DBG("setBlobWriteHook"); + CHK(h->setActiveHook(blobWriteHook, &v) == 0); + return 0; +} + +static int +setBlobWriteHook(Tup& tup) +{ + CHK(setBlobWriteHook(g_bh1, tup.m_blob1) == 0); + if (! g_opt.m_oneblob) + CHK(setBlobWriteHook(g_bh2, tup.m_blob2) == 0); + return 0; +} + +static NdbBlob::ActiveHook blobReadHook; + +// no PK yet to identify tuple so just read the value +static int +blobReadHook(NdbBlob* h, void* arg) +{ + DBG("blobReadHook"); + Bval& v = *(Bval*)arg; + unsigned len; + CHK(getBlobLength(h, len) == 0); + v.alloc(len); + Uint32 maxlen = 0xffffffff; + CHK(h->readData(v.m_buf, maxlen) == 0); + DBG("read " << maxlen << " bytes"); + CHK(len == maxlen); + return 0; +} + +static int +setBlobReadHook(NdbBlob* h, Bval& v) +{ + DBG("setBlobReadHook"); + CHK(h->setActiveHook(blobReadHook, &v) == 0); + return 0; +} + +static int +setBlobReadHook(Tup& tup) +{ + CHK(setBlobReadHook(g_bh1, tup.m_blob1) == 0); + if (! g_opt.m_oneblob) + CHK(setBlobReadHook(g_bh2, tup.m_blob2) == 0); + return 0; +} + // verify blob data static int @@ -540,7 +678,11 @@ verifyHeadInline(const Bcol& c, const Bval& v, NdbRecAttr* ra) CHK(ra->isNULL() == 1); } else { CHK(ra->isNULL() == 0); - CHK(ra->u_64_value() == v.m_len); + const NdbBlob::Head* head = (const NdbBlob::Head*)ra->aRef(); + CHK(head->length == v.m_len); + const char* data = (const char*)(head + 1); + for (unsigned i = 0; i < head->length && i < c.m_inline; i++) + CHK(data[i] == v.m_val[i]); } return 0; } @@ -548,7 +690,7 @@ verifyHeadInline(const Bcol& c, const Bval& v, NdbRecAttr* ra) static int verifyHeadInline(const Tup& tup) { - DBG("verifyHeadInline pk1=" << tup.m_pk1); + DBG("verifyHeadInline pk1=" << hex << tup.m_pk1); CHK((g_con = g_ndb->startTransaction()) != 0); CHK((g_opr = g_con->getNdbOperation(g_opt.m_tname)) != 0); CHK(g_opr->readTuple() == 0); @@ -580,7 +722,7 @@ verifyHeadInline(const Tup& tup) static int verifyBlobTable(const Bcol& b, const Bval& v, Uint32 pk1, bool exists) { - DBG("verify " << b.m_btname << " pk1=" << pk1); + DBG("verify " << b.m_btname << " pk1=" << hex << pk1); NdbRecAttr* ra_pk; NdbRecAttr* ra_part; NdbRecAttr* ra_data; @@ -640,7 +782,7 @@ verifyBlob() { for (unsigned k = 0; k < g_opt.m_rows; k++) { const Tup& tup = g_tups[k]; - DBG("verifyBlob pk1=" << tup.m_pk1); + DBG("verifyBlob pk1=" << hex << tup.m_pk1); CHK(verifyHeadInline(tup) == 0); CHK(verifyBlobTable(tup) == 0); } @@ -649,135 +791,86 @@ verifyBlob() // operations +static const char* stylename[3] = { + "style=getValue/setValue", + "style=setActiveHook", + "style=readData/writeData" +}; + +// pk ops + static int -insertPk(bool rw) +insertPk(int style) { - DBG("--- insertPk ---"); + DBG("--- insertPk " << stylename[style] << " ---"); + unsigned n = 0; + CHK((g_con = g_ndb->startTransaction()) != 0); for (unsigned k = 0; k < g_opt.m_rows; k++) { Tup& tup = g_tups[k]; - DBG("insertPk pk1=" << tup.m_pk1); - CHK((g_con = g_ndb->startTransaction()) != 0); + DBG("insertPk pk1=" << hex << tup.m_pk1); CHK((g_opr = g_con->getNdbOperation(g_opt.m_tname)) != 0); CHK(g_opr->insertTuple() == 0); CHK(g_opr->equal("PK1", tup.m_pk1) == 0); if (g_opt.m_pk2len != 0) CHK(g_opr->equal("PK2", tup.m_pk2) == 0); - CHK((g_bh1 = g_opr->getBlobHandle("BL1")) != 0); - if (! g_opt.m_oneblob) - CHK((g_bh2 = g_opr->getBlobHandle("BL2")) != 0); - if (! rw) { - CHK(setBlobValue(g_bh1, tup.m_blob1) == 0); - if (! g_opt.m_oneblob) - CHK(setBlobValue(g_bh2, tup.m_blob2) == 0); + CHK(getBlobHandles(g_opr) == 0); + if (style == 0) { + CHK(setBlobValue(tup) == 0); + } else if (style == 1) { + // non-nullable must be set + CHK(g_bh1->setValue("", 0) == 0); + CHK(setBlobWriteHook(tup) == 0); } else { // non-nullable must be set CHK(g_bh1->setValue("", 0) == 0); CHK(g_con->execute(NoCommit) == 0); - CHK(writeBlobData(g_bh1, tup.m_blob1) == 0); - if (! g_opt.m_oneblob) - CHK(writeBlobData(g_bh2, tup.m_blob2) == 0); + CHK(writeBlobData(tup) == 0); } - CHK(g_con->execute(Commit) == 0); - g_ndb->closeTransaction(g_con); - g_opr = 0; - g_con = 0; - tup.m_exists = true; - } - return 0; -} - -static int -updatePk(bool rw) -{ - DBG("--- updatePk ---"); - for (unsigned k = 0; k < g_opt.m_rows; k++) { - Tup& tup = g_tups[k]; - DBG("updatePk pk1=" << tup.m_pk1); - CHK((g_con = g_ndb->startTransaction()) != 0); - CHK((g_opr = g_con->getNdbOperation(g_opt.m_tname)) != 0); - CHK(g_opr->updateTuple() == 0); - CHK(g_opr->equal("PK1", tup.m_pk1) == 0); - if (g_opt.m_pk2len != 0) - CHK(g_opr->equal("PK2", tup.m_pk2) == 0); - CHK((g_bh1 = g_opr->getBlobHandle("BL1")) != 0); - if (! g_opt.m_oneblob) - CHK((g_bh2 = g_opr->getBlobHandle("BL2")) != 0); - if (! rw) { - CHK(setBlobValue(g_bh1, tup.m_blob1) == 0); - if (! g_opt.m_oneblob) - CHK(setBlobValue(g_bh2, tup.m_blob2) == 0); - } else { + // just another trap + if (urandom(10) == 0) CHK(g_con->execute(NoCommit) == 0); - CHK(writeBlobData(g_bh1, tup.m_blob1) == 0); - if (! g_opt.m_oneblob) - CHK(writeBlobData(g_bh2, tup.m_blob2) == 0); + if (++n == g_opt.m_batch) { + CHK(g_con->execute(Commit) == 0); + g_ndb->closeTransaction(g_con); + CHK((g_con = g_ndb->startTransaction()) != 0); + n = 0; } - CHK(g_con->execute(Commit) == 0); - g_ndb->closeTransaction(g_con); g_opr = 0; - g_con = 0; tup.m_exists = true; } - return 0; -} - -static int -updateIdx(bool rw) -{ - DBG("--- updateIdx ---"); - for (unsigned k = 0; k < g_opt.m_rows; k++) { - Tup& tup = g_tups[k]; - DBG("updateIdx pk1=" << tup.m_pk1); - CHK((g_con = g_ndb->startTransaction()) != 0); - CHK((g_opx = g_con->getNdbIndexOperation(g_opt.m_x1name, g_opt.m_tname)) != 0); - CHK(g_opx->updateTuple() == 0); - CHK(g_opx->equal("PK2", tup.m_pk2) == 0); - CHK((g_bh1 = g_opx->getBlobHandle("BL1")) != 0); - if (! g_opt.m_oneblob) - CHK((g_bh2 = g_opx->getBlobHandle("BL2")) != 0); - if (! rw) { - CHK(setBlobValue(g_bh1, tup.m_blob1) == 0); - if (! g_opt.m_oneblob) - CHK(setBlobValue(g_bh2, tup.m_blob2) == 0); - } else { - CHK(g_con->execute(NoCommit) == 0); - CHK(writeBlobData(g_bh1, tup.m_blob1) == 0); - if (! g_opt.m_oneblob) - CHK(writeBlobData(g_bh2, tup.m_blob2) == 0); - } + if (n != 0) { CHK(g_con->execute(Commit) == 0); - g_ndb->closeTransaction(g_con); - g_opx = 0; - g_con = 0; - tup.m_exists = true; + n = 0; } + g_ndb->closeTransaction(g_con); + g_con = 0; return 0; } static int -readPk(bool rw) +readPk(int style) { - DBG("--- readPk ---"); + DBG("--- readPk " << stylename[style] << " ---"); for (unsigned k = 0; k < g_opt.m_rows; k++) { Tup& tup = g_tups[k]; - DBG("readPk pk1=" << tup.m_pk1); + DBG("readPk pk1=" << hex << tup.m_pk1); CHK((g_con = g_ndb->startTransaction()) != 0); CHK((g_opr = g_con->getNdbOperation(g_opt.m_tname)) != 0); CHK(g_opr->readTuple() == 0); CHK(g_opr->equal("PK1", tup.m_pk1) == 0); if (g_opt.m_pk2len != 0) CHK(g_opr->equal("PK2", tup.m_pk2) == 0); - CHK((g_bh1 = g_opr->getBlobHandle("BL1")) != 0); - if (! g_opt.m_oneblob) - CHK((g_bh2 = g_opr->getBlobHandle("BL2")) != 0); - if (! rw) { + CHK(getBlobHandles(g_opr) == 0); + if (style == 0) { CHK(getBlobValue(tup) == 0); + } else if (style == 1) { + CHK(setBlobReadHook(tup) == 0); } else { CHK(g_con->execute(NoCommit) == 0); CHK(readBlobData(tup) == 0); } CHK(g_con->execute(Commit) == 0); - if (! rw) { + if (style == 0 || style == 1) { CHK(verifyBlobValue(tup) == 0); } g_ndb->closeTransaction(g_con); @@ -788,94 +881,43 @@ readPk(bool rw) } static int -readIdx(bool rw) +updatePk(int style) { - DBG("--- readIdx ---"); + DBG("--- updatePk " << stylename[style] << " ---"); for (unsigned k = 0; k < g_opt.m_rows; k++) { Tup& tup = g_tups[k]; - DBG("readIdx pk1=" << tup.m_pk1); + DBG("updatePk pk1=" << hex << tup.m_pk1); CHK((g_con = g_ndb->startTransaction()) != 0); - CHK((g_opx = g_con->getNdbIndexOperation(g_opt.m_x1name, g_opt.m_tname)) != 0); - CHK(g_opx->readTuple() == 0); - CHK(g_opx->equal("PK2", tup.m_pk2) == 0); - CHK((g_bh1 = g_opx->getBlobHandle("BL1")) != 0); - if (! g_opt.m_oneblob) - CHK((g_bh2 = g_opx->getBlobHandle("BL2")) != 0); - if (! rw) { - CHK(getBlobValue(tup) == 0); + CHK((g_opr = g_con->getNdbOperation(g_opt.m_tname)) != 0); + CHK(g_opr->updateTuple() == 0); + CHK(g_opr->equal("PK1", tup.m_pk1) == 0); + if (g_opt.m_pk2len != 0) + CHK(g_opr->equal("PK2", tup.m_pk2) == 0); + CHK(getBlobHandles(g_opr) == 0); + if (style == 0) { + CHK(setBlobValue(tup) == 0); + } else if (style == 1) { + CHK(setBlobWriteHook(tup) == 0); } else { CHK(g_con->execute(NoCommit) == 0); - CHK(readBlobData(tup) == 0); + CHK(writeBlobData(tup) == 0); } CHK(g_con->execute(Commit) == 0); - if (! rw) { - CHK(verifyBlobValue(tup) == 0); - } g_ndb->closeTransaction(g_con); - g_opx = 0; + g_opr = 0; g_con = 0; + tup.m_exists = true; } return 0; } -static int -readScan(bool rw, bool idx) -{ - const char* func = ! idx ? "scan read table" : "scan read index"; - DBG("--- " << func << " ---"); - Tup tup; - tup.alloc(); // allocate buffers - NdbResultSet* rs; - CHK((g_con = g_ndb->startTransaction()) != 0); - if (! idx) { - CHK((g_ops = g_con->getNdbScanOperation(g_opt.m_tname)) != 0); - } else { - CHK((g_ops = g_con->getNdbIndexScanOperation(g_opt.m_x2name, g_opt.m_tname)) != 0); - } - CHK((rs = g_ops->readTuples(NdbScanOperation::LM_Exclusive)) != 0); - CHK(g_ops->getValue("PK1", (char*)&tup.m_pk1) != 0); - if (g_opt.m_pk2len != 0) - CHK(g_ops->getValue("PK2", tup.m_pk2) != 0); - CHK((g_bh1 = g_ops->getBlobHandle("BL1")) != 0); - if (! g_opt.m_oneblob) - CHK((g_bh2 = g_ops->getBlobHandle("BL2")) != 0); - if (! rw) { - CHK(getBlobValue(tup) == 0); - } - CHK(g_con->execute(NoCommit) == 0); - unsigned rows = 0; - while (1) { - int ret; - tup.m_pk1 = (Uint32)-1; - memset(tup.m_pk2, 'x', g_opt.m_pk2len); - CHK((ret = rs->nextResult(true)) == 0 || ret == 1); - if (ret == 1) - break; - DBG(func << " pk1=" << tup.m_pk1); - Uint32 k = tup.m_pk1 - g_opt.m_pk1off; - CHK(k < g_opt.m_rows && g_tups[k].m_exists); - tup.copy(g_tups[k]); - if (! rw) { - CHK(verifyBlobValue(tup) == 0); - } else { - CHK(readBlobData(tup) == 0); - } - rows++; - } - g_ndb->closeTransaction(g_con); - g_con = 0; - g_ops = 0; - CHK(g_opt.m_rows == rows); - return 0; -} - static int deletePk() { DBG("--- deletePk ---"); for (unsigned k = 0; k < g_opt.m_rows; k++) { Tup& tup = g_tups[k]; - DBG("deletePk pk1=" << tup.m_pk1); + DBG("deletePk pk1=" << hex << tup.m_pk1); CHK((g_con = g_ndb->startTransaction()) != 0); CHK((g_opr = g_con->getNdbOperation(g_opt.m_tname)) != 0); CHK(g_opr->deleteTuple() == 0); @@ -891,13 +933,75 @@ deletePk() return 0; } +// hash index ops + +static int +readIdx(int style) +{ + DBG("--- readIdx " << stylename[style] << " ---"); + for (unsigned k = 0; k < g_opt.m_rows; k++) { + Tup& tup = g_tups[k]; + DBG("readIdx pk1=" << hex << tup.m_pk1); + CHK((g_con = g_ndb->startTransaction()) != 0); + CHK((g_opx = g_con->getNdbIndexOperation(g_opt.m_x1name, g_opt.m_tname)) != 0); + CHK(g_opx->readTuple() == 0); + CHK(g_opx->equal("PK2", tup.m_pk2) == 0); + CHK(getBlobHandles(g_opx) == 0); + if (style == 0) { + CHK(getBlobValue(tup) == 0); + } else if (style == 1) { + CHK(setBlobReadHook(tup) == 0); + } else { + CHK(g_con->execute(NoCommit) == 0); + CHK(readBlobData(tup) == 0); + } + CHK(g_con->execute(Commit) == 0); + if (style == 0 || style == 1) { + CHK(verifyBlobValue(tup) == 0); + } + g_ndb->closeTransaction(g_con); + g_opx = 0; + g_con = 0; + } + return 0; +} + +static int +updateIdx(int style) +{ + DBG("--- updateIdx " << stylename[style] << " ---"); + for (unsigned k = 0; k < g_opt.m_rows; k++) { + Tup& tup = g_tups[k]; + DBG("updateIdx pk1=" << hex << tup.m_pk1); + CHK((g_con = g_ndb->startTransaction()) != 0); + CHK((g_opx = g_con->getNdbIndexOperation(g_opt.m_x1name, g_opt.m_tname)) != 0); + CHK(g_opx->updateTuple() == 0); + CHK(g_opx->equal("PK2", tup.m_pk2) == 0); + CHK(getBlobHandles(g_opx) == 0); + if (style == 0) { + CHK(setBlobValue(tup) == 0); + } else if (style == 1) { + CHK(setBlobWriteHook(tup) == 0); + } else { + CHK(g_con->execute(NoCommit) == 0); + CHK(writeBlobData(tup) == 0); + } + CHK(g_con->execute(Commit) == 0); + g_ndb->closeTransaction(g_con); + g_opx = 0; + g_con = 0; + tup.m_exists = true; + } + return 0; +} + static int deleteIdx() { DBG("--- deleteIdx ---"); for (unsigned k = 0; k < g_opt.m_rows; k++) { Tup& tup = g_tups[k]; - DBG("deleteIdx pk1=" << tup.m_pk1); + DBG("deleteIdx pk1=" << hex << tup.m_pk1); CHK((g_con = g_ndb->startTransaction()) != 0); CHK((g_opx = g_con->getNdbIndexOperation(g_opt.m_x1name, g_opt.m_tname)) != 0); CHK(g_opx->deleteTuple() == 0); @@ -911,11 +1015,119 @@ deleteIdx() return 0; } +// scan ops table and index + +static int +readScan(int style, bool idx) +{ + DBG("--- " << "readScan" << (idx ? "Idx" : "") << " " << stylename[style] << " ---"); + Tup tup; + tup.alloc(); // allocate buffers + NdbResultSet* rs; + CHK((g_con = g_ndb->startTransaction()) != 0); + if (! idx) { + CHK((g_ops = g_con->getNdbScanOperation(g_opt.m_tname)) != 0); + } else { + CHK((g_ops = g_con->getNdbIndexScanOperation(g_opt.m_x2name, g_opt.m_tname)) != 0); + } + CHK((rs = g_ops->readTuples(NdbScanOperation::LM_Exclusive)) != 0); + CHK(g_ops->getValue("PK1", (char*)&tup.m_pk1) != 0); + if (g_opt.m_pk2len != 0) + CHK(g_ops->getValue("PK2", tup.m_pk2) != 0); + CHK(getBlobHandles(g_ops) == 0); + if (style == 0) { + CHK(getBlobValue(tup) == 0); + } else if (style == 1) { + CHK(setBlobReadHook(tup) == 0); + } + CHK(g_con->execute(NoCommit) == 0); + unsigned rows = 0; + while (1) { + int ret; + tup.m_pk1 = (Uint32)-1; + memset(tup.m_pk2, 'x', g_opt.m_pk2len); + CHK((ret = rs->nextResult(true)) == 0 || ret == 1); + if (ret == 1) + break; + DBG("readScan" << (idx ? "Idx" : "") << " pk1=" << hex << tup.m_pk1); + Uint32 k = tup.m_pk1 - g_opt.m_pk1off; + CHK(k < g_opt.m_rows && g_tups[k].m_exists); + tup.copyfrom(g_tups[k]); + if (style == 0) { + CHK(verifyBlobValue(tup) == 0); + } else if (style == 1) { + // execute ops generated by callbacks, if any + CHK(verifyBlobValue(tup) == 0); + } else { + CHK(readBlobData(tup) == 0); + } + rows++; + } + g_ndb->closeTransaction(g_con); + g_con = 0; + g_ops = 0; + CHK(g_opt.m_rows == rows); + return 0; +} + +static int +updateScan(int style, bool idx) +{ + DBG("--- " << "updateScan" << (idx ? "Idx" : "") << " " << stylename[style] << " ---"); + Tup tup; + tup.alloc(); // allocate buffers + NdbResultSet* rs; + CHK((g_con = g_ndb->startTransaction()) != 0); + if (! idx) { + CHK((g_ops = g_con->getNdbScanOperation(g_opt.m_tname)) != 0); + } else { + CHK((g_ops = g_con->getNdbIndexScanOperation(g_opt.m_x2name, g_opt.m_tname)) != 0); + } + CHK((rs = g_ops->readTuples(NdbScanOperation::LM_Exclusive)) != 0); + CHK(g_ops->getValue("PK1", (char*)&tup.m_pk1) != 0); + if (g_opt.m_pk2len != 0) + CHK(g_ops->getValue("PK2", tup.m_pk2) != 0); + CHK(g_con->execute(NoCommit) == 0); + unsigned rows = 0; + while (1) { + int ret; + tup.m_pk1 = (Uint32)-1; + memset(tup.m_pk2, 'x', g_opt.m_pk2len); + CHK((ret = rs->nextResult(true)) == 0 || ret == 1); + if (ret == 1) + break; + DBG("updateScan" << (idx ? "Idx" : "") << " pk1=" << hex << tup.m_pk1); + Uint32 k = tup.m_pk1 - g_opt.m_pk1off; + CHK(k < g_opt.m_rows && g_tups[k].m_exists); + // calculate new blob values + calcBval(g_tups[k], false); + tup.copyfrom(g_tups[k]); + CHK((g_opr = rs->updateTuple()) != 0); + CHK(getBlobHandles(g_opr) == 0); + if (style == 0) { + CHK(setBlobValue(tup) == 0); + } else if (style == 1) { + CHK(setBlobWriteHook(tup) == 0); + } else { + CHK(g_con->execute(NoCommit) == 0); + CHK(writeBlobData(tup) == 0); + } + CHK(g_con->execute(NoCommit) == 0); + g_opr = 0; + rows++; + } + CHK(g_con->execute(Commit) == 0); + g_ndb->closeTransaction(g_con); + g_con = 0; + g_ops = 0; + CHK(g_opt.m_rows == rows); + return 0; +} + static int deleteScan(bool idx) { - const char* func = ! idx ? "scan delete table" : "scan delete index"; - DBG("--- " << func << " ---"); + DBG("--- " << "deleteScan" << (idx ? "Idx" : "") << " ---"); Tup tup; NdbResultSet* rs; CHK((g_con = g_ndb->startTransaction()) != 0); @@ -937,7 +1149,7 @@ deleteScan(bool idx) CHK((ret = rs->nextResult()) == 0 || ret == 1); if (ret == 1) break; - DBG(func << " pk1=" << tup.m_pk1); + DBG("deleteScan" << (idx ? "Idx" : "") << " pk1=" << hex << tup.m_pk1); CHK(rs->deleteTuple() == 0); CHK(g_con->execute(NoCommit) == 0); Uint32 k = tup.m_pk1 - g_opt.m_pk1off; @@ -948,7 +1160,6 @@ deleteScan(bool idx) CHK(g_con->execute(Commit) == 0); g_ndb->closeTransaction(g_con); g_con = 0; - g_opr = 0; g_ops = 0; CHK(g_opt.m_rows == rows); return 0; @@ -981,69 +1192,75 @@ testmain() } if (g_opt.m_seed != 0) srandom(g_opt.m_seed); - for (unsigned loop = 0; g_opt.m_loop == 0 || loop < g_opt.m_loop; loop++) { - DBG("=== loop " << loop << " ==="); + for (g_loop = 0; g_opt.m_loop == 0 || g_loop < g_opt.m_loop; g_loop++) { + DBG("=== loop " << g_loop << " ==="); if (g_opt.m_seed == 0) - srandom(loop); - bool llim = skip('v') ? true : false; - bool ulim = skip('w') ? false : true; + srandom(g_loop); // pk - for (int rw = llim; rw <= ulim; rw++) { - if (skip('k')) + for (int style = 0; style <= 2; style++) { + if (skipcase('k') || skipstyle(style)) continue; - DBG("--- pk ops " << (! rw ? "get/set" : "read/write") << " ---"); + DBG("--- pk ops " << stylename[style] << " ---"); calcTups(false); - CHK(insertPk(rw) == 0); + CHK(insertPk(style) == 0); CHK(verifyBlob() == 0); - CHK(readPk(rw) == 0); - if (! skip('u')) { - calcTups(rw); - CHK(updatePk(rw) == 0); + CHK(readPk(style) == 0); + if (! skipcase('u')) { + calcTups(style); + CHK(updatePk(style) == 0); CHK(verifyBlob() == 0); } - CHK(readPk(rw) == 0); + CHK(readPk(style) == 0); CHK(deletePk() == 0); CHK(verifyBlob() == 0); } // hash index - for (int rw = llim; rw <= ulim; rw++) { - if (skip('i')) + for (int style = 0; style <= 2; style++) { + if (skipcase('i') || skipstyle(style)) continue; - DBG("--- idx ops " << (! rw ? "get/set" : "read/write") << " ---"); + DBG("--- idx ops " << stylename[style] << " ---"); calcTups(false); - CHK(insertPk(rw) == 0); + CHK(insertPk(style) == 0); CHK(verifyBlob() == 0); - CHK(readIdx(rw) == 0); - calcTups(rw); - if (! skip('u')) { - CHK(updateIdx(rw) == 0); + CHK(readIdx(style) == 0); + calcTups(style); + if (! skipcase('u')) { + CHK(updateIdx(style) == 0); CHK(verifyBlob() == 0); - CHK(readIdx(rw) == 0); + CHK(readIdx(style) == 0); } CHK(deleteIdx() == 0); CHK(verifyBlob() == 0); } // scan table - for (int rw = llim; rw <= ulim; rw++) { - if (skip('s')) + for (int style = 0; style <= 2; style++) { + if (skipcase('s') || skipstyle(style)) continue; - DBG("--- table scan " << (! rw ? "get/set" : "read/write") << " ---"); + DBG("--- table scan " << stylename[style] << " ---"); calcTups(false); - CHK(insertPk(rw) == 0); + CHK(insertPk(style) == 0); CHK(verifyBlob() == 0); - CHK(readScan(rw, false) == 0); + CHK(readScan(style, false) == 0); + if (! skipcase('u')) { + CHK(updateScan(style, false) == 0); + CHK(verifyBlob() == 0); + } CHK(deleteScan(false) == 0); CHK(verifyBlob() == 0); } // scan index - for (int rw = llim; rw <= ulim; rw++) { - if (skip('r')) + for (int style = 0; style <= 2; style++) { + if (skipcase('r') || skipstyle(style)) continue; - DBG("--- index scan " << (! rw ? "get/set" : "read/write") << " ---"); + DBG("--- index scan " << stylename[style] << " ---"); calcTups(false); - CHK(insertPk(rw) == 0); + CHK(insertPk(style) == 0); CHK(verifyBlob() == 0); - CHK(readScan(rw, true) == 0); + CHK(readScan(style, true) == 0); + if (! skipcase('u')) { + CHK(updateScan(style, true) == 0); + CHK(verifyBlob() == 0); + } CHK(deleteScan(true) == 0); CHK(verifyBlob() == 0); } @@ -1121,6 +1338,12 @@ NDB_COMMAND(testOdbcDriver, "testBlobs", "testBlobs", "testBlobs", 65535) { while (++argv, --argc > 0) { const char* arg = argv[0]; + if (strcmp(arg, "-batch") == 0) { + if (++argv, --argc > 0) { + g_opt.m_batch = atoi(argv[0]); + continue; + } + } if (strcmp(arg, "-core") == 0) { g_opt.m_core = true; continue; @@ -1165,9 +1388,13 @@ NDB_COMMAND(testOdbcDriver, "testBlobs", "testBlobs", "testBlobs", 65535) } if (strcmp(arg, "-skip") == 0) { if (++argv, --argc > 0) { - for (const char* p = argv[0]; *p != 0; p++) { - skip(*p) = true; - } + g_opt.m_skip = strdup(argv[0]); + continue; + } + } + if (strcmp(arg, "-style") == 0) { + if (++argv, --argc > 0) { + g_opt.m_style = strdup(argv[0]); continue; } } @@ -1175,10 +1402,6 @@ NDB_COMMAND(testOdbcDriver, "testBlobs", "testBlobs", "testBlobs", 65535) if (strcmp(arg, "-pk2len") == 0) { if (++argv, --argc > 0) { g_opt.m_pk2len = atoi(argv[0]); - if (g_opt.m_pk2len == 0) { - skip('i') = true; - skip('r') = true; - } if (g_opt.m_pk2len <= g_max_pk2len) continue; } @@ -1205,7 +1428,15 @@ NDB_COMMAND(testOdbcDriver, "testBlobs", "testBlobs", "testBlobs", 65535) printusage(); return NDBT_ProgramExit(NDBT_WRONGARGS); } + if (g_opt.m_pk2len == 0) { + char b[100]; + strcpy(b, g_opt.m_skip); + strcat(b, "i"); + strcat(b, "r"); + g_opt.m_skip = strdup(b); + } if (testmain() == -1) { + ndbout << "line " << __LINE__ << " FAIL loop=" << g_loop << endl; return NDBT_ProgramExit(NDBT_FAILED); } return NDBT_ProgramExit(NDBT_OK); diff --git a/ndb/test/src/NDBT_Table.cpp b/ndb/test/src/NDBT_Table.cpp index 485377e690a..d283cdf5912 100644 --- a/ndb/test/src/NDBT_Table.cpp +++ b/ndb/test/src/NDBT_Table.cpp @@ -18,35 +18,6 @@ #include #include -class NdbOut& -operator <<(class NdbOut& ndbout, const NDBT_Attribute & attr){ - - NdbDictionary::Column::Type type = attr.getType(); - - ndbout << attr.getName() << " " << type; - - switch(type){ - case NdbDictionary::Column::Decimal: - ndbout << "(" << attr.getScale() << ", " << attr.getPrecision() << ")"; - break; - default: - break; - } - - if(attr.getLength() != 1) - ndbout << "[" << attr.getLength() << "]"; - - if(attr.getNullable()) - ndbout << " NULL"; - else - ndbout << " NOT NULL"; - - if(attr.getPrimaryKey()) - ndbout << " PRIMARY KEY"; - - return ndbout; -} - class NdbOut& operator <<(class NdbOut& ndbout, const NDBT_Table & tab) { diff --git a/ndb/test/src/NDBT_Test.cpp b/ndb/test/src/NDBT_Test.cpp index af4e3ff3550..06eb3f4e9e2 100644 --- a/ndb/test/src/NDBT_Test.cpp +++ b/ndb/test/src/NDBT_Test.cpp @@ -830,7 +830,8 @@ void NDBT_TestSuite::execute(Ndb* ndb, const NdbDictionary::Table* pTab, if(pTab2 == 0 && pDict->createTable(* pTab) != 0){ numTestsFail++; numTestsExecuted++; - g_err << "ERROR1: Failed to create table " << pTab->getName() << endl; + g_err << "ERROR1: Failed to create table " << pTab->getName() + << pDict->getNdbError() << endl; tests[t]->saveTestResult(pTab, FAILED_TO_CREATE); continue; } diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc index ce452222fb9..312bcbe5c2a 100644 --- a/sql/ha_ndbcluster.cc +++ b/sql/ha_ndbcluster.cc @@ -181,6 +181,45 @@ bool ha_ndbcluster::get_error_message(int error, } +/* + Check if type is supported by NDB. + TODO Use this once, not in every operation +*/ + +static inline bool ndb_supported_type(enum_field_types type) +{ + switch (type) { + case MYSQL_TYPE_DECIMAL: + case MYSQL_TYPE_TINY: + case MYSQL_TYPE_SHORT: + case MYSQL_TYPE_LONG: + case MYSQL_TYPE_INT24: + case MYSQL_TYPE_LONGLONG: + case MYSQL_TYPE_FLOAT: + case MYSQL_TYPE_DOUBLE: + case MYSQL_TYPE_TIMESTAMP: + case MYSQL_TYPE_DATETIME: + case MYSQL_TYPE_DATE: + case MYSQL_TYPE_NEWDATE: + case MYSQL_TYPE_TIME: + case MYSQL_TYPE_YEAR: + case MYSQL_TYPE_STRING: + case MYSQL_TYPE_VAR_STRING: + case MYSQL_TYPE_TINY_BLOB: + case MYSQL_TYPE_BLOB: + case MYSQL_TYPE_MEDIUM_BLOB: + case MYSQL_TYPE_LONG_BLOB: + case MYSQL_TYPE_ENUM: + case MYSQL_TYPE_SET: + return true; + case MYSQL_TYPE_NULL: + case MYSQL_TYPE_GEOMETRY: + break; + } + return false; +} + + /* Instruct NDB to set the value of the hidden primary key */ @@ -208,40 +247,15 @@ int ha_ndbcluster::set_ndb_key(NdbOperation *ndb_op, Field *field, pack_len)); DBUG_DUMP("key", (char*)field_ptr, pack_len); - switch (field->type()) { - case MYSQL_TYPE_DECIMAL: - case MYSQL_TYPE_TINY: - case MYSQL_TYPE_SHORT: - case MYSQL_TYPE_LONG: - case MYSQL_TYPE_FLOAT: - case MYSQL_TYPE_DOUBLE: - case MYSQL_TYPE_TIMESTAMP: - case MYSQL_TYPE_LONGLONG: - case MYSQL_TYPE_INT24: - case MYSQL_TYPE_DATE: - case MYSQL_TYPE_TIME: - case MYSQL_TYPE_DATETIME: - case MYSQL_TYPE_YEAR: - case MYSQL_TYPE_NEWDATE: - case MYSQL_TYPE_ENUM: - case MYSQL_TYPE_SET: - case MYSQL_TYPE_VAR_STRING: - case MYSQL_TYPE_STRING: - // Common implementation for most field types - DBUG_RETURN(ndb_op->equal(fieldnr, (char*) field_ptr, pack_len) != 0); - - case MYSQL_TYPE_TINY_BLOB: - case MYSQL_TYPE_MEDIUM_BLOB: - case MYSQL_TYPE_LONG_BLOB: - case MYSQL_TYPE_BLOB: - case MYSQL_TYPE_NULL: - case MYSQL_TYPE_GEOMETRY: - default: - // Unhandled field types - DBUG_PRINT("error", ("Field type %d not supported", field->type())); - DBUG_RETURN(2); + if (ndb_supported_type(field->type())) + { + if (! (field->flags & BLOB_FLAG)) + // Common implementation for most field types + DBUG_RETURN(ndb_op->equal(fieldnr, (char*) field_ptr, pack_len) != 0); } - DBUG_RETURN(3); + // Unhandled field types + DBUG_PRINT("error", ("Field type %d not supported", field->type())); + DBUG_RETURN(2); } @@ -259,63 +273,197 @@ int ha_ndbcluster::set_ndb_value(NdbOperation *ndb_op, Field *field, fieldnr, field->field_name, field->type(), pack_len, field->is_null()?"Y":"N")); DBUG_DUMP("value", (char*) field_ptr, pack_len); - - if (field->is_null()) + + if (ndb_supported_type(field->type())) { - // Set value to NULL - DBUG_RETURN((ndb_op->setValue(fieldnr, (char*)NULL, pack_len) != 0)); - } - - switch (field->type()) { - case MYSQL_TYPE_DECIMAL: - case MYSQL_TYPE_TINY: - case MYSQL_TYPE_SHORT: - case MYSQL_TYPE_LONG: - case MYSQL_TYPE_FLOAT: - case MYSQL_TYPE_DOUBLE: - case MYSQL_TYPE_TIMESTAMP: - case MYSQL_TYPE_LONGLONG: - case MYSQL_TYPE_INT24: - case MYSQL_TYPE_DATE: - case MYSQL_TYPE_TIME: - case MYSQL_TYPE_DATETIME: - case MYSQL_TYPE_YEAR: - case MYSQL_TYPE_NEWDATE: - case MYSQL_TYPE_ENUM: - case MYSQL_TYPE_SET: - case MYSQL_TYPE_VAR_STRING: - case MYSQL_TYPE_STRING: - // Common implementation for most field types - DBUG_RETURN(ndb_op->setValue(fieldnr, (char*)field_ptr, pack_len) != 0); - - case MYSQL_TYPE_TINY_BLOB: - case MYSQL_TYPE_MEDIUM_BLOB: - case MYSQL_TYPE_LONG_BLOB: - case MYSQL_TYPE_BLOB: - case MYSQL_TYPE_NULL: - case MYSQL_TYPE_GEOMETRY: - default: - // Unhandled field types - DBUG_PRINT("error", ("Field type %d not supported", field->type())); - DBUG_RETURN(2); + if (! (field->flags & BLOB_FLAG)) + { + if (field->is_null()) + // Set value to NULL + DBUG_RETURN((ndb_op->setValue(fieldnr, (char*)NULL, pack_len) != 0)); + // Common implementation for most field types + DBUG_RETURN(ndb_op->setValue(fieldnr, (char*)field_ptr, pack_len) != 0); + } + + // Blob type + NdbBlob *ndb_blob = ndb_op->getBlobHandle(fieldnr); + if (ndb_blob != NULL) + { + if (field->is_null()) + DBUG_RETURN(ndb_blob->setNull() != 0); + + Field_blob *field_blob= (Field_blob*)field; + + // Get length and pointer to data + uint32 blob_len= field_blob->get_length(field_ptr); + char* blob_ptr= NULL; + field_blob->get_ptr(&blob_ptr); + + // Looks like NULL blob can also be signaled in this way + if (blob_ptr == NULL) + DBUG_RETURN(ndb_blob->setNull() != 0); + + DBUG_PRINT("value", ("set blob ptr=%x len=%u", + (unsigned)blob_ptr, blob_len)); + DBUG_DUMP("value", (char*)blob_ptr, min(blob_len, 26)); + + // No callback needed to write value + DBUG_RETURN(ndb_blob->setValue(blob_ptr, blob_len) != 0); + } + DBUG_RETURN(1); } - DBUG_RETURN(3); + // Unhandled field types + DBUG_PRINT("error", ("Field type %d not supported", field->type())); + DBUG_RETURN(2); +} + + +/* + Callback to read all blob values. + - not done in unpack_record because unpack_record is valid + after execute(Commit) but reading blobs is not + - may only generate read operations; they have to be executed + somewhere before the data is available + - due to single buffer for all blobs, we let the last blob + process all blobs (last so that all are active) + - null bit is still set in unpack_record + - TODO allocate blob part aligned buffers +*/ + +NdbBlob::ActiveHook get_ndb_blobs_value; + +int get_ndb_blobs_value(NdbBlob *ndb_blob, void *arg) +{ + DBUG_ENTER("get_ndb_blobs_value [callback]"); + if (ndb_blob->blobsNextBlob() != NULL) + DBUG_RETURN(0); + ha_ndbcluster *ha= (ha_ndbcluster *)arg; + DBUG_RETURN(ha->get_ndb_blobs_value(ndb_blob)); +} + +int ha_ndbcluster::get_ndb_blobs_value(NdbBlob *last_ndb_blob) +{ + DBUG_ENTER("get_ndb_blobs_value"); + + // Field has no field number so cannot use TABLE blob_field + // Loop twice, first only counting total buffer size + for (int loop= 0; loop <= 1; loop++) + { + uint32 offset= 0; + for (uint i= 0; i < table->fields; i++) + { + Field *field= table->field[i]; + NdbValue value= m_value[i]; + if (value.ptr != NULL && (field->flags & BLOB_FLAG)) + { + Field_blob *field_blob= (Field_blob *)field; + NdbBlob *ndb_blob= value.blob; + Uint64 blob_len= 0; + if (ndb_blob->getLength(blob_len) != 0) + DBUG_RETURN(-1); + // Align to Uint64 + uint32 blob_size= blob_len; + if (blob_size % 8 != 0) + blob_size+= 8 - blob_size % 8; + if (loop == 1) + { + char *buf= blobs_buffer + offset; + uint32 len= 0xffffffff; // Max uint32 + DBUG_PRINT("value", ("read blob ptr=%x len=%u", + (uint)buf, (uint)blob_len)); + if (ndb_blob->readData(buf, len) != 0) + DBUG_RETURN(-1); + DBUG_ASSERT(len == blob_len); + field_blob->set_ptr(len, buf); + } + offset+= blob_size; + } + } + if (loop == 0 && offset > blobs_buffer_size) + { + my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR)); + blobs_buffer_size= 0; + DBUG_PRINT("value", ("allocate blobs buffer size %u", offset)); + blobs_buffer= my_malloc(offset, MYF(MY_WME)); + if (blobs_buffer == NULL) + DBUG_RETURN(-1); + blobs_buffer_size= offset; + } + } + DBUG_RETURN(0); } /* Instruct NDB to fetch one field - - data is read directly into buffer provided by field_ptr - if it's NULL, data is read into memory provided by NDBAPI + - data is read directly into buffer provided by field + if field is NULL, data is read into memory provided by NDBAPI */ -int ha_ndbcluster::get_ndb_value(NdbOperation *op, - uint field_no, byte *field_ptr) +int ha_ndbcluster::get_ndb_value(NdbOperation *ndb_op, Field *field, + uint fieldnr) { DBUG_ENTER("get_ndb_value"); - DBUG_PRINT("enter", ("field_no: %d", field_no)); - m_value[field_no]= op->getValue(field_no, field_ptr); - DBUG_RETURN(m_value == NULL); + DBUG_PRINT("enter", ("fieldnr: %d flags: %o", fieldnr, + (int)(field != NULL ? field->flags : 0))); + + if (field != NULL) + { + if (ndb_supported_type(field->type())) + { + DBUG_ASSERT(field->ptr != NULL); + if (! (field->flags & BLOB_FLAG)) + { + m_value[fieldnr].rec= ndb_op->getValue(fieldnr, field->ptr); + DBUG_RETURN(m_value[fieldnr].rec == NULL); + } + + // Blob type + NdbBlob *ndb_blob= ndb_op->getBlobHandle(fieldnr); + m_value[fieldnr].blob= ndb_blob; + if (ndb_blob != NULL) + { + // Set callback + void *arg= (void *)this; + DBUG_RETURN(ndb_blob->setActiveHook(::get_ndb_blobs_value, arg) != 0); + } + DBUG_RETURN(1); + } + // Unhandled field types + DBUG_PRINT("error", ("Field type %d not supported", field->type())); + DBUG_RETURN(2); + } + + // Used for hidden key only + m_value[fieldnr].rec= ndb_op->getValue(fieldnr, NULL); + DBUG_RETURN(m_value[fieldnr].rec == NULL); +} + + +/* + Check if any set or get of blob value in current query. +*/ +bool ha_ndbcluster::uses_blob_value(bool all_fields) +{ + if (table->blob_fields == 0) + return false; + if (all_fields) + return true; + { + uint no_fields= table->fields; + int i; + THD *thd= current_thd; + // They always put blobs at the end.. + for (i= no_fields - 1; i >= 0; i--) + { + Field *field= table->field[i]; + if (thd->query_id == field->query_id) + { + return true; + } + } + } + return false; } @@ -462,10 +610,19 @@ void ha_ndbcluster::release_metadata() DBUG_VOID_RETURN; } -NdbScanOperation::LockMode get_ndb_lock_type(enum thr_lock_type type) +int ha_ndbcluster::get_ndb_lock_type(enum thr_lock_type type) { - return (type == TL_WRITE_ALLOW_WRITE) ? - NdbScanOperation::LM_Exclusive : NdbScanOperation::LM_CommittedRead; + int lm; + if (type == TL_WRITE_ALLOW_WRITE) + lm = NdbScanOperation::LM_Exclusive; + else if (uses_blob_value(retrieve_all_fields)) + /* + TODO use a new scan mode to read + lock + keyinfo + */ + lm = NdbScanOperation::LM_Exclusive; + else + lm = NdbScanOperation::LM_CommittedRead; + return lm; } static const ulong index_type_flags[]= @@ -615,7 +772,7 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf) ERR_RETURN(trans->getNdbError()); // Read key at the same time, for future reference - if (get_ndb_value(op, no_fields, NULL)) + if (get_ndb_value(op, NULL, no_fields)) ERR_RETURN(trans->getNdbError()); } else @@ -632,13 +789,13 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf) if ((thd->query_id == field->query_id) || retrieve_all_fields) { - if (get_ndb_value(op, i, field->ptr)) + if (get_ndb_value(op, field, i)) ERR_RETURN(trans->getNdbError()); } else { // Attribute was not to be read - m_value[i]= NULL; + m_value[i].ptr= NULL; } } @@ -747,13 +904,13 @@ int ha_ndbcluster::unique_index_read(const byte *key, if ((thd->query_id == field->query_id) || (field->flags & PRI_KEY_FLAG)) { - if (get_ndb_value(op, i, field->ptr)) + if (get_ndb_value(op, field, i)) ERR_RETURN(op->getNdbError()); } else { // Attribute was not to be read - m_value[i]= NULL; + m_value[i].ptr= NULL; } } @@ -796,11 +953,22 @@ inline int ha_ndbcluster::next_result(byte *buf) bool contact_ndb = m_lock.type != TL_WRITE_ALLOW_WRITE; do { DBUG_PRINT("info", ("Call nextResult, contact_ndb: %d", contact_ndb)); + /* + We can only handle one tuple with blobs at a time. + */ + if (ops_pending && blobs_pending) + { + if (trans->execute(NoCommit) != 0) + DBUG_RETURN(ndb_err(trans)); + ops_pending= 0; + blobs_pending= false; + } check= cursor->nextResult(contact_ndb); if (check == 0) { // One more record found DBUG_PRINT("info", ("One more record found")); + unpack_record(buf); table->status= 0; DBUG_RETURN(0); @@ -914,8 +1082,10 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key, index_name= get_index_name(active_index); if (!(op= trans->getNdbIndexScanOperation(index_name, m_tabname))) ERR_RETURN(trans->getNdbError()); - if (!(cursor= op->readTuples(get_ndb_lock_type(m_lock.type), 0, - parallelism, sorted))) + + NdbScanOperation::LockMode lm= (NdbScanOperation::LockMode) + get_ndb_lock_type(m_lock.type); + if (!(cursor= op->readTuples(lm, 0, parallelism, sorted))) ERR_RETURN(trans->getNdbError()); m_active_cursor= cursor; @@ -975,7 +1145,9 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len, if (!(op= trans->getNdbScanOperation(m_tabname))) ERR_RETURN(trans->getNdbError()); - if (!(cursor= op->readTuples(get_ndb_lock_type(m_lock.type), 0,parallelism))) + NdbScanOperation::LockMode lm= (NdbScanOperation::LockMode) + get_ndb_lock_type(m_lock.type); + if (!(cursor= op->readTuples(lm, 0, parallelism))) ERR_RETURN(trans->getNdbError()); m_active_cursor= cursor; @@ -1044,7 +1216,9 @@ int ha_ndbcluster::full_table_scan(byte *buf) if (!(op=trans->getNdbScanOperation(m_tabname))) ERR_RETURN(trans->getNdbError()); - if (!(cursor= op->readTuples(get_ndb_lock_type(m_lock.type), 0,parallelism))) + NdbScanOperation::LockMode lm= (NdbScanOperation::LockMode) + get_ndb_lock_type(m_lock.type); + if (!(cursor= op->readTuples(lm, 0, parallelism))) ERR_RETURN(trans->getNdbError()); m_active_cursor= cursor; DBUG_RETURN(define_read_attrs(buf, op)); @@ -1068,12 +1242,12 @@ int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op) (field->flags & PRI_KEY_FLAG) || retrieve_all_fields) { - if (get_ndb_value(op, i, field->ptr)) + if (get_ndb_value(op, field, i)) ERR_RETURN(op->getNdbError()); } else { - m_value[i]= NULL; + m_value[i].ptr= NULL; } } @@ -1087,7 +1261,7 @@ int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op) if (!tab->getColumn(hidden_no)) DBUG_RETURN(1); #endif - if (get_ndb_value(op, hidden_no, NULL)) + if (get_ndb_value(op, NULL, hidden_no)) ERR_RETURN(op->getNdbError()); } @@ -1155,12 +1329,13 @@ int ha_ndbcluster::write_row(byte *record) */ rows_inserted++; if ((rows_inserted == rows_to_insert) || - ((rows_inserted % bulk_insert_rows) == 0)) + ((rows_inserted % bulk_insert_rows) == 0) || + uses_blob_value(false) != 0) { // Send rows to NDB DBUG_PRINT("info", ("Sending inserts to NDB, "\ "rows_inserted:%d, bulk_insert_rows: %d", - rows_inserted, bulk_insert_rows)); + (int)rows_inserted, (int)bulk_insert_rows)); if (trans->execute(NoCommit) != 0) DBUG_RETURN(ndb_err(trans)); } @@ -1270,6 +1445,8 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) if (!(op= cursor->updateTuple())) ERR_RETURN(trans->getNdbError()); ops_pending++; + if (uses_blob_value(false)) + blobs_pending= true; } else { @@ -1285,7 +1462,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) // Require that the PK for this record has previously been // read into m_value uint no_fields= table->fields; - NdbRecAttr* rec= m_value[no_fields]; + NdbRecAttr* rec= m_value[no_fields].rec; DBUG_ASSERT(rec); DBUG_DUMP("key", (char*)rec->aRef(), NDB_HIDDEN_PRIMARY_KEY_LENGTH); @@ -1360,7 +1537,7 @@ int ha_ndbcluster::delete_row(const byte *record) // This table has no primary key, use "hidden" primary key DBUG_PRINT("info", ("Using hidden key")); uint no_fields= table->fields; - NdbRecAttr* rec= m_value[no_fields]; + NdbRecAttr* rec= m_value[no_fields].rec; DBUG_ASSERT(rec != NULL); if (set_hidden_key(op, no_fields, rec->aRef())) @@ -1398,7 +1575,7 @@ void ha_ndbcluster::unpack_record(byte* buf) { uint row_offset= (uint) (buf - table->record[0]); Field **field, **end; - NdbRecAttr **value= m_value; + NdbValue *value= m_value; DBUG_ENTER("unpack_record"); // Set null flag(s) @@ -1407,8 +1584,23 @@ void ha_ndbcluster::unpack_record(byte* buf) field < end; field++, value++) { - if (*value && (*value)->isNULL()) - (*field)->set_null(row_offset); + if ((*value).ptr) + { + if (! ((*field)->flags & BLOB_FLAG)) + { + if ((*value).rec->isNULL()) + (*field)->set_null(row_offset); + } + else + { + NdbBlob* ndb_blob= (*value).blob; + bool isNull= true; + int ret= ndb_blob->getNull(isNull); + DBUG_ASSERT(ret == 0); + if (isNull) + (*field)->set_null(row_offset); + } + } } #ifndef DBUG_OFF @@ -1419,7 +1611,7 @@ void ha_ndbcluster::unpack_record(byte* buf) int hidden_no= table->fields; const NDBTAB *tab= (NDBTAB *) m_table; const NDBCOL *hidden_col= tab->getColumn(hidden_no); - NdbRecAttr* rec= m_value[hidden_no]; + NdbRecAttr* rec= m_value[hidden_no].rec; DBUG_ASSERT(rec); DBUG_PRINT("hidden", ("%d: %s \"%llu\"", hidden_no, hidden_col->getName(), rec->u_64_value())); @@ -1446,9 +1638,9 @@ void ha_ndbcluster::print_results() { Field *field; const NDBCOL *col; - NdbRecAttr *value; + NdbValue value; - if (!(value= m_value[f])) + if (!(value= m_value[f]).ptr) { fprintf(DBUG_FILE, "Field %d was not read\n", f); continue; @@ -1457,19 +1649,28 @@ void ha_ndbcluster::print_results() DBUG_DUMP("field->ptr", (char*)field->ptr, field->pack_length()); col= tab->getColumn(f); fprintf(DBUG_FILE, "%d: %s\t", f, col->getName()); - - if (value->isNULL()) + + NdbBlob *ndb_blob= NULL; + if (! (field->flags & BLOB_FLAG)) { - fprintf(DBUG_FILE, "NULL\n"); - continue; + if (value.rec->isNULL()) + { + fprintf(DBUG_FILE, "NULL\n"); + continue; + } + } + else + { + ndb_blob= value.blob; + bool isNull= true; + ndb_blob->getNull(isNull); + if (isNull) { + fprintf(DBUG_FILE, "NULL\n"); + continue; + } } switch (col->getType()) { - case NdbDictionary::Column::Blob: - case NdbDictionary::Column::Clob: - case NdbDictionary::Column::Undefined: - fprintf(DBUG_FILE, "Unknown type: %d", col->getType()); - break; case NdbDictionary::Column::Tinyint: { char value= *field->ptr; fprintf(DBUG_FILE, "Tinyint\t%d", value); @@ -1561,6 +1762,21 @@ void ha_ndbcluster::print_results() fprintf(DBUG_FILE, "Timespec\t%llu", value); break; } + case NdbDictionary::Column::Blob: { + Uint64 len= 0; + ndb_blob->getLength(len); + fprintf(DBUG_FILE, "Blob\t[len=%u]", (unsigned)len); + break; + } + case NdbDictionary::Column::Text: { + Uint64 len= 0; + ndb_blob->getLength(len); + fprintf(DBUG_FILE, "Text\t[len=%u]", (unsigned)len); + break; + } + case NdbDictionary::Column::Undefined: + fprintf(DBUG_FILE, "Unknown type: %d", col->getType()); + break; } fprintf(DBUG_FILE, "\n"); @@ -1806,7 +2022,7 @@ void ha_ndbcluster::position(const byte *record) // No primary key, get hidden key DBUG_PRINT("info", ("Getting hidden key")); int hidden_no= table->fields; - NdbRecAttr* rec= m_value[hidden_no]; + NdbRecAttr* rec= m_value[hidden_no].rec; const NDBTAB *tab= (NDBTAB *) m_table; const NDBCOL *hidden_col= tab->getColumn(hidden_no); DBUG_ASSERT(hidden_col->getPrimaryKey() && @@ -1980,7 +2196,7 @@ void ha_ndbcluster::start_bulk_insert(ha_rows rows) const NDBTAB *tab= (NDBTAB *) m_table; DBUG_ENTER("start_bulk_insert"); - DBUG_PRINT("enter", ("rows: %d", rows)); + DBUG_PRINT("enter", ("rows: %d", (int)rows)); rows_inserted= 0; rows_to_insert= rows; @@ -2017,7 +2233,7 @@ int ha_ndbcluster::end_bulk_insert() int ha_ndbcluster::extra_opt(enum ha_extra_function operation, ulong cache_size) { DBUG_ENTER("extra_opt"); - DBUG_PRINT("enter", ("cache_size: %d", cache_size)); + DBUG_PRINT("enter", ("cache_size: %lu", cache_size)); DBUG_RETURN(extra(operation)); } @@ -2238,7 +2454,7 @@ int ha_ndbcluster::start_stmt(THD *thd) NdbConnection *tablock_trans= (NdbConnection*)thd->transaction.all.ndb_tid; - DBUG_PRINT("info", ("tablock_trans: %x", tablock_trans)); + DBUG_PRINT("info", ("tablock_trans: %x", (uint)tablock_trans)); DBUG_ASSERT(tablock_trans); trans= m_ndb->hupp(tablock_trans); if (trans == NULL) ERR_RETURN(m_ndb->getNdbError()); @@ -2315,71 +2531,184 @@ int ndbcluster_rollback(THD *thd, void *ndb_transaction) /* - Map MySQL type to the corresponding NDB type + Define NDB column based on Field. + Returns 0 or mysql error code. + Not member of ha_ndbcluster because NDBCOL cannot be declared. */ -inline NdbDictionary::Column::Type -mysql_to_ndb_type(enum enum_field_types mysql_type, bool unsigned_flg) +static int create_ndb_column(NDBCOL &col, + Field *field, + HA_CREATE_INFO *info) { - switch(mysql_type) { + // Set name + col.setName(field->field_name); + // Set type and sizes + const enum enum_field_types mysql_type= field->real_type(); + switch (mysql_type) { + // Numeric types case MYSQL_TYPE_DECIMAL: - return NdbDictionary::Column::Char; + col.setType(NDBCOL::Char); + col.setLength(field->pack_length()); + break; case MYSQL_TYPE_TINY: - return (unsigned_flg) ? - NdbDictionary::Column::Tinyunsigned : - NdbDictionary::Column::Tinyint; + if (field->flags & UNSIGNED_FLAG) + col.setType(NDBCOL::Tinyunsigned); + else + col.setType(NDBCOL::Tinyint); + col.setLength(1); + break; case MYSQL_TYPE_SHORT: - return (unsigned_flg) ? - NdbDictionary::Column::Smallunsigned : - NdbDictionary::Column::Smallint; + if (field->flags & UNSIGNED_FLAG) + col.setType(NDBCOL::Smallunsigned); + else + col.setType(NDBCOL::Smallint); + col.setLength(1); + break; case MYSQL_TYPE_LONG: - return (unsigned_flg) ? - NdbDictionary::Column::Unsigned : - NdbDictionary::Column::Int; - case MYSQL_TYPE_TIMESTAMP: - return NdbDictionary::Column::Unsigned; - case MYSQL_TYPE_LONGLONG: - return (unsigned_flg) ? - NdbDictionary::Column::Bigunsigned : - NdbDictionary::Column::Bigint; + if (field->flags & UNSIGNED_FLAG) + col.setType(NDBCOL::Unsigned); + else + col.setType(NDBCOL::Int); + col.setLength(1); + break; case MYSQL_TYPE_INT24: - return (unsigned_flg) ? - NdbDictionary::Column::Mediumunsigned : - NdbDictionary::Column::Mediumint; + if (field->flags & UNSIGNED_FLAG) + col.setType(NDBCOL::Mediumunsigned); + else + col.setType(NDBCOL::Mediumint); + col.setLength(1); + break; + case MYSQL_TYPE_LONGLONG: + if (field->flags & UNSIGNED_FLAG) + col.setType(NDBCOL::Bigunsigned); + else + col.setType(NDBCOL::Bigint); + col.setLength(1); break; case MYSQL_TYPE_FLOAT: - return NdbDictionary::Column::Float; + col.setType(NDBCOL::Float); + col.setLength(1); + break; case MYSQL_TYPE_DOUBLE: - return NdbDictionary::Column::Double; - case MYSQL_TYPE_DATETIME : - return NdbDictionary::Column::Datetime; - case MYSQL_TYPE_DATE : - case MYSQL_TYPE_NEWDATE : - case MYSQL_TYPE_TIME : - case MYSQL_TYPE_YEAR : - // Missing NDB data types, mapped to char - return NdbDictionary::Column::Char; - case MYSQL_TYPE_ENUM : - return NdbDictionary::Column::Char; - case MYSQL_TYPE_SET : - return NdbDictionary::Column::Char; - case MYSQL_TYPE_TINY_BLOB : - case MYSQL_TYPE_MEDIUM_BLOB : - case MYSQL_TYPE_LONG_BLOB : - case MYSQL_TYPE_BLOB : - return NdbDictionary::Column::Blob; - case MYSQL_TYPE_VAR_STRING : - return NdbDictionary::Column::Varchar; - case MYSQL_TYPE_STRING : - return NdbDictionary::Column::Char; - case MYSQL_TYPE_NULL : - case MYSQL_TYPE_GEOMETRY : - return NdbDictionary::Column::Undefined; + col.setType(NDBCOL::Double); + col.setLength(1); + break; + // Date types + case MYSQL_TYPE_TIMESTAMP: + col.setType(NDBCOL::Unsigned); + col.setLength(1); + break; + case MYSQL_TYPE_DATETIME: + col.setType(NDBCOL::Datetime); + col.setLength(1); + break; + case MYSQL_TYPE_DATE: + case MYSQL_TYPE_NEWDATE: + case MYSQL_TYPE_TIME: + case MYSQL_TYPE_YEAR: + col.setType(NDBCOL::Char); + col.setLength(field->pack_length()); + break; + // Char types + case MYSQL_TYPE_STRING: + if (field->flags & BINARY_FLAG) + col.setType(NDBCOL::Binary); + else + col.setType(NDBCOL::Char); + col.setLength(field->pack_length()); + break; + case MYSQL_TYPE_VAR_STRING: + if (field->flags & BINARY_FLAG) + col.setType(NDBCOL::Varbinary); + else + col.setType(NDBCOL::Varchar); + col.setLength(field->pack_length()); + break; + // Blob types (all come in as MYSQL_TYPE_BLOB) + mysql_type_tiny_blob: + case MYSQL_TYPE_TINY_BLOB: + if (field->flags & BINARY_FLAG) + col.setType(NDBCOL::Blob); + else + col.setType(NDBCOL::Text); + col.setInlineSize(256); + // No parts + col.setPartSize(0); + col.setStripeSize(0); + break; + mysql_type_blob: + case MYSQL_TYPE_BLOB: + if (field->flags & BINARY_FLAG) + col.setType(NDBCOL::Blob); + else + col.setType(NDBCOL::Text); + // Use "<=" even if "<" is the exact condition + if (field->max_length() <= (1 << 8)) + goto mysql_type_tiny_blob; + else if (field->max_length() <= (1 << 16)) + { + col.setInlineSize(256); + col.setPartSize(2000); + col.setStripeSize(16); + } + else if (field->max_length() <= (1 << 24)) + goto mysql_type_medium_blob; + else + goto mysql_type_long_blob; + break; + mysql_type_medium_blob: + case MYSQL_TYPE_MEDIUM_BLOB: + if (field->flags & BINARY_FLAG) + col.setType(NDBCOL::Blob); + else + col.setType(NDBCOL::Text); + col.setInlineSize(256); + col.setPartSize(4000); + col.setStripeSize(8); + break; + mysql_type_long_blob: + case MYSQL_TYPE_LONG_BLOB: + if (field->flags & BINARY_FLAG) + col.setType(NDBCOL::Blob); + else + col.setType(NDBCOL::Text); + col.setInlineSize(256); + col.setPartSize(8000); + col.setStripeSize(4); + break; + // Other types + case MYSQL_TYPE_ENUM: + col.setType(NDBCOL::Char); + col.setLength(field->pack_length()); + break; + case MYSQL_TYPE_SET: + col.setType(NDBCOL::Char); + col.setLength(field->pack_length()); + break; + case MYSQL_TYPE_NULL: + case MYSQL_TYPE_GEOMETRY: + goto mysql_type_unsupported; + mysql_type_unsupported: + default: + return HA_ERR_UNSUPPORTED; } - return NdbDictionary::Column::Undefined; + // Set nullable and pk + col.setNullable(field->maybe_null()); + col.setPrimaryKey(field->flags & PRI_KEY_FLAG); + // Set autoincrement + if (field->flags & AUTO_INCREMENT_FLAG) + { + col.setAutoIncrement(TRUE); + ulonglong value= info->auto_increment_value ? + info->auto_increment_value -1 : (ulonglong) 0; + DBUG_PRINT("info", ("Autoincrement key, initial: %llu", value)); + col.setAutoIncrementInitialValue(value); + } + else + col.setAutoIncrement(false); + return 0; } - /* Create a table in NDB Cluster */ @@ -2389,7 +2718,6 @@ int ha_ndbcluster::create(const char *name, HA_CREATE_INFO *info) { NDBTAB tab; - NdbDictionary::Column::Type ndb_type; NDBCOL col; uint pack_length, length, i; const void *data, *pack_data; @@ -2420,31 +2748,11 @@ int ha_ndbcluster::create(const char *name, for (i= 0; i < form->fields; i++) { Field *field= form->field[i]; - ndb_type= mysql_to_ndb_type(field->real_type(), - field->flags & UNSIGNED_FLAG); DBUG_PRINT("info", ("name: %s, type: %u, pack_length: %d", field->field_name, field->real_type(), field->pack_length())); - col.setName(field->field_name); - col.setType(ndb_type); - if ((ndb_type == NdbDictionary::Column::Char) || - (ndb_type == NdbDictionary::Column::Varchar)) - col.setLength(field->pack_length()); - else - col.setLength(1); - col.setNullable(field->maybe_null()); - col.setPrimaryKey(field->flags & PRI_KEY_FLAG); - if (field->flags & AUTO_INCREMENT_FLAG) - { - col.setAutoIncrement(TRUE); - ulonglong value= info->auto_increment_value ? - info->auto_increment_value -1 : (ulonglong) 0; - DBUG_PRINT("info", ("Autoincrement key, initial: %d", value)); - col.setAutoIncrementInitialValue(value); - } - else - col.setAutoIncrement(false); - + if (my_errno= create_ndb_column(col, field, info)) + DBUG_RETURN(my_errno); tab.addColumn(col); } @@ -2712,14 +3020,15 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg): m_table(NULL), m_table_flags(HA_REC_NOT_IN_SEQ | HA_NOT_EXACT_COUNT | - HA_NO_PREFIX_CHAR_KEYS | - HA_NO_BLOBS), + HA_NO_PREFIX_CHAR_KEYS), m_use_write(false), retrieve_all_fields(FALSE), rows_to_insert(1), rows_inserted(0), bulk_insert_rows(1024), - ops_pending(0) + ops_pending(0), + blobs_buffer(0), + blobs_buffer_size(0) { int i; @@ -2752,6 +3061,8 @@ ha_ndbcluster::~ha_ndbcluster() DBUG_ENTER("~ha_ndbcluster"); release_metadata(); + my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR)); + blobs_buffer= 0; // Check for open cursor/transaction DBUG_ASSERT(m_active_cursor == NULL); diff --git a/sql/ha_ndbcluster.h b/sql/ha_ndbcluster.h index a1cb1698067..661eb582786 100644 --- a/sql/ha_ndbcluster.h +++ b/sql/ha_ndbcluster.h @@ -35,6 +35,7 @@ class NdbRecAttr; // Forward declaration class NdbResultSet; // Forward declaration class NdbScanOperation; class NdbIndexScanOperation; +class NdbBlob; typedef enum ndb_index_type { UNDEFINED_INDEX = 0, @@ -171,6 +172,7 @@ class ha_ndbcluster: public handler enum ha_rkey_function find_flag); int close_scan(); void unpack_record(byte *buf); + int get_ndb_lock_type(enum thr_lock_type type); void set_dbname(const char *pathname); void set_tabname(const char *pathname); @@ -181,7 +183,9 @@ class ha_ndbcluster: public handler int set_ndb_key(NdbOperation*, Field *field, uint fieldnr, const byte* field_ptr); int set_ndb_value(NdbOperation*, Field *field, uint fieldnr); - int get_ndb_value(NdbOperation*, uint fieldnr, byte *field_ptr); + int get_ndb_value(NdbOperation*, Field *field, uint fieldnr); + friend int ::get_ndb_blobs_value(NdbBlob *ndb_blob, void *arg); + int get_ndb_blobs_value(NdbBlob *last_ndb_blob); int set_primary_key(NdbOperation *op, const byte *key); int set_primary_key(NdbOperation *op); int set_primary_key_from_old_data(NdbOperation *op, const byte *old_data); @@ -191,8 +195,8 @@ class ha_ndbcluster: public handler void print_results(); longlong get_auto_increment(); - int ndb_err(NdbConnection*); + bool uses_blob_value(bool all_fields); private: int check_ndb_connection(); @@ -209,13 +213,19 @@ class ha_ndbcluster: public handler NDB_SHARE *m_share; NDB_INDEX_TYPE m_indextype[MAX_KEY]; const char* m_unique_index_name[MAX_KEY]; - NdbRecAttr *m_value[NDB_MAX_ATTRIBUTES_IN_TABLE]; + // NdbRecAttr has no reference to blob + typedef union { NdbRecAttr *rec; NdbBlob *blob; void *ptr; } NdbValue; + NdbValue m_value[NDB_MAX_ATTRIBUTES_IN_TABLE]; bool m_use_write; bool retrieve_all_fields; ha_rows rows_to_insert; ha_rows rows_inserted; ha_rows bulk_insert_rows; ha_rows ops_pending; + bool blobs_pending; + // memory for blobs in one tuple + char *blobs_buffer; + uint32 blobs_buffer_size; }; bool ndbcluster_init(void); @@ -231,10 +241,3 @@ int ndbcluster_discover(const char* dbname, const char* name, int ndbcluster_drop_database(const char* path); void ndbcluster_print_error(int error, const NdbOperation *error_op); - - - - - - -