/* $Id: queue.c,v 1.13 2004/03/23 06:42:53 jmuelmen Exp $ */ #include "scheduler.h" #include "parse.h" #include "queue.h" #include #include #include #include #include #include #include #include static queue_t _q; void queue_lock () { pthread_mutex_lock(&_q.mut); } void queue_unlock () { pthread_mutex_unlock(&_q.mut); } void queue_init () { _q.n_l = 0; _q.head = _q.tail = 0; pthread_mutex_init(&_q.mut, NULL); pthread_cond_init(&_q.not_full, NULL); pthread_cond_init(&_q.not_empty, NULL); } int enqueue (func_t f, value_t *argv, int argc, value_t *ret) { /* make a queue element */ queue_el_t *l = queue_el_create(f, argv, argc, ret); /* lock the return value of the function call so it dun get used before the function has been called */ /* if (err = pthread_mutex_lock(&ret->mut)) { */ /* printf("%s", strerror(err)); */ /* } */ /* if we're in an atomic section, the queue is already locked for us; otherwise we'll have to lock it ourselves */ if (!get_atomic()) queue_lock(); /* if the queue is full, we'll have to wait until it's not */ while (_q.n_l == N_Q_L) { #ifdef WINDOWS_IS_DOGSHIT if (usleep(WINDOWS_IS_DOGSHIT)) perror("Sleeping on queue space"); #endif pthread_cond_wait(&_q.not_full, &_q.mut); } /* stick the new element in the queue */ if (_q.n_l == N_Q_L) { fprintf(stderr, "Out of queue space!\n"); exit(1); } _q.q[_q.tail++] = l; _q.tail %= N_Q_L; _q.n_l++; /* printf("!"); */ /* fflush(stdout); */ if (!get_atomic()) queue_unlock(); /* signal that we're not empty */ pthread_cond_signal(&_q.not_empty); return 0; } void queue_process () { queue_lock(); /* don't do anything while the queue is empty */ while (!_q.n_l) { #ifdef WINDOWS_IS_DOGSHIT if (usleep(WINDOWS_IS_DOGSHIT)) perror("Sleeping on queue contents"); #endif if (pthread_cond_wait(&_q.not_empty, &_q.mut)) perror("Waiting for queue contents"); } /* printf("Queue contains %d elements\n", _q.n_l); */ /* do something with the head element */ queue_el_process(_q.q[_q.head]); /* then remove it from the queue */ _q.head++; _q.head %= N_Q_L; _q.n_l--; /* (queue_el_process will take care of freeing. */ /* printf("."); */ /* fflush(stdout); */ queue_unlock(); pthread_cond_signal(&_q.not_full); } void *queue (void *arg) { /* go through the queue periodically and run queue elements */ while (1) { /* long sleep_time; */ /* struct timeval tv; */ queue_process(); /* figure out how long it is to the next full 10 ms */ /* gettimeofday(&tv, NULL); */ /* sleep_time = QUEUE_INT - tv.tv_usec % QUEUE_INT; */ /* usleep(sleep_time); */ } return 0; }