--- src/sbin/jscan/jstream.c 2005/07/06 06:06:44 1.4 +++ src/sbin/jscan/jstream.c 2005/09/06 06:42:44 1.5 @@ -38,175 +38,10 @@ static struct jhash *JHashAry[JHASH_SIZE]; -static struct jstream *jaddrecord(struct jfile *jf, struct jstream *js); static void jnormalize(struct jstream *js); /* - * Locate the next (or previous) complete virtual stream transaction given a - * file descriptor and direction. Keep track of partial stream records as - * a side effect. - * - * Note that a transaction might represent a huge I/O operation, resulting - * in an overall node structure that spans gigabytes, but individual - * subrecord leaf nodes are limited in size and we depend on this to simplify - * the handling of leaf records. - * - * A transaction may cover several raw records. The jstream collection for - * a transaction is only returned when the entire transaction has been - * successfully scanned. Due to the interleaving of transactions the ordering - * of returned JS's may be different (not exactly reversed) when scanning a - * journal backwards verses forwards. Since parallel operations are - * theoretically non-conflicting, this should not present a problem. - */ -struct jstream * -jscan_stream(struct jfile *jf) -{ - struct journal_rawrecbeg head; - struct journal_rawrecend tail; - struct journal_ackrecord ack; - int recsize; - int search; - int error; - struct jstream *js; - - /* - * Get the current offset and make sure it is 16-byte aligned. If it - * isn't, align it and enter search mode. - */ - if (jf->jf_pos & 15) { - jf_warn(jf, "realigning bad offset and entering search mode"); - jalign(jf); - search = 1; - } else { - search = 0; - } - - error = 0; - js = NULL; - - if (jf->jf_direction == JF_FORWARDS) { - /* - * Scan the journal forwards. Note that the file pointer might not - * be seekable. - */ - while ((error = jread(jf, &head, sizeof(head))) == 0) { - if (head.begmagic != JREC_BEGMAGIC) { - if (search == 0) - jf_warn(jf, "bad beginmagic, searching for new record"); - search = 1; - jalign(jf); - continue; - } - recsize = (head.recsize + 15) & ~15; - if (recsize <= 0) { - jf_warn(jf, "bad recordsize: %d\n", recsize); - search = 1; - jalign(jf); - continue; - } - jset(jf); - js = malloc(offsetof(struct jstream, js_data[recsize])); - bzero(js, sizeof(struct jstream)); - bcopy(&head, js->js_data, sizeof(head)); - error = jread(jf, js->js_data + sizeof(head), recsize - sizeof(head)); - if (error) { - jf_warn(jf, "Incomplete stream record\n"); - jreturn(jf); - free(js); - js = NULL; - break; - } - - /* - * XXX if the stream is full duplex send the ack back now. This - * really needs to be delayed until the transaction is committed, - * but there are stalling issues if the transaction being - * collected exceeds to the size of the FIFO. So for now this - * is just for testing. - */ - if (jf->jf_flags & JF_FULL_DUPLEX) { - bzero(&ack, sizeof(ack)); - ack.rbeg.begmagic = JREC_BEGMAGIC; - ack.rbeg.streamid = JREC_STREAMID_ACK; - ack.rbeg.transid = head.transid; - ack.rbeg.recsize = sizeof(ack); - ack.rend.endmagic = JREC_ENDMAGIC; - ack.rend.recsize = sizeof(ack); - jwrite(jf, &ack, sizeof(ack)); - } - - /* - * note: recsize is aligned (the actual record size), - * head.recsize is unaligned (the actual payload size). - */ - js->js_size = head.recsize; - bcopy(js->js_data + recsize - sizeof(tail), &tail, sizeof(tail)); - if (tail.endmagic != JREC_ENDMAGIC) { - jf_warn(jf, "bad endmagic, searching for new record"); - search = 1; - jreturn(jf); - free(js); - js = NULL; - continue; - } - jflush(jf); - if ((js = jaddrecord(jf, js)) != NULL) - break; - } - } else { - /* - * Scan the journal backwards. Note that jread()'s reverse-seek and - * read. The data read will be forward ordered, however. - */ - while ((error = jread(jf, &tail, sizeof(tail))) == 0) { - if (tail.endmagic != JREC_ENDMAGIC) { - if (search == 0) - jf_warn(jf, "bad endmagic, searching for new record"); - search = 1; - jalign(jf); - continue; - } - recsize = (tail.recsize + 15) & ~15; - if (recsize <= 0) { - jf_warn(jf, "bad recordsize: %d\n", recsize); - search = 1; - jalign(jf); - continue; - } - jset(jf); - js = malloc(offsetof(struct jstream, js_data[recsize])); - bzero(js, sizeof(struct jstream)); - bcopy(&tail, js->js_data + recsize - sizeof(tail), sizeof(tail)); - error = jread(jf, js->js_data, recsize - sizeof(tail)); - - if (error) { - jf_warn(jf, "Incomplete stream record\n"); - jreturn(jf); - free(js); - js = NULL; - break; - } - js->js_size = tail.recsize; - bcopy(js->js_data + recsize - sizeof(tail), &tail, sizeof(tail)); - bcopy(js->js_data, &head, sizeof(head)); - if (head.begmagic != JREC_BEGMAGIC) { - jf_warn(jf, "bad begmagic, searching for new record"); - search = 1; - jreturn(jf); - free(js); - continue; - } - jflush(jf); - if ((js = jaddrecord(jf, js)) != NULL) - break; - } - } - jf->jf_error = error; - return(js); -} - -/* - * Integrate a jstream record. Deal with the transaction begin and end flags + * Integrate a raw record. Deal with the transaction begin and end flags * to create a forward-referenced collection of jstream records. If we are * able to complete a transaction, the first js associated with that * transaction is returned. @@ -214,13 +49,21 @@ jscan_stream(struct jfile *jf) * XXX we need to store the data for very large multi-record transactions * separately since it might not fit into memory. */ -static struct jstream * -jaddrecord(struct jfile *jf, struct jstream *js) +struct jstream * +jaddrecord(struct jfile *jf, struct jdata *jd) { - struct journal_rawrecbeg *head = (void *)js->js_data; + struct journal_rawrecbeg *head; + struct jstream *js; struct jhash *jh; struct jhash **jhp; + js = malloc(sizeof(struct jstream)); + bzero(js, sizeof(struct jstream)); + js->js_jdata = jref(jd); + js->js_head = (void *)jd->jd_data; + js->js_jfile = jf; + head = js->js_head; + /* * Check for a completely self-contained transaction, just return the * js if possible. @@ -256,7 +99,7 @@ jaddrecord(struct jfile *jf, struct jstr * Emplace the stream segment */ jh->jh_transid |= head->streamid & JREC_STREAMCTL_MASK; - if (jf->jf_direction == JF_FORWARDS) { + if (jf->jf_direction == JD_FORWARDS) { jh->jh_last->js_next = js; jh->jh_last = js; } else { @@ -294,15 +137,15 @@ jnormalize(struct jstream *js) off_t off; js->js_normalized_off = 0; - js->js_normalized_base = js->js_data; - js->js_normalized_size = ((struct journal_rawrecbeg *)js->js_data)->recsize - sizeof(struct journal_rawrecend); + js->js_normalized_base = (void *)js->js_head; + js->js_normalized_size = js->js_head->recsize - sizeof(struct journal_rawrecend); js->js_normalized_total = js->js_normalized_size; off = js->js_normalized_size; for (jscan = js->js_next; jscan; jscan = jscan->js_next) { jscan->js_normalized_off = off; - jscan->js_normalized_base = jscan->js_data + + jscan->js_normalized_base = (char *)jscan->js_head + sizeof(struct journal_rawrecbeg); - jscan->js_normalized_size = jscan->js_size - + jscan->js_normalized_size = jscan->js_head->recsize - sizeof(struct journal_rawrecbeg) - sizeof(struct journal_rawrecend); off += jscan->js_normalized_size; @@ -323,6 +166,8 @@ jscan_dispose(struct jstream *js) while (js) { jnext = js->js_next; + jfree(js->js_jfile, js->js_jdata); + js->js_jdata = NULL; free(js); js = jnext; }