Update lmdb ddd0a773e (2024-06-04)

This commit is contained in:
2024-06-25 06:41:34 +09:00
parent 473a302dad
commit 5fdfc6c160
7 changed files with 720 additions and 50 deletions

View File

@@ -79,6 +79,7 @@ mtest3: mtest3.o liblmdb.a
mtest4: mtest4.o liblmdb.a
mtest5: mtest5.o liblmdb.a
mtest6: mtest6.o liblmdb.a
mplay: mplay.o liblmdb.a
mdb.o: mdb.c lmdb.h midl.h
$(CC) $(CFLAGS) $(CPPFLAGS) -c mdb.c

View File

@@ -578,18 +578,26 @@ typedef MDB_ID txnid_t;
#define MDB_DEBUG 0
#endif
#define MDB_DBG_INFO 1
#define MDB_DBG_TRACE 2
#if MDB_DEBUG
static int mdb_debug;
static int mdb_debug = MDB_DBG_TRACE;
static txnid_t mdb_debug_start;
/** Print a debug message with printf formatting.
* Requires double parenthesis around 2 or more args.
*/
# define DPRINTF(args) ((void) ((mdb_debug) && DPRINTF0 args))
# define DPRINTF(args) ((void) ((mdb_debug & MDB_DBG_INFO) && DPRINTF0 args))
# define DPRINTF0(fmt, ...) \
fprintf(stderr, "%s:%d " fmt "\n", mdb_func_, __LINE__, __VA_ARGS__)
/** Trace info for replaying */
# define MDB_TRACE(args) ((void) ((mdb_debug & MDB_DBG_TRACE) && DPRINTF1 args))
# define DPRINTF1(fmt, ...) \
fprintf(stderr, ">%d:%s: " fmt "\n", getpid(), mdb_func_, __VA_ARGS__)
#else
# define DPRINTF(args) ((void) 0)
# define MDB_TRACE(args) ((void) 0)
#endif
/** Print a debug string.
* The string is printed literally, with no format processing.
@@ -690,6 +698,11 @@ static txnid_t mdb_debug_start;
* This is used for printing a hex dump of a key's contents.
*/
#define DKBUF char kbuf[DKBUF_MAXKEYSIZE*2+1]
/** A data value buffer.
* @ingroup debug
* This is used for printing a hex dump of a #MDB_DUPSORT value's contents.
*/
#define DDBUF char dbuf[DKBUF_MAXKEYSIZE*2+1+2]
/** Display a key in hex.
* @ingroup debug
* Invoke a function to display a key in hex.
@@ -697,6 +710,7 @@ static txnid_t mdb_debug_start;
#define DKEY(x) mdb_dkey(x, kbuf)
#else
#define DKBUF
#define DDBUF
#define DKEY(x) 0
#endif
@@ -1652,6 +1666,9 @@ static int mdb_update_key(MDB_cursor *mc, MDB_val *key);
static void mdb_cursor_pop(MDB_cursor *mc);
static int mdb_cursor_push(MDB_cursor *mc, MDB_page *mp);
static int _mdb_cursor_del(MDB_cursor *mc, unsigned int flags);
static int _mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data, unsigned int flags);
static int mdb_cursor_del0(MDB_cursor *mc);
static int mdb_del0(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, MDB_val *data, unsigned flags);
static int mdb_cursor_sibling(MDB_cursor *mc, int move_right);
@@ -1773,7 +1790,7 @@ mdb_strerror(int err)
buf[0] = 0;
FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM |
FORMAT_MESSAGE_IGNORE_INSERTS,
NULL, err, 0, ptr, MSGSIZE, (va_list *)buf+MSGSIZE);
NULL, err, 0, ptr, MSGSIZE, NULL);
return ptr;
#else
if (err < 0)
@@ -1849,6 +1866,18 @@ mdb_dkey(MDB_val *key, char *buf)
return buf;
}
static char *
mdb_dval(MDB_txn *txn, MDB_dbi dbi, MDB_val *data, char *buf)
{
if (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT) {
mdb_dkey(data, buf+1);
*buf = '[';
strcpy(buf + data->mv_size * 2 + 1, "]");
} else
*buf = '\0';
return buf;
}
static const char *
mdb_leafnode_type(MDB_node *n)
{
@@ -3084,9 +3113,14 @@ mdb_txn_renew0(MDB_txn *txn)
do /* LY: Retry on a race, ITS#7970. */
r->mr_txnid = ti->mti_txnid;
while(r->mr_txnid != ti->mti_txnid);
if (!r->mr_txnid && (env->me_flags & MDB_RDONLY)) {
meta = mdb_env_pick_meta(env);
r->mr_txnid = meta->mm_txnid;
} else {
meta = env->me_metas[r->mr_txnid & 1];
}
txn->mt_txnid = r->mr_txnid;
txn->mt_u.reader = r;
meta = env->me_metas[txn->mt_txnid & 1];
}
} else {
@@ -3103,7 +3137,7 @@ mdb_txn_renew0(MDB_txn *txn)
txn->mt_txnid++;
#if MDB_DEBUG
if (txn->mt_txnid == mdb_debug_start)
mdb_debug = 1;
mdb_debug = MDB_DBG_INFO;
#endif
txn->mt_child = NULL;
txn->mt_loose_pgs = NULL;
@@ -3283,6 +3317,7 @@ renew:
txn->mt_txnid, (flags & MDB_RDONLY) ? 'r' : 'w',
(void *) txn, (void *) env, txn->mt_dbs[MAIN_DBI].md_root));
}
MDB_TRACE(("%p, %p, %u = %p", env, parent, flags, txn));
return rc;
}
@@ -3442,18 +3477,25 @@ mdb_txn_reset(MDB_txn *txn)
mdb_txn_end(txn, MDB_END_RESET);
}
void
mdb_txn_abort(MDB_txn *txn)
static void
_mdb_txn_abort(MDB_txn *txn)
{
if (txn == NULL)
return;
if (txn->mt_child)
mdb_txn_abort(txn->mt_child);
_mdb_txn_abort(txn->mt_child);
mdb_txn_end(txn, MDB_END_ABORT|MDB_END_SLOT|MDB_END_FREE);
}
void
mdb_txn_abort(MDB_txn *txn)
{
MDB_TRACE(("%p", txn));
_mdb_txn_abort(txn);
}
/** Save the freelist as of this transaction to the freeDB.
* This changes the freelist. Keep trying until it stabilizes.
*
@@ -3547,7 +3589,7 @@ mdb_freelist_save(MDB_txn *txn)
pglast = head_id = *(txnid_t *)key.mv_data;
total_room = head_room = 0;
mdb_tassert(txn, pglast <= env->me_pglast);
rc = mdb_cursor_del(&mc, 0);
rc = _mdb_cursor_del(&mc, 0);
if (rc)
return rc;
}
@@ -3567,7 +3609,7 @@ mdb_freelist_save(MDB_txn *txn)
do {
freecnt = free_pgs[0];
data.mv_size = MDB_IDL_SIZEOF(free_pgs);
rc = mdb_cursor_put(&mc, &key, &data, MDB_RESERVE);
rc = _mdb_cursor_put(&mc, &key, &data, MDB_RESERVE);
if (rc)
return rc;
/* Retry if mt_free_pgs[] grew during the Put() */
@@ -3616,7 +3658,7 @@ mdb_freelist_save(MDB_txn *txn)
key.mv_size = sizeof(head_id);
key.mv_data = &head_id;
data.mv_size = (head_room + 1) * sizeof(pgno_t);
rc = mdb_cursor_put(&mc, &key, &data, MDB_RESERVE);
rc = _mdb_cursor_put(&mc, &key, &data, MDB_RESERVE);
if (rc)
return rc;
/* IDL is initially empty, zero out at least the length */
@@ -3671,7 +3713,7 @@ mdb_freelist_save(MDB_txn *txn)
data.mv_data = mop -= len;
save = mop[0];
mop[0] = len;
rc = mdb_cursor_put(&mc, &key, &data, MDB_CURRENT);
rc = _mdb_cursor_put(&mc, &key, &data, MDB_CURRENT);
mop[0] = save;
if (rc || !(mop_len -= len))
break;
@@ -3916,8 +3958,8 @@ done:
static int ESECT mdb_env_share_locks(MDB_env *env, int *excl);
int
mdb_txn_commit(MDB_txn *txn)
static int
_mdb_txn_commit(MDB_txn *txn)
{
int rc;
unsigned int i, end_mode;
@@ -3930,7 +3972,7 @@ mdb_txn_commit(MDB_txn *txn)
end_mode = MDB_END_EMPTY_COMMIT|MDB_END_UPDATE|MDB_END_SLOT|MDB_END_FREE;
if (txn->mt_child) {
rc = mdb_txn_commit(txn->mt_child);
rc = _mdb_txn_commit(txn->mt_child);
if (rc)
goto fail;
}
@@ -4110,7 +4152,7 @@ mdb_txn_commit(MDB_txn *txn)
goto fail;
}
data.mv_data = &txn->mt_dbs[i];
rc = mdb_cursor_put(&mc, &txn->mt_dbxs[i].md_name, &data,
rc = _mdb_cursor_put(&mc, &txn->mt_dbxs[i].md_name, &data,
F_SUBDATA);
if (rc)
goto fail;
@@ -4153,10 +4195,17 @@ done:
return MDB_SUCCESS;
fail:
mdb_txn_abort(txn);
_mdb_txn_abort(txn);
return rc;
}
int
mdb_txn_commit(MDB_txn *txn)
{
MDB_TRACE(("%p", txn));
return _mdb_txn_commit(txn);
}
/** Read the environment parameters of a DB environment before
* mapping it into memory.
* @param[in] env the environment handle
@@ -4457,6 +4506,7 @@ mdb_env_create(MDB_env **env)
GET_PAGESIZE(e->me_os_psize);
VGMEMP_CREATE(e,0,0);
*env = e;
MDB_TRACE(("%p", e));
return MDB_SUCCESS;
}
@@ -4622,6 +4672,7 @@ mdb_env_set_mapsize(MDB_env *env, mdb_size_t size)
env->me_mapsize = size;
if (env->me_psize)
env->me_maxpg = env->me_mapsize / env->me_psize;
MDB_TRACE(("%p, %"Yu"", env, size));
return MDB_SUCCESS;
}
@@ -4631,6 +4682,7 @@ mdb_env_set_maxdbs(MDB_env *env, MDB_dbi dbs)
if (env->me_map)
return EINVAL;
env->me_maxdbs = dbs + CORE_DBS;
MDB_TRACE(("%p, %u", env, dbs));
return MDB_SUCCESS;
}
@@ -4640,6 +4692,7 @@ mdb_env_set_maxreaders(MDB_env *env, unsigned int readers)
if (env->me_map || readers < 1)
return EINVAL;
env->me_maxreaders = readers;
MDB_TRACE(("%p, %u", env, readers));
return MDB_SUCCESS;
}
@@ -5703,6 +5756,7 @@ mdb_env_open(MDB_env *env, const char *path, unsigned int flags, mdb_mode_t mode
}
leave:
MDB_TRACE(("%p, %s, %u, %04o", env, path, flags & (CHANGEABLE|CHANGELESS), mode));
if (rc) {
mdb_env_close0(env, excl);
}
@@ -5824,17 +5878,6 @@ mdb_env_close0(MDB_env *env, int excl)
if (excl > 0)
semctl(env->me_rmutex->semid, 0, IPC_RMID);
}
#elif defined(MDB_ROBUST_SUPPORTED)
/* If we have the filelock: If we are the
* only remaining user, clean up robust
* mutexes.
*/
if (excl == 0)
mdb_env_excl_lock(env, &excl);
if (excl > 0) {
pthread_mutex_destroy(env->me_txns->mti_rmutex);
pthread_mutex_destroy(env->me_txns->mti_wmutex);
}
#endif
munmap((void *)env->me_txns, (env->me_maxreaders-1)*sizeof(MDB_reader)+sizeof(MDB_txninfo));
}
@@ -5869,6 +5912,7 @@ mdb_env_close(MDB_env *env)
if (env == NULL)
return;
MDB_TRACE(("%p", env));
VGMEMP_DESTROY(env);
while ((dp = env->me_dpages) != NULL) {
VGMEMP_DEFINED(&dp->mp_next, sizeof(dp->mp_next));
@@ -6631,7 +6675,7 @@ mdb_page_search(MDB_cursor *mc, MDB_val *key, int flags)
MDB_node *leaf = mdb_node_search(&mc2,
&mc->mc_dbx->md_name, &exact);
if (!exact)
return MDB_NOTFOUND;
return MDB_BAD_DBI;
if ((leaf->mn_flags & (F_DUPDATA|F_SUBDATA)) != F_SUBDATA)
return MDB_INCOMPATIBLE; /* not a named DB */
rc = mdb_node_read(&mc2, leaf, &data);
@@ -7583,8 +7627,8 @@ mdb_cursor_touch(MDB_cursor *mc)
/** Do not spill pages to disk if txn is getting full, may fail instead */
#define MDB_NOSPILL 0x8000
int
mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
static int
_mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
unsigned int flags)
{
MDB_env *env;
@@ -7946,7 +7990,7 @@ current:
* Copy end of page, adjusting alignment so
* compiler may copy words instead of bytes.
*/
off = (PAGEHDRSZ + data->mv_size) & -sizeof(size_t);
off = (PAGEHDRSZ + data->mv_size) & -(int)sizeof(size_t);
memcpy((size_t *)((char *)np + off),
(size_t *)((char *)omp + off), sz - off);
sz = PAGEHDRSZ;
@@ -7974,11 +8018,14 @@ current:
else if (!(mc->mc_flags & C_SUB))
memcpy(olddata.mv_data, data->mv_data, data->mv_size);
else {
if (key->mv_size != NODEKSZ(leaf))
goto new_ksize;
memcpy(NODEKEY(leaf), key->mv_data, key->mv_size);
goto fix_parent;
}
return MDB_SUCCESS;
}
new_ksize:
mdb_node_del(mc, 0);
}
@@ -8042,7 +8089,7 @@ put_sub:
new_dupdata = (int)dkey.mv_size;
/* converted, write the original data first */
if (dkey.mv_size) {
rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, &dkey, &xdata, xflags);
rc = _mdb_cursor_put(&mc->mc_xcursor->mx_cursor, &dkey, &xdata, xflags);
if (rc)
goto bad_sub;
/* we've done our job */
@@ -8070,7 +8117,7 @@ put_sub:
ecount = mc->mc_xcursor->mx_db.md_entries;
if (flags & MDB_APPENDDUP)
xflags |= MDB_APPEND;
rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, data, &xdata, xflags);
rc = _mdb_cursor_put(&mc->mc_xcursor->mx_cursor, data, &xdata, xflags);
if (flags & F_SUBDATA) {
void *db = NODEDATA(leaf);
memcpy(db, &mc->mc_xcursor->mx_db, sizeof(MDB_db));
@@ -8111,7 +8158,20 @@ bad_sub:
}
int
mdb_cursor_del(MDB_cursor *mc, unsigned int flags)
mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
unsigned int flags)
{
DKBUF;
DDBUF;
int rc = _mdb_cursor_put(mc, key, data, flags);
MDB_TRACE(("%p, %"Z"u[%s], %"Z"u%s, %u",
mc, key ? key->mv_size:0, DKEY(key), data ? data->mv_size:0,
data ? mdb_dval(mc->mc_txn, mc->mc_dbi, data, dbuf):"", flags));
return rc;
}
static int
_mdb_cursor_del(MDB_cursor *mc, unsigned int flags)
{
MDB_node *leaf;
MDB_page *mp;
@@ -8149,7 +8209,7 @@ mdb_cursor_del(MDB_cursor *mc, unsigned int flags)
if (!F_ISSET(leaf->mn_flags, F_SUBDATA)) {
mc->mc_xcursor->mx_cursor.mc_pg[0] = NODEDATA(leaf);
}
rc = mdb_cursor_del(&mc->mc_xcursor->mx_cursor, MDB_NOSPILL);
rc = _mdb_cursor_del(&mc->mc_xcursor->mx_cursor, MDB_NOSPILL);
if (rc)
return rc;
/* If sub-DB still has entries, we're done */
@@ -8213,6 +8273,14 @@ fail:
return rc;
}
int
mdb_cursor_del(MDB_cursor *mc, unsigned int flags)
{
MDB_TRACE(("%p, %u",
mc, flags));
return _mdb_cursor_del(mc, flags);
}
/** Allocate and initialize new pages for a database.
* Set #MDB_TXN_ERROR on failure.
* @param[in] mc a cursor on the database being added to.
@@ -8706,6 +8774,7 @@ mdb_cursor_open(MDB_txn *txn, MDB_dbi dbi, MDB_cursor **ret)
return ENOMEM;
}
MDB_TRACE(("%p, %u = %p", txn, dbi, mc));
*ret = mc;
return MDB_SUCCESS;
@@ -8769,6 +8838,7 @@ mdb_cursor_count(MDB_cursor *mc, mdb_size_t *countp)
void
mdb_cursor_close(MDB_cursor *mc)
{
MDB_TRACE(("%p", mc));
if (mc) {
MDB_CURSOR_UNREF(mc, 0);
}
@@ -9585,6 +9655,8 @@ int
mdb_del(MDB_txn *txn, MDB_dbi dbi,
MDB_val *key, MDB_val *data)
{
DKBUF;
DDBUF;
if (!key || !TXN_DBI_EXIST(txn, dbi, DB_USRVALID))
return EINVAL;
@@ -9596,6 +9668,9 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi,
data = NULL;
}
MDB_TRACE(("%p, %u, %"Z"u[%s], %"Z"u%s",
txn, dbi, key ? key->mv_size:0, DKEY(key), data ? data->mv_size:0,
data ? mdb_dval(txn, dbi, data, dbuf):""));
return mdb_del0(txn, dbi, key, data, 0);
}
@@ -9635,7 +9710,7 @@ mdb_del0(MDB_txn *txn, MDB_dbi dbi,
*/
mc.mc_next = txn->mt_cursors[dbi];
txn->mt_cursors[dbi] = &mc;
rc = mdb_cursor_del(&mc, flags);
rc = _mdb_cursor_del(&mc, flags);
txn->mt_cursors[dbi] = mc.mc_next;
}
return rc;
@@ -10079,6 +10154,8 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi,
MDB_cursor mc;
MDB_xcursor mx;
int rc;
DKBUF;
DDBUF;
if (!key || !data || !TXN_DBI_EXIST(txn, dbi, DB_USRVALID))
return EINVAL;
@@ -10089,10 +10166,12 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi,
if (txn->mt_flags & (MDB_TXN_RDONLY|MDB_TXN_BLOCKED))
return (txn->mt_flags & MDB_TXN_RDONLY) ? EACCES : MDB_BAD_TXN;
MDB_TRACE(("%p, %u, %"Z"u[%s], %"Z"u%s, %u",
txn, dbi, key ? key->mv_size:0, DKEY(key), data->mv_size, mdb_dval(txn, dbi, data, dbuf), flags));
mdb_cursor_init(&mc, txn, dbi, &mx);
mc.mc_next = txn->mt_cursors[dbi];
txn->mt_cursors[dbi] = &mc;
rc = mdb_cursor_put(&mc, key, data, flags);
rc = _mdb_cursor_put(&mc, key, data, flags);
txn->mt_cursors[dbi] = mc.mc_next;
return rc;
}
@@ -10496,7 +10575,7 @@ finish:
my.mc_error = rc;
mdb_env_cthr_toggle(&my, 1 | MDB_EOF);
rc = THREAD_FINISH(thr);
mdb_txn_abort(txn);
_mdb_txn_abort(txn);
done:
#ifdef _WIN32
@@ -10608,7 +10687,7 @@ mdb_env_copyfd0(MDB_env *env, HANDLE fd)
}
leave:
mdb_txn_abort(txn);
_mdb_txn_abort(txn);
return rc;
}
@@ -10823,6 +10902,7 @@ int mdb_dbi_open(MDB_txn *txn, const char *name, unsigned int flags, MDB_dbi *db
}
}
mdb_default_cmp(txn, MAIN_DBI);
MDB_TRACE(("%p, (null), %u = %u", txn, flags, MAIN_DBI));
return MDB_SUCCESS;
}
@@ -10884,7 +10964,7 @@ int mdb_dbi_open(MDB_txn *txn, const char *name, unsigned int flags, MDB_dbi *db
dummy.md_root = P_INVALID;
dummy.md_flags = flags & PERSISTENT_FLAGS;
WITH_CURSOR_TRACKING(mc,
rc = mdb_cursor_put(&mc, &key, &data, F_SUBDATA));
rc = _mdb_cursor_put(&mc, &key, &data, F_SUBDATA));
dbflag |= DB_DIRTY;
}
@@ -10909,6 +10989,7 @@ int mdb_dbi_open(MDB_txn *txn, const char *name, unsigned int flags, MDB_dbi *db
if (!unused) {
txn->mt_numdbs++;
}
MDB_TRACE(("%p, %s, %u = %u", txn, name, flags, slot));
}
return rc;
@@ -10940,6 +11021,7 @@ void mdb_dbi_close(MDB_env *env, MDB_dbi dbi)
ptr = env->me_dbxs[dbi].md_name.mv_data;
/* If there was no name, this was already closed */
if (ptr) {
MDB_TRACE(("%p, %u", env, dbi));
env->me_dbxs[dbi].md_name.mv_data = NULL;
env->me_dbxs[dbi].md_name.mv_size = 0;
env->me_dbflags[dbi] = 0;
@@ -11081,6 +11163,7 @@ int mdb_drop(MDB_txn *txn, MDB_dbi dbi, int del)
if (rc)
return rc;
MDB_TRACE(("%u, %d", dbi, del));
rc = mdb_drop0(mc, mc->mc_db->md_flags & MDB_DUPSORT);
/* Invalidate the dropped DB's cursors */
for (m2 = txn->mt_cursors[dbi]; m2; m2 = m2->mc_next)

View File

@@ -67,14 +67,14 @@ Exit status is zero if no errors occur.
Errors result in a non-zero exit status and
a diagnostic message being written to standard error.
Dumping and reloading databases that use user-defined comparison functions
will result in new databases that use the default comparison functions.
\fBIn this case it is quite likely that the reloaded database will be
damaged beyond repair permitting neither record storage nor retrieval.\fP
Dumping databases that use user-defined comparison functions will output
records with the ordering imposed by those comparison functions. If
.B mdb_load
is invoked without including the
.B -a
option when reloading those records, the new databases will likely be
damaged beyond repair, permitting neither record storage nor retrieval.
The only available workaround is to modify the source for the
.BR mdb_load (1)
utility to load the database using the correct comparison functions.
.SH "SEE ALSO"
.BR mdb_load (1)
.SH AUTHOR

View File

@@ -8,6 +8,8 @@ mdb_load \- LMDB environment import tool
[\c
.BR \-V ]
[\c
.BR \-a ]
[\c
.BI \-f \ file\fR]
[\c
.BR \-n ]

View File

@@ -445,7 +445,7 @@ int main(int argc, char *argv[])
if (rc == MDB_KEYEXIST && putflags)
continue;
if (rc) {
fprintf(stderr, "mdb_cursor_put failed, error %d %s\n", rc, mdb_strerror(rc));
fprintf(stderr, "%s: line %"Yu": mdb_cursor_put failed, error %d %s\n", prog, lineno, rc, mdb_strerror(rc));
goto txn_abort;
}
batch++;
@@ -466,9 +466,11 @@ int main(int argc, char *argv[])
fprintf(stderr, "mdb_cursor_open failed, error %d %s\n", rc, mdb_strerror(rc));
goto txn_abort;
}
if (appflag & MDB_APPENDDUP) {
if (append) {
MDB_val k, d;
mdb_cursor_get(mc, &k, &d, MDB_LAST);
memcpy(prevk.mv_data, k.mv_data, k.mv_size);
prevk.mv_size = k.mv_size;
}
batch = 0;
}

View File

@@ -56,7 +56,9 @@ typedef MDB_ID *MDB_IDL;
/* IDL sizes - likely should be even bigger
* limiting factors: sizeof(ID), thread stack size
*/
#ifndef MDB_IDL_LOGN
#define MDB_IDL_LOGN 16 /* DB_SIZE is 2^16, UM_SIZE is 2^17 */
#endif
#define MDB_IDL_DB_SIZE (1<<MDB_IDL_LOGN)
#define MDB_IDL_UM_SIZE (1<<(MDB_IDL_LOGN+1))

580
kiss-db/liblmdb/mplay.c Normal file
View File

@@ -0,0 +1,580 @@
/* mplay.c - memory-mapped database log replay */
/*
* Copyright 2011-2023 Howard Chu, Symas Corp.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted only as authorized by the OpenLDAP
* Public License.
*
* A copy of this license is available in the file LICENSE in the
* top-level directory of the distribution or, alternatively, at
* <http://www.OpenLDAP.org/license.html>.
*/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <time.h>
#include <string.h>
#include <ctype.h>
#include <assert.h>
#include <sys/types.h>
#include <sys/wait.h>
#include "lmdb.h"
#define E(expr) CHECK((rc = (expr)) == MDB_SUCCESS, #expr)
#define RES(err, expr) ((rc = expr) == (err) || (CHECK(!rc, #expr), 0))
#define CHECK(test, msg) ((test) ? (void)0 : ((void)fprintf(stderr, \
"%s:%d: %s: %s\n", __FILE__, __LINE__, msg, mdb_strerror(rc)), abort()))
#define SCMP(s) s, (sizeof(s)-1)
char inbuf[8192];
char *dbuf, *kbuf;
size_t dbufsize;
int maxkey;
#define SOFF(s) (sizeof(s)+1)
#define MAXENVS 16
#define MAXTXNS 16
#define MAXCRSS 16
#define MAXPIDS 16
typedef struct crspair {
void *tcrs; /* scanned text pointer */
MDB_cursor *rcrs;
} crspair;
typedef struct txnpair {
void *ttxn; /* scanned text pointer */
MDB_txn *rtxn;
crspair cursors[MAXCRSS];
int ncursors;
} txnpair;
typedef struct envpair {
void *tenv;
MDB_env *renv;
txnpair txns[MAXTXNS];
int ntxns;
} envpair;
envpair envs[MAXENVS];
int nenvs;
envpair *lastenv;
txnpair *lasttxn;
crspair *lastcrs;
typedef struct pidpair {
int tpid;
pid_t rpid;
int fdout, fdin;
} pidpair;
pidpair *lastpid;
pidpair pids[MAXPIDS];
int npids;
unsigned long lcount;
static int unhex(unsigned char *c2)
{
int x, c;
x = *c2++ & 0x4f;
if (x & 0x40)
x -= 55;
c = x << 4;
x = *c2 & 0x4f;
if (x & 0x40)
x -= 55;
c |= x;
return c;
}
int inhex(char *in, char *out)
{
char *c2 = out;
while (isxdigit(*in)) {
*c2++ = unhex((unsigned char *)in);
in += 2;
}
return c2 - out;
}
static void addenv(void *tenv, MDB_env *renv)
{
assert(nenvs < MAXENVS);
envs[nenvs].tenv = tenv;
envs[nenvs].renv = renv;
envs[nenvs].ntxns = 0;
lastenv = envs+nenvs;
nenvs++;
}
static envpair *findenv(void *tenv)
{
int i;
if (!lastenv || lastenv->tenv != tenv) {
for (i=0; i<nenvs; i++)
if (envs[i].tenv == tenv)
break;
assert(i < nenvs);
lastenv = &envs[i];
}
return lastenv;
}
static void delenv(envpair *ep)
{
int i = ep - envs;
for (; i<nenvs-1; i++)
envs[i] = envs[i+1];
nenvs--;
lastenv = NULL;
}
static void addtxn(void *tenv, void *ttxn, MDB_txn *rtxn)
{
envpair *ep;
txnpair *tp;
ep = findenv(tenv);
assert(ep->ntxns < MAXTXNS);
tp = ep->txns+ep->ntxns;
tp->ttxn = ttxn;
tp->rtxn = rtxn;
tp->ncursors = 0;
ep->ntxns++;
lasttxn = tp;
}
static txnpair *findtxn(void *ttxn)
{
int i, j;
if (lasttxn && lasttxn->ttxn == ttxn)
return lasttxn;
if (lastenv) {
for (i=0; i<lastenv->ntxns; i++) {
if (lastenv->txns[i].ttxn == ttxn) {
lasttxn = lastenv->txns+i;
return lasttxn;
}
}
}
for (i=0; i<nenvs; i++) {
if (envs+i == lastenv) continue;
for (j=0; j<envs[i].ntxns; j++) {
if (envs[i].txns[j].ttxn == ttxn) {
lastenv = envs+i;
lasttxn = envs[i].txns+j;
return lasttxn;
}
}
}
assert(0); /* should have found it */
}
static void deltxn(txnpair *tp)
{
int i = tp - lastenv->txns;
for (; i<lastenv->ntxns-1; i++)
lastenv->txns[i] = lastenv->txns[i+1];
lastenv->ntxns--;
lasttxn = NULL;
}
static void addcrs(txnpair *tp, void *tcrs, MDB_cursor *rcrs)
{
int j = tp->ncursors;
assert(tp->ncursors < MAXCRSS);
tp->cursors[j].tcrs = tcrs;
tp->cursors[j].rcrs = rcrs;
tp->ncursors++;
lastcrs = tp->cursors+j;
}
static crspair *findcrs(void *tcrs)
{
int i, j, k;
envpair *ep;
txnpair *tp;
crspair *cp;
if (lastcrs && lastcrs->tcrs == tcrs)
return lastcrs;
if (lasttxn) {
for (k=0, cp=lasttxn->cursors; k<lasttxn->ncursors; k++, cp++) {
if (cp->tcrs == tcrs) {
lastcrs = cp;
return lastcrs;
}
}
}
if (lastenv) {
for (j=0, tp=lastenv->txns; j<lastenv->ntxns; j++, tp++){
if (tp == lasttxn)
continue;
for (k=0, cp = tp->cursors; k<tp->ncursors; k++, cp++) {
if (cp->tcrs == tcrs) {
lastcrs = cp;
lasttxn = tp;
return lastcrs;
}
}
}
}
for (i=0, ep=envs; i<nenvs; i++, ep++) {
for (j=0, tp=ep->txns; j<ep->ntxns; j++, tp++) {
if (tp == lasttxn)
continue;
for (k=0, cp = tp->cursors; k<tp->ncursors; k++, cp++) {
if (cp->tcrs == tcrs) {
lastcrs = cp;
lasttxn = tp;
lastenv = ep;
return lastcrs;
}
}
}
}
assert(0); /* should have found it already */
}
static void delcrs(void *tcrs)
{
int i;
crspair *cp = findcrs(tcrs);
mdb_cursor_close(cp->rcrs);
for (i = cp - lasttxn->cursors; i<lasttxn->ncursors-1; i++)
lasttxn->cursors[i] = lasttxn->cursors[i+1];
lasttxn->ncursors--;
lastcrs = NULL;
}
void child()
{
int rc;
MDB_val key, data;
char *ptr;
while (fgets(inbuf, sizeof(inbuf), stdin)) {
ptr = inbuf;
if (!strncmp(ptr, SCMP("exit")))
break;
if (!strncmp(ptr, SCMP("mdb_env_create"))) {
void *tenv;
MDB_env *renv;
sscanf(ptr+SOFF("mdb_env_create"), "%p", &tenv);
E(mdb_env_create(&renv));
addenv(tenv, renv);
} else if (!strncmp(ptr, SCMP("mdb_env_set_maxdbs"))) {
void *tenv;
envpair *ep;
unsigned int maxdbs;
sscanf(ptr+SOFF("mdb_env_set_maxdbs"), "%p, %u", &tenv, &maxdbs);
ep = findenv(tenv);
E(mdb_env_set_maxdbs(ep->renv, maxdbs));
} else if (!strncmp(ptr, SCMP("mdb_env_set_mapsize"))) {
void *tenv;
envpair *ep;
mdb_size_t mapsize;
sscanf(ptr+SOFF("mdb_env_set_mapsize"), "%p, %"MDB_SCNy(u), &tenv, &mapsize);
ep = findenv(tenv);
E(mdb_env_set_mapsize(ep->renv, mapsize));
} else if (!strncmp(ptr, SCMP("mdb_env_open"))) {
void *tenv;
envpair *ep;
char *path;
int len;
unsigned int flags, mode;
sscanf(ptr+SOFF("mdb_env_open"), "%p, %n", &tenv, &len);
path = ptr+SOFF("mdb_env_open")+len;
ptr = strchr(path, ',');
*ptr++ = '\0';
sscanf(ptr, "%u, %o", &flags, &mode);
ep = findenv(tenv);
E(mdb_env_open(ep->renv, path, flags, mode));
if (!maxkey) {
maxkey = mdb_env_get_maxkeysize(ep->renv);
kbuf = malloc(maxkey+2);
dbuf = malloc(maxkey+2);
dbufsize = maxkey;
}
} else if (!strncmp(ptr, SCMP("mdb_env_close"))) {
void *tenv;
envpair *ep;
sscanf(ptr+SOFF("mdb_env_close"), "%p", &tenv);
ep = findenv(tenv);
mdb_env_close(ep->renv);
delenv(ep);
if (!nenvs) /* if no other envs left, this process is done */
break;
} else if (!strncmp(ptr, SCMP("mdb_txn_begin"))) {
unsigned int flags;
void *tenv, *ttxn;
envpair *ep;
MDB_txn *rtxn;
sscanf(ptr+SOFF("mdb_txn_begin"), "%p, %*p, %u = %p", &tenv, &flags, &ttxn);
ep = findenv(tenv);
E(mdb_txn_begin(ep->renv, NULL, flags, &rtxn));
addtxn(tenv, ttxn, rtxn);
} else if (!strncmp(ptr, SCMP("mdb_txn_commit"))) {
void *ttxn;
txnpair *tp;
sscanf(ptr+SOFF("mdb_txn_commit"), "%p", &ttxn);
tp = findtxn(ttxn);
E(mdb_txn_commit(tp->rtxn));
deltxn(tp);
} else if (!strncmp(ptr, SCMP("mdb_txn_abort"))) {
void *ttxn;
txnpair *tp;
sscanf(ptr+SOFF("mdb_txn_abort"), "%p", &ttxn);
tp = findtxn(ttxn);
mdb_txn_abort(tp->rtxn);
deltxn(tp);
} else if (!strncmp(ptr, SCMP("mdb_dbi_open"))) {
void *ttxn;
txnpair *tp;
char *dbname;
unsigned int flags;
unsigned int tdbi;
MDB_dbi dbi;
sscanf(ptr+SOFF("mdb_dbi_open"), "%p, ", &ttxn);
dbname = strchr(ptr+SOFF("mdb_dbi_open"), ',');
dbname += 2;
ptr = strchr(dbname, ',');
*ptr++ = '\0';
if (!strcmp(dbname, "(null)"))
dbname = NULL;
sscanf(ptr, "%u = %u", &flags, &tdbi);
tp = findtxn(ttxn);
E(mdb_dbi_open(tp->rtxn, dbname, flags, &dbi));
} else if (!strncmp(ptr, SCMP("mdb_dbi_close"))) {
void *tenv;
envpair *ep;
unsigned int tdbi;
sscanf(ptr+SOFF("mdb_dbi_close"), "%p, %u", &tenv, &tdbi);
ep = findenv(tenv);
mdb_dbi_close(ep->renv, tdbi);
} else if (!strncmp(ptr, SCMP("mdb_cursor_open"))) {
void *ttxn, *tcrs;
txnpair *tp;
MDB_cursor *rcrs;
unsigned int tdbi;
sscanf(ptr+SOFF("mdb_cursor_open"), "%p, %u = %p", &ttxn, &tdbi, &tcrs);
tp = findtxn(ttxn);
E(mdb_cursor_open(tp->rtxn, tdbi, &rcrs));
addcrs(tp, tcrs, rcrs);
} else if (!strncmp(ptr, SCMP("mdb_cursor_close"))) {
void *tcrs;
sscanf(ptr+SOFF("mdb_cursor_close"), "%p", &tcrs);
delcrs(tcrs);
} else if (!strncmp(ptr, SCMP("mdb_cursor_put"))) {
void *tcrs;
crspair *cp;
unsigned int flags;
int len;
sscanf(ptr+SOFF("mdb_cursor_put"), "%p, ", &tcrs);
cp = findcrs(tcrs);
ptr = strchr(ptr+SOFF("mdb_cursor_put"), ',');
sscanf(ptr+1, "%"MDB_SCNy(u)",", &key.mv_size);
if (key.mv_size) {
ptr = strchr(ptr, '[');
inhex(ptr+1, kbuf);
key.mv_data = kbuf;
ptr += key.mv_size * 2 + 2;
}
ptr = strchr(ptr+1, ',');
sscanf(ptr+1, "%"MDB_SCNy(u)"%n", &data.mv_size, &len);
if (data.mv_size > dbufsize) {
dbuf = realloc(dbuf, data.mv_size+2);
assert(dbuf != NULL);
dbufsize = data.mv_size;
}
ptr += len+1;
if (*ptr == '[') {
inhex(ptr+1, dbuf);
data.mv_data = dbuf;
ptr += data.mv_size * 2 + 2;
} else {
sprintf(dbuf, "%09ld", (long)mdb_txn_id(lasttxn->rtxn));
}
sscanf(ptr+1, "%u", &flags);
E(mdb_cursor_put(cp->rcrs, &key, &data, flags));
} else if (!strncmp(ptr, SCMP("mdb_cursor_del"))) {
void *tcrs;
crspair *cp;
unsigned int flags;
sscanf(ptr+SOFF("mdb_cursor_del"), "%p, %u", &tcrs, &flags);
cp = findcrs(tcrs);
E(mdb_cursor_del(cp->rcrs, flags));
} else if (!strncmp(ptr, SCMP("mdb_put"))) {
void *ttxn;
txnpair *tp;
unsigned int tdbi, flags;
int len;
sscanf(ptr+SOFF("mdb_put"),"%p, %u, %"MDB_SCNy(u), &ttxn, &tdbi, &key.mv_size);
tp = findtxn(ttxn);
ptr = strchr(ptr+SOFF("mdb_put"), '[');
inhex(ptr+1, kbuf);
key.mv_data = kbuf;
ptr += key.mv_size * 2 + 2;
sscanf(ptr+1, "%"MDB_SCNy(u)"%n", &data.mv_size, &len);
if (data.mv_size > dbufsize) {
dbuf = realloc(dbuf, data.mv_size+2);
assert(dbuf != NULL);
dbufsize = data.mv_size;
}
ptr += len+1;
if (*ptr == '[') {
inhex(ptr+1, dbuf);
ptr += data.mv_size * 2 + 2;
} else {
sprintf(dbuf, "%09ld", (long)mdb_txn_id(tp->rtxn));
}
data.mv_data = dbuf;
sscanf(ptr+1, "%u", &flags);
RES(MDB_KEYEXIST,mdb_put(tp->rtxn, tdbi, &key, &data, flags));
} else if (!strncmp(ptr, SCMP("mdb_del"))) {
void *ttxn;
txnpair *tp;
unsigned int tdbi;
int len;
sscanf(ptr+SOFF("mdb_del"),"%p, %u, %"MDB_SCNy(u), &ttxn, &tdbi, &key.mv_size);
tp = findtxn(ttxn);
ptr = strchr(ptr+SOFF("mdb_del"), '[');
inhex(ptr+1, kbuf);
key.mv_data = kbuf;
ptr += key.mv_size * 2 + 2;
sscanf(ptr+1, "%"MDB_SCNy(u)"%n", &data.mv_size, &len);
if (data.mv_size > dbufsize) {
dbuf = realloc(dbuf, data.mv_size+2);
assert(dbuf != NULL);
dbufsize = data.mv_size;
}
ptr += len+1;
if (*ptr == '[') {
inhex(ptr+1, dbuf);
} else {
sprintf(dbuf, "%09ld", (long)mdb_txn_id(tp->rtxn));
}
data.mv_data = dbuf;
RES(MDB_NOTFOUND,mdb_del(tp->rtxn, tdbi, &key, &data));
}
write(1, "\n", 1);
}
exit(0);
}
static pidpair *addpid(int tpid)
{
int fdout[2], fdin[2];
pid_t pid;
assert(npids < MAXPIDS);
pids[npids].tpid = tpid;
pipe(fdout);
pipe(fdin);
if ((pid = fork()) == 0) {
/* child */
fclose(stdin);
fclose(stdout);
dup2(fdout[0], 0);
dup2(fdin[1], 1);
stdin = fdopen(0, "r");
stdout = fdopen(1, "w");
child();
return 0; /* NOTREACHED */
} else {
pids[npids].rpid = pid;
pids[npids].fdout = fdout[1];
pids[npids].fdin = fdin[0];
lastpid = pids+npids;
npids++;
return lastpid;
}
}
static pidpair *findpid(int tpid)
{
int i;
if (!lastpid || lastpid->tpid != tpid) {
for (i=0; i<npids; i++)
if (pids[i].tpid == tpid)
break;
if (i == npids)
return NULL;
lastpid = &pids[i];
}
return lastpid;
}
volatile pid_t killpid;
static void delpid(int tpid)
{
pidpair *pp = findpid(tpid);
if (pp) {
pid_t kpid = pp->rpid;
killpid = kpid;
write(pp->fdout, "exit\n", sizeof("exit"));
while (killpid == kpid)
usleep(10000);
}
}
static void reaper(int sig)
{
int status, i;
pid_t pid = waitpid(-1, &status, 0);
if (pid > 0) {
fprintf(stderr, "# %s %d\n", WIFEXITED(status) ? "exited" : "killed", pid);
for (i=0; i<npids; i++)
if (pids[i].rpid == pid)
break;
assert(i < npids);
close(pids[i].fdout);
close(pids[i].fdin);
for (;i<npids-1; i++)
pids[i] = pids[i+1];
npids--;
killpid = 0;
}
}
int main(int argc,char * argv[])
{
signal(SIGCHLD, reaper);
while (fgets(inbuf, sizeof(inbuf), stdin)) {
pidpair *pp;
int tpid, len;
char c, *ptr;
lcount++;
if (inbuf[0] == '#' && !strncmp(inbuf+1, SCMP(" killed"))) {
sscanf(inbuf+SOFF("killed"),"%d", &tpid);
delpid(tpid);
continue;
}
if (inbuf[0] != '>')
continue;
ptr = inbuf+1;
sscanf(ptr, "%d:%n", &tpid, &len);
pp = findpid(tpid);
if (!pp)
pp = addpid(tpid); /* new process */
ptr = inbuf+len+1;
len = strlen(ptr);
write(pp->fdout, ptr, len); /* send command and wait for ack */
read(pp->fdin, &c, 1);
}
while (npids)
delpid(pids[0].tpid);
}