diff --git a/newbrt/kv-pair.h b/newbrt/kv-pair.h index 31042f5dcc8..d7f6f1b5ea2 100644 --- a/newbrt/kv-pair.h +++ b/newbrt/kv-pair.h @@ -53,6 +53,28 @@ static inline int kv_pair_vallen(struct kv_pair *pair) { return pair->vallen; } +/* use the low bit to indicate an inuse pair that is deleted */ + +static inline int kv_pair_inuse(struct kv_pair *pair) { + return pair != 0; +} + +static inline int kv_pair_deleted(struct kv_pair *pair) { + return ((long) pair & 1) != 0; +} + +static inline int kv_pair_valid(struct kv_pair *pair) { + return kv_pair_inuse(pair) && !kv_pair_deleted(pair); +} + +static inline struct kv_pair *kv_pair_set_deleted(struct kv_pair *pair) { + return (struct kv_pair *) ((long) pair | 1); +} + +static inline struct kv_pair *kv_pair_ptr(struct kv_pair *pair) { + return (struct kv_pair *) ((long) pair & ~1); +} + struct kv_pair_tag { struct kv_pair *pair; int oldtag, newtag; diff --git a/newbrt/pma-internal.h b/newbrt/pma-internal.h index 977f4a6e30b..f9f40212168 100644 --- a/newbrt/pma-internal.h +++ b/newbrt/pma-internal.h @@ -60,6 +60,17 @@ void __pma_update_my_cursors(PMA pma, struct kv_pair_tag *tpairs, int n); */ void __pma_delete_at(PMA pma, int here); +/* + * finish a deletion from the pma. called when there are no cursor references + * to the kv pair. + */ +void __pma_delete_finish(PMA pma, int here); + +/* + * count the number of cursors that reference a pma pair + */ +int __pma_count_cursor_refs(PMA pma, int here); + /* density thresholds */ #define PMA_LDT_HIGH 0.25 #define PMA_LDT_LOW 0.40 diff --git a/newbrt/pma-test.c b/newbrt/pma-test.c index 263d69a9d8f..7c288750b2f 100644 --- a/newbrt/pma-test.c +++ b/newbrt/pma-test.c @@ -609,12 +609,94 @@ void test_pma_cursor_4 (void) { assert(error == 0); } +void test_pma_cursor_delete(int n) { + printf("test_pma_cursor_delete:%d\n", n); + + PMA pma; + int error; + + error = pma_create(&pma, default_compare_fun); + assert(error == 0); + + /* insert 1 -> 42 */ + DBT key, val; int k, v; + int i; + for (i=0; i /* Only needed for testing. */ #include - #include "list.h" #include "kv-pair.h" #include "pma-internal.h" @@ -29,14 +28,14 @@ int pma_index_limit (PMA pma) { int pmanode_valid (PMA pma, int i) { assert(0<=i); assert(ipairs[i] != 0; + return kv_pair_inuse(pma->pairs[i]); } bytevec pmanode_key (PMA pma, int i) { struct kv_pair *pair; assert(0<=i); assert(ipairs[i]; - assert(pair); + assert(kv_pair_valid(pair)); return kv_pair_key(pair); } @@ -44,7 +43,7 @@ ITEMLEN pmanode_keylen (PMA pma, int i) { struct kv_pair *pair; assert(0<=i); assert(ipairs[i]; - assert(pair); + assert(kv_pair_valid(pair)); return kv_pair_keylen(pair); } @@ -52,7 +51,7 @@ bytevec pmanode_val (PMA pma, int i) { struct kv_pair *pair; assert(0<=i); assert(ipairs[i]; - assert(pair); + assert(kv_pair_valid(pair)); return kv_pair_val(pair); } @@ -60,7 +59,7 @@ ITEMLEN pmanode_vallen (PMA pma, int i) { struct kv_pair *pair; assert(0<=i); assert(ipairs[i]; - assert(pair); + assert(kv_pair_valid(pair)); return kv_pair_vallen(pair); } @@ -72,7 +71,7 @@ int pma_random_pick(PMA pma, bytevec *key, ITEMLEN *keylen, bytevec *val, ITEMLE /* For now a simple implementation where we simply start at the beginning and look. */ for (i=0; ipairs[i]; - if (pair) { + if (kv_pair_valid(pair)) { *key = kv_pair_key(pair); *keylen = kv_pair_keylen(pair); *val = kv_pair_val(pair); @@ -90,7 +89,7 @@ int pma_random_pick(PMA pma, bytevec *key, ITEMLEN *keylen, bytevec *val, ITEMLE for (i=0; ipairs[ir]; - if (pair) { + if (kv_pair_valid(pair)) { *key = kv_pair_key(pair); *keylen = kv_pair_keylen(pair); *val = kv_pair_val(pair); @@ -125,10 +124,12 @@ int pmainternal_find (PMA pma, DBT *k, DB *db) { int mid; // Scan forward looking for a non-null value. for (mid=(lo+hi)/2; midpairs[mid]!=0) { + struct kv_pair *kv = pma->pairs[mid]; + if (kv_pair_inuse(kv)) { // Found one. + kv = kv_pair_ptr(kv); DBT k2; - int cmp = pma->compare_fun(db, k, fill_dbt(&k2, pma->pairs[mid]->key, pma->pairs[mid]->keylen)); + int cmp = pma->compare_fun(db, k, fill_dbt(&k2, kv->key, kv->keylen)); if (cmp==0) return mid; else if (cmp<0) { /* key is smaller than the midpoint, so look in the low half. */ @@ -175,8 +176,8 @@ int pmainternal_printpairs (struct kv_pair *pairs[], int N) { printf("{"); for (i=0; ikey); + if (kv_pair_valid(pairs[i])) { + printf("%s", (char*)kv_pair_key(pairs[i])); count++; } else printf("_"); @@ -224,7 +225,7 @@ int pmainternal_smooth_region (struct kv_pair *pairs[], int n, int idx, int base int i; int n_present=0; for (i=0; ipma; c->position=pma->N-1; - while (c->pma->pairs[c->position]==0) { + while (!kv_pair_valid(c->pma->pairs[c->position])) { if (c->position>0) c->position--; else return DB_NOTFOUND; } @@ -388,8 +389,11 @@ int pma_cursor_set_position_prev (PMA_CURSOR c) { int old_position = c->position; c->position--; while (c->position >= 0) { - if (pma->pairs[c->position] != 0) + if (kv_pair_valid(pma->pairs[c->position])) { + if (old_position >= 0 && kv_pair_deleted(pma->pairs[old_position]) &&__pma_count_cursor_refs(pma, old_position) == 0) + __pma_delete_finish(pma, old_position); return 0; + } c->position--; } c->position = old_position; @@ -399,7 +403,7 @@ int pma_cursor_set_position_prev (PMA_CURSOR c) { int pma_cursor_set_position_first (PMA_CURSOR c) { PMA pma = c->pma; c->position=0; - while (c->pma->pairs[c->position]==0) { + while (!kv_pair_valid(c->pma->pairs[c->position])) { if (c->position+1N) c->position++; else return DB_NOTFOUND; } @@ -411,17 +415,24 @@ int pma_cursor_set_position_next (PMA_CURSOR c) { int old_position=c->position; c->position++; while (c->positionN) { - if (c->pma->pairs[c->position]!=0) return 0; - c->position++; + if (kv_pair_valid(c->pma->pairs[c->position])) { + if (old_position >= 0 && kv_pair_deleted(pma->pairs[old_position]) && __pma_count_cursor_refs(pma, old_position) == 0) + __pma_delete_finish(pma, old_position); + return 0; + } + c->position++; } c->position=old_position; return DB_NOTFOUND; } int pma_cget_current (PMA_CURSOR c, DBT *key, DBT *val) { + if (c->position == -1) + return DB_NOTFOUND; PMA pma = c->pma; struct kv_pair *pair = pma->pairs[c->position]; - if (pair==0) return BRT_KEYEMPTY; + if (!kv_pair_valid(pair)) + return BRT_KEYEMPTY; ybt_set_value(key, pair->key, pair->keylen, &c->skey); ybt_set_value(val, pair->key + pair->keylen, pair->vallen, &c->sval); return 0; @@ -445,7 +456,12 @@ int pma_cget_first (PMA_CURSOR c, YBT *key, YBT *val) { int pma_cursor_free (PMA_CURSOR *cursp) { PMA_CURSOR curs=*cursp; + PMA pma = curs->pma; list_remove(&curs->next); + if (curs->position >= 0 && kv_pair_deleted(pma->pairs[curs->position]) && + __pma_count_cursor_refs(pma, curs->position) == 0) { + __pma_delete_finish(pma, curs->position); + } if (curs->skey) toku_free(curs->skey); if (curs->sval) toku_free(curs->sval); toku_free(curs); @@ -524,7 +540,7 @@ enum pma_errors pma_lookup (PMA pma, DBT *k, DBT *v, DB *db) { assert(0<=l ); assert(l<=pma_index_limit(pma)); if (l==pma_index_limit(pma)) return DB_NOTFOUND; pair = pma->pairs[l]; - if (pair!=0 && pma->compare_fun(db, k, fill_dbt(&k2, pair->key, pair->keylen))==0) { + if (kv_pair_valid(pair) && pma->compare_fun(db, k, fill_dbt(&k2, pair->key, pair->keylen))==0) { return ybt_set_value(v, pair->key + pair->keylen, pair->vallen, &pma->sval); } else { return DB_NOTFOUND; @@ -541,8 +557,9 @@ int pma_free (PMA *pmap) { if (pma->n_pairs_present > 0) { for (i=0; i < pma->N; i++) { - if (pma->pairs[i]) { - kv_pair_free(pma->pairs[i]); + struct kv_pair *kv = pma->pairs[i]; + if (kv_pair_inuse(kv)) { + kv_pair_free(kv_pair_ptr(kv)); pma->pairs[i] = 0; pma->n_pairs_present--; } @@ -562,14 +579,19 @@ int pma_insert (PMA pma, DBT *k, DBT *v, DB* db) { int idx = pmainternal_find(pma, k, db); if (idx < pma_index_limit(pma) && pma->pairs[idx]) { DBT k2; - if (0==pma->compare_fun(db, k, fill_dbt(&k2, pma->pairs[idx]->key, pma->pairs[idx]->keylen))) { - return BRT_ALREADY_THERE; /* It is already here. Return an error. */ + struct kv_pair *kv = kv_pair_ptr(pma->pairs[idx]); + if (0==pma->compare_fun(db, k, fill_dbt(&k2, kv->key, kv->keylen))) { + if (kv_pair_deleted(pma->pairs[idx])) { + pma->pairs[idx] = kv_pair_realloc_same_key(kv, v->data, v->size); + return BRT_OK; + } else + return BRT_ALREADY_THERE; /* It is already here. Return an error. */ } } - if (pma->pairs[idx]) { + if (kv_pair_inuse(pma->pairs[idx])) { idx = pmainternal_make_space_at (pma, idx); /* returns the new idx. */ } - assert(!pma->pairs[idx]); + assert(!kv_pair_inuse(pma->pairs[idx])); pma->pairs[idx] = kv_pair_malloc(k->data, k->size, v->data, v->size); assert(pma->pairs[idx]); pma->n_pairs_present++; @@ -580,18 +602,27 @@ int pma_delete (PMA pma, DBT *k, DB *db) { int l; l = pmainternal_find(pma, k, db); - struct kv_pair *pair = pma->pairs[l]; - if (pair==0) { + struct kv_pair *kv = pma->pairs[l]; + if (!kv_pair_inuse(kv)) { printf("%s:%d l=%d r=%d\n", __FILE__, __LINE__, l, DB_NOTFOUND); return DB_NOTFOUND; } - kv_pair_free(pair); - pma->pairs[l] = 0; - pma->n_pairs_present--; - __pma_delete_at(pma, l); + pma->pairs[l] = kv_pair_set_deleted(kv); + if (__pma_count_cursor_refs(pma, l) == 0) + __pma_delete_finish(pma, l); return BRT_OK; } +void __pma_delete_finish(PMA pma, int here) { + struct kv_pair *kv = pma->pairs[here]; + if (!kv_pair_inuse(kv)) + return; + kv_pair_free(kv_pair_ptr(kv)); + pma->pairs[here] = 0; + pma->n_pairs_present--; + __pma_delete_at(pma, here); +} + void __pma_delete_at(PMA pma, int here) { int size; int count; @@ -666,19 +697,21 @@ int pma_insert_or_replace (PMA pma, DBT *k, DBT *v, DB *db, ) { //printf("%s:%d v->size=%d\n", __FILE__, __LINE__, v->size); int idx = pmainternal_find(pma, k, db); - struct kv_pair *pair; - if (idx < pma_index_limit(pma) && (pair=pma->pairs[idx])) { + struct kv_pair *kv; + if (idx < pma_index_limit(pma) && (kv = pma->pairs[idx])) { DBT k2; - if (0==pma->compare_fun(db, k, fill_dbt(&k2, pair->key, pair->keylen))) { - *replaced_v_size = pair->vallen; - pma->pairs[idx] = kv_pair_realloc_same_key(pair, v->data, v->size); - return BRT_OK; /* It is already here. Return an error. */ + kv = kv_pair_ptr(kv); + if (0==pma->compare_fun(db, k, fill_dbt(&k2, kv->key, kv->keylen))) { + if (!kv_pair_deleted(pma->pairs[idx])) + *replaced_v_size = kv->vallen; + pma->pairs[idx] = kv_pair_realloc_same_key(kv, v->data, v->size); + return BRT_OK; } } - if (pma->pairs[idx]) { + if (kv_pair_inuse(pma->pairs[idx])) { idx = pmainternal_make_space_at (pma, idx); /* returns the new idx. */ } - assert(!pma->pairs[idx]); + assert(!kv_pair_inuse(pma->pairs[idx])); //printf("%s:%d v->size=%d\n", __FILE__, __LINE__, v->size); pma->pairs[idx] = kv_pair_malloc(k->data, k->size, v->data, v->size); assert(pma->pairs[idx]); @@ -698,6 +731,21 @@ void pma_iterate (PMA pma, void(*f)(bytevec,ITEMLEN,bytevec,ITEMLEN, void*), voi } } +int __pma_count_cursor_refs(PMA pma, int here) { + int refs = 0; + struct list *list; + struct pma_cursor *cursor; + + list = list_head(&pma->cursors); + while (list != &pma->cursors) { + cursor = list_struct(list, struct pma_cursor, next); + if (cursor->position == here) + refs += 1; + list = list->next; + } + return refs; +} + void __pma_update_cursors_position(PMA pma, struct list *cursor_set, int oldposition, int newposition) { struct list *list, *nextlist; struct pma_cursor *cursor;