--- src/sys/kern/lwkt_ipiq.c 2004/07/16 05:51:10 1.8 +++ src/sys/kern/lwkt_ipiq.c 2005/04/13 04:00:50 1.9 @@ -92,9 +92,12 @@ #endif #ifdef SMP -static __int64_t ipiq_count; -static __int64_t ipiq_fifofull; -static __int64_t ipiq_cscount; +static __int64_t ipiq_count; /* total calls to lwkt_send_ipiq*() */ +static __int64_t ipiq_fifofull; /* number of fifo full conditions detected */ +static __int64_t ipiq_avoided; /* interlock with target avoids cpu ipi */ +static __int64_t ipiq_passive; /* passive IPI messages */ +static __int64_t ipiq_cscount; /* number of cpu synchronizations */ +static int ipiq_optimized = 1; /* XXX temporary sysctl */ #endif #ifdef _KERNEL @@ -102,7 +105,10 @@ static __int64_t ipiq_cscount; #ifdef SMP SYSCTL_QUAD(_lwkt, OID_AUTO, ipiq_count, CTLFLAG_RW, &ipiq_count, 0, ""); SYSCTL_QUAD(_lwkt, OID_AUTO, ipiq_fifofull, CTLFLAG_RW, &ipiq_fifofull, 0, ""); +SYSCTL_QUAD(_lwkt, OID_AUTO, ipiq_avoided, CTLFLAG_RW, &ipiq_avoided, 0, ""); +SYSCTL_QUAD(_lwkt, OID_AUTO, ipiq_passive, CTLFLAG_RW, &ipiq_passive, 0, ""); SYSCTL_QUAD(_lwkt, OID_AUTO, ipiq_cscount, CTLFLAG_RW, &ipiq_cscount, 0, ""); +SYSCTL_INT(_lwkt, OID_AUTO, ipiq_optimized, CTLFLAG_RW, &ipiq_optimized, 0, ""); #endif #endif @@ -118,14 +124,18 @@ static void lwkt_cpusync_remote2(lwkt_cp * on the cpu<->cpu ipiq matrix. Each cpu owns a unique ipiq FIFO for every * possible target cpu. The FIFO can be written. * - * YYY If the FIFO fills up we have to enable interrupts and process the - * IPIQ while waiting for it to empty or we may deadlock with another cpu. - * Create a CPU_*() function to do this! + * If the FIFO fills up we have to enable interrupts to avoid an APIC + * deadlock and process pending IPIQs while waiting for it to empty. + * Otherwise we may soft-deadlock with another cpu whos FIFO is also full. * * We can safely bump gd_intr_nesting_level because our crit_exit() at the * end will take care of any pending interrupts. * - * Must be called from a critical section. + * The actual hardware IPI is avoided if the target cpu is already processing + * the queue from a prior IPI. It is possible to pipeline IPI messages + * very quickly between cpus due to the FIFO hysteresis. + * + * Need not be called from a critical section. */ int lwkt_send_ipiq(globaldata_t target, ipifunc_t func, void *arg) @@ -149,18 +159,86 @@ lwkt_send_ipiq(globaldata_t target, ipif ip = &gd->gd_ipiq[target->gd_cpuid]; /* - * We always drain before the FIFO becomes full so it should never - * become full. We need to leave enough entries to deal with - * reentrancy. + * Do not allow the FIFO to become full. Interrupts must be physically + * enabled while we liveloop to avoid deadlocking the APIC. + */ + if (ip->ip_windex - ip->ip_rindex > MAXCPUFIFO / 2) { + unsigned int eflags = read_eflags(); + + if (atomic_poll_acquire_int(&ip->ip_npoll) || ipiq_optimized == 0) + cpu_send_ipiq(target->gd_cpuid); + cpu_enable_intr(); + ++ipiq_fifofull; + while (ip->ip_windex - ip->ip_rindex > MAXCPUFIFO / 4) { + KKASSERT(ip->ip_windex - ip->ip_rindex != MAXCPUFIFO - 1); + lwkt_process_ipiq(); + } + write_eflags(eflags); + } + + /* + * Queue the new message */ - KKASSERT(ip->ip_windex - ip->ip_rindex != MAXCPUFIFO); windex = ip->ip_windex & MAXCPUFIFO_MASK; ip->ip_func[windex] = (ipifunc2_t)func; ip->ip_arg[windex] = arg; cpu_mb1(); ++ip->ip_windex; + --gd->gd_intr_nesting_level; + + /* + * signal the target cpu that there is work pending. + */ + if (atomic_poll_acquire_int(&ip->ip_npoll)) { + cpu_send_ipiq(target->gd_cpuid); + } else { + if (ipiq_optimized == 0) + cpu_send_ipiq(target->gd_cpuid); + ++ipiq_avoided; + } + crit_exit(); + return(ip->ip_windex); +} + +/* + * Similar to lwkt_send_ipiq() but this function does not actually initiate + * the IPI to the target cpu unless the FIFO has become too full, so it is + * very fast. + * + * This function is used for non-critical IPI messages, such as memory + * deallocations. The queue will typically be flushed by the target cpu at + * the next clock interrupt. + * + * Need not be called from a critical section. + */ +int +lwkt_send_ipiq_passive(globaldata_t target, ipifunc_t func, void *arg) +{ + lwkt_ipiq_t ip; + int windex; + struct globaldata *gd = mycpu; + + KKASSERT(target != gd); + crit_enter(); + ++gd->gd_intr_nesting_level; +#ifdef INVARIANTS + if (gd->gd_intr_nesting_level > 20) + panic("lwkt_send_ipiq: TOO HEAVILY NESTED!"); +#endif + KKASSERT(curthread->td_pri >= TDPRI_CRIT); + ++ipiq_count; + ++ipiq_passive; + ip = &gd->gd_ipiq[target->gd_cpuid]; + + /* + * Do not allow the FIFO to become full. Interrupts must be physically + * enabled while we liveloop to avoid deadlocking the APIC. + */ if (ip->ip_windex - ip->ip_rindex > MAXCPUFIFO / 2) { unsigned int eflags = read_eflags(); + + if (atomic_poll_acquire_int(&ip->ip_npoll) || ipiq_optimized == 0) + cpu_send_ipiq(target->gd_cpuid); cpu_enable_intr(); ++ipiq_fifofull; while (ip->ip_windex - ip->ip_rindex > MAXCPUFIFO / 4) { @@ -169,22 +247,33 @@ lwkt_send_ipiq(globaldata_t target, ipif } write_eflags(eflags); } + + /* + * Queue the new message + */ + windex = ip->ip_windex & MAXCPUFIFO_MASK; + ip->ip_func[windex] = (ipifunc2_t)func; + ip->ip_arg[windex] = arg; + cpu_mb1(); + ++ip->ip_windex; --gd->gd_intr_nesting_level; - cpu_send_ipiq(target->gd_cpuid); /* issues mem barrier if appropriate */ + + /* + * Do not signal the target cpu, it will pick up the IPI when it next + * polls (typically on the next tick). + */ crit_exit(); return(ip->ip_windex); } /* - * Send an IPI request passively, return 0 on success and ENOENT on failure. - * This routine does not recursive through lwkt_process_ipiq() nor does it - * block trying to queue the actual IPI. If we successfully queue the - * message but fail to queue the IPI, we still count it as a success. - * The occassional small race against a target cpu HLT is recovered at - * the next clock interrupt. + * Send an IPI request without blocking, return 0 on success, ENOENT on + * failure. The actual queueing of the hardware IPI may still force us + * to spin and process incoming IPIs but that will eventually go away + * when we've gotten rid of the other general IPIs. */ int -lwkt_send_ipiq_passive(globaldata_t target, ipifunc_t func, void *arg) +lwkt_send_ipiq_nowait(globaldata_t target, ipifunc_t func, void *arg) { lwkt_ipiq_t ip; int windex; @@ -198,22 +287,24 @@ lwkt_send_ipiq_passive(globaldata_t targ ++ipiq_count; ip = &gd->gd_ipiq[target->gd_cpuid]; - if (ip->ip_windex - ip->ip_rindex >= MAXCPUFIFO - 1) { + if (ip->ip_windex - ip->ip_rindex >= MAXCPUFIFO * 3 / 2) return(ENOENT); - } windex = ip->ip_windex & MAXCPUFIFO_MASK; ip->ip_func[windex] = (ipifunc2_t)func; ip->ip_arg[windex] = arg; cpu_mb1(); ++ip->ip_windex; + /* - * passive mode doesn't work yet :-( + * This isn't a passive IPI, we still have to signal the target cpu. */ -#if 1 - cpu_send_ipiq(target->gd_cpuid); -#else - cpu_send_ipiq_passive(target->gd_cpuid); -#endif + if (atomic_poll_acquire_int(&ip->ip_npoll)) { + cpu_send_ipiq(target->gd_cpuid); + } else { + if (ipiq_optimized == 0) + cpu_send_ipiq(target->gd_cpuid); + ++ipiq_avoided; + } return(0); } @@ -372,6 +463,14 @@ lwkt_process_ipiq1(lwkt_ipiq_t ip, struc /* YYY memory barrier */ ip->ip_xindex = ip->ip_rindex; } + + /* + * Return non-zero if there are more IPI messages pending on this + * ipiq. ip_npoll is left set as long as possible to reduce the + * number of IPIs queued by the originating cpu, but must be cleared + * *BEFORE* checking windex. + */ + atomic_poll_release_int(&ip->ip_npoll); return(wi != ip->ip_windex); }