--- src/sys/kern/lwkt_msgport.c 2008/11/01 12:30:23 1.51 +++ src/sys/kern/lwkt_msgport.c 2008/11/09 09:20:09 1.52 @@ -188,6 +188,7 @@ static int lwkt_thread_putport(lwkt_port static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags); static void *lwkt_thread_waitport(lwkt_port_t port, int flags); static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg); +static void lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg); static void *lwkt_spin_getport(lwkt_port_t port); static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg); @@ -207,6 +208,7 @@ static int lwkt_panic_putport(lwkt_port_ static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags); static void *lwkt_panic_waitport(lwkt_port_t port, int flags); static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg); +static void lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg); /* * Core port initialization (internal) @@ -218,15 +220,18 @@ _lwkt_initport(lwkt_port_t port, int (*pportfn)(lwkt_port_t, lwkt_msg_t), int (*wmsgfn)(lwkt_msg_t, int), void *(*wportfn)(lwkt_port_t, int), - void (*rportfn)(lwkt_port_t, lwkt_msg_t)) + void (*rportfn)(lwkt_port_t, lwkt_msg_t), + void (*dmsgfn)(lwkt_port_t, lwkt_msg_t)) { bzero(port, sizeof(*port)); TAILQ_INIT(&port->mp_msgq); + TAILQ_INIT(&port->mp_msgq_prio); port->mp_getport = gportfn; port->mp_putport = pportfn; port->mp_waitmsg = wmsgfn; port->mp_waitport = wportfn; port->mp_replyport = rportfn; + port->mp_dropmsg = dmsgfn; } /* @@ -261,7 +266,8 @@ lwkt_initport_thread(lwkt_port_t port, t lwkt_thread_putport, lwkt_thread_waitmsg, lwkt_thread_waitport, - lwkt_thread_replyport); + lwkt_thread_replyport, + lwkt_thread_dropmsg); port->mpu_td = td; } @@ -280,7 +286,8 @@ lwkt_initport_spin(lwkt_port_t port) lwkt_spin_putport, lwkt_spin_waitmsg, lwkt_spin_waitport, - lwkt_spin_replyport); + lwkt_spin_replyport, + lwkt_panic_dropmsg); spin_init(&port->mpu_spin); } @@ -299,7 +306,8 @@ lwkt_initport_serialize(lwkt_port_t port lwkt_serialize_putport, lwkt_serialize_waitmsg, lwkt_serialize_waitport, - lwkt_serialize_replyport); + lwkt_serialize_replyport, + lwkt_panic_dropmsg); port->mpu_serialize = slz; } @@ -315,7 +323,8 @@ lwkt_initport_replyonly_null(lwkt_port_t lwkt_panic_putport, lwkt_panic_waitmsg, lwkt_panic_waitport, - lwkt_null_replyport); + lwkt_null_replyport, + lwkt_panic_dropmsg); } /* @@ -328,7 +337,7 @@ lwkt_initport_replyonly(lwkt_port_t port { _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport, lwkt_panic_waitmsg, lwkt_panic_waitport, - rportfn); + rportfn, lwkt_panic_dropmsg); } void @@ -337,7 +346,7 @@ lwkt_initport_putonly(lwkt_port_t port, { _lwkt_initport(port, lwkt_panic_getport, pportfn, lwkt_panic_waitmsg, lwkt_panic_waitport, - lwkt_panic_replyport); + lwkt_panic_replyport, lwkt_panic_dropmsg); } void @@ -346,17 +355,23 @@ lwkt_initport_panic(lwkt_port_t port) _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport, lwkt_panic_waitmsg, lwkt_panic_waitport, - lwkt_panic_replyport); + lwkt_panic_replyport, lwkt_panic_dropmsg); } static __inline void _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg) { + lwkt_msg_queue *queue; + /* * normal case, remove and return the message. */ - TAILQ_REMOVE(&port->mp_msgq, msg, ms_node); + if (__predict_false(msg->ms_flags & MSGF_PRIORITY)) + queue = &port->mp_msgq_prio; + else + queue = &port->mp_msgq; + TAILQ_REMOVE(queue, msg, ms_node); msg->ms_flags &= ~MSGF_QUEUED; } @@ -364,16 +379,38 @@ static __inline void _lwkt_pushmsg(lwkt_port_t port, lwkt_msg_t msg) { + lwkt_msg_queue *queue; + msg->ms_flags |= MSGF_QUEUED; - TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node); + if (__predict_false(msg->ms_flags & MSGF_PRIORITY)) + queue = &port->mp_msgq_prio; + else + queue = &port->mp_msgq; + TAILQ_INSERT_TAIL(queue, msg, ms_node); +} + +static __inline +lwkt_msg_t +_lwkt_pollmsg(lwkt_port_t port) +{ + lwkt_msg_t msg; + + msg = TAILQ_FIRST(&port->mp_msgq_prio); + if (__predict_false(msg != NULL)) + return msg; + + /* + * Priority queue has no message, fallback to non-priority queue. + */ + return TAILQ_FIRST(&port->mp_msgq); } static __inline void _lwkt_enqueue_reply(lwkt_port_t port, lwkt_msg_t msg) { - TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node); - msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED; + _lwkt_pushmsg(port, msg); + msg->ms_flags |= MSGF_REPLY | MSGF_DONE; } /************************************************************************ @@ -501,6 +538,24 @@ lwkt_thread_replyport(lwkt_port_t port, } /* + * lwkt_thread_dropmsg() - Backend to lwkt_dropmsg() + * + * This function could _only_ be used when caller is in the same thread + * as the message's target port owner thread. + */ +static void +lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg) +{ + KASSERT(port->mpu_td == curthread, + ("message could only be dropped in the same thread " + "as the message target port thread\n")); + crit_enter_quick(port->mpu_td); + _lwkt_pullmsg(port, msg); + msg->ms_flags |= MSGF_DONE; + crit_exit_quick(port->mpu_td); +} + +/* * lwkt_thread_putport() - Backend to lwkt_beginmsg() * * Called with the target port as an argument but in the context of the @@ -583,7 +638,7 @@ lwkt_thread_getport(lwkt_port_t port) KKASSERT(port->mpu_td == curthread); crit_enter_quick(port->mpu_td); - if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL) + if ((msg = _lwkt_pollmsg(port)) != NULL) _lwkt_pullmsg(port, msg); crit_exit_quick(port->mpu_td); return(msg); @@ -600,6 +655,9 @@ static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags) { + KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0, + ("can't wait dropable message\n")); + if ((msg->ms_flags & MSGF_DONE) == 0) { /* * If the done bit was not set we have to block until it is. @@ -654,7 +712,7 @@ lwkt_thread_waitport(lwkt_port_t port, i KKASSERT(port->mpu_td == td); crit_enter_quick(td); - while ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL) { + while ((msg = _lwkt_pollmsg(port)) == NULL) { port->mp_flags |= MSGPORTF_WAITING; error = lwkt_sleep("waitport", flags); port->mp_flags &= ~MSGPORTF_WAITING; @@ -691,7 +749,7 @@ lwkt_spin_getport(lwkt_port_t port) lwkt_msg_t msg; spin_lock_wr(&port->mpu_spin); - if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL) + if ((msg = _lwkt_pollmsg(port)) != NULL) _lwkt_pullmsg(port, msg); spin_unlock_wr(&port->mpu_spin); return(msg); @@ -727,6 +785,9 @@ lwkt_spin_waitmsg(lwkt_msg_t msg, int fl int sentabort; int error; + KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0, + ("can't wait dropable message\n")); + if ((msg->ms_flags & MSGF_DONE) == 0) { port = msg->ms_reply_port; sentabort = 0; @@ -790,7 +851,7 @@ lwkt_spin_waitport(lwkt_port_t port, int int error; spin_lock_wr(&port->mpu_spin); - while ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL) { + while ((msg = _lwkt_pollmsg(port)) == NULL) { port->mp_flags |= MSGPORTF_WAITING; error = msleep(port, &port->mpu_spin, flags, "waitport", 0); /* see note at the top on the MSGPORTF_WAITING flag */ @@ -857,7 +918,7 @@ lwkt_serialize_getport(lwkt_port_t port) ASSERT_SERIALIZED(port->mpu_serialize); - if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL) + if ((msg = _lwkt_pollmsg(port)) != NULL) _lwkt_pullmsg(port, msg); return(msg); } @@ -886,6 +947,9 @@ lwkt_serialize_waitmsg(lwkt_msg_t msg, i int sentabort; int error; + KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0, + ("can't wait dropable message\n")); + if ((msg->ms_flags & MSGF_DONE) == 0) { port = msg->ms_reply_port; @@ -953,7 +1017,7 @@ lwkt_serialize_waitport(lwkt_port_t port ASSERT_SERIALIZED(port->mpu_serialize); - while ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL) { + while ((msg = _lwkt_pollmsg(port)) == NULL) { port->mp_flags |= MSGPORTF_WAITING; error = serialize_sleep(port, port->mpu_serialize, flags, "waitport", 0); @@ -1043,3 +1107,9 @@ lwkt_panic_replyport(lwkt_port_t port, l panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg); } +static +void +lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg) +{ + panic("lwkt_dropmsg() is illegal on port %p msg %p", port, msg); +}