--- src/sbin/jscan/jfile.c 2005/07/06 06:06:44 1.5 +++ src/sbin/jscan/jfile.c 2005/09/06 06:42:44 1.6 @@ -35,140 +35,602 @@ */ #include "jscan.h" +#include + +static void jalign(struct jfile *jf); +static int jreadbuf(struct jfile *jf, void *buf, int bytes); /* - * Open a journal for directional scanning + * Open a file descriptor for journal record access. + * + * NOTE: only seekable descriptors are supported for backwards scans. */ struct jfile * -jopen_stream(const char *path, enum jdirection jdir, int flags) +jopen_fd(int fd, enum jdirection direction) { - FILE *fp; struct jfile *jf; - if ((fp = fopen(path, "r")) == NULL) - return (NULL); - if ((jf = jopen_fp(fp, jdir, flags)) == NULL) - fclose (fp); + jf = malloc(sizeof(struct jfile)); + bzero(jf, sizeof(struct jfile)); + jf->jf_fd = fd; + jf->jf_open_flags = O_RDONLY; + if (direction == JD_BACKWARDS) { + jf->jf_pos = lseek(jf->jf_fd, 0L, SEEK_END); + } + jf->jf_direction = direction; return(jf); } +/* + * Open a prefix set. .nnnnnnnnn files or a .transid file + * must exist to succeed. No file descriptor is actually opened but + * the sequence number is initialized to the beginning or end of the set. + */ struct jfile * -jopen_fp(FILE *fp, enum jdirection jdir, int flags) +jopen_prefix(const char *prefix, enum jdirection direction, int rw) { struct jfile *jf; + unsigned int seq_beg = -1; + unsigned int seq_end = -1; + unsigned int seq; + struct stat st; + const char *dirname; + struct dirent *den; + DIR *dir; + char *basename; + char *data; + char *ptr; + int hastransid; + int baselen; + int fd; + + dirname = data = strdup(prefix); + if ((basename = strrchr(dirname, '/')) != NULL) { + *basename++ = 0; + } else { + basename = data; + dirname = "./"; + } + baselen = strlen(basename); + if ((dir = opendir(dirname)) != NULL) { + while ((den = readdir(dir)) != NULL) { + if (strncmp(den->d_name, basename, baselen) == 0 && + den->d_name[baselen] == '.' + ) { + seq = strtoul(den->d_name + baselen + 1, &ptr, 10); + if (*ptr == 0 && seq > 0) { + if (seq_beg == (unsigned int)-1 || seq_beg > seq) + seq_beg = seq; + if (seq_end == (unsigned int)-1 || seq_end < seq) + seq_end = seq; + } + } + } + closedir(dir); + } + free(data); - jf = malloc(sizeof(struct jfile)); - bzero(jf, sizeof(struct jfile)); - jf->jf_fp = fp; - jf->jf_direction = jdir; - jf->jf_setpt = -1; - jf->jf_flags = flags; - if (jdir == JF_BACKWARDS) { - fseeko(jf->jf_fp, 0L, SEEK_END); - jf->jf_pos = ftello(jf->jf_fp); + hastransid = 0; + asprintf(&data, "%s.transid", prefix); + if (stat(data, &st) == 0) + hastransid = 1; + free(data); + + if (seq_beg != (unsigned int)-1 || hastransid) { + if (seq_beg == (unsigned int)-1) { + seq_beg = 0; + seq_end = 0; + if (rw) { + asprintf(&data, "%s.%08x", prefix, 0); + if ((fd = open(data, O_RDWR|O_CREAT, 0666)) >= 0) + close(fd); + free(data); + } + } + jf = malloc(sizeof(struct jfile)); + bzero(jf, sizeof(struct jfile)); + jf->jf_fd = -1; + jf->jf_prefix = strdup(prefix); + jf->jf_seq_beg = seq_beg; + jf->jf_seq_end = seq_end; + jf->jf_pos = -1; + if (direction == JD_BACKWARDS) { + jf->jf_seq = jf->jf_seq_end; + } else { + jf->jf_seq = jf->jf_seq_beg; + } + jf->jf_direction = direction; + jf->jf_open_flags = rw ? (O_RDWR|O_CREAT) : O_RDONLY; + } else { + jf = NULL; } return(jf); } /* - * Close a previously opened journal, clean up any side allocations. + * Get a prefix set ready for append. */ -void -jclose_stream(struct jfile *jf) +int +jrecord_init(const char *prefix) { - struct jdata *jd; + struct jfile *jf; + struct stat st; + char *data; + int hasseqspace; + int fd; + + /* + * Determine whether we already have a prefix set or whether we need + * to create one. + */ + jf = jopen_prefix(prefix, 0, 0); + hasseqspace = 0; + if (jf) { + if (jf->jf_seq_beg != (unsigned int)-1) + hasseqspace = 1; + jclose(jf); + } + asprintf(&data, "%s.transid", prefix); - fclose(jf->jf_fp); - jf->jf_fp = NULL; - while ((jd = jf->jf_saved) != NULL) { - jf->jf_saved = jd->jd_next; - free(jd); + /* + * If the sequence exists the transid file must ALREADY exist for us + * to be able to safely 'append' to the space. Locked-down sequence + * spaces do not have a transid file. + */ + if (hasseqspace) { + fd = open(data, O_RDWR, 0666); + } else { + fd = open(data, O_RDWR|O_CREAT, 0666); } - free(jf); + free(data); + if (fd < 0) + return(-1); + if (fstat(fd, &st) == 0 && st.st_size == 0) + write(fd, "0000000000000000\n", 17); /* starting transid in hex */ + close(fd); + return(0); } /* - * Align us to the next 16 byte boundary. If scanning forwards we align - * forwards if not already aligned. If scanning backwards we align - * backwards if not already aligned. + * Close a previously opened journal, clean up any side allocations. */ void -jalign(struct jfile *jf) +jclose(struct jfile *jf) { - if (jf->jf_direction == JF_FORWARDS) { - jf->jf_pos = (jf->jf_pos + 15) & ~15; - fseeko(jf->jf_fp, jf->jf_pos, SEEK_SET); - } else { - jf->jf_pos = jf->jf_pos & ~15; - } + close(jf->jf_fd); + jf->jf_fd = -1; + free(jf); } /* - * Read data from a journal forwards or backwards. Note that the file - * pointer's actual seek position does not match jf_pos in the reverse - * scan case. Callers should never access jf_fp directly. + * Locate the next (or previous) complete virtual stream transaction given a + * file descriptor and direction. Keep track of partial stream records as + * a side effect. + * + * Note that a transaction might represent a huge I/O operation, resulting + * in an overall node structure that spans gigabytes, but individual + * subrecord leaf nodes are limited in size and we depend on this to simplify + * the handling of leaf records. + * + * A transaction may cover several raw records. The jstream collection for + * a transaction is only returned when the entire transaction has been + * successfully scanned. Due to the interleaving of transactions the ordering + * of returned JS's may be different (not exactly reversed) when scanning a + * journal backwards verses forwards. Since parallel operations are + * theoretically non-conflicting, this should not present a problem. */ int -jread(struct jfile *jf, void *buf, int bytes) +jread(struct jfile *jf, struct jdata **jdp, enum jdirection direction) { + struct journal_rawrecbeg head; + struct journal_rawrecbeg *headp; + struct journal_rawrecend tail; + struct journal_rawrecend *tailp; + struct jdata *jd; + char *filename; + int allocsize; + int recsize; + int search; int n; - - if (jf->jf_direction == JF_FORWARDS) { - while (bytes) { - n = fread(buf, 1, bytes, jf->jf_fp); - if (n <= 0) - break; - assert(n <= bytes); - jf->jf_pos += n; - buf = (char *)buf + n; - bytes -= n; + + /* + * If changing direction on an open descriptor we have to fixup jf_pos. + * When reading backwards the actual file seek position does not match + * jf_pos. + * + * If you read forwards then read backwards, or read backwords then + * read forwards, you will get the same record. + */ + if (jf->jf_direction != direction) { + if (jf->jf_fd >= 0) { + if (direction == JD_FORWARDS) { + lseek(jf->jf_fd, jf->jf_pos, 0); + } } - if (bytes == 0) { - return (0); - } else { - fseeko(jf->jf_fp, jf->jf_pos, SEEK_SET); - return (errno ? errno : ENOENT); + jf->jf_direction = direction; + } + +top: + /* + * If reading in prefix mode and we have no descriptor, open + * a new descriptor based on the current sequence number. If + * this fails we will fall all the way through to the end which will + * setup the next sequence number and loop. + */ + if (jf->jf_fd == -1 && jf->jf_prefix) { + asprintf(&filename, "%s.%08x", jf->jf_prefix, jf->jf_seq); + if ((jf->jf_fd = open(filename, O_RDONLY)) >= 0) { + if (jf->jf_direction == JD_FORWARDS) + jf->jf_pos = 0; + else + jf->jf_pos = lseek(jf->jf_fd, 0L, SEEK_END); + search = 0; } + fprintf(stderr, "Open %s fd %d\n", filename, jf->jf_fd); + free(filename); + } + + /* + * Get the current offset and make sure it is 16-byte aligned. If it + * isn't, align it and enter search mode. + */ + if (jf->jf_pos & 15) { + jf_warn(jf, "realigning bad offset and entering search mode"); + jalign(jf); + search = 1; } else { - if (bytes > jf->jf_pos) - return (ENOENT); - jf->jf_pos -= bytes; - fseeko(jf->jf_fp, jf->jf_pos, SEEK_SET); - if (fread(buf, bytes, 1, jf->jf_fp) == 1) { - return (0); + search = 0; + } + + if (jf->jf_direction == JD_FORWARDS) { + /* + * Scan the journal forwards. Note that the file pointer might not + * be seekable. + */ + while (jreadbuf(jf, &head, sizeof(head)) == sizeof(head)) { + if (head.begmagic != JREC_BEGMAGIC) { + if (search == 0) + jf_warn(jf, "bad beginmagic, searching for new record"); + search = 1; + jalign(jf); + continue; + } + + /* + * The actual record is 16-byte aligned. head.recsize contains + * the unaligned record size. + */ + recsize = (head.recsize + 15) & ~15; + if (recsize < JREC_MINRECSIZE || recsize > JREC_MAXRECSIZE) { + if (search == 0) + jf_warn(jf, "bad recordsize: %d\n", recsize); + search = 1; + jalign(jf); + continue; + } + allocsize = offsetof(struct jdata, jd_data[recsize]); + allocsize = (allocsize + 255) & ~255; + jd = malloc(allocsize); + bzero(jd, offsetof(struct jdata, jd_data[0])); + bcopy(&head, jd->jd_data, sizeof(head)); + n = jreadbuf(jf, jd->jd_data + sizeof(head), + recsize - sizeof(head)); + if (n != (int)(recsize - sizeof(head))) { + if (search == 0) + jf_warn(jf, "Incomplete stream record\n"); + search = 1; + jalign(jf); + free(jd); + continue; + } + + tailp = (void *)(jd->jd_data + recsize - sizeof(*tailp)); + if (tailp->endmagic != JREC_ENDMAGIC) { + if (search == 0) + jf_warn(jf, "bad endmagic, searching for new record"); + search = 1; + jalign(jf); + free(jd); + continue; + } + + /* + * note: recsize is aligned (the actual record size), + * head.recsize is unaligned (the actual payload size). + */ + jd->jd_transid = head.transid; + jd->jd_alloc = allocsize; + jd->jd_size = recsize; + jd->jd_refs = 1; + jd->jd_next = jf->jf_data; + jf->jf_data = jd; + *jdp = jd; + return(0); + } + } else { + /* + * Scan the journal backwards. Note that jread()'s reverse-seek and + * read. The data read will be forward ordered, however. + */ + while (jreadbuf(jf, &tail, sizeof(tail)) == sizeof(tail)) { + if (tail.endmagic != JREC_ENDMAGIC) { + if (search == 0) + jf_warn(jf, "bad endmagic, searching for new record"); + search = 1; + jalign(jf); + continue; + } + + /* + * The actual record is 16-byte aligned. head.recsize contains + * the unaligned record size. + */ + recsize = (tail.recsize + 15) & ~15; + if (recsize < JREC_MINRECSIZE || recsize > JREC_MAXRECSIZE) { + if (search == 0) + jf_warn(jf, "bad recordsize: %d\n", recsize); + search = 1; + jalign(jf); + continue; + } + allocsize = offsetof(struct jdata, jd_data[recsize]); + allocsize = (allocsize + 255) & ~255; + jd = malloc(allocsize); + bzero(jd, offsetof(struct jdata, jd_data[0])); + bcopy(&tail, jd->jd_data + recsize - sizeof(tail), sizeof(tail)); + n = jreadbuf(jf, jd->jd_data, recsize - sizeof(tail)); + if (n != (int)(recsize - sizeof(tail))) { + if (search == 0) + jf_warn(jf, "Incomplete stream record\n"); + search = 1; + jalign(jf); + free(jd); + continue; + } + + headp = (void *)jd->jd_data; + if (headp->begmagic != JREC_BEGMAGIC) { + if (search == 0) + jf_warn(jf, "bad begmagic, searching for new record"); + search = 1; + jalign(jf); + free(jd); + continue; + } + + /* + * note: recsize is aligned (the actual record size), + * head.recsize is unaligned (the actual payload size). + */ + jd->jd_transid = headp->transid; + jd->jd_alloc = allocsize; + jd->jd_size = recsize; + jd->jd_refs = 1; + jd->jd_next = jf->jf_data; + jf->jf_data = jd; + *jdp = jd; + return(0); + } + } + + /* + * If reading in prefix mode and there is no more data, close the + * current descriptor, adjust the sequence number, and loop. + */ + if (jf->jf_prefix) { + close(jf->jf_fd); + jf->jf_fd = -1; + if (jf->jf_direction == JD_FORWARDS) { + if (jf->jf_seq < jf->jf_seq_end) { + ++jf->jf_seq; + goto top; + } } else { - jf->jf_pos += bytes; - return (errno); + if (jf->jf_seq > jf->jf_seq_beg) { + --jf->jf_seq; + goto top; + } } } + + /* + * Otherwise there are no more records and we are done. + */ + *jdp = NULL; + return(-1); } +/* + * Write a record out. If this is a prefix set and the file would + * exceed record_size, we rotate into a new sequence number. + */ int -jwrite(struct jfile *jf, void *buf, int bytes) +jwrite(struct jfile *jf, struct jdata *jd) { int n; - n = write(fileno(jf->jf_fp), buf, bytes); + n = write(jf->jf_fd, jd->jd_data, jd->jd_size); return(n); } -void -jset(struct jfile *jf) +/* + * Reset the direction and seek us to the beginning or end + * of the currenet file. In prefix mode we might as well + * just let jsread() do it since it might have to do it + * anyway. + */ +static void +jreset(struct jfile *jf, unsigned int seq, enum jdirection direction) { - jf->jf_setpt = jf->jf_pos; + if (jf->jf_prefix) { + if (jf->jf_fd >= 0) { + close(jf->jf_fd); + jf->jf_fd = -1; + } + jf->jf_pos = -1; + jf->jf_seq = seq; + } else { + if (direction) { + jf->jf_pos = lseek(jf->jf_fd, 0L, 0); + } else { + jf->jf_pos = lseek(jf->jf_fd, 0L, SEEK_END); + } + } + jf->jf_direction = direction; } +/* + * Position the file such that the next jread() in the specified + * direction will read the record for the specified transaction id. + * If the transaction id does not exist the jseek will position the + * file at the next higher (if reading forwards) or lower (if reading + * backwards) transaction id. + * + * jseek is not required to be exact. It is allowed to position the + * file at any point <= the transid (forwards) or >= the transid + * (backwards). However, the more off jseek is, the more scanning + * the code will have to do to position itself properly. + */ void -jreturn(struct jfile *jf) +jseek(struct jfile *jf, int64_t transid, enum jdirection direction) +{ + int64_t transid_beg; + int64_t transid_end; + unsigned int seq; + struct jdata *jd; + + /* + * If we have a prefix set search the sequence space backwards until + * we find the file most likely to contain the transaction id. + */ + if (jf->jf_prefix) { + for (seq = jf->jf_seq_end; seq >= jf->jf_seq_beg; --seq) { + jreset(jf, seq, JD_FORWARDS); + if (jread(jf, &jd, JD_FORWARDS) == 0) { + transid_beg = jd->jd_transid; + jfree(jf, jd); + if (transid_beg == transid) { + jreset(jf, seq, JD_FORWARDS); + break; + } + if (transid_beg < transid) + break; + } + } + } + + /* + * Position us within the current file. + */ + jreset(jf, seq, JD_BACKWARDS); + while (jread(jf, &jd, JD_BACKWARDS) == 0) { + transid_end = jd->jd_transid; + jfree(jf, jd); + + /* + * If we are at the sequence number the next forward read + * will re-read the record since we were going backwards. If + * the caller wants to go backwards we have to go forwards one + * record so the caller gets the transid record when it does + * its first backwards read. Confused yet? + * + * If we are at a smaller sequence number we need to read forwards + * by one so the next forwards read gets the first record > transid, + * or the next backwards read gets the first record < transid. + */ + if (transid_end == transid) { + if (direction == JD_BACKWARDS) { + if (jread(jf, &jd, JD_FORWARDS) == 0) + jfree(jf, jd); + } + break; + } + if (transid_end < transid) { + if (jread(jf, &jd, JD_FORWARDS) == 0) + jfree(jf, jd); + } + } +} + +/* + * Data returned by jread() is persistent until released. + */ +struct jdata * +jref(struct jdata *jd) { - jf->jf_pos = jf->jf_setpt; - jf->jf_setpt = -1; - fseeko(jf->jf_fp, jf->jf_pos, SEEK_SET); + ++jd->jd_refs; + return(jd); } void -jflush(struct jfile *jf) +jfree(struct jfile *jf, struct jdata *jd) +{ + struct jdata **jdp; + + if (--jd->jd_refs == 0){ + for (jdp = &jf->jf_data; *jdp != jd; jdp = &(*jdp)->jd_next) { + assert(*jdp != NULL); + } + *jdp = jd->jd_next; + free(jd); + } +} + +/* + * Align us to the next 16 byte boundary. If scanning forwards we align + * forwards if not already aligned. If scanning backwards we align + * backwards if not already aligned. We only have to synchronize the + * seek position with the file seek position for forward scans. + */ +static void +jalign(struct jfile *jf) +{ + char dummy[16]; + int bytes; + + if ((int)jf->jf_pos & 15) { + if (jf->jf_direction == JD_FORWARDS) { + bytes = 16 - ((int)jf->jf_pos & 15); + jf->jf_pos += jreadbuf(jf, dummy, bytes); + } else { + jf->jf_pos = jf->jf_pos & ~(off_t)15; + } + } +} + +/* + * Read the next raw journal record forwards or backwards and return a + * pointer to it. Note that the file pointer's actual seek position does + * not match jf_pos in the reverse direction case. + */ +static int +jreadbuf(struct jfile *jf, void *buf, int bytes) { - jf->jf_setpt = -1; + int ttl = 0; + int n; + + if (jf->jf_fd < 0) + return(0); + + if (jf->jf_direction == JD_FORWARDS) { + while (ttl != bytes) { + n = read(jf->jf_fd, (char *)buf + ttl, bytes - ttl); + if (n <= 0) + break; + ttl += n; + } + } else { + if (jf->jf_pos >= bytes) { + jf->jf_pos -= bytes; + lseek(jf->jf_fd, jf->jf_pos, 0); + while (ttl != bytes) { + n = read(jf->jf_fd, (char *)buf + ttl, bytes - ttl); + if (n <= 0) + break; + ttl += n; + } + } + } + return(ttl); }