Diff for /src/sys/kern/kern_mpipe.c between versions 1.2 and 1.3

version 1.2, 2004/01/20 05:04:06 version 1.3, 2004/03/29 14:06:31
Line 30 Line 30
 #include <sys/systm.h>  #include <sys/systm.h>
 #include <sys/kernel.h>  #include <sys/kernel.h>
 #include <sys/slaballoc.h>  #include <sys/slaballoc.h>
 #include <sys/mpipe.h>  
 #include <sys/mbuf.h>  #include <sys/mbuf.h>
 #include <sys/vmmeter.h>  #include <sys/vmmeter.h>
 #include <sys/lock.h>  #include <sys/lock.h>
 #include <sys/thread.h>  #include <sys/thread.h>
 #include <sys/globaldata.h>  #include <sys/globaldata.h>
   #include <sys/mpipe.h>
   
 #include <sys/thread2.h>  #include <sys/thread2.h>
   
#define arysize(ary)    (sizeof(ary)/sizeof((ary)[0]))void    mpipe_rebalance(malloc_pipe_t mpipe);
   
 typedef struct mpipe_buf {  typedef struct mpipe_buf {
         TAILQ_ENTRY(mpipe_buf)  entry;          TAILQ_ENTRY(mpipe_buf)  entry;
Line 52  typedef struct mpipe_buf { Line 52  typedef struct mpipe_buf {
  */   */
 void  void
 mpipe_init(malloc_pipe_t mpipe, malloc_type_t type, int bytes,  mpipe_init(malloc_pipe_t mpipe, malloc_type_t type, int bytes,
        int nnow, int nmax)           int global_nnom, int global_nmax, int cpu_nmax,
            int mpflags)
 {  {
       struct mpipe_buf *buf;
       int i;
       int mflags;
   
     if (bytes < sizeof(struct mpipe_buf))      if (bytes < sizeof(struct mpipe_buf))
         bytes = sizeof(struct mpipe_buf);          bytes = sizeof(struct mpipe_buf);
   
       if (global_nnom < cpu_nmax * ncpus)
           global_nnom = cpu_nmax * ncpus;
       if (global_nnom > global_nmax)
           global_nmax = global_nnom;
   
     bzero(mpipe, sizeof(struct malloc_pipe));      bzero(mpipe, sizeof(struct malloc_pipe));
     TAILQ_INIT(&mpipe->queue);  
     mpipe->type = type;      mpipe->type = type;
     mpipe->bytes = bytes;      mpipe->bytes = bytes;
    mpipe->max_count = nmax;    mpipe->max_count = global_nmax;
    if (nnow > 0) {    mpipe->cpu_max = cpu_nmax;
        void *buf;    mpipe->mpflags = mpflags;
        buf = malloc(bytes, mpipe->type, M_WAITOK);    mflags = M_WAITOK;
        KKASSERT(buf != NULL);    if ((mpflags & MPF_NO_ZERO) == 0)
        ++mpipe->total_count;        mflags |= M_ZERO;
        mpipe_free(mpipe, buf);
        while (--nnow > 0) {    for (i = 0; i <= SMP_MAXCPU; i++)
            buf = malloc(bytes, mpipe->type, M_SYSNOWAIT);        TAILQ_INIT(&mpipe->queue[i]);
            if (buf == NULL)
                break;    for (i = 1; i <= ncpus; i++) {      
         while (mpipe->queue_len[i] < mpipe->cpu_max) {
             buf = malloc(mpipe->bytes, mpipe->type, mflags);
             TAILQ_INSERT_TAIL(&mpipe->queue[i], buf, entry);
             ++mpipe->total_count;              ++mpipe->total_count;
            mpipe_free(mpipe, buf);            ++mpipe->queue_len[i];
             --global_nnom;
         }          }
     }      }
    if (mpipe->max_count < mpipe->total_count)
        mpipe->max_count = mpipe->total_count;    TAILQ_INIT(&mpipe->queue[0]);
     while (--global_nnom >= 0) {
         buf = malloc(mpipe->bytes, mpipe->type, mflags);
         TAILQ_INSERT_TAIL(&mpipe->queue[0], buf, entry);
         ++mpipe->total_count;
         ++mpipe->queue_len[0];
     }
 }  }
   
 void  void
 mpipe_done(malloc_pipe_t mpipe)  mpipe_done(malloc_pipe_t mpipe)
 {  {
    struct mpipe_buf *buf;    mpipe_buf_t buf;
     lwkt_tokref ilock;
     int i;
   
    KKASSERT(mpipe->free_count == mpipe->total_count);    lwkt_gettoken(&ilock, &mpipe->mpipe_token);
    while (mpipe->free_count) {    KKASSERT(mpipe->queue_len[0] == mpipe->total_count);
        buf = TAILQ_FIRST(&mpipe->queue);    for (i = 0; i < SMP_MAXCPU; i++) {
        KKASSERT(buf != NULL);        while(! TAILQ_EMPTY(&mpipe->queue[i])) {
        TAILQ_REMOVE(&mpipe->queue, buf, entry);            buf = TAILQ_FIRST(&mpipe->queue[i]);
        --mpipe->free_count;            KKASSERT(buf != NULL);
        --mpipe->total_count;            TAILQ_REMOVE(&mpipe->queue[i], buf, entry);
        free(buf, mpipe->type);            --mpipe->queue_len[i];
             --mpipe->total_count;
             free(buf, mpipe->type);
         }
         KKASSERT(mpipe->queue_len[i] == 0);
     }      }
    KKASSERT(TAILQ_EMPTY(&mpipe->queue));    KKASSERT(mpipe->total_count == 0);
 }  }
   
 /*  /*
 * Allocate an entry.  flags can be M_RNOWAIT which tells us not to block. * Allocation from MPIPE that can wait. Only drain the global queue.
 * Unlike a normal malloc, if we block in mpipe_alloc() no deadlock will occur 
 * because it will unblock the moment an existing in-use buffer is freed. 
  */   */
 void *  void *
mpipe_alloc(malloc_pipe_t mpipe, int flags)mpipe_alloc_waitok(malloc_pipe_t mpipe)
 {  {
    mpipe_buf_t buf;    mpipe_buf_t buf = NULL;
     lwkt_tokref ilock;
     int mflags = M_WAITOK;
 
     lwkt_gettoken(&ilock, &mpipe->mpipe_token);
     for (;;) {
         crit_enter();
 
         if (mpipe->queue_len[0] > 0) {
             buf = TAILQ_FIRST(&mpipe->queue[0]);
             KKASSERT(buf != NULL);
             TAILQ_REMOVE(&mpipe->queue[0], buf, entry);
             --mpipe->queue_len[0];
             if ((mpipe->mpflags & MPF_NO_ZERO) == 0)
                 bzero(buf, mpipe->bytes);
             crit_exit();
             lwkt_reltoken(&ilock);
             mpipe_rebalance(mpipe);
             return(buf);
         }
   
     crit_enter();  
     while (mpipe->free_count == 0) {  
         if (mpipe->total_count < mpipe->max_count) {          if (mpipe->total_count < mpipe->max_count) {
            ++mpipe->total_count;            if ((mpipe->mpflags & MPF_NO_ZERO) == 0)
            if ((buf = malloc(mpipe->bytes, mpipe->type, flags)) != NULL) {                mflags |= M_ZERO;
                crit_exit();
                return(buf);            mpipe->total_count++;
            } 
            --mpipe->total_count; 
        } else if (flags & M_RNOWAIT) { 
             crit_exit();              crit_exit();
            return(NULL);            lwkt_reltoken(&ilock);
        } else {            buf = malloc(mpipe->bytes, mpipe->type, mflags);
            mpipe->pending = 1;            KKASSERT(buf != NULL);
            tsleep(mpipe, 0, "mpipe", 0); 
         }          }
           mpipe->pending = 1;
           tsleep(mpipe, 0, "mpipe", 0);
       }
   }
   
   /*
    * Allocation from MPIPE that can't wait. Try to drain the
    * local cpu queue first, if that is empty, drain the global
    * CPU
    */
   
   void *
   mpipe_alloc_nowait(malloc_pipe_t mpipe)
   {
       globaldata_t gd = mycpu;
       mpipe_buf_t buf = NULL;
       lwkt_tokref ilock;
       int my_queue = gd->gd_cpuid + 1;
       int mflags = M_NOWAIT;
   
       /* First check the local CPU queue to avoid token acquisation. */
       crit_enter();
       if (mpipe->queue_len[my_queue] > 0) {
           buf = TAILQ_FIRST(&mpipe->queue[my_queue]);
           KKASSERT(buf != NULL);
           TAILQ_REMOVE(&mpipe->queue[my_queue], buf, entry);
           --mpipe->queue_len[my_queue];
           if ((mpipe->mpflags & MPF_NO_ZERO) == 0)
               bzero(buf, mpipe->bytes);
           mpipe_rebalance(mpipe);
           crit_exit();
           return(buf);
       }
       /* We have to acquire the token, unblock interrupts and get it. */
       crit_exit();
   
       lwkt_gettoken(&ilock, &mpipe->mpipe_token);
       crit_enter();
   
       if (mpipe->queue_len[0] > 0) {
           buf = TAILQ_FIRST(&mpipe->queue[0]);
           KKASSERT(buf != NULL);
           TAILQ_REMOVE(&mpipe->queue[0], buf, entry);
           --mpipe->queue_len[0];
           if ((mpipe->mpflags & MPF_NO_ZERO) == 0)
               bzero(buf, mpipe->bytes);
           crit_exit();
           lwkt_reltoken(&ilock);
           return(buf);
     }      }
    buf = TAILQ_FIRST(&mpipe->queue);
    KKASSERT(buf != NULL);    /* Recheck the local CPU queue again in case an interrupt freed something*/
    TAILQ_REMOVE(&mpipe->queue, buf, entry);    if (mpipe->queue_len[my_queue] > 0) {
    --mpipe->free_count;        buf = TAILQ_FIRST(&mpipe->queue[my_queue]);
         KKASSERT(buf != NULL);
         TAILQ_REMOVE(&mpipe->queue[my_queue], buf, entry);
         --mpipe->queue_len[my_queue];
         if ((mpipe->mpflags & MPF_NO_ZERO) == 0)
             bzero(buf, mpipe->bytes);
         crit_exit();
         lwkt_reltoken(&ilock);
         return(buf);
     }
 
     if (mpipe->total_count < mpipe->max_count) {
         if ((mpipe->mpflags & MPF_NO_ZERO) == 0)
             mflags |= M_ZERO;
 
         buf = malloc(mpipe->bytes, mpipe->type, mflags);
         if (buf)
             mpipe->total_count++;
     }
 
     crit_exit();      crit_exit();
    if (flags & M_ZERO)    lwkt_reltoken(&ilock);
        bzero(buf, mpipe->bytes); 
     return(buf);      return(buf);
 }  }
   
Line 140  mpipe_alloc(malloc_pipe_t mpipe, int fla Line 244  mpipe_alloc(malloc_pipe_t mpipe, int fla
 void  void
 mpipe_free(malloc_pipe_t mpipe, void *vbuf)  mpipe_free(malloc_pipe_t mpipe, void *vbuf)
 {  {
    struct mpipe_buf *buf;    globaldata_t gd = mycpu;
     mpipe_buf_t buf = NULL;
     lwkt_tokref ilock;
     int my_queue = gd->gd_cpuid + 1;
   
    if ((buf = vbuf) != NULL) {    if (vbuf == NULL)
        crit_enter();        return;
        if (mpipe->total_count > mpipe->max_count) {
            --mpipe->total_count;    lwkt_gettoken(&ilock, &mpipe->mpipe_token);
            crit_exit();    crit_enter();
            free(buf, mpipe->type);
        } else {    /* first try to refill the current CPU queue */
            TAILQ_INSERT_TAIL(&mpipe->queue, buf, entry);    if (mpipe->queue_len[my_queue] < mpipe->cpu_max) {
            ++mpipe->free_count;        TAILQ_INSERT_TAIL(&mpipe->queue[my_queue], buf, entry);
            crit_exit();        ++mpipe->queue_len[my_queue];
            if (mpipe->free_count >= (mpipe->total_count >> 2) + 1) {        crit_exit();
                if (mpipe->trigger) {        if (mpipe->pending) {
                    mpipe->trigger(mpipe->trigger_data);            mpipe->pending = 0;
                }            wakeup(mpipe);
                if (mpipe->pending) { 
                    mpipe->pending = 0; 
                    wakeup(mpipe); 
                } 
            } 
         }          }
           lwkt_reltoken(&ilock);
           mpipe_rebalance(mpipe);
           return;
     }      }
   
       if (mpipe->total_count < mpipe->max_count) {
           TAILQ_INSERT_TAIL(&mpipe->queue[0], buf, entry);
           ++mpipe->queue_len[0];
           crit_exit();
           if (mpipe->pending) {
               mpipe->pending = 0;
               wakeup(mpipe);
           }
           lwkt_reltoken(&ilock);
           mpipe_rebalance(mpipe);
           return;
       }
   
       --mpipe->total_count;
       crit_exit();
       lwkt_reltoken(&ilock);
       free(buf, mpipe->type);    
 }  }
   
   /*
    * Rebalance local CPU queue by trying to size it to max_cpu entries
    */
   
   void
   mpipe_rebalance(malloc_pipe_t mpipe)
   {
       globaldata_t gd = mycpu;
       mpipe_buf_t buf;
       lwkt_tokref ilock;
       int my_queue = gd->gd_cpuid + 1;
   
       lwkt_gettoken(&ilock, &mpipe->mpipe_token);
       crit_enter();
       while (mpipe->queue_len[my_queue] < mpipe->cpu_max &&
              mpipe->queue_len[0] > 0) {
           buf = TAILQ_FIRST(&mpipe->queue[0]);
           TAILQ_REMOVE(&mpipe->queue[0], buf, entry);
           TAILQ_INSERT_TAIL(&mpipe->queue[my_queue], buf, entry);
           ++mpipe->queue_len[my_queue];
           --mpipe->queue_len[0];
       }
       while (mpipe->queue_len[my_queue] > mpipe->cpu_max) {
           buf = TAILQ_FIRST(&mpipe->queue[my_queue]);
           TAILQ_REMOVE(&mpipe->queue[my_queue], buf, entry);
           TAILQ_INSERT_TAIL(&mpipe->queue[0], buf, entry);
           ++mpipe->queue_len[0];
           --mpipe->queue_len[my_queue];
       }
       crit_exit();
       lwkt_reltoken(&ilock);
   }

Removed from v.1.2  
changed lines
  Added in v.1.3