--- src/sys/kern/kern_syslink.c 2007/04/22 00:59:25 1.9 +++ src/sys/kern/kern_syslink.c 2007/04/26 02:10:59 1.10 @@ -1,5 +1,5 @@ /* - * Copyright (c) 2006 The DragonFly Project. All rights reserved. + * Copyright (c) 2006-2007 The DragonFly Project. All rights reserved. * * This code is derived from software contributed to The DragonFly Project * by Matthew Dillon @@ -37,6 +37,10 @@ * This module implements the syslink() system call and protocol which * is used to glue clusters together as well as to interface userland * devices and filesystems to the kernel. + * + * We implement the management node concept in this module. A management + * node is basically a router node with additional features that take much + * of the protocol burden away from connecting terminal nodes. */ #include @@ -62,6 +66,8 @@ #include +#include "opt_syslink.h" + /* * Red-Black trees organizing the syslink 'router' nodes and connections * to router nodes. @@ -110,12 +116,13 @@ struct sldata { RB_ENTRY(sldata) rbnode; struct slrouter *router; /* organizing router */ struct file *xfp; /* external file pointer */ - struct socket *xso; /* external socket */ struct lock rlock; /* synchronizing lock */ struct lock wlock; /* synchronizing lock */ struct thread *rthread; /* helper thread */ struct thread *wthread; /* helper thread */ - struct sockbuf sio; /* accumulate mbufs */ + struct sockbuf sior; /* accumulate incoming mbufs */ + struct sockbuf siow; /* accumulate outgoing mbufs */ + struct sockaddr sa; /* used w/SLIF_SUBNET mode */ int bindex; /* broadcast index */ int flags; /* connection flags */ int linkid; @@ -513,11 +520,13 @@ syslink_add(struct slrouter *slrouter, s * Complete initialization of the physical route node. Setting * sldata->router activates the node. */ - sbinit(&sldata->sio, SYSLINK_SIOBUFSIZE); + sbinit(&sldata->sior, SYSLINK_SIOBUFSIZE); + sbinit(&sldata->siow, SYSLINK_SIOBUFSIZE); sldata->bindex = slrouter->bbuf.windex; sldata->flags = info->flags & SLIF_USERFLAGS; lockinit(&sldata->rlock, "slread", 0, 0); lockinit(&sldata->wlock, "slwrite", 0, 0); + bcopy(&info->u.sa, &sldata->sa, sizeof(sldata->sa)); if (info->fd < 0) { /* @@ -544,7 +553,6 @@ syslink_add(struct slrouter *slrouter, s /* two refs: reader thread and writer thread */ sldata->refs += 2; if (sldata->xfp->f_type == DTYPE_SOCKET) { - sldata->xso = (void *)sldata->xfp->f_data; lwkt_create(syslink_rthread_so, sldata, &sldata->rthread, NULL, 0, -1, "syslink_r"); @@ -589,34 +597,74 @@ syslink_rem(struct slrouter *slrouter, s } /* - * This thread reads from an external descriptor into rbuf, then parses and - * dispatches syslink messages from rbuf. + * Read syslink messages from an external socket and route them. */ static void syslink_rthread_so(void *arg) { struct sldata *sldata = arg; + struct socket *so; struct sockaddr *sa; struct mbuf *m; int soflags; int linkid; int error; + int needsa; + + so = (void *)sldata->xfp->f_data; + sa = NULL; + + /* + * Calculate whether we need to get the peer address or not. + * We need to obtain the peer address for packet-mode sockets + * representing subnets (rather then single connections). + */ + needsa = (sldata->bits && (sldata->flags & SLIF_PACKET)); while ((sldata->flags & SLIF_RQUIT) == 0) { /* - * Read some data. This is easy if data is packetized, + * Read some data. This is easy if the data is packetized, * otherwise we can still obtain an mbuf chain but we have * to parse out the syslink messages. */ soflags = 0; - sa = NULL; - error = so_pru_soreceive(sldata->xso, - (sldata->bits ? &sa : NULL), - NULL, &sldata->sio, + error = so_pru_soreceive(so, + (needsa ? &sa : NULL), + NULL, &sldata->sior, NULL, &soflags); + + /* + * The target is responsible for adjusting the src address + * field in the syslink_msg. We may need subnet information + * from the sockaddr to accomplish this. + * + * For streams representing subnets the originator is + * responsible for tagging its subnet bits in the src + * address but we have to renormalize + */ linkid = sldata->linkid; - if (sldata->bits && sa) { + if (sldata->flags & SLIF_PACKET) { + if (sldata->bits) { + linkid += syslink_getsubnet(sa) & + ((1 << sldata->bits) - 1); + } + if ((m = sldata->sior.sb_mb) != NULL) { + sbinit(&sldata->sior, SYSLINK_SIOBUFSIZE); + syslink_route(sldata->router, linkid, m); + } + } else { + while ((m = syslink_parse_stream(&sldata->sior)) != NULL) { + syslink_route(sldata->router, linkid, m); + } + } + + + + /* + * + */ + if ((sldata->flags & SLIF_SUBNET) && sldata->bits && sa) { linkid += syslink_getsubnet(sa) & ((1 << sldata->bits) - 1); FREE(sa, M_SONAME); @@ -624,16 +672,24 @@ syslink_rthread_so(void *arg) if (error) break; + /* + * Note: Incoming syslink messages must have their headers + * adjusted to reflect the origination address. This will + * be handled by syslink_route. + */ if (sldata->flags & SLIF_PACKET) { /* - * Packetized data + * Packetized data can just be directly routed. */ - m = sldata->sio.sb_mb; - sbinit(&sldata->sio, SYSLINK_SIOBUFSIZE); - if (m) + if ((m = sldata->sior.sb_mb) != NULL) { + sbinit(&sldata->sior, SYSLINK_SIOBUFSIZE); syslink_route(sldata->router, linkid, m); + } } else { - while ((m = syslink_parse_stream(&sldata->sio)) != NULL) { + /* + * Stream data has to be parsed out. + */ + while ((m = syslink_parse_stream(&sldata->sior)) != NULL) { syslink_route(sldata->router, linkid, m); } } @@ -643,8 +699,9 @@ syslink_rthread_so(void *arg) * Mark us as done and deref sldata. Tell the writer to terminate as * well. */ - sbflush(&sldata->sio); sldata->flags |= SLIF_RDONE; + sbflush(&sldata->sior); + sbflush(&sldata->siow); if ((sldata->flags & SLIF_WDONE) == 0) { sldata->flags |= SLIF_WQUIT; wakeup(&sldata->wthread); @@ -654,13 +711,36 @@ syslink_rthread_so(void *arg) sldata_rels(sldata); } +/* + * Read syslink messages from an external descriptor and route them. Used + * when no socket interface is available. + */ static void syslink_rthread_fp(void *arg) { struct sldata *sldata = arg; +#if 0 + /* + * Loop until told otherwise + */ + while ((sldata->flags & SLIF_RQUIT) == 0) { + error = fp_read(slink->xfp, + slbuf->buf + + (slbuf->windex & slbuf->bufmask + ), + count, &count, 0, UIO_SYSSPACE); + } +#endif + + /* + * Mark us as done and deref sldata. Tell the writer to terminate as + * well. + */ sldata->flags |= SLIF_RDONE; + sbflush(&sldata->sior); + sbflush(&sldata->siow); if ((sldata->flags & SLIF_WDONE) == 0) { sldata->flags |= SLIF_WQUIT; wakeup(&sldata->wthread); @@ -795,12 +875,82 @@ void syslink_wthread_so(void *arg) { struct sldata *sldata = arg; -#if 0 - struct slbuf *slbuf = &sldata->wbuf; + struct slrouter *slrouter; struct syslink_msg *head; + struct sockaddr *sa; + struct socket *so; + struct iovec aiov; + struct uio auio; int error; + int avail; + int bytes; + +#if 0 + so = (void *)sldata->xfp->f_data; + slrouter = sldata->router; while ((sldata->flags & SLIF_WQUIT) == 0) { + /* + * Deal with any broadcast data sitting in the route node's + * broadcast buffer. If we have fallen too far behind the + * data may no longer be valid. + * + * avail -- available data in broadcast buffer and + * bytes -- available contiguous data in broadcast buffer + */ + if (slrouter->bbuf.rindex - sldata->bindex > 0) + sldata->bindex = slrouter->bbuf.rindex; + if ((avail = slrouter->bbuf.windex - sldata->bindex) > 0) { + bytes = slrouter->bbuf.bufsize - + (sldata->bindex & slrouter->bbuf.bufmask); + if (bytes > avail) + bytes = avail; + head = (void *)(slrouter->bbuf.buf + + (sldata->bindex & slrouter->bbuf.bufmask)); + /* + * Break into packets if necessary, else just write + * it all in one fell swoop. + */ + aiov.iov_base = (void *)head; + aiov.iov_len = bytes; + auio.uio_iov = &aiov; + auio.uio_iovcnt = 1; + auio.uio_offset = 0; + auio.uio_resid = bytes; + auio.uio_segflg = UIO_SYSSPACE; + auio.uio_rw = UIO_WRITE; + auio.uio_td = curthread; + if (sldata->flags & SLIF_PACKET) { + if (head->sm_bytes < SL_MIN_MESSAGE_SIZE) { + kprintf("syslink_msg too small, terminating\n"); + break; + } + if (head->sm_bytes > bytes) { + kprintf("syslink_msg not FIFO aligned, terminating\n"); + break; + } + bytes = SLMSG_ALIGN(head->sm_bytes); + so_pru_sosend(so, sa, &auio, NULL, NULL, 0, curthread); + } else { + so_pru_sosend(so, sa, &auio, NULL, NULL, 0, curthread); + } + continue; + } + + /* + * Deal with mbuf records waiting to be output + */ + if (sldata->siow.sb_mb != NULL) { + + } + + /* + * Block waiting for something to do. + */ + tsleep(&sldata->wthread, 0, "wait", 0); + } + + error = 0; for (;;) { int aligned_reclen;