From 4d50d691e3fa80c9b9aef5aa0498a0a92654b324 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 23 Sep 2015 16:09:16 +0200 Subject: [PATCH] bio.c: new API bioWaitStepOfType(). --- src/bio.c | 35 +++++++++++++++++++++++++++++++---- src/bio.h | 2 +- 2 files changed, 32 insertions(+), 5 deletions(-) diff --git a/src/bio.c b/src/bio.c index a4962e00..135b8f3b 100644 --- a/src/bio.c +++ b/src/bio.c @@ -63,7 +63,8 @@ static pthread_t bio_threads[BIO_NUM_OPS]; static pthread_mutex_t bio_mutex[BIO_NUM_OPS]; -static pthread_cond_t bio_condvar[BIO_NUM_OPS]; +static pthread_cond_t bio_newjob_cond[BIO_NUM_OPS]; +static pthread_cond_t bio_step_cond[BIO_NUM_OPS]; static list *bio_jobs[BIO_NUM_OPS]; /* The following array is used to hold the number of pending jobs for every * OP type. This allows us to export the bioPendingJobsOfType() API that is @@ -98,7 +99,8 @@ void bioInit(void) { /* Initialization of state vars and objects */ for (j = 0; j < BIO_NUM_OPS; j++) { pthread_mutex_init(&bio_mutex[j],NULL); - pthread_cond_init(&bio_condvar[j],NULL); + pthread_cond_init(&bio_newjob_cond[j],NULL); + pthread_cond_init(&bio_step_cond[j],NULL); bio_jobs[j] = listCreate(); bio_pending[j] = 0; } @@ -133,7 +135,7 @@ void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) { pthread_mutex_lock(&bio_mutex[type]); listAddNodeTail(bio_jobs[type],job); bio_pending[type]++; - pthread_cond_signal(&bio_condvar[type]); + pthread_cond_signal(&bio_newjob_cond[type]); pthread_mutex_unlock(&bio_mutex[type]); } @@ -168,7 +170,7 @@ void *bioProcessBackgroundJobs(void *arg) { /* The loop always starts with the lock hold. */ if (listLength(bio_jobs[type]) == 0) { - pthread_cond_wait(&bio_condvar[type],&bio_mutex[type]); + pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]); continue; } /* Pop the job from the queue. */ @@ -188,6 +190,9 @@ void *bioProcessBackgroundJobs(void *arg) { } zfree(job); + /* Unblock threads blocked on bioWaitStepOfType() if any. */ + pthread_cond_broadcast(&bio_step_cond[type]); + /* Lock again before reiterating the loop, if there are no longer * jobs to process we'll block again in pthread_cond_wait(). */ pthread_mutex_lock(&bio_mutex[type]); @@ -205,6 +210,28 @@ unsigned long long bioPendingJobsOfType(int type) { return val; } +/* If there are pending jobs for the specified type, the function blocks + * and waits that the next job was processed. Otherwise the function + * does not block and returns ASAP. + * + * The function returns the number of jobs still to process of the + * requested type. + * + * This function is useful when from another thread, we want to wait + * a bio.c thread to do more work in a blocking way. + */ +unsigned long long bioWaitStepOfType(int type) { + unsigned long long val; + pthread_mutex_lock(&bio_mutex[type]); + val = bio_pending[type]; + if (val != 0) { + pthread_cond_wait(&bio_step_cond[type],&bio_mutex[type]); + val = bio_pending[type]; + } + pthread_mutex_unlock(&bio_mutex[type]); + return val; +} + /* Kill the running bio threads in an unclean way. This function should be * used only when it's critical to stop the threads for some reason. * Currently Redis does this only on crash (for instance on SIGSEGV) in order diff --git a/src/bio.h b/src/bio.h index dc284e77..eb3a2f5d 100644 --- a/src/bio.h +++ b/src/bio.h @@ -31,7 +31,7 @@ void bioInit(void); void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3); unsigned long long bioPendingJobsOfType(int type); -void bioWaitPendingJobsLE(int type, unsigned long long num); +unsigned long long bioWaitStepOfType(int type); time_t bioOlderJobOfType(int type); void bioKillThreads(void);