--- src/sys/kern/vfs_journal.c 2005/03/05 05:08:27 1.11 +++ src/sys/kern/vfs_journal.c 2005/03/22 22:13:28 1.12 @@ -106,12 +106,14 @@ static int journal_install_vfs_journal(s const struct mountctl_install_journal *info); static int journal_remove_vfs_journal(struct mount *mp, const struct mountctl_remove_journal *info); +static int journal_destroy(struct mount *mp, struct journal *jo, int flags); static int journal_resync_vfs_journal(struct mount *mp, const void *ctl); static int journal_status_vfs_journal(struct mount *mp, const struct mountctl_status_journal *info, struct mountctl_journal_ret_status *rstat, int buflen, int *res); -static void journal_thread(void *info); +static void journal_wthread(void *info); +static void journal_rthread(void *info); static void *journal_reserve(struct journal *jo, struct journal_rawrecbeg **rawpp, @@ -279,7 +281,8 @@ journal_install_vfs_journal(struct mount jo = malloc(sizeof(struct journal), M_JOURNAL, M_WAITOK|M_ZERO); bcopy(info->id, jo->id, sizeof(jo->id)); - jo->flags = info->flags & ~(MC_JOURNAL_ACTIVE | MC_JOURNAL_STOP_REQ); + jo->flags = info->flags & ~(MC_JOURNAL_WACTIVE | MC_JOURNAL_RACTIVE | + MC_JOURNAL_STOP_REQ); /* * Memory FIFO size, round to nearest power of 2 @@ -327,12 +330,19 @@ journal_install_vfs_journal(struct mount free(jo, M_JOURNAL); } else { fhold(fp); - jo->flags |= MC_JOURNAL_ACTIVE; - lwkt_create(journal_thread, jo, NULL, &jo->thread, - TDF_STOPREQ, -1, "journal %.*s", JIDMAX, jo->id); - lwkt_setpri(&jo->thread, TDPRI_KERN_DAEMON); - lwkt_schedule(&jo->thread); - + jo->flags |= MC_JOURNAL_WACTIVE; + lwkt_create(journal_wthread, jo, NULL, &jo->wthread, + TDF_STOPREQ, -1, "journal w:%.*s", JIDMAX, jo->id); + lwkt_setpri(&jo->wthread, TDPRI_KERN_DAEMON); + lwkt_schedule(&jo->wthread); + + if (jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) { + jo->flags |= MC_JOURNAL_RACTIVE; + lwkt_create(journal_rthread, jo, NULL, &jo->rthread, + TDF_STOPREQ, -1, "journal r:%.*s", JIDMAX, jo->id); + lwkt_setpri(&jo->rthread, TDPRI_KERN_DAEMON); + lwkt_schedule(&jo->rthread); + } jrecord_init(jo, &jrec, JREC_STREAMID_DISCONT); jrecord_write(&jrec, JTYPE_ASSOCIATE, 0); jrecord_done(&jrec, 0); @@ -351,36 +361,56 @@ journal_remove_vfs_journal(struct mount const struct mountctl_remove_journal *info) { struct journal *jo; - struct jrecord jrec; int error; TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) { if (bcmp(jo->id, info->id, sizeof(jo->id)) == 0) break; } - if (jo) { - error = 0; - TAILQ_REMOVE(&mp->mnt_jlist, jo, jentry); + if (jo) + error = journal_destroy(mp, jo, info->flags); + else + error = EINVAL; + return (error); +} - jrecord_init(jo, &jrec, JREC_STREAMID_DISCONT); - jrecord_write(&jrec, JTYPE_DISASSOCIATE, 0); - jrecord_done(&jrec, 0); +/* + * Remove all journals associated with a mount point. Usually called + * by the umount code. + */ +void +journal_remove_all_journals(struct mount *mp, int flags) +{ + struct journal *jo; - jo->flags |= MC_JOURNAL_STOP_REQ | (info->flags & MC_JOURNAL_STOP_IMM); - wakeup(&jo->fifo); - while (jo->flags & MC_JOURNAL_ACTIVE) { - tsleep(jo, 0, "jwait", 0); - } - lwkt_free_thread(&jo->thread); /* XXX SMP */ - if (jo->fp) - fdrop(jo->fp, curthread); - if (jo->fifo.membase) - free(jo->fifo.membase, M_JFIFO); - free(jo, M_JOURNAL); - } else { - error = EINVAL; + while ((jo = TAILQ_FIRST(&mp->mnt_jlist)) != NULL) { + journal_destroy(mp, jo, flags); } - return (error); +} + +static int +journal_destroy(struct mount *mp, struct journal *jo, int flags) +{ + struct jrecord jrec; + + TAILQ_REMOVE(&mp->mnt_jlist, jo, jentry); + + jrecord_init(jo, &jrec, JREC_STREAMID_DISCONT); + jrecord_write(&jrec, JTYPE_DISASSOCIATE, 0); + jrecord_done(&jrec, 0); + + jo->flags |= MC_JOURNAL_STOP_REQ | (flags & MC_JOURNAL_STOP_IMM); + wakeup(&jo->fifo); + while (jo->flags & (MC_JOURNAL_WACTIVE | MC_JOURNAL_RACTIVE)) { + tsleep(jo, 0, "jwait", 0); + } + lwkt_free_thread(&jo->wthread); /* XXX SMP */ + if (jo->fp) + fdrop(jo->fp, curthread); + if (jo->fifo.membase) + free(jo->fifo.membase, M_JFIFO); + free(jo, M_JOURNAL); + return(0); } static int @@ -433,12 +463,13 @@ journal_status_vfs_journal(struct mount } return(error); } + /* * The per-journal worker thread is responsible for writing out the * journal's FIFO to the target stream. */ static void -journal_thread(void *info) +journal_wthread(void *info) { struct journal *jo = info; struct journal_rawrecbeg *rawp; @@ -483,7 +514,7 @@ journal_thread(void *info) * help it. * * If xindex is caught up to rindex it gets incremented along with - * rindex. XXX + * rindex. XXX SMP */ if (rawp->streamid == JREC_STREAMID_PAD) { if (jo->fifo.rindex == jo->fifo.xindex) @@ -535,20 +566,127 @@ journal_thread(void *info) } /* - * Advance rindex. XXX for now also advance xindex, which will - * eventually be advanced only when the target acknowledges the - * sequence space. + * Advance rindex. If the journal stream is not full duplex we also + * advance xindex, otherwise the rjournal thread is responsible for + * advancing xindex. */ jo->fifo.rindex += bytes; - jo->fifo.xindex += bytes; + if ((jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) == 0) + jo->fifo.xindex += bytes; jo->total_acked += bytes; KKASSERT(jo->fifo.windex - jo->fifo.rindex >= 0); - if (jo->flags & MC_JOURNAL_WWAIT) { - jo->flags &= ~MC_JOURNAL_WWAIT; /* XXX hysteresis */ - wakeup(&jo->fifo.windex); + if ((jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) == 0) { + if (jo->flags & MC_JOURNAL_WWAIT) { + jo->flags &= ~MC_JOURNAL_WWAIT; /* XXX hysteresis */ + wakeup(&jo->fifo.windex); + } + } + } + jo->flags &= ~MC_JOURNAL_WACTIVE; + wakeup(jo); + wakeup(&jo->fifo.windex); +} + +/* + * A second per-journal worker thread is created for two-way journaling + * streams to deal with the return acknowledgement stream. + */ +static void +journal_rthread(void *info) +{ + struct journal_rawrecbeg *rawp; + struct journal_ackrecord ack; + struct journal *jo = info; + int64_t transid; + int error; + int count; + int bytes; + int index; + + transid = 0; + error = 0; + + for (;;) { + /* + * We have been asked to stop + */ + if (jo->flags & MC_JOURNAL_STOP_REQ) + break; + + /* + * If we have no active transaction id, get one from the return + * stream. + */ + if (transid == 0) { + for (index = 0; index < sizeof(ack); index += count) { + error = fp_read(jo->fp, &ack, sizeof(ack), &count); + if (error) + break; + if (count == 0) + tsleep(&jo->fifo.xindex, 0, "jread", hz); + } + if (error) { + printf("read error %d on receive stream\n", error); + break; + } + if (ack.rbeg.begmagic != JREC_BEGMAGIC || + ack.rend.endmagic != JREC_ENDMAGIC + ) { + printf("bad begmagic or endmagic on receive stream\n"); + break; + } + transid = ack.rbeg.transid; + } + + /* + * Calculate the number of unacknowledged bytes. If there are no + * unacknowledged bytes then unsent data was acknowledged, report, + * sleep a bit, and loop in that case. This should not happen + * normally. The ack record is thrown away. + */ + bytes = jo->fifo.rindex - jo->fifo.xindex; + + if (bytes == 0) { + printf("warning: unsent data acknowledged\n"); + tsleep(&jo->fifo.xindex, 0, "jrseq", hz); + transid = 0; + continue; + } + + /* + * Since rindex has advanceted, the record pointed to by xindex + * must be a valid record. + */ + rawp = (void *)(jo->fifo.membase + (jo->fifo.xindex & jo->fifo.mask)); + KKASSERT(rawp->begmagic == JREC_BEGMAGIC); + KKASSERT(rawp->recsize <= bytes); + + /* + * The target can acknowledge several records at once. + */ + if (rawp->transid < transid) { + printf("ackskip %08llx/%08llx\n", rawp->transid, transid); + jo->fifo.xindex += (rawp->recsize + 15) & ~15; + if (jo->flags & MC_JOURNAL_WWAIT) { + jo->flags &= ~MC_JOURNAL_WWAIT; /* XXX hysteresis */ + wakeup(&jo->fifo.windex); + } + continue; } + if (rawp->transid == transid) { + printf("ackskip %08llx/%08llx\n", rawp->transid, transid); + jo->fifo.xindex += (rawp->recsize + 15) & ~15; + if (jo->flags & MC_JOURNAL_WWAIT) { + jo->flags &= ~MC_JOURNAL_WWAIT; /* XXX hysteresis */ + wakeup(&jo->fifo.windex); + } + transid = 0; + continue; + } + printf("warning: unsent data(2) acknowledged\n"); + transid = 0; } - jo->flags &= ~MC_JOURNAL_ACTIVE; + jo->flags &= ~MC_JOURNAL_RACTIVE; wakeup(jo); wakeup(&jo->fifo.windex); } @@ -557,10 +695,13 @@ journal_thread(void *info) * This builds a pad record which the journaling thread will skip over. Pad * records are required when we are unable to reserve sufficient stream space * due to insufficient space at the end of the physical memory fifo. + * + * Even though the record is not transmitted, a normal transid must be + * assigned to it so link recovery operations after a failure work properly. */ static void -journal_build_pad(struct journal_rawrecbeg *rawp, int recsize) +journal_build_pad(struct journal_rawrecbeg *rawp, int recsize, int64_t transid) { struct journal_rawrecend *rendp; @@ -568,7 +709,7 @@ journal_build_pad(struct journal_rawrecb rawp->streamid = JREC_STREAMID_PAD; rawp->recsize = recsize; /* must be 16-byte aligned */ - rawp->seqno = 0; + rawp->transid = transid; /* * WARNING, rendp may overlap rawp->seqno. This is necessary to * allow PAD records to fit in 16 bytes. Use cpu_mb1() to @@ -610,7 +751,7 @@ journal_commit_wakeup(struct journal *jo * specified amount of payload space. *rawpp will be set to point to the * base of the new stream record and a pointer to the base of the payload * space will be returned. *rawpp does not need to be pre-NULLd prior to - * making this call. + * making this call. The raw record header will be partially initialized. * * A stream can be extended, aborted, or committed by other API calls * below. This may result in a sequence of potentially disconnected @@ -693,16 +834,20 @@ journal_reserve(struct journal *jo, stru * journaling code must also be aware the reserved sections occuring * after this one will also not be written out even if completed * until this one is completed. + * + * The transaction id must accomodate real and potential pad creation. */ rawp = (void *)(jo->fifo.membase + (jo->fifo.windex & jo->fifo.mask)); if (req != bytes) { - journal_build_pad(rawp, availtoend); + journal_build_pad(rawp, availtoend, jo->transid); + ++jo->transid; rawp = (void *)jo->fifo.membase; } rawp->begmagic = JREC_INCOMPLETEMAGIC; /* updated by abort/commit */ rawp->recsize = bytes; /* (unaligned size) */ rawp->streamid = streamid | JREC_STREAMCTL_BEGIN; - rawp->seqno = 0; /* set by caller */ + rawp->transid = jo->transid; + jo->transid += 2; /* * Issue a memory barrier to guarentee that the record data has been @@ -882,7 +1027,8 @@ journal_commit(struct journal *jo, struc jo->fifo.windex -= osize - nsize; } else { /* we cannot backindex the fifo, emplace a pad in the dead space */ - journal_build_pad((void *)((char *)rawp + nsize), osize - nsize); + journal_build_pad((void *)((char *)rawp + nsize), osize - nsize, + rawp->transid + 1); } } @@ -1352,9 +1498,9 @@ jrecord_write_vattr(struct jrecord *jrec save = jrecord_push(jrec, JTYPE_VATTR); if (vat->va_type != VNON) - jrecord_leaf(jrec, JLEAF_UID, &vat->va_type, sizeof(vat->va_type)); + jrecord_leaf(jrec, JLEAF_VTYPE, &vat->va_type, sizeof(vat->va_type)); if (vat->va_uid != VNOVAL) - jrecord_leaf(jrec, JLEAF_UID, &vat->va_mode, sizeof(vat->va_mode)); + jrecord_leaf(jrec, JLEAF_MODES, &vat->va_mode, sizeof(vat->va_mode)); if (vat->va_nlink != VNOVAL) jrecord_leaf(jrec, JLEAF_NLINK, &vat->va_nlink, sizeof(vat->va_nlink)); if (vat->va_uid != VNOVAL)