Initial import
[samba] / source / tdb / tdb.c
1  /* 
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              1999-2004
7    Copyright (C) Paul `Rusty' Russell              2000
8    Copyright (C) Jeremy Allison                    2000-2003
9    
10      ** NOTE! The following LGPL license applies to the tdb
11      ** library. This does NOT imply that all of Samba is released
12      ** under the LGPL
13    
14    This library is free software; you can redistribute it and/or
15    modify it under the terms of the GNU Lesser General Public
16    License as published by the Free Software Foundation; either
17    version 2 of the License, or (at your option) any later version.
18
19    This library is distributed in the hope that it will be useful,
20    but WITHOUT ANY WARRANTY; without even the implied warranty of
21    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
22    Lesser General Public License for more details.
23    
24    You should have received a copy of the GNU Lesser General Public
25    License along with this library; if not, write to the Free Software
26    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
27 */
28
29
30 /* NOTE: If you use tdbs under valgrind, and in particular if you run
31  * tdbtorture, you may get spurious "uninitialized value" warnings.  I
32  * think this is because valgrind doesn't understand that the mmap'd
33  * area may be written to by other processes.  Memory can, from the
34  * point of view of the grinded process, spontaneously become
35  * initialized.
36  *
37  * I can think of a few solutions.  [mbp 20030311]
38  *
39  * 1 - Write suppressions for Valgrind so that it doesn't complain
40  * about this.  Probably the most reasonable but people need to
41  * remember to use them.
42  *
43  * 2 - Use IO not mmap when running under valgrind.  Not so nice.
44  *
45  * 3 - Use the special valgrind macros to mark memory as valid at the
46  * right time.  Probably too hard -- the process just doesn't know.
47  */ 
48
49 #ifdef STANDALONE
50 #if HAVE_CONFIG_H
51 #include <config.h>
52 #endif
53
54 #include <stdlib.h>
55 #include <stdio.h>
56 #include <fcntl.h>
57 #include <unistd.h>
58 #include <string.h>
59 #include <fcntl.h>
60 #include <errno.h>
61 #include <sys/mman.h>
62 #include <sys/stat.h>
63 #include <signal.h>
64 #include "tdb.h"
65 #include "spinlock.h"
66 #else
67 #include "includes.h"
68
69 #if defined(PARANOID_MALLOC_CHECKER)
70 #ifdef malloc
71 #undef malloc
72 #endif
73
74 #ifdef realloc
75 #undef realloc
76 #endif
77
78 #ifdef calloc
79 #undef calloc
80 #endif
81
82 #ifdef strdup
83 #undef strdup
84 #endif
85
86 #ifdef strndup
87 #undef strndup
88 #endif
89
90 #endif
91
92 #endif
93
94 #define TDB_MAGIC_FOOD "TDB file\n"
95 #define TDB_VERSION (0x26011967 + 6)
96 #define TDB_MAGIC (0x26011999U)
97 #define TDB_FREE_MAGIC (~TDB_MAGIC)
98 #define TDB_DEAD_MAGIC (0xFEE1DEAD)
99 #define TDB_ALIGNMENT 4
100 #define MIN_REC_SIZE (2*sizeof(struct list_struct) + TDB_ALIGNMENT)
101 #define DEFAULT_HASH_SIZE 131
102 #define TDB_PAGE_SIZE 0x2000
103 #define FREELIST_TOP (sizeof(struct tdb_header))
104 #define TDB_ALIGN(x,a) (((x) + (a)-1) & ~((a)-1))
105 #define TDB_BYTEREV(x) (((((x)&0xff)<<24)|((x)&0xFF00)<<8)|(((x)>>8)&0xFF00)|((x)>>24))
106 #define TDB_DEAD(r) ((r)->magic == TDB_DEAD_MAGIC)
107 #define TDB_BAD_MAGIC(r) ((r)->magic != TDB_MAGIC && !TDB_DEAD(r))
108 #define TDB_HASH_TOP(hash) (FREELIST_TOP + (BUCKET(hash)+1)*sizeof(tdb_off))
109 #define TDB_DATA_START(hash_size) (TDB_HASH_TOP(hash_size-1) + TDB_SPINLOCK_SIZE(hash_size))
110
111
112 /* NB assumes there is a local variable called "tdb" that is the
113  * current context, also takes doubly-parenthesized print-style
114  * argument. */
115 #define TDB_LOG(x) (tdb->log_fn?((tdb->log_fn x),0) : 0)
116
117 /* lock offsets */
118 #define GLOBAL_LOCK 0
119 #define ACTIVE_LOCK 4
120
121 #ifndef MAP_FILE
122 #define MAP_FILE 0
123 #endif
124
125 #ifndef MAP_FAILED
126 #define MAP_FAILED ((void *)-1)
127 #endif
128
129 /* free memory if the pointer is valid and zero the pointer */
130 #ifndef SAFE_FREE
131 #define SAFE_FREE(x) do { if ((x) != NULL) {free((x)); (x)=NULL;} } while(0)
132 #endif
133
134 #define BUCKET(hash) ((hash) % tdb->header.hash_size)
135 TDB_DATA tdb_null;
136
137 /* all contexts, to ensure no double-opens (fcntl locks don't nest!) */
138 static TDB_CONTEXT *tdbs = NULL;
139
140 static int tdb_munmap(TDB_CONTEXT *tdb)
141 {
142         if (tdb->flags & TDB_INTERNAL)
143                 return 0;
144
145 #ifdef HAVE_MMAP
146         if (tdb->map_ptr) {
147                 int ret = munmap(tdb->map_ptr, tdb->map_size);
148                 if (ret != 0)
149                         return ret;
150         }
151 #endif
152         tdb->map_ptr = NULL;
153         return 0;
154 }
155
156 static void tdb_mmap(TDB_CONTEXT *tdb)
157 {
158         if (tdb->flags & TDB_INTERNAL)
159                 return;
160
161 #ifdef HAVE_MMAP
162         if (!(tdb->flags & TDB_NOMMAP)) {
163                 tdb->map_ptr = mmap(NULL, tdb->map_size, 
164                                     PROT_READ|(tdb->read_only? 0:PROT_WRITE), 
165                                     MAP_SHARED|MAP_FILE, tdb->fd, 0);
166
167                 /*
168                  * NB. When mmap fails it returns MAP_FAILED *NOT* NULL !!!!
169                  */
170
171                 if (tdb->map_ptr == MAP_FAILED) {
172                         tdb->map_ptr = NULL;
173                         TDB_LOG((tdb, 2, "tdb_mmap failed for size %d (%s)\n", 
174                                  tdb->map_size, strerror(errno)));
175                 }
176         } else {
177                 tdb->map_ptr = NULL;
178         }
179 #else
180         tdb->map_ptr = NULL;
181 #endif
182 }
183
184 /* Endian conversion: we only ever deal with 4 byte quantities */
185 static void *convert(void *buf, u32 size)
186 {
187         u32 i, *p = buf;
188         for (i = 0; i < size / 4; i++)
189                 p[i] = TDB_BYTEREV(p[i]);
190         return buf;
191 }
192 #define DOCONV() (tdb->flags & TDB_CONVERT)
193 #define CONVERT(x) (DOCONV() ? convert(&x, sizeof(x)) : &x)
194
195 /* the body of the database is made of one list_struct for the free space
196    plus a separate data list for each hash value */
197 struct list_struct {
198         tdb_off next; /* offset of the next record in the list */
199         tdb_len rec_len; /* total byte length of record */
200         tdb_len key_len; /* byte length of key */
201         tdb_len data_len; /* byte length of data */
202         u32 full_hash; /* the full 32 bit hash of the key */
203         u32 magic;   /* try to catch errors */
204         /* the following union is implied:
205                 union {
206                         char record[rec_len];
207                         struct {
208                                 char key[key_len];
209                                 char data[data_len];
210                         }
211                         u32 totalsize; (tailer)
212                 }
213         */
214 };
215
216 /***************************************************************
217  Allow a caller to set a "alarm" flag that tdb can check to abort
218  a blocking lock on SIGALRM.
219 ***************************************************************/
220
221 static sig_atomic_t *palarm_fired;
222
223 void tdb_set_lock_alarm(sig_atomic_t *palarm)
224 {
225         palarm_fired = palarm;
226 }
227
228 /* a byte range locking function - return 0 on success
229    this functions locks/unlocks 1 byte at the specified offset.
230
231    On error, errno is also set so that errors are passed back properly
232    through tdb_open(). */
233 static int tdb_brlock(TDB_CONTEXT *tdb, tdb_off offset, 
234                       int rw_type, int lck_type, int probe)
235 {
236         struct flock fl;
237         int ret;
238
239         if (tdb->flags & TDB_NOLOCK)
240                 return 0;
241         if ((rw_type == F_WRLCK) && (tdb->read_only)) {
242                 errno = EACCES;
243                 return -1;
244         }
245
246         fl.l_type = rw_type;
247         fl.l_whence = SEEK_SET;
248         fl.l_start = offset;
249         fl.l_len = 1;
250         fl.l_pid = 0;
251
252         do {
253                 ret = fcntl(tdb->fd,lck_type,&fl);
254                 if (ret == -1 && errno == EINTR && palarm_fired && *palarm_fired)
255                         break;
256         } while (ret == -1 && errno == EINTR);
257
258         if (ret == -1) {
259                 if (!probe && lck_type != F_SETLK) {
260                         /* Ensure error code is set for log fun to examine. */
261                         if (errno == EINTR && palarm_fired && *palarm_fired)
262                                 tdb->ecode = TDB_ERR_LOCK_TIMEOUT;
263                         else
264                                 tdb->ecode = TDB_ERR_LOCK;
265                         TDB_LOG((tdb, 5,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d\n", 
266                                  tdb->fd, offset, rw_type, lck_type));
267                 }
268                 /* Was it an alarm timeout ? */
269                 if (errno == EINTR && palarm_fired && *palarm_fired) {
270                         TDB_LOG((tdb, 5, "tdb_brlock timed out (fd=%d) at offset %d rw_type=%d lck_type=%d\n", 
271                                  tdb->fd, offset, rw_type, lck_type));
272                         return TDB_ERRCODE(TDB_ERR_LOCK_TIMEOUT, -1);
273                 }
274                 /* Otherwise - generic lock error. errno set by fcntl.
275                  * EAGAIN is an expected return from non-blocking
276                  * locks. */
277                 if (errno != EAGAIN) {
278                         TDB_LOG((tdb, 5, "tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d: %s\n", 
279                                  tdb->fd, offset, rw_type, lck_type, 
280                                  strerror(errno)));
281                 }
282                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
283         }
284         return 0;
285 }
286
287 /* lock a list in the database. list -1 is the alloc list */
288 static int tdb_lock(TDB_CONTEXT *tdb, int list, int ltype)
289 {
290         if (list < -1 || list >= (int)tdb->header.hash_size) {
291                 TDB_LOG((tdb, 0,"tdb_lock: invalid list %d for ltype=%d\n", 
292                            list, ltype));
293                 return -1;
294         }
295         if (tdb->flags & TDB_NOLOCK)
296                 return 0;
297
298         /* Since fcntl locks don't nest, we do a lock for the first one,
299            and simply bump the count for future ones */
300         if (tdb->locked[list+1].count == 0) {
301                 if (!tdb->read_only && tdb->header.rwlocks) {
302                         if (tdb_spinlock(tdb, list, ltype)) {
303                                 TDB_LOG((tdb, 0, "tdb_lock spinlock failed on list %d ltype=%d\n", 
304                                            list, ltype));
305                                 return -1;
306                         }
307                 } else if (tdb_brlock(tdb,FREELIST_TOP+4*list,ltype,F_SETLKW, 0)) {
308                         TDB_LOG((tdb, 0,"tdb_lock failed on list %d ltype=%d (%s)\n", 
309                                            list, ltype, strerror(errno)));
310                         return -1;
311                 }
312                 tdb->locked[list+1].ltype = ltype;
313         }
314         tdb->locked[list+1].count++;
315         return 0;
316 }
317
318 /* unlock the database: returns void because it's too late for errors. */
319         /* changed to return int it may be interesting to know there
320            has been an error  --simo */
321 static int tdb_unlock(TDB_CONTEXT *tdb, int list, int ltype)
322 {
323         int ret = -1;
324
325         if (tdb->flags & TDB_NOLOCK)
326                 return 0;
327
328         /* Sanity checks */
329         if (list < -1 || list >= (int)tdb->header.hash_size) {
330                 TDB_LOG((tdb, 0, "tdb_unlock: list %d invalid (%d)\n", list, tdb->header.hash_size));
331                 return ret;
332         }
333
334         if (tdb->locked[list+1].count==0) {
335                 TDB_LOG((tdb, 0, "tdb_unlock: count is 0\n"));
336                 return ret;
337         }
338
339         if (tdb->locked[list+1].count == 1) {
340                 /* Down to last nested lock: unlock underneath */
341                 if (!tdb->read_only && tdb->header.rwlocks) {
342                         ret = tdb_spinunlock(tdb, list, ltype);
343                 } else {
344                         ret = tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK, F_SETLKW, 0);
345                 }
346         } else {
347                 ret = 0;
348         }
349         tdb->locked[list+1].count--;
350
351         if (ret)
352                 TDB_LOG((tdb, 0,"tdb_unlock: An error occurred unlocking!\n")); 
353         return ret;
354 }
355
356 /* check for an out of bounds access - if it is out of bounds then
357    see if the database has been expanded by someone else and expand
358    if necessary 
359    note that "len" is the minimum length needed for the db
360 */
361 static int tdb_oob(TDB_CONTEXT *tdb, tdb_off len, int probe)
362 {
363         struct stat st;
364         if (len <= tdb->map_size)
365                 return 0;
366         if (tdb->flags & TDB_INTERNAL) {
367                 if (!probe) {
368                         /* Ensure ecode is set for log fn. */
369                         tdb->ecode = TDB_ERR_IO;
370                         TDB_LOG((tdb, 0,"tdb_oob len %d beyond internal malloc size %d\n",
371                                  (int)len, (int)tdb->map_size));
372                 }
373                 return TDB_ERRCODE(TDB_ERR_IO, -1);
374         }
375
376         if (fstat(tdb->fd, &st) == -1)
377                 return TDB_ERRCODE(TDB_ERR_IO, -1);
378
379         if (st.st_size < (size_t)len) {
380                 if (!probe) {
381                         /* Ensure ecode is set for log fn. */
382                         tdb->ecode = TDB_ERR_IO;
383                         TDB_LOG((tdb, 0,"tdb_oob len %d beyond eof at %d\n",
384                                  (int)len, (int)st.st_size));
385                 }
386                 return TDB_ERRCODE(TDB_ERR_IO, -1);
387         }
388
389         /* Unmap, update size, remap */
390         if (tdb_munmap(tdb) == -1)
391                 return TDB_ERRCODE(TDB_ERR_IO, -1);
392         tdb->map_size = st.st_size;
393         tdb_mmap(tdb);
394         return 0;
395 }
396
397 /* write a lump of data at a specified offset */
398 static int tdb_write(TDB_CONTEXT *tdb, tdb_off off, void *buf, tdb_len len)
399 {
400         if (tdb_oob(tdb, off + len, 0) != 0)
401                 return -1;
402
403         if (tdb->map_ptr)
404                 memcpy(off + (char *)tdb->map_ptr, buf, len);
405 #ifdef HAVE_PWRITE
406         else if (pwrite(tdb->fd, buf, len, off) != (ssize_t)len) {
407 #else
408         else if (lseek(tdb->fd, off, SEEK_SET) != off
409                  || write(tdb->fd, buf, len) != (ssize_t)len) {
410 #endif
411                 /* Ensure ecode is set for log fn. */
412                 tdb->ecode = TDB_ERR_IO;
413                 TDB_LOG((tdb, 0,"tdb_write failed at %d len=%d (%s)\n",
414                            off, len, strerror(errno)));
415                 return TDB_ERRCODE(TDB_ERR_IO, -1);
416         }
417         return 0;
418 }
419
420 /* read a lump of data at a specified offset, maybe convert */
421 static int tdb_read(TDB_CONTEXT *tdb,tdb_off off,void *buf,tdb_len len,int cv)
422 {
423         if (tdb_oob(tdb, off + len, 0) != 0)
424                 return -1;
425
426         if (tdb->map_ptr)
427                 memcpy(buf, off + (char *)tdb->map_ptr, len);
428 #ifdef HAVE_PREAD
429         else if (pread(tdb->fd, buf, len, off) != (ssize_t)len) {
430 #else
431         else if (lseek(tdb->fd, off, SEEK_SET) != off
432                  || read(tdb->fd, buf, len) != (ssize_t)len) {
433 #endif
434                 /* Ensure ecode is set for log fn. */
435                 tdb->ecode = TDB_ERR_IO;
436                 TDB_LOG((tdb, 0,"tdb_read failed at %d len=%d (%s)\n",
437                            off, len, strerror(errno)));
438                 return TDB_ERRCODE(TDB_ERR_IO, -1);
439         }
440         if (cv)
441                 convert(buf, len);
442         return 0;
443 }
444
445 /* read a lump of data, allocating the space for it */
446 static char *tdb_alloc_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_len len)
447 {
448         char *buf;
449
450         if (!(buf = malloc(len))) {
451                 /* Ensure ecode is set for log fn. */
452                 tdb->ecode = TDB_ERR_OOM;
453                 TDB_LOG((tdb, 0,"tdb_alloc_read malloc failed len=%d (%s)\n",
454                            len, strerror(errno)));
455                 return TDB_ERRCODE(TDB_ERR_OOM, buf);
456         }
457         if (tdb_read(tdb, offset, buf, len, 0) == -1) {
458                 SAFE_FREE(buf);
459                 return NULL;
460         }
461         return buf;
462 }
463
464 /* read/write a tdb_off */
465 static int ofs_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
466 {
467         return tdb_read(tdb, offset, (char*)d, sizeof(*d), DOCONV());
468 }
469 static int ofs_write(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
470 {
471         tdb_off off = *d;
472         return tdb_write(tdb, offset, CONVERT(off), sizeof(*d));
473 }
474
475 /* read/write a record */
476 static int rec_read(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
477 {
478         if (tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
479                 return -1;
480         if (TDB_BAD_MAGIC(rec)) {
481                 /* Ensure ecode is set for log fn. */
482                 tdb->ecode = TDB_ERR_CORRUPT;
483                 TDB_LOG((tdb, 0,"rec_read bad magic 0x%x at offset=%d\n", rec->magic, offset));
484                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
485         }
486         return tdb_oob(tdb, rec->next+sizeof(*rec), 0);
487 }
488 static int rec_write(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
489 {
490         struct list_struct r = *rec;
491         return tdb_write(tdb, offset, CONVERT(r), sizeof(r));
492 }
493
494 /* read a freelist record and check for simple errors */
495 static int rec_free_read(TDB_CONTEXT *tdb, tdb_off off, struct list_struct *rec)
496 {
497         if (tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1)
498                 return -1;
499
500         if (rec->magic == TDB_MAGIC) {
501                 /* this happens when a app is showdown while deleting a record - we should
502                    not completely fail when this happens */
503                 TDB_LOG((tdb, 0,"rec_free_read non-free magic 0x%x at offset=%d - fixing\n", 
504                          rec->magic, off));
505                 rec->magic = TDB_FREE_MAGIC;
506                 if (tdb_write(tdb, off, rec, sizeof(*rec)) == -1)
507                         return -1;
508         }
509
510         if (rec->magic != TDB_FREE_MAGIC) {
511                 /* Ensure ecode is set for log fn. */
512                 tdb->ecode = TDB_ERR_CORRUPT;
513                 TDB_LOG((tdb, 0,"rec_free_read bad magic 0x%x at offset=%d\n", 
514                            rec->magic, off));
515                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
516         }
517         if (tdb_oob(tdb, rec->next+sizeof(*rec), 0) != 0)
518                 return -1;
519         return 0;
520 }
521
522 /* update a record tailer (must hold allocation lock) */
523 static int update_tailer(TDB_CONTEXT *tdb, tdb_off offset,
524                          const struct list_struct *rec)
525 {
526         tdb_off totalsize;
527
528         /* Offset of tailer from record header */
529         totalsize = sizeof(*rec) + rec->rec_len;
530         return ofs_write(tdb, offset + totalsize - sizeof(tdb_off),
531                          &totalsize);
532 }
533
534 static tdb_off tdb_dump_record(TDB_CONTEXT *tdb, tdb_off offset)
535 {
536         struct list_struct rec;
537         tdb_off tailer_ofs, tailer;
538
539         if (tdb_read(tdb, offset, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
540                 printf("ERROR: failed to read record at %u\n", offset);
541                 return 0;
542         }
543
544         printf(" rec: offset=%u next=%d rec_len=%d key_len=%d data_len=%d full_hash=0x%x magic=0x%x\n",
545                offset, rec.next, rec.rec_len, rec.key_len, rec.data_len, rec.full_hash, rec.magic);
546
547         tailer_ofs = offset + sizeof(rec) + rec.rec_len - sizeof(tdb_off);
548         if (ofs_read(tdb, tailer_ofs, &tailer) == -1) {
549                 printf("ERROR: failed to read tailer at %u\n", tailer_ofs);
550                 return rec.next;
551         }
552
553         if (tailer != rec.rec_len + sizeof(rec)) {
554                 printf("ERROR: tailer does not match record! tailer=%u totalsize=%u\n",
555                                 (unsigned)tailer, (unsigned)(rec.rec_len + sizeof(rec)));
556         }
557         return rec.next;
558 }
559
560 static int tdb_dump_chain(TDB_CONTEXT *tdb, int i)
561 {
562         tdb_off rec_ptr, top;
563         int hash_length = 0;
564
565         top = TDB_HASH_TOP(i);
566
567         if (tdb_lock(tdb, i, F_WRLCK) != 0)
568                 return -1;
569
570         if (ofs_read(tdb, top, &rec_ptr) == -1)
571                 return tdb_unlock(tdb, i, F_WRLCK);
572
573         if (rec_ptr)
574                 printf("hash=%d\n", i);
575
576         while (rec_ptr) {
577                 rec_ptr = tdb_dump_record(tdb, rec_ptr);
578                 hash_length += 1;
579         }
580
581         printf("chain %d length %d\n", i, hash_length);
582
583         return tdb_unlock(tdb, i, F_WRLCK);
584 }
585
586 void tdb_dump_all(TDB_CONTEXT *tdb)
587 {
588         int i;
589         for (i=0;i<tdb->header.hash_size;i++) {
590                 tdb_dump_chain(tdb, i);
591         }
592         tdb_printfreelist(tdb);
593 }
594
595 int tdb_printfreelist(TDB_CONTEXT *tdb)
596 {
597         int ret;
598         long total_free = 0;
599         tdb_off offset, rec_ptr;
600         struct list_struct rec;
601
602         if ((ret = tdb_lock(tdb, -1, F_WRLCK)) != 0)
603                 return ret;
604
605         offset = FREELIST_TOP;
606
607         /* read in the freelist top */
608         if (ofs_read(tdb, offset, &rec_ptr) == -1) {
609                 tdb_unlock(tdb, -1, F_WRLCK);
610                 return 0;
611         }
612
613         printf("freelist top=[0x%08x]\n", rec_ptr );
614         while (rec_ptr) {
615                 if (tdb_read(tdb, rec_ptr, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
616                         tdb_unlock(tdb, -1, F_WRLCK);
617                         return -1;
618                 }
619
620                 if (rec.magic != TDB_FREE_MAGIC) {
621                         printf("bad magic 0x%08x in free list\n", rec.magic);
622                         tdb_unlock(tdb, -1, F_WRLCK);
623                         return -1;
624                 }
625
626                 printf("entry offset=[0x%08x], rec.rec_len = [0x%08x (%d)]\n", rec.next, rec.rec_len, rec.rec_len );
627                 total_free += rec.rec_len;
628
629                 /* move to the next record */
630                 rec_ptr = rec.next;
631         }
632         printf("total rec_len = [0x%08x (%d)]\n", (int)total_free, 
633                (int)total_free);
634
635         return tdb_unlock(tdb, -1, F_WRLCK);
636 }
637
638 /* Remove an element from the freelist.  Must have alloc lock. */
639 static int remove_from_freelist(TDB_CONTEXT *tdb, tdb_off off, tdb_off next)
640 {
641         tdb_off last_ptr, i;
642
643         /* read in the freelist top */
644         last_ptr = FREELIST_TOP;
645         while (ofs_read(tdb, last_ptr, &i) != -1 && i != 0) {
646                 if (i == off) {
647                         /* We've found it! */
648                         return ofs_write(tdb, last_ptr, &next);
649                 }
650                 /* Follow chain (next offset is at start of record) */
651                 last_ptr = i;
652         }
653         TDB_LOG((tdb, 0,"remove_from_freelist: not on list at off=%d\n", off));
654         return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
655 }
656
657 /* Add an element into the freelist. Merge adjacent records if
658    neccessary. */
659 static int tdb_free(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
660 {
661         tdb_off right, left;
662
663         /* Allocation and tailer lock */
664         if (tdb_lock(tdb, -1, F_WRLCK) != 0)
665                 return -1;
666
667         /* set an initial tailer, so if we fail we don't leave a bogus record */
668         if (update_tailer(tdb, offset, rec) != 0) {
669                 TDB_LOG((tdb, 0, "tdb_free: upfate_tailer failed!\n"));
670                 goto fail;
671         }
672
673         /* Look right first (I'm an Australian, dammit) */
674         right = offset + sizeof(*rec) + rec->rec_len;
675         if (right + sizeof(*rec) <= tdb->map_size) {
676                 struct list_struct r;
677
678                 if (tdb_read(tdb, right, &r, sizeof(r), DOCONV()) == -1) {
679                         TDB_LOG((tdb, 0, "tdb_free: right read failed at %u\n", right));
680                         goto left;
681                 }
682
683                 /* If it's free, expand to include it. */
684                 if (r.magic == TDB_FREE_MAGIC) {
685                         if (remove_from_freelist(tdb, right, r.next) == -1) {
686                                 TDB_LOG((tdb, 0, "tdb_free: right free failed at %u\n", right));
687                                 goto left;
688                         }
689                         rec->rec_len += sizeof(r) + r.rec_len;
690                 }
691         }
692
693 left:
694         /* Look left */
695         left = offset - sizeof(tdb_off);
696         if (left > TDB_DATA_START(tdb->header.hash_size)) {
697                 struct list_struct l;
698                 tdb_off leftsize;
699                 
700                 /* Read in tailer and jump back to header */
701                 if (ofs_read(tdb, left, &leftsize) == -1) {
702                         TDB_LOG((tdb, 0, "tdb_free: left offset read failed at %u\n", left));
703                         goto update;
704                 }
705                 left = offset - leftsize;
706
707                 /* Now read in record */
708                 if (tdb_read(tdb, left, &l, sizeof(l), DOCONV()) == -1) {
709                         TDB_LOG((tdb, 0, "tdb_free: left read failed at %u (%u)\n", left, leftsize));
710                         goto update;
711                 }
712
713                 /* If it's free, expand to include it. */
714                 if (l.magic == TDB_FREE_MAGIC) {
715                         if (remove_from_freelist(tdb, left, l.next) == -1) {
716                                 TDB_LOG((tdb, 0, "tdb_free: left free failed at %u\n", left));
717                                 goto update;
718                         } else {
719                                 offset = left;
720                                 rec->rec_len += leftsize;
721                         }
722                 }
723         }
724
725 update:
726         if (update_tailer(tdb, offset, rec) == -1) {
727                 TDB_LOG((tdb, 0, "tdb_free: update_tailer failed at %u\n", offset));
728                 goto fail;
729         }
730
731         /* Now, prepend to free list */
732         rec->magic = TDB_FREE_MAGIC;
733
734         if (ofs_read(tdb, FREELIST_TOP, &rec->next) == -1 ||
735             rec_write(tdb, offset, rec) == -1 ||
736             ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
737                 TDB_LOG((tdb, 0, "tdb_free record write failed at offset=%d\n", offset));
738                 goto fail;
739         }
740
741         /* And we're done. */
742         tdb_unlock(tdb, -1, F_WRLCK);
743         return 0;
744
745  fail:
746         tdb_unlock(tdb, -1, F_WRLCK);
747         return -1;
748 }
749
750
751 /* expand a file.  we prefer to use ftruncate, as that is what posix
752   says to use for mmap expansion */
753 static int expand_file(TDB_CONTEXT *tdb, tdb_off size, tdb_off addition)
754 {
755         char buf[1024];
756 #if HAVE_FTRUNCATE_EXTEND
757         if (ftruncate(tdb->fd, size+addition) != 0) {
758                 TDB_LOG((tdb, 0, "expand_file ftruncate to %d failed (%s)\n", 
759                            size+addition, strerror(errno)));
760                 return -1;
761         }
762 #else
763         char b = 0;
764
765 #ifdef HAVE_PWRITE
766         if (pwrite(tdb->fd,  &b, 1, (size+addition) - 1) != 1) {
767 #else
768         if (lseek(tdb->fd, (size+addition) - 1, SEEK_SET) != (size+addition) - 1 || 
769             write(tdb->fd, &b, 1) != 1) {
770 #endif
771                 TDB_LOG((tdb, 0, "expand_file to %d failed (%s)\n", 
772                            size+addition, strerror(errno)));
773                 return -1;
774         }
775 #endif
776
777         /* now fill the file with something. This ensures that the file isn't sparse, which would be
778            very bad if we ran out of disk. This must be done with write, not via mmap */
779         memset(buf, 0x42, sizeof(buf));
780         while (addition) {
781                 int n = addition>sizeof(buf)?sizeof(buf):addition;
782 #ifdef HAVE_PWRITE
783                 int ret = pwrite(tdb->fd, buf, n, size);
784 #else
785                 int ret;
786                 if (lseek(tdb->fd, size, SEEK_SET) != size)
787                         return -1;
788                 ret = write(tdb->fd, buf, n);
789 #endif
790                 if (ret != n) {
791                         TDB_LOG((tdb, 0, "expand_file write of %d failed (%s)\n", 
792                                    n, strerror(errno)));
793                         return -1;
794                 }
795                 addition -= n;
796                 size += n;
797         }
798         return 0;
799 }
800
801
802 /* expand the database at least size bytes by expanding the underlying
803    file and doing the mmap again if necessary */
804 static int tdb_expand(TDB_CONTEXT *tdb, tdb_off size)
805 {
806         struct list_struct rec;
807         tdb_off offset;
808
809         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
810                 TDB_LOG((tdb, 0, "lock failed in tdb_expand\n"));
811                 return -1;
812         }
813
814         /* must know about any previous expansions by another process */
815         tdb_oob(tdb, tdb->map_size + 1, 1);
816
817         /* always make room for at least 10 more records, and round
818            the database up to a multiple of TDB_PAGE_SIZE */
819         size = TDB_ALIGN(tdb->map_size + size*10, TDB_PAGE_SIZE) - tdb->map_size;
820
821         if (!(tdb->flags & TDB_INTERNAL))
822                 tdb_munmap(tdb);
823
824         /*
825          * We must ensure the file is unmapped before doing this
826          * to ensure consistency with systems like OpenBSD where
827          * writes and mmaps are not consistent.
828          */
829
830         /* expand the file itself */
831         if (!(tdb->flags & TDB_INTERNAL)) {
832                 if (expand_file(tdb, tdb->map_size, size) != 0)
833                         goto fail;
834         }
835
836         tdb->map_size += size;
837
838         if (tdb->flags & TDB_INTERNAL) {
839                 char *new_map_ptr = realloc(tdb->map_ptr, tdb->map_size);
840                 if (!new_map_ptr) {
841                         tdb->map_size -= size;
842                         goto fail;
843                 }
844                 tdb->map_ptr = new_map_ptr;
845         } else {
846                 /*
847                  * We must ensure the file is remapped before adding the space
848                  * to ensure consistency with systems like OpenBSD where
849                  * writes and mmaps are not consistent.
850                  */
851
852                 /* We're ok if the mmap fails as we'll fallback to read/write */
853                 tdb_mmap(tdb);
854         }
855
856         /* form a new freelist record */
857         memset(&rec,'\0',sizeof(rec));
858         rec.rec_len = size - sizeof(rec);
859
860         /* link it into the free list */
861         offset = tdb->map_size - size;
862         if (tdb_free(tdb, offset, &rec) == -1)
863                 goto fail;
864
865         tdb_unlock(tdb, -1, F_WRLCK);
866         return 0;
867  fail:
868         tdb_unlock(tdb, -1, F_WRLCK);
869         return -1;
870 }
871
872 /* allocate some space from the free list. The offset returned points
873    to a unconnected list_struct within the database with room for at
874    least length bytes of total data
875
876    0 is returned if the space could not be allocated
877  */
878 static tdb_off tdb_allocate(TDB_CONTEXT *tdb, tdb_len length,
879                             struct list_struct *rec)
880 {
881         tdb_off rec_ptr, last_ptr, newrec_ptr;
882         struct list_struct newrec;
883
884         memset(&newrec, '\0', sizeof(newrec));
885
886         if (tdb_lock(tdb, -1, F_WRLCK) == -1)
887                 return 0;
888
889         /* Extra bytes required for tailer */
890         length += sizeof(tdb_off);
891
892  again:
893         last_ptr = FREELIST_TOP;
894
895         /* read in the freelist top */
896         if (ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1)
897                 goto fail;
898
899         /* keep looking until we find a freelist record big enough */
900         while (rec_ptr) {
901                 if (rec_free_read(tdb, rec_ptr, rec) == -1)
902                         goto fail;
903
904                 if (rec->rec_len >= length) {
905                         /* found it - now possibly split it up  */
906                         if (rec->rec_len > length + MIN_REC_SIZE) {
907                                 /* Length of left piece */
908                                 length = TDB_ALIGN(length, TDB_ALIGNMENT);
909
910                                 /* Right piece to go on free list */
911                                 newrec.rec_len = rec->rec_len
912                                         - (sizeof(*rec) + length);
913                                 newrec_ptr = rec_ptr + sizeof(*rec) + length;
914
915                                 /* And left record is shortened */
916                                 rec->rec_len = length;
917                         } else
918                                 newrec_ptr = 0;
919
920                         /* Remove allocated record from the free list */
921                         if (ofs_write(tdb, last_ptr, &rec->next) == -1)
922                                 goto fail;
923
924                         /* Update header: do this before we drop alloc
925                            lock, otherwise tdb_free() might try to
926                            merge with us, thinking we're free.
927                            (Thanks Jeremy Allison). */
928                         rec->magic = TDB_MAGIC;
929                         if (rec_write(tdb, rec_ptr, rec) == -1)
930                                 goto fail;
931
932                         /* Did we create new block? */
933                         if (newrec_ptr) {
934                                 /* Update allocated record tailer (we
935                                    shortened it). */
936                                 if (update_tailer(tdb, rec_ptr, rec) == -1)
937                                         goto fail;
938
939                                 /* Free new record */
940                                 if (tdb_free(tdb, newrec_ptr, &newrec) == -1)
941                                         goto fail;
942                         }
943
944                         /* all done - return the new record offset */
945                         tdb_unlock(tdb, -1, F_WRLCK);
946                         return rec_ptr;
947                 }
948                 /* move to the next record */
949                 last_ptr = rec_ptr;
950                 rec_ptr = rec->next;
951         }
952         /* we didn't find enough space. See if we can expand the
953            database and if we can then try again */
954         if (tdb_expand(tdb, length + sizeof(*rec)) == 0)
955                 goto again;
956  fail:
957         tdb_unlock(tdb, -1, F_WRLCK);
958         return 0;
959 }
960
961 /* initialise a new database with a specified hash size */
962 static int tdb_new_database(TDB_CONTEXT *tdb, int hash_size)
963 {
964         struct tdb_header *newdb;
965         int size, ret = -1;
966
967         /* We make it up in memory, then write it out if not internal */
968         size = sizeof(struct tdb_header) + (hash_size+1)*sizeof(tdb_off);
969         if (!(newdb = calloc(size, 1)))
970                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
971
972         /* Fill in the header */
973         newdb->version = TDB_VERSION;
974         newdb->hash_size = hash_size;
975 #ifdef USE_SPINLOCKS
976         newdb->rwlocks = size;
977 #endif
978         if (tdb->flags & TDB_INTERNAL) {
979                 tdb->map_size = size;
980                 tdb->map_ptr = (char *)newdb;
981                 memcpy(&tdb->header, newdb, sizeof(tdb->header));
982                 /* Convert the `ondisk' version if asked. */
983                 CONVERT(*newdb);
984                 return 0;
985         }
986         if (lseek(tdb->fd, 0, SEEK_SET) == -1)
987                 goto fail;
988
989         if (ftruncate(tdb->fd, 0) == -1)
990                 goto fail;
991
992         /* This creates an endian-converted header, as if read from disk */
993         CONVERT(*newdb);
994         memcpy(&tdb->header, newdb, sizeof(tdb->header));
995         /* Don't endian-convert the magic food! */
996         memcpy(newdb->magic_food, TDB_MAGIC_FOOD, strlen(TDB_MAGIC_FOOD)+1);
997         if (write(tdb->fd, newdb, size) != size)
998                 ret = -1;
999         else
1000                 ret = tdb_create_rwlocks(tdb->fd, hash_size);
1001
1002   fail:
1003         SAFE_FREE(newdb);
1004         return ret;
1005 }
1006
1007 /* Returns 0 on fail.  On success, return offset of record, and fills
1008    in rec */
1009 static tdb_off tdb_find(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash,
1010                         struct list_struct *r)
1011 {
1012         tdb_off rec_ptr;
1013         
1014         /* read in the hash top */
1015         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
1016                 return 0;
1017
1018         /* keep looking until we find the right record */
1019         while (rec_ptr) {
1020                 if (rec_read(tdb, rec_ptr, r) == -1)
1021                         return 0;
1022
1023                 if (!TDB_DEAD(r) && hash==r->full_hash && key.dsize==r->key_len) {
1024                         char *k;
1025                         /* a very likely hit - read the key */
1026                         k = tdb_alloc_read(tdb, rec_ptr + sizeof(*r), 
1027                                            r->key_len);
1028                         if (!k)
1029                                 return 0;
1030
1031                         if (memcmp(key.dptr, k, key.dsize) == 0) {
1032                                 SAFE_FREE(k);
1033                                 return rec_ptr;
1034                         }
1035                         SAFE_FREE(k);
1036                 }
1037                 rec_ptr = r->next;
1038         }
1039         return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
1040 }
1041
1042 /* As tdb_find, but if you succeed, keep the lock */
1043 static tdb_off tdb_find_lock_hash(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash, int locktype,
1044                              struct list_struct *rec)
1045 {
1046         u32 rec_ptr;
1047
1048         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
1049                 return 0;
1050         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
1051                 tdb_unlock(tdb, BUCKET(hash), locktype);
1052         return rec_ptr;
1053 }
1054
1055 enum TDB_ERROR tdb_error(TDB_CONTEXT *tdb)
1056 {
1057         return tdb->ecode;
1058 }
1059
1060 static struct tdb_errname {
1061         enum TDB_ERROR ecode; const char *estring;
1062 } emap[] = { {TDB_SUCCESS, "Success"},
1063              {TDB_ERR_CORRUPT, "Corrupt database"},
1064              {TDB_ERR_IO, "IO Error"},
1065              {TDB_ERR_LOCK, "Locking error"},
1066              {TDB_ERR_OOM, "Out of memory"},
1067              {TDB_ERR_EXISTS, "Record exists"},
1068              {TDB_ERR_NOLOCK, "Lock exists on other keys"},
1069              {TDB_ERR_NOEXIST, "Record does not exist"} };
1070
1071 /* Error string for the last tdb error */
1072 const char *tdb_errorstr(TDB_CONTEXT *tdb)
1073 {
1074         u32 i;
1075         for (i = 0; i < sizeof(emap) / sizeof(struct tdb_errname); i++)
1076                 if (tdb->ecode == emap[i].ecode)
1077                         return emap[i].estring;
1078         return "Invalid error code";
1079 }
1080
1081 /* update an entry in place - this only works if the new data size
1082    is <= the old data size and the key exists.
1083    on failure return -1.
1084 */
1085
1086 static int tdb_update_hash(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash, TDB_DATA dbuf)
1087 {
1088         struct list_struct rec;
1089         tdb_off rec_ptr;
1090
1091         /* find entry */
1092         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
1093                 return -1;
1094
1095         /* must be long enough key, data and tailer */
1096         if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off)) {
1097                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
1098                 return -1;
1099         }
1100
1101         if (tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
1102                       dbuf.dptr, dbuf.dsize) == -1)
1103                 return -1;
1104
1105         if (dbuf.dsize != rec.data_len) {
1106                 /* update size */
1107                 rec.data_len = dbuf.dsize;
1108                 return rec_write(tdb, rec_ptr, &rec);
1109         }
1110  
1111         return 0;
1112 }
1113
1114 /* find an entry in the database given a key */
1115 /* If an entry doesn't exist tdb_err will be set to
1116  * TDB_ERR_NOEXIST. If a key has no data attached
1117  * tdb_err will not be set. Both will return a
1118  * zero pptr and zero dsize.
1119  */
1120
1121 TDB_DATA tdb_fetch(TDB_CONTEXT *tdb, TDB_DATA key)
1122 {
1123         tdb_off rec_ptr;
1124         struct list_struct rec;
1125         TDB_DATA ret;
1126         u32 hash;
1127
1128         /* find which hash bucket it is in */
1129         hash = tdb->hash_fn(&key);
1130         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec)))
1131                 return tdb_null;
1132
1133         if (rec.data_len)
1134                 ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
1135                                           rec.data_len);
1136         else
1137                 ret.dptr = NULL;
1138         ret.dsize = rec.data_len;
1139         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
1140         return ret;
1141 }
1142
1143 /* check if an entry in the database exists 
1144
1145    note that 1 is returned if the key is found and 0 is returned if not found
1146    this doesn't match the conventions in the rest of this module, but is
1147    compatible with gdbm
1148 */
1149 static int tdb_exists_hash(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash)
1150 {
1151         struct list_struct rec;
1152         
1153         if (tdb_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
1154                 return 0;
1155         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
1156         return 1;
1157 }
1158
1159 int tdb_exists(TDB_CONTEXT *tdb, TDB_DATA key)
1160 {
1161         u32 hash = tdb->hash_fn(&key);
1162         return tdb_exists_hash(tdb, key, hash);
1163 }
1164
1165 /* record lock stops delete underneath */
1166 static int lock_record(TDB_CONTEXT *tdb, tdb_off off)
1167 {
1168         return off ? tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0) : 0;
1169 }
1170 /*
1171   Write locks override our own fcntl readlocks, so check it here.
1172   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
1173   an error to fail to get the lock here.
1174 */
1175  
1176 static int write_lock_record(TDB_CONTEXT *tdb, tdb_off off)
1177 {
1178         struct tdb_traverse_lock *i;
1179         for (i = &tdb->travlocks; i; i = i->next)
1180                 if (i->off == off)
1181                         return -1;
1182         return tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1);
1183 }
1184
1185 /*
1186   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
1187   an error to fail to get the lock here.
1188 */
1189
1190 static int write_unlock_record(TDB_CONTEXT *tdb, tdb_off off)
1191 {
1192         return tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0);
1193 }
1194 /* fcntl locks don't stack: avoid unlocking someone else's */
1195 static int unlock_record(TDB_CONTEXT *tdb, tdb_off off)
1196 {
1197         struct tdb_traverse_lock *i;
1198         u32 count = 0;
1199
1200         if (off == 0)
1201                 return 0;
1202         for (i = &tdb->travlocks; i; i = i->next)
1203                 if (i->off == off)
1204                         count++;
1205         return (count == 1 ? tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0) : 0);
1206 }
1207
1208 /* actually delete an entry in the database given the offset */
1209 static int do_delete(TDB_CONTEXT *tdb, tdb_off rec_ptr, struct list_struct*rec)
1210 {
1211         tdb_off last_ptr, i;
1212         struct list_struct lastrec;
1213
1214         if (tdb->read_only) return -1;
1215
1216         if (write_lock_record(tdb, rec_ptr) == -1) {
1217                 /* Someone traversing here: mark it as dead */
1218                 rec->magic = TDB_DEAD_MAGIC;
1219                 return rec_write(tdb, rec_ptr, rec);
1220         }
1221         if (write_unlock_record(tdb, rec_ptr) != 0)
1222                 return -1;
1223
1224         /* find previous record in hash chain */
1225         if (ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
1226                 return -1;
1227         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
1228                 if (rec_read(tdb, i, &lastrec) == -1)
1229                         return -1;
1230
1231         /* unlink it: next ptr is at start of record. */
1232         if (last_ptr == 0)
1233                 last_ptr = TDB_HASH_TOP(rec->full_hash);
1234         if (ofs_write(tdb, last_ptr, &rec->next) == -1)
1235                 return -1;
1236
1237         /* recover the space */
1238         if (tdb_free(tdb, rec_ptr, rec) == -1)
1239                 return -1;
1240         return 0;
1241 }
1242
1243 /* Uses traverse lock: 0 = finish, -1 = error, other = record offset */
1244 static int tdb_next_lock(TDB_CONTEXT *tdb, struct tdb_traverse_lock *tlock,
1245                          struct list_struct *rec)
1246 {
1247         int want_next = (tlock->off != 0);
1248
1249         /* Lock each chain from the start one. */
1250         for (; tlock->hash < tdb->header.hash_size; tlock->hash++) {
1251
1252                 /* this is an optimisation for the common case where
1253                    the hash chain is empty, which is particularly
1254                    common for the use of tdb with ldb, where large
1255                    hashes are used. In that case we spend most of our
1256                    time in tdb_brlock(), locking empty hash chains.
1257
1258                    To avoid this, we do an unlocked pre-check to see
1259                    if the hash chain is empty before starting to look
1260                    inside it. If it is empty then we can avoid that
1261                    hash chain. If it isn't empty then we can't believe
1262                    the value we get back, as we read it without a
1263                    lock, so instead we get the lock and re-fetch the
1264                    value below.
1265
1266                    Notice that not doing this optimisation on the
1267                    first hash chain is critical. We must guarantee
1268                    that we have done at least one fcntl lock at the
1269                    start of a search to guarantee that memory is
1270                    coherent on SMP systems. If records are added by
1271                    others during the search then thats OK, and we
1272                    could possibly miss those with this trick, but we
1273                    could miss them anyway without this trick, so the
1274                    semantics don't change.
1275
1276                    With a non-indexed ldb search this trick gains us a
1277                    factor of around 80 in speed on a linux 2.6.x
1278                    system (testing using ldbtest).
1279                  */
1280                 if (!tlock->off && tlock->hash != 0) {
1281                         u32 off;
1282                         if (tdb->map_ptr) {
1283                                 for (;tlock->hash < tdb->header.hash_size;tlock->hash++) {
1284                                         if (0 != *(u32 *)(TDB_HASH_TOP(tlock->hash) + (unsigned char *)tdb->map_ptr)) {
1285                                                 break;
1286                                         }
1287                                 }
1288                                 if (tlock->hash == tdb->header.hash_size) {
1289                                         continue;
1290                                 }
1291                         } else {
1292                                 if (ofs_read(tdb, TDB_HASH_TOP(tlock->hash), &off) == 0 &&
1293                                     off == 0) {
1294                                         continue;
1295                                 }
1296                         }
1297                 }
1298
1299                 if (tdb_lock(tdb, tlock->hash, F_WRLCK) == -1)
1300                         return -1;
1301
1302                 /* No previous record?  Start at top of chain. */
1303                 if (!tlock->off) {
1304                         if (ofs_read(tdb, TDB_HASH_TOP(tlock->hash),
1305                                      &tlock->off) == -1)
1306                                 goto fail;
1307                 } else {
1308                         /* Otherwise unlock the previous record. */
1309                         if (unlock_record(tdb, tlock->off) != 0)
1310                                 goto fail;
1311                 }
1312
1313                 if (want_next) {
1314                         /* We have offset of old record: grab next */
1315                         if (rec_read(tdb, tlock->off, rec) == -1)
1316                                 goto fail;
1317                         tlock->off = rec->next;
1318                 }
1319
1320                 /* Iterate through chain */
1321                 while( tlock->off) {
1322                         tdb_off current;
1323                         if (rec_read(tdb, tlock->off, rec) == -1)
1324                                 goto fail;
1325
1326                         /* Detect infinite loops. From "Shlomi Yaakobovich" <Shlomi@exanet.com>. */
1327                         if (tlock->off == rec->next) {
1328                                 TDB_LOG((tdb, 0, "tdb_next_lock: loop detected.\n"));
1329                                 goto fail;
1330                         }
1331
1332                         if (!TDB_DEAD(rec)) {
1333                                 /* Woohoo: we found one! */
1334                                 if (lock_record(tdb, tlock->off) != 0)
1335                                         goto fail;
1336                                 return tlock->off;
1337                         }
1338
1339                         /* Try to clean dead ones from old traverses */
1340                         current = tlock->off;
1341                         tlock->off = rec->next;
1342                         if (!tdb->read_only && 
1343                             do_delete(tdb, current, rec) != 0)
1344                                 goto fail;
1345                 }
1346                 tdb_unlock(tdb, tlock->hash, F_WRLCK);
1347                 want_next = 0;
1348         }
1349         /* We finished iteration without finding anything */
1350         return TDB_ERRCODE(TDB_SUCCESS, 0);
1351
1352  fail:
1353         tlock->off = 0;
1354         if (tdb_unlock(tdb, tlock->hash, F_WRLCK) != 0)
1355                 TDB_LOG((tdb, 0, "tdb_next_lock: On error unlock failed!\n"));
1356         return -1;
1357 }
1358
1359 /* traverse the entire database - calling fn(tdb, key, data) on each element.
1360    return -1 on error or the record count traversed
1361    if fn is NULL then it is not called
1362    a non-zero return value from fn() indicates that the traversal should stop
1363   */
1364 int tdb_traverse(TDB_CONTEXT *tdb, tdb_traverse_func fn, void *private_val)
1365 {
1366         TDB_DATA key, dbuf;
1367         struct list_struct rec;
1368         struct tdb_traverse_lock tl = { NULL, 0, 0 };
1369         int ret, count = 0;
1370
1371         /* This was in the initializaton, above, but the IRIX compiler
1372          * did not like it.  crh
1373          */
1374         tl.next = tdb->travlocks.next;
1375
1376         /* fcntl locks don't stack: beware traverse inside traverse */
1377         tdb->travlocks.next = &tl;
1378
1379         /* tdb_next_lock places locks on the record returned, and its chain */
1380         while ((ret = tdb_next_lock(tdb, &tl, &rec)) > 0) {
1381                 count++;
1382                 /* now read the full record */
1383                 key.dptr = tdb_alloc_read(tdb, tl.off + sizeof(rec), 
1384                                           rec.key_len + rec.data_len);
1385                 if (!key.dptr) {
1386                         ret = -1;
1387                         if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0)
1388                                 goto out;
1389                         if (unlock_record(tdb, tl.off) != 0)
1390                                 TDB_LOG((tdb, 0, "tdb_traverse: key.dptr == NULL and unlock_record failed!\n"));
1391                         goto out;
1392                 }
1393                 key.dsize = rec.key_len;
1394                 dbuf.dptr = key.dptr + rec.key_len;
1395                 dbuf.dsize = rec.data_len;
1396
1397                 /* Drop chain lock, call out */
1398                 if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0) {
1399                         ret = -1;
1400                         goto out;
1401                 }
1402                 if (fn && fn(tdb, key, dbuf, private_val)) {
1403                         /* They want us to terminate traversal */
1404                         ret = count;
1405                         if (unlock_record(tdb, tl.off) != 0) {
1406                                 TDB_LOG((tdb, 0, "tdb_traverse: unlock_record failed!\n"));;
1407                                 ret = -1;
1408                         }
1409                         tdb->travlocks.next = tl.next;
1410                         SAFE_FREE(key.dptr);
1411                         return count;
1412                 }
1413                 SAFE_FREE(key.dptr);
1414         }
1415 out:
1416         tdb->travlocks.next = tl.next;
1417         if (ret < 0)
1418                 return -1;
1419         else
1420                 return count;
1421 }
1422
1423 /* find the first entry in the database and return its key */
1424 TDB_DATA tdb_firstkey(TDB_CONTEXT *tdb)
1425 {
1426         TDB_DATA key;
1427         struct list_struct rec;
1428
1429         /* release any old lock */
1430         if (unlock_record(tdb, tdb->travlocks.off) != 0)
1431                 return tdb_null;
1432         tdb->travlocks.off = tdb->travlocks.hash = 0;
1433
1434         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) <= 0)
1435                 return tdb_null;
1436         /* now read the key */
1437         key.dsize = rec.key_len;
1438         key.dptr =tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),key.dsize);
1439         if (tdb_unlock(tdb, BUCKET(tdb->travlocks.hash), F_WRLCK) != 0)
1440                 TDB_LOG((tdb, 0, "tdb_firstkey: error occurred while tdb_unlocking!\n"));
1441         return key;
1442 }
1443
1444 /* find the next entry in the database, returning its key */
1445 TDB_DATA tdb_nextkey(TDB_CONTEXT *tdb, TDB_DATA oldkey)
1446 {
1447         u32 oldhash;
1448         TDB_DATA key = tdb_null;
1449         struct list_struct rec;
1450         char *k = NULL;
1451
1452         /* Is locked key the old key?  If so, traverse will be reliable. */
1453         if (tdb->travlocks.off) {
1454                 if (tdb_lock(tdb,tdb->travlocks.hash,F_WRLCK))
1455                         return tdb_null;
1456                 if (rec_read(tdb, tdb->travlocks.off, &rec) == -1
1457                     || !(k = tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),
1458                                             rec.key_len))
1459                     || memcmp(k, oldkey.dptr, oldkey.dsize) != 0) {
1460                         /* No, it wasn't: unlock it and start from scratch */
1461                         if (unlock_record(tdb, tdb->travlocks.off) != 0)
1462                                 return tdb_null;
1463                         if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
1464                                 return tdb_null;
1465                         tdb->travlocks.off = 0;
1466                 }
1467
1468                 SAFE_FREE(k);
1469         }
1470
1471         if (!tdb->travlocks.off) {
1472                 /* No previous element: do normal find, and lock record */
1473                 tdb->travlocks.off = tdb_find_lock_hash(tdb, oldkey, tdb->hash_fn(&oldkey), F_WRLCK, &rec);
1474                 if (!tdb->travlocks.off)
1475                         return tdb_null;
1476                 tdb->travlocks.hash = BUCKET(rec.full_hash);
1477                 if (lock_record(tdb, tdb->travlocks.off) != 0) {
1478                         TDB_LOG((tdb, 0, "tdb_nextkey: lock_record failed (%s)!\n", strerror(errno)));
1479                         return tdb_null;
1480                 }
1481         }
1482         oldhash = tdb->travlocks.hash;
1483
1484         /* Grab next record: locks chain and returned record,
1485            unlocks old record */
1486         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) > 0) {
1487                 key.dsize = rec.key_len;
1488                 key.dptr = tdb_alloc_read(tdb, tdb->travlocks.off+sizeof(rec),
1489                                           key.dsize);
1490                 /* Unlock the chain of this new record */
1491                 if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
1492                         TDB_LOG((tdb, 0, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
1493         }
1494         /* Unlock the chain of old record */
1495         if (tdb_unlock(tdb, BUCKET(oldhash), F_WRLCK) != 0)
1496                 TDB_LOG((tdb, 0, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
1497         return key;
1498 }
1499
1500 /* delete an entry in the database given a key */
1501 static int tdb_delete_hash(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash)
1502 {
1503         tdb_off rec_ptr;
1504         struct list_struct rec;
1505         int ret;
1506
1507         if (!(rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK, &rec)))
1508                 return -1;
1509         ret = do_delete(tdb, rec_ptr, &rec);
1510         if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
1511                 TDB_LOG((tdb, 0, "tdb_delete: WARNING tdb_unlock failed!\n"));
1512         return ret;
1513 }
1514
1515 int tdb_delete(TDB_CONTEXT *tdb, TDB_DATA key)
1516 {
1517         u32 hash = tdb->hash_fn(&key);
1518         return tdb_delete_hash(tdb, key, hash);
1519 }
1520
1521 /* store an element in the database, replacing any existing element
1522    with the same key 
1523
1524    return 0 on success, -1 on failure
1525 */
1526 int tdb_store(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
1527 {
1528         struct list_struct rec;
1529         u32 hash;
1530         tdb_off rec_ptr;
1531         char *p = NULL;
1532         int ret = 0;
1533
1534         /* find which hash bucket it is in */
1535         hash = tdb->hash_fn(&key);
1536         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
1537                 return -1;
1538
1539         /* check for it existing, on insert. */
1540         if (flag == TDB_INSERT) {
1541                 if (tdb_exists_hash(tdb, key, hash)) {
1542                         tdb->ecode = TDB_ERR_EXISTS;
1543                         goto fail;
1544                 }
1545         } else {
1546                 /* first try in-place update, on modify or replace. */
1547                 if (tdb_update_hash(tdb, key, hash, dbuf) == 0)
1548                         goto out;
1549                 if (tdb->ecode == TDB_ERR_NOEXIST &&
1550                     flag == TDB_MODIFY) {
1551                         /* if the record doesn't exist and we are in TDB_MODIFY mode then
1552                          we should fail the store */
1553                         goto fail;
1554         }
1555         }
1556         /* reset the error code potentially set by the tdb_update() */
1557         tdb->ecode = TDB_SUCCESS;
1558
1559         /* delete any existing record - if it doesn't exist we don't
1560            care.  Doing this first reduces fragmentation, and avoids
1561            coalescing with `allocated' block before it's updated. */
1562         if (flag != TDB_INSERT)
1563                 tdb_delete_hash(tdb, key, hash);
1564
1565         /* Copy key+value *before* allocating free space in case malloc
1566            fails and we are left with a dead spot in the tdb. */
1567
1568         if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
1569                 tdb->ecode = TDB_ERR_OOM;
1570                 goto fail;
1571         }
1572
1573         memcpy(p, key.dptr, key.dsize);
1574         if (dbuf.dsize)
1575                 memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
1576
1577         /* we have to allocate some space */
1578         if (!(rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec)))
1579                 goto fail;
1580
1581         /* Read hash top into next ptr */
1582         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
1583                 goto fail;
1584
1585         rec.key_len = key.dsize;
1586         rec.data_len = dbuf.dsize;
1587         rec.full_hash = hash;
1588         rec.magic = TDB_MAGIC;
1589
1590         /* write out and point the top of the hash chain at it */
1591         if (rec_write(tdb, rec_ptr, &rec) == -1
1592             || tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
1593             || ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
1594                 /* Need to tdb_unallocate() here */
1595                 goto fail;
1596         }
1597  out:
1598         SAFE_FREE(p); 
1599         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
1600         return ret;
1601 fail:
1602         ret = -1;
1603         goto out;
1604 }
1605
1606 /* Attempt to append data to an entry in place - this only works if the new data size
1607    is <= the old data size and the key exists.
1608    on failure return -1. Record must be locked before calling.
1609 */
1610 static int tdb_append_inplace(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash, TDB_DATA new_dbuf)
1611 {
1612         struct list_struct rec;
1613         tdb_off rec_ptr;
1614
1615         /* find entry */
1616         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
1617                 return -1;
1618
1619         /* Append of 0 is always ok. */
1620         if (new_dbuf.dsize == 0)
1621                 return 0;
1622
1623         /* must be long enough for key, old data + new data and tailer */
1624         if (rec.rec_len < key.dsize + rec.data_len + new_dbuf.dsize + sizeof(tdb_off)) {
1625                 /* No room. */
1626                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
1627                 return -1;
1628         }
1629
1630         if (tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len + rec.data_len,
1631                       new_dbuf.dptr, new_dbuf.dsize) == -1)
1632                 return -1;
1633
1634         /* update size */
1635         rec.data_len += new_dbuf.dsize;
1636         return rec_write(tdb, rec_ptr, &rec);
1637 }
1638
1639 /* Append to an entry. Create if not exist. */
1640
1641 int tdb_append(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA new_dbuf)
1642 {
1643         struct list_struct rec;
1644         u32 hash;
1645         tdb_off rec_ptr;
1646         char *p = NULL;
1647         int ret = 0;
1648         size_t new_data_size = 0;
1649
1650         /* find which hash bucket it is in */
1651         hash = tdb->hash_fn(&key);
1652         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
1653                 return -1;
1654
1655         /* first try in-place. */
1656         if (tdb_append_inplace(tdb, key, hash, new_dbuf) == 0)
1657                 goto out;
1658
1659         /* reset the error code potentially set by the tdb_append_inplace() */
1660         tdb->ecode = TDB_SUCCESS;
1661
1662         /* find entry */
1663         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec))) {
1664                 if (tdb->ecode != TDB_ERR_NOEXIST)
1665                         goto fail;
1666
1667                 /* Not found - create. */
1668
1669                 ret = tdb_store(tdb, key, new_dbuf, TDB_INSERT);
1670                 goto out;
1671         }
1672
1673         new_data_size = rec.data_len + new_dbuf.dsize;
1674
1675         /* Copy key+old_value+value *before* allocating free space in case malloc
1676            fails and we are left with a dead spot in the tdb. */
1677
1678         if (!(p = (char *)malloc(key.dsize + new_data_size))) {
1679                 tdb->ecode = TDB_ERR_OOM;
1680                 goto fail;
1681         }
1682
1683         /* Copy the key in place. */
1684         memcpy(p, key.dptr, key.dsize);
1685
1686         /* Now read the old data into place. */
1687         if (rec.data_len &&
1688                 tdb_read(tdb, rec_ptr + sizeof(rec) + rec.key_len, p + key.dsize, rec.data_len, 0) == -1)
1689                         goto fail;
1690
1691         /* Finally append the new data. */
1692         if (new_dbuf.dsize)
1693                 memcpy(p+key.dsize+rec.data_len, new_dbuf.dptr, new_dbuf.dsize);
1694
1695         /* delete any existing record - if it doesn't exist we don't
1696            care.  Doing this first reduces fragmentation, and avoids
1697            coalescing with `allocated' block before it's updated. */
1698
1699         tdb_delete_hash(tdb, key, hash);
1700
1701         if (!(rec_ptr = tdb_allocate(tdb, key.dsize + new_data_size, &rec)))
1702                 goto fail;
1703
1704         /* Read hash top into next ptr */
1705         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
1706                 goto fail;
1707
1708         rec.key_len = key.dsize;
1709         rec.data_len = new_data_size;
1710         rec.full_hash = hash;
1711         rec.magic = TDB_MAGIC;
1712
1713         /* write out and point the top of the hash chain at it */
1714         if (rec_write(tdb, rec_ptr, &rec) == -1
1715             || tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+new_data_size)==-1
1716             || ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
1717                 /* Need to tdb_unallocate() here */
1718                 goto fail;
1719         }
1720
1721  out:
1722         SAFE_FREE(p); 
1723         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
1724         return ret;
1725
1726 fail:
1727         ret = -1;
1728         goto out;
1729 }
1730
1731 static int tdb_already_open(dev_t device,
1732                             ino_t ino)
1733 {
1734         TDB_CONTEXT *i;
1735         
1736         for (i = tdbs; i; i = i->next) {
1737                 if (i->device == device && i->inode == ino) {
1738                         return 1;
1739                 }
1740         }
1741
1742         return 0;
1743 }
1744
1745 /* This is based on the hash algorithm from gdbm */
1746 static u32 default_tdb_hash(TDB_DATA *key)
1747 {
1748         u32 value;      /* Used to compute the hash value.  */
1749         u32   i;        /* Used to cycle through random values. */
1750
1751         /* Set the initial value from the key size. */
1752         for (value = 0x238F13AF * key->dsize, i=0; i < key->dsize; i++)
1753                 value = (value + (key->dptr[i] << (i*5 % 24)));
1754
1755         return (1103515243 * value + 12345);  
1756 }
1757
1758 /* open the database, creating it if necessary 
1759
1760    The open_flags and mode are passed straight to the open call on the
1761    database file. A flags value of O_WRONLY is invalid. The hash size
1762    is advisory, use zero for a default value.
1763
1764    Return is NULL on error, in which case errno is also set.  Don't 
1765    try to call tdb_error or tdb_errname, just do strerror(errno).
1766
1767    @param name may be NULL for internal databases. */
1768 TDB_CONTEXT *tdb_open(const char *name, int hash_size, int tdb_flags,
1769                       int open_flags, mode_t mode)
1770 {
1771         return tdb_open_ex(name, hash_size, tdb_flags, open_flags, mode, NULL, NULL);
1772 }
1773
1774
1775 TDB_CONTEXT *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
1776                          int open_flags, mode_t mode,
1777                          tdb_log_func log_fn,
1778                          tdb_hash_func hash_fn)
1779 {
1780         TDB_CONTEXT *tdb;
1781         struct stat st;
1782         int rev = 0, locked = 0;
1783         unsigned char *vp;
1784         u32 vertest;
1785
1786         if (!(tdb = calloc(1, sizeof *tdb))) {
1787                 /* Can't log this */
1788                 errno = ENOMEM;
1789                 goto fail;
1790         }
1791         tdb->fd = -1;
1792         tdb->name = NULL;
1793         tdb->map_ptr = NULL;
1794         tdb->flags = tdb_flags;
1795         tdb->open_flags = open_flags;
1796         tdb->log_fn = log_fn;
1797         tdb->hash_fn = hash_fn ? hash_fn : default_tdb_hash;
1798
1799         if ((open_flags & O_ACCMODE) == O_WRONLY) {
1800                 TDB_LOG((tdb, 0, "tdb_open_ex: can't open tdb %s write-only\n",
1801                          name));
1802                 errno = EINVAL;
1803                 goto fail;
1804         }
1805         
1806         if (hash_size == 0)
1807                 hash_size = DEFAULT_HASH_SIZE;
1808         if ((open_flags & O_ACCMODE) == O_RDONLY) {
1809                 tdb->read_only = 1;
1810                 /* read only databases don't do locking or clear if first */
1811                 tdb->flags |= TDB_NOLOCK;
1812                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
1813         }
1814
1815         /* internal databases don't mmap or lock, and start off cleared */
1816         if (tdb->flags & TDB_INTERNAL) {
1817                 tdb->flags |= (TDB_NOLOCK | TDB_NOMMAP);
1818                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
1819                 if (tdb_new_database(tdb, hash_size) != 0) {
1820                         TDB_LOG((tdb, 0, "tdb_open_ex: tdb_new_database failed!"));
1821                         goto fail;
1822                 }
1823                 goto internal;
1824         }
1825
1826         if ((tdb->fd = open(name, open_flags, mode)) == -1) {
1827                 TDB_LOG((tdb, 5, "tdb_open_ex: could not open file %s: %s\n",
1828                          name, strerror(errno)));
1829                 goto fail;      /* errno set by open(2) */
1830         }
1831
1832         /* ensure there is only one process initialising at once */
1833         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0) == -1) {
1834                 TDB_LOG((tdb, 0, "tdb_open_ex: failed to get global lock on %s: %s\n",
1835                          name, strerror(errno)));
1836                 goto fail;      /* errno set by tdb_brlock */
1837         }
1838
1839         /* we need to zero database if we are the only one with it open */
1840         if ((tdb_flags & TDB_CLEAR_IF_FIRST) &&
1841                 (locked = (tdb_brlock(tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0) == 0))) {
1842                 open_flags |= O_CREAT;
1843                 if (ftruncate(tdb->fd, 0) == -1) {
1844                         TDB_LOG((tdb, 0, "tdb_open_ex: "
1845                                  "failed to truncate %s: %s\n",
1846                                  name, strerror(errno)));
1847                         goto fail; /* errno set by ftruncate */
1848                 }
1849         }
1850
1851         if (read(tdb->fd, &tdb->header, sizeof(tdb->header)) != sizeof(tdb->header)
1852             || strcmp(tdb->header.magic_food, TDB_MAGIC_FOOD) != 0
1853             || (tdb->header.version != TDB_VERSION
1854                 && !(rev = (tdb->header.version==TDB_BYTEREV(TDB_VERSION))))) {
1855                 /* its not a valid database - possibly initialise it */
1856                 if (!(open_flags & O_CREAT) || tdb_new_database(tdb, hash_size) == -1) {
1857                         errno = EIO; /* ie bad format or something */
1858                         goto fail;
1859                 }
1860                 rev = (tdb->flags & TDB_CONVERT);
1861         }
1862         vp = (unsigned char *)&tdb->header.version;
1863         vertest = (((u32)vp[0]) << 24) | (((u32)vp[1]) << 16) |
1864                   (((u32)vp[2]) << 8) | (u32)vp[3];
1865         tdb->flags |= (vertest==TDB_VERSION) ? TDB_BIGENDIAN : 0;
1866         if (!rev)
1867                 tdb->flags &= ~TDB_CONVERT;
1868         else {
1869                 tdb->flags |= TDB_CONVERT;
1870                 convert(&tdb->header, sizeof(tdb->header));
1871         }
1872         if (fstat(tdb->fd, &st) == -1)
1873                 goto fail;
1874
1875         /* Is it already in the open list?  If so, fail. */
1876         if (tdb_already_open(st.st_dev, st.st_ino)) {
1877                 TDB_LOG((tdb, 2, "tdb_open_ex: "
1878                          "%s (%d,%d) is already open in this process\n",
1879                          name, (int)st.st_dev, (int)st.st_ino));
1880                 errno = EBUSY;
1881                 goto fail;
1882         }
1883
1884         if (!(tdb->name = (char *)strdup(name))) {
1885                 errno = ENOMEM;
1886                 goto fail;
1887         }
1888
1889         tdb->map_size = st.st_size;
1890         tdb->device = st.st_dev;
1891         tdb->inode = st.st_ino;
1892         tdb->locked = calloc(tdb->header.hash_size+1, sizeof(tdb->locked[0]));
1893         if (!tdb->locked) {
1894                 TDB_LOG((tdb, 2, "tdb_open_ex: "
1895                          "failed to allocate lock structure for %s\n",
1896                          name));
1897                 errno = ENOMEM;
1898                 goto fail;
1899         }
1900         tdb_mmap(tdb);
1901         if (locked) {
1902                 if (!tdb->read_only)
1903                         if (tdb_clear_spinlocks(tdb) != 0) {
1904                                 TDB_LOG((tdb, 0, "tdb_open_ex: "
1905                                 "failed to clear spinlock\n"));
1906                                 goto fail;
1907                         }
1908                 if (tdb_brlock(tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0) == -1) {
1909                         TDB_LOG((tdb, 0, "tdb_open_ex: "
1910                                  "failed to take ACTIVE_LOCK on %s: %s\n",
1911                                  name, strerror(errno)));
1912                         goto fail;
1913                 }
1914
1915         }
1916
1917         /* We always need to do this if the CLEAR_IF_FIRST flag is set, even if
1918            we didn't get the initial exclusive lock as we need to let all other
1919            users know we're using it. */
1920
1921         if (tdb_flags & TDB_CLEAR_IF_FIRST) {
1922                 /* leave this lock in place to indicate it's in use */
1923                 if (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1)
1924                         goto fail;
1925         }
1926
1927
1928  internal:
1929         /* Internal (memory-only) databases skip all the code above to
1930          * do with disk files, and resume here by releasing their
1931          * global lock and hooking into the active list. */
1932         if (tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0) == -1)
1933                 goto fail;
1934         tdb->next = tdbs;
1935         tdbs = tdb;
1936         return tdb;
1937
1938  fail:
1939         { int save_errno = errno;
1940
1941         if (!tdb)
1942                 return NULL;
1943         
1944         if (tdb->map_ptr) {
1945                 if (tdb->flags & TDB_INTERNAL)
1946                         SAFE_FREE(tdb->map_ptr);
1947                 else
1948                         tdb_munmap(tdb);
1949         }
1950         SAFE_FREE(tdb->name);
1951         if (tdb->fd != -1)
1952                 if (close(tdb->fd) != 0)
1953                         TDB_LOG((tdb, 5, "tdb_open_ex: failed to close tdb->fd on error!\n"));
1954         SAFE_FREE(tdb->locked);
1955         SAFE_FREE(tdb);
1956         errno = save_errno;
1957         return NULL;
1958         }
1959 }
1960
1961 /**
1962  * Close a database.
1963  *
1964  * @returns -1 for error; 0 for success.
1965  **/
1966 int tdb_close(TDB_CONTEXT *tdb)
1967 {
1968         TDB_CONTEXT **i;
1969         int ret = 0;
1970
1971         if (tdb->map_ptr) {
1972                 if (tdb->flags & TDB_INTERNAL)
1973                         SAFE_FREE(tdb->map_ptr);
1974                 else
1975                         tdb_munmap(tdb);
1976         }
1977         SAFE_FREE(tdb->name);
1978         if (tdb->fd != -1)
1979                 ret = close(tdb->fd);
1980         SAFE_FREE(tdb->locked);
1981
1982         /* Remove from contexts list */
1983         for (i = &tdbs; *i; i = &(*i)->next) {
1984                 if (*i == tdb) {
1985                         *i = tdb->next;
1986                         break;
1987                 }
1988         }
1989
1990         memset(tdb, 0, sizeof(*tdb));
1991         SAFE_FREE(tdb);
1992
1993         return ret;
1994 }
1995
1996 /* lock/unlock entire database */
1997 int tdb_lockall(TDB_CONTEXT *tdb)
1998 {
1999         u32 i;
2000
2001         /* There are no locks on read-only dbs */
2002         if (tdb->read_only)
2003                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
2004         for (i = 0; i < tdb->header.hash_size; i++) 
2005                 if (tdb_lock(tdb, i, F_WRLCK))
2006                         break;
2007
2008         /* If error, release locks we have... */
2009         if (i < tdb->header.hash_size) {
2010                 u32 j;
2011
2012                 for ( j = 0; j < i; j++)
2013                         tdb_unlock(tdb, j, F_WRLCK);
2014                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
2015         }
2016
2017         return 0;
2018 }
2019 void tdb_unlockall(TDB_CONTEXT *tdb)
2020 {
2021         u32 i;
2022         for (i=0; i < tdb->header.hash_size; i++)
2023                 tdb_unlock(tdb, i, F_WRLCK);
2024 }
2025
2026 /* lock/unlock one hash chain. This is meant to be used to reduce
2027    contention - it cannot guarantee how many records will be locked */
2028 int tdb_chainlock(TDB_CONTEXT *tdb, TDB_DATA key)
2029 {
2030         return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
2031 }
2032
2033 int tdb_chainunlock(TDB_CONTEXT *tdb, TDB_DATA key)
2034 {
2035         return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
2036 }
2037
2038 int tdb_chainlock_read(TDB_CONTEXT *tdb, TDB_DATA key)
2039 {
2040         return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
2041 }
2042
2043 int tdb_chainunlock_read(TDB_CONTEXT *tdb, TDB_DATA key)
2044 {
2045         return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
2046 }
2047
2048
2049 /* register a loging function */
2050 void tdb_logging_function(TDB_CONTEXT *tdb, void (*fn)(TDB_CONTEXT *, int , const char *, ...))
2051 {
2052         tdb->log_fn = fn;
2053 }
2054
2055 /* reopen a tdb - this can be used after a fork to ensure that we have an independent
2056    seek pointer from our parent and to re-establish locks */
2057 int tdb_reopen(TDB_CONTEXT *tdb)
2058 {
2059         struct stat st;
2060
2061         if (tdb->flags & TDB_INTERNAL)
2062                 return 0; /* Nothing to do. */
2063         if (tdb_munmap(tdb) != 0) {
2064                 TDB_LOG((tdb, 0, "tdb_reopen: munmap failed (%s)\n", strerror(errno)));
2065                 goto fail;
2066         }
2067         if (close(tdb->fd) != 0)
2068                 TDB_LOG((tdb, 0, "tdb_reopen: WARNING closing tdb->fd failed!\n"));
2069         tdb->fd = open(tdb->name, tdb->open_flags & ~(O_CREAT|O_TRUNC), 0);
2070         if (tdb->fd == -1) {
2071                 TDB_LOG((tdb, 0, "tdb_reopen: open failed (%s)\n", strerror(errno)));
2072                 goto fail;
2073         }
2074         if ((tdb->flags & TDB_CLEAR_IF_FIRST) &&
2075                         (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1)) {
2076                 TDB_LOG((tdb, 0, "tdb_reopen: failed to obtain active lock\n"));
2077                 goto fail;
2078         }
2079         if (fstat(tdb->fd, &st) != 0) {
2080                 TDB_LOG((tdb, 0, "tdb_reopen: fstat failed (%s)\n", strerror(errno)));
2081                 goto fail;
2082         }
2083         if (st.st_ino != tdb->inode || st.st_dev != tdb->device) {
2084                 TDB_LOG((tdb, 0, "tdb_reopen: file dev/inode has changed!\n"));
2085                 goto fail;
2086         }
2087         tdb_mmap(tdb);
2088
2089         return 0;
2090
2091 fail:
2092         tdb_close(tdb);
2093         return -1;
2094 }
2095
2096 /* reopen all tdb's */
2097 int tdb_reopen_all(void)
2098 {
2099         TDB_CONTEXT *tdb;
2100
2101         for (tdb=tdbs; tdb; tdb = tdb->next) {
2102                 if (tdb_reopen(tdb) != 0)
2103                         return -1;
2104         }
2105
2106         return 0;
2107 }