From 0f737b6ba6559901235d6e9ec36967d354e0bb0e Mon Sep 17 00:00:00 2001 From: Marius Gerbershagen Date: Fri, 21 Aug 2020 21:03:16 +0200 Subject: [PATCH 01/13] multithreading: implement mutexes and condition variables using OS primitives Replace slow homegrown mutex implementation by standard OS functions. We try our best to be interrupt safe, however a completely safe implementation is impossible (unless one completely removes the ability to interrupt a thread waiting on a mutex). There is always a window after the OS specific function has returned, but before we can set the owner, in which interrupts will see an inconsistent state of the mutex with regards to owner and count. Condition variables are now based on OS functions as well. Timed waiting on condition variables has also been implemented. --- src/c/Makefile.in | 5 +- src/c/alloc_2.d | 36 ++- src/c/error.d | 19 ++ src/c/threads/condition_variable.d | 159 +++++++--- src/c/threads/mutex.d | 219 ++++++------- src/c/threads/queue.d | 2 +- src/c/unixint.d | 2 - src/h/external.h | 2 + src/h/object.h | 21 +- src/h/threads.h | 472 +++++++++++++++++++++++++++++ src/lsp/mp.lsp | 35 +-- src/util/emacs.el | 1 + 12 files changed, 774 insertions(+), 199 deletions(-) create mode 100644 src/h/threads.h diff --git a/src/c/Makefile.in b/src/c/Makefile.in index ef6889afe..e0f20dc17 100644 --- a/src/c/Makefile.in +++ b/src/c/Makefile.in @@ -46,8 +46,9 @@ HFILES = $(HDIR)/config.h $(HDIR)/ecl.h $(HDIR)/ecl-cmp.h \ $(HDIR)/number.h $(HDIR)/page.h $(HDIR)/bytecodes.h \ $(HDIR)/cache.h $(HDIR)/config-internal.h $(HDIR)/ecl_atomics.h \ $(HDIR)/ecl-inl.h $(HDIR)/internal.h $(HDIR)/stack-resize.h \ - $(HDIR)/impl/math_dispatch2.h $(HDIR)/impl/math_dispatch.h \ - $(HDIR)/impl/math_fenv.h $(HDIR)/impl/math_fenv_msvc.h + $(HDIR)/threads.h $(HDIR)/impl/math_dispatch2.h \ + $(HDIR)/impl/math_dispatch.h $(HDIR)/impl/math_fenv.h \ + $(HDIR)/impl/math_fenv_msvc.h CLOS_OBJS = clos/cache.o clos/accessor.o clos/instance.o clos/gfun.o OBJS = main.o symbol.o package.o cons.o list.o apply.o eval.o \ interpreter.o compiler.o disassembler.o $(CLOS_OBJS) \ diff --git a/src/c/alloc_2.d b/src/c/alloc_2.d index f164482f6..8957586b3 100644 --- a/src/c/alloc_2.d +++ b/src/c/alloc_2.d @@ -446,15 +446,10 @@ cl_object_mark_proc(void *addr, struct GC_ms_entry *msp, struct GC_ms_entry *msl ecl_mark_env(o->process.env); break; case t_lock: - MAYBE_MARK(o->lock.queue_list); - MAYBE_MARK(o->lock.queue_spinlock); MAYBE_MARK(o->lock.owner); MAYBE_MARK(o->lock.name); break; case t_condition_variable: - MAYBE_MARK(o->condition_variable.queue_spinlock); - MAYBE_MARK(o->condition_variable.queue_list); - MAYBE_MARK(o->condition_variable.lock); break; case t_rwlock: MAYBE_MARK(o->rwlock.name); @@ -1020,9 +1015,7 @@ init_alloc(void) to_bitmap(&o, &(o.process.queue_record)); type_info[t_lock].descriptor = to_bitmap(&o, &(o.lock.name)) | - to_bitmap(&o, &(o.lock.owner)) | - to_bitmap(&o, &(o.lock.queue_spinlock)) | - to_bitmap(&o, &(o.lock.queue_list)); + to_bitmap(&o, &(o.lock.owner)); # ifdef ECL_RWLOCK type_info[t_rwlock].descriptor = to_bitmap(&o, &(o.rwlock.name)); @@ -1031,10 +1024,7 @@ init_alloc(void) to_bitmap(&o, &(o.rwlock.name)) | to_bitmap(&o, &(o.rwlock.mutex)); # endif - type_info[t_condition_variable].descriptor = - to_bitmap(&o, &(o.condition_variable.lock)) | - to_bitmap(&o, &(o.condition_variable.queue_list)) | - to_bitmap(&o, &(o.condition_variable.queue_spinlock)); + type_info[t_condition_variable].descriptor = 0; type_info[t_semaphore].descriptor = to_bitmap(&o, &(o.semaphore.name)) | to_bitmap(&o, &(o.semaphore.queue_list)) | @@ -1123,6 +1113,20 @@ standard_finalizer(cl_object o) GC_unregister_disappearing_link((void**)&(o->weak.value)); break; #ifdef ECL_THREADS + case t_lock: { + const cl_env_ptr the_env = ecl_process_env(); + ecl_disable_interrupts_env(the_env); + ecl_mutex_destroy(&o->lock.mutex); + ecl_enable_interrupts_env(the_env); + break; + } + case t_condition_variable: { + const cl_env_ptr the_env = ecl_process_env(); + ecl_disable_interrupts_env(the_env); + ecl_cond_var_destroy(&o->condition_variable.cv); + ecl_enable_interrupts_env(the_env); + break; + } # ifdef ECL_RWLOCK case t_rwlock: { const cl_env_ptr the_env = ecl_process_env(); @@ -1169,9 +1173,13 @@ register_finalizer(cl_object o, void *finalized_object, case t_codeblock: #endif case t_stream: -#if defined(ECL_THREADS) && defined(ECL_RWLOCK) +#if defined(ECL_THREADS) + case t_lock: + case t_condition_variable: +# if defined(ECL_RWLOCK) case t_rwlock: -#endif +# endif +#endif /* ECL_THREADS */ /* Don't delete the standard finalizer. */ if (fn == NULL) { fn = (GC_finalization_proc)wrapped_finalizer; diff --git a/src/c/error.d b/src/c/error.d index bf97b8fca..6ae680e22 100644 --- a/src/c/error.d +++ b/src/c/error.d @@ -463,6 +463,25 @@ FEinvalid_function_name(cl_object fname) @':datum', fname); } +#ifdef ECL_THREADS +void +FEerror_not_owned(cl_object lock) +{ + FEerror("Attempted to give up lock ~S that is not owned by process ~S", + 2, lock, mp_current_process()); +} + +void +FEunknown_lock_error(cl_object lock) +{ +#ifdef ECL_WINDOWS_THREADS + FEwin32_error("When acting on lock ~A, got an unexpected error.", 1, lock); +#else + FEerror("When acting on lock ~A, got an unexpected error.", 1, lock); +#endif +} +#endif + /* bootstrap version */ static int recursive_error = 0; diff --git a/src/c/threads/condition_variable.d b/src/c/threads/condition_variable.d index d1dc0cda3..b8aab505f 100644 --- a/src/c/threads/condition_variable.d +++ b/src/c/threads/condition_variable.d @@ -2,16 +2,27 @@ /* vim: set filetype=c tabstop=2 shiftwidth=2 expandtab: */ /* - * condition_variable.d - condition variables for native threads + * condition_variable.d - condition variables * - * Copyright (c) 2003 Juan Jose Garcia Ripoll + * Copyright (c) 2003, Juan Jose Garcia Ripoll + * Copyright (c) 2020, Marius Gerbershagen * * See file 'LICENSE' for the copyright details. * */ +#ifndef __sun__ /* See unixinit.d for this */ +#define _XOPEN_SOURCE 600 /* For pthread mutex attributes */ +#endif +#define ECL_INCLUDE_MATH_H #include +#ifdef ECL_WINDOWS_THREADS +# include +#else +# include +#endif #include +#include /*---------------------------------------------------------------------- * CONDITION VARIABLES @@ -20,81 +31,133 @@ cl_object mp_make_condition_variable(void) { + cl_env_ptr env = ecl_process_env(); cl_object output = ecl_alloc_object(t_condition_variable); - output->condition_variable.queue_list = ECL_NIL; - output->condition_variable.queue_spinlock = ECL_NIL; - output->condition_variable.lock = ECL_NIL; - @(return output); -} - -static cl_object -condition_variable_wait(cl_env_ptr env, cl_object cv) -{ - cl_object lock = cv->condition_variable.lock; - cl_object own_process = env->own_process; - /* We have entered the queue and still own the mutex? */ - print_lock("cv lock %p is %p =? %p", cv, lock, lock->lock.owner, own_process); - if (lock->lock.owner == own_process) { - mp_giveup_lock(lock); - } - /* We always return when we have been explicitly awaken */ - return own_process->process.woken_up; + ecl_disable_interrupts_env(env); + ecl_cond_var_init(&output->condition_variable.cv); + ecl_set_finalizer_unprotected(output, ECL_T); + ecl_enable_interrupts_env(env); + @(return output) } cl_object mp_condition_variable_wait(cl_object cv, cl_object lock) { cl_env_ptr env = ecl_process_env(); - cl_object own_process = env->own_process; - unlikely_if (ecl_t_of(cv) != t_condition_variable) { + int rc; + cl_fixnum counter; + cl_object owner; + if (ecl_unlikely(ecl_t_of(cv) != t_condition_variable)) { FEwrong_type_nth_arg(@[mp::condition-variable-wait], 1, cv, @[mp::condition-variable]); } - unlikely_if (ecl_t_of(lock) != t_lock) { + if (ecl_unlikely(ecl_t_of(lock) != t_lock)) { FEwrong_type_nth_arg(@[mp::condition-variable-wait], 2, lock, @[mp::lock]); } - unlikely_if (cv->condition_variable.lock != ECL_NIL && - cv->condition_variable.lock != lock) { - FEerror("Attempt to associate lock ~A~%with condition variable ~A," - "~%which is already associated to lock ~A", 2, lock, - cv, cv->condition_variable.lock); - } - unlikely_if (lock->lock.owner != own_process) { - FEerror("Attempt to wait on a condition variable using lock~%~S" - "~%which is not owned by process~%~S", 2, lock, own_process); - } - unlikely_if (lock->lock.recursive) { + if (ecl_unlikely(lock->lock.recursive)) { FEerror("mp:condition-variable-wait can not be used with recursive" " locks:~%~S", 1, lock); } - print_lock("waiting cv %p", cv, cv); - cv->condition_variable.lock = lock; - ecl_wait_on(env, condition_variable_wait, cv); - mp_get_lock_wait(lock); - @(return ECL_T); + if (ecl_unlikely(lock->lock.owner != env->own_process)) { + FEerror("Attempt to wait on a condition variable using lock~%~S" + "~%which is not owned by process~%~S", 2, lock, env->own_process); + } + ecl_disable_interrupts_env(env); + counter = lock->lock.counter; + owner = lock->lock.owner; + lock->lock.counter = 0; + lock->lock.owner = ECL_NIL; + ecl_enable_interrupts_env(env); + rc = ecl_cond_var_wait(&cv->condition_variable.cv, &lock->lock.mutex); + ecl_disable_interrupts_env(env); + lock->lock.owner = owner; + lock->lock.counter = counter; + ecl_enable_interrupts_env(env); + if (ecl_unlikely(rc != ECL_MUTEX_SUCCESS)) { + if (rc == ECL_MUTEX_NOT_OWNED) { + FEerror_not_owned(lock); + } else { + FEunknown_lock_error(lock); + } + } + @(return ECL_T) } cl_object mp_condition_variable_timedwait(cl_object cv, cl_object lock, cl_object seconds) { - FEerror("Timed condition variables are not supported.", 0); + cl_env_ptr env = ecl_process_env(); + int rc; + cl_fixnum counter; + cl_object owner; + + if (ecl_unlikely(ecl_t_of(cv) != t_condition_variable)) { + FEwrong_type_nth_arg(@[mp::condition-variable-timedwait], + 1, cv, @[mp::condition-variable]); + } + if (ecl_unlikely(ecl_t_of(lock) != t_lock)) { + FEwrong_type_nth_arg(@[mp::condition-variable-timedwait], + 2, lock, @[mp::lock]); + } + if (ecl_unlikely(lock->lock.recursive)) { + FEerror("mp:condition-variable-timedwait can not be used with recursive" + " locks:~%~S", 1, lock); + } + if (ecl_unlikely(lock->lock.owner != env->own_process)) { + FEerror("Attempt to wait on a condition variable using lock~%~S" + "~%which is not owned by process~%~S", 2, lock, env->own_process); + } + /* INV: ecl_minusp() makes sure `seconds' is real */ + if (ecl_unlikely(ecl_minusp(seconds))) { + cl_error(9, @'simple-type-error', @':format-control', + make_constant_base_string("Not a non-negative number ~S"), + @':format-arguments', cl_list(1, seconds), + @':expected-type', @'real', @':datum', seconds); + } + ecl_disable_interrupts_env(env); + counter = lock->lock.counter; + owner = lock->lock.owner; + lock->lock.counter = 0; + lock->lock.owner = ECL_NIL; + ecl_enable_interrupts_env(env); + + rc = ecl_cond_var_timedwait(&cv->condition_variable.cv, &lock->lock.mutex, ecl_to_double(seconds)); + + ecl_disable_interrupts_env(env); + lock->lock.owner = owner; + lock->lock.counter = counter; + ecl_enable_interrupts_env(env); + + if (ecl_unlikely(rc != ECL_MUTEX_SUCCESS && rc != ECL_MUTEX_TIMEOUT)) { + if (rc == ECL_MUTEX_NOT_OWNED) { + FEerror_not_owned(lock); + } else { + FEunknown_lock_error(lock); + } + } + @(return (rc == ECL_MUTEX_SUCCESS ? ECL_T : ECL_NIL)) } cl_object mp_condition_variable_signal(cl_object cv) { - print_lock("signal cv %p", cv, cv); - ecl_wakeup_waiters(ecl_process_env(), cv, - ECL_WAKEUP_RESET_FLAG | ECL_WAKEUP_ONE | ECL_WAKEUP_DELETE); - @(return ECL_T); + if (ecl_unlikely(ecl_t_of(cv) != t_condition_variable)) { + FEwrong_type_only_arg(@[mp::condition-variable-signal], + cv, @[mp::condition-variable]); + } + ecl_cond_var_signal(&cv->condition_variable.cv); + @(return ECL_T) } cl_object mp_condition_variable_broadcast(cl_object cv) { - print_lock("broadcast cv %p", cv); - ecl_wakeup_waiters(ecl_process_env(), cv, - ECL_WAKEUP_RESET_FLAG | ECL_WAKEUP_ALL | ECL_WAKEUP_DELETE); - @(return ECL_T); + if (ecl_unlikely(ecl_t_of(cv) != t_condition_variable)) { + FEwrong_type_only_arg(@[mp::condition-variable-broadcast], + cv, @[mp::condition-variable]); + } + ecl_cond_var_broadcast(&cv->condition_variable.cv); + @(return ECL_T) } + diff --git a/src/c/threads/mutex.d b/src/c/threads/mutex.d index 62bde1b7b..4a28a827c 100755 --- a/src/c/threads/mutex.d +++ b/src/c/threads/mutex.d @@ -2,27 +2,50 @@ /* vim: set filetype=c tabstop=2 shiftwidth=2 expandtab: */ /* - * mutex.d - mutually exclusive locks + * mutex.d - mutually exclusive locks. * - * Copyright (c) 2003 Juan Jose Garcia Ripoll + * Copyright (c) 2003, Juan Jose Garcia Ripoll + * Copyright (c) 2020, Marius Gerbershagen * * See file 'LICENSE' for the copyright details. * */ +#ifndef __sun__ /* See unixinit.d for this */ +#define _XOPEN_SOURCE 600 /* For pthread mutex attributes */ +#endif +#include #include #include - /*---------------------------------------------------------------------- * LOCKS or MUTEX */ -static void -FEerror_not_a_lock(cl_object lock) -{ - FEwrong_type_argument(@'mp::lock', lock); -} + +/* THREAD SAFETY + * + * mp:lock-owner, mp:holding-lock-p and mp:lock-count will return + * wrong values in the following scenarios: + * 1. Another thread is in the process of locking/unlocking the mutex. + * This in unavoidable since count and owner cannot both be stored + * atomically. + * 2. A call to mp:get-lock-wait is interrupted after the mutex has + * been locked but before count and owner have been set. In this + * case, the mutex will appear to be unlocked even though it is + * already locked. If the interrupting code performs a nonlocal + * jump up the call stack, this will persist even after the + * interrupt. However, the mutex can still be unlocked by + * mp:giveup-lock since the check whether the mutex is locked or + * not is done by OS level functions. + * 3. A call to mp:condition-variable-(timed)wait is interrupted after + * the mutex has been unlocked/relocked but after/before count and + * owner have been set. The consequences are equivalent to scenario 2. + * In summary, owner can be nil and count 0 even though the mutex is + * locked but the converse (owner != nil and count != 0 when the mutex + * is unlocked) cannot happen. + * + */ static void FEerror_not_a_recursive_lock(cl_object lock) @@ -31,76 +54,73 @@ FEerror_not_a_recursive_lock(cl_object lock) 2, lock, lock->lock.owner); } -static void -FEerror_not_owned(cl_object lock) -{ - FEerror("Attempted to give up lock ~S that is not owned by process ~S", - 2, lock, mp_current_process()); -} - cl_object ecl_make_lock(cl_object name, bool recursive) { + cl_env_ptr env = ecl_process_env(); cl_object output = ecl_alloc_object(t_lock); output->lock.name = name; output->lock.owner = ECL_NIL; output->lock.counter = 0; output->lock.recursive = recursive; - output->lock.queue_list = ECL_NIL; - output->lock.queue_spinlock = ECL_NIL; + ecl_disable_interrupts_env(env); + ecl_mutex_init(&output->lock.mutex, recursive); + ecl_set_finalizer_unprotected(output, ECL_T); + ecl_enable_interrupts_env(env); return output; } @(defun mp::make-lock (&key name (recursive ECL_NIL)) @ - @(return ecl_make_lock(name, !Null(recursive))); + @(return ecl_make_lock(name, !Null(recursive))) @) cl_object mp_recursive_lock_p(cl_object lock) { cl_env_ptr env = ecl_process_env(); - unlikely_if (ecl_t_of(lock) != t_lock) - FEerror_not_a_lock(lock); + if (ecl_unlikely(ecl_t_of(lock) != t_lock)) { + FEwrong_type_only_arg(@[mp::recursive-lock-p], lock, @[mp::lock]); + } ecl_return1(env, lock->lock.recursive? ECL_T : ECL_NIL); } cl_object -mp_holding_lock_p(cl_object lock) +mp_lock_name(cl_object lock) { cl_env_ptr env = ecl_process_env(); - cl_object own_process = env->own_process; - unlikely_if (ecl_t_of(lock) != t_lock) - FEerror_not_a_lock(lock); - ecl_return1(env, (lock->lock.owner == own_process) ? ECL_T : ECL_NIL); + if (ecl_unlikely(ecl_t_of(lock) != t_lock)) { + FEwrong_type_only_arg(@[mp::lock-name], lock, @[mp::lock]); + } + ecl_return1(env, lock->lock.name); } cl_object -mp_lock_name(cl_object lock) +mp_lock_owner(cl_object lock) { cl_env_ptr env = ecl_process_env(); - unlikely_if (ecl_t_of(lock) != t_lock) { - FEerror_not_a_lock(lock); + if (ecl_unlikely(ecl_t_of(lock) != t_lock)) { + FEwrong_type_only_arg(@[mp::lock-owner], lock, @[mp::lock]); } - ecl_return1(env, lock->lock.name); + ecl_return1(env, lock->lock.owner); } cl_object -mp_lock_owner(cl_object lock) +mp_holding_lock_p(cl_object lock) { cl_env_ptr env = ecl_process_env(); - unlikely_if (ecl_t_of(lock) != t_lock) { - FEerror_not_a_lock(lock); + if (ecl_unlikely(ecl_t_of(lock) != t_lock)) { + FEwrong_type_only_arg(@[mp::holding-lock-p], lock, @[mp::lock]); } - ecl_return1(env, lock->lock.owner); + ecl_return1(env, (lock->lock.owner == mp_current_process())? ECL_T : ECL_NIL); } cl_object mp_lock_count(cl_object lock) { cl_env_ptr env = ecl_process_env(); - unlikely_if (ecl_t_of(lock) != t_lock) { - FEerror_not_a_lock(lock); + if (ecl_unlikely(ecl_t_of(lock) != t_lock)) { + FEwrong_type_only_arg(@[mp::lock-count], lock, @[mp::lock]); } ecl_return1(env, ecl_make_fixnum(lock->lock.counter)); } @@ -108,103 +128,96 @@ mp_lock_count(cl_object lock) cl_object mp_giveup_lock(cl_object lock) { - /* Must be called with interrupts disabled. */ cl_env_ptr env = ecl_process_env(); - cl_object own_process = env->own_process; - unlikely_if (ecl_t_of(lock) != t_lock) { - FEerror_not_a_lock(lock); - } - unlikely_if (lock->lock.owner != own_process) { - FEerror_not_owned(lock); + int rc; + if (ecl_unlikely(ecl_t_of(lock) != t_lock)) { + FEwrong_type_only_arg(@[mp::giveup-lock], lock, @[mp::lock]); } - if (--lock->lock.counter == 0) { - cl_object first = ecl_waiter_pop(env, lock);; - if (first == ECL_NIL) { - lock->lock.owner = ECL_NIL; - } else { - lock->lock.counter = 1; - lock->lock.owner = first; - ecl_wakeup_process(first); - } - } - ecl_return1(env, ECL_T); -} - -static cl_object -get_lock_inner(cl_env_ptr env, cl_object lock) -{ - cl_object output; - cl_object own_process = env->own_process; ecl_disable_interrupts_env(env); - if (AO_compare_and_swap_full((AO_t*)&(lock->lock.owner), - (AO_t)ECL_NIL, (AO_t)own_process)) { - lock->lock.counter = 1; - output = ECL_T; - print_lock("acquired %p\t", lock, lock); - } else if (lock->lock.owner == own_process) { - unlikely_if (!lock->lock.recursive) { - FEerror_not_a_recursive_lock(lock); - } - ++lock->lock.counter; - output = ECL_T; - } else { - print_lock("failed acquiring %p for %d\t", lock, lock, - lock->lock.owner); - output = ECL_NIL; + if ((lock->lock.counter > 0 ? --lock->lock.counter : 0) == 0) { + lock->lock.owner = ECL_NIL; } + rc = ecl_mutex_unlock(&lock->lock.mutex); ecl_enable_interrupts_env(env); - return output; + if (ecl_likely(rc == ECL_MUTEX_SUCCESS)) { + ecl_return1(env, ECL_T); + } else if (rc == ECL_MUTEX_NOT_OWNED) { + FEerror_not_owned(lock); + } else { + FEunknown_lock_error(lock); + } } cl_object mp_get_lock_nowait(cl_object lock) { cl_env_ptr env = ecl_process_env(); - unlikely_if (ecl_t_of(lock) != t_lock) { - FEerror_not_a_lock(lock); - } - ecl_return1(env, get_lock_inner(env, lock)); -} - -static cl_object -own_or_get_lock(cl_env_ptr env, cl_object lock) -{ - cl_object output; cl_object own_process = env->own_process; + int rc; + if (ecl_unlikely(ecl_t_of(lock) != t_lock)) { + FEwrong_type_nth_arg(@[mp::get-lock], 1, lock, @[mp::lock]); + } +#if !defined(ECL_MUTEX_DEADLOCK) + if (ecl_unlikely(lock->lock.owner == own_process && !lock->lock.recursive)) { + /* INV: owner != nil only if the mutex is locked */ + FEerror_not_a_recursive_lock(lock); + } +#endif ecl_disable_interrupts_env(env); - if (AO_compare_and_swap_full((AO_t*)&(lock->lock.owner), - (AO_t)ECL_NIL, (AO_t)own_process)) { - lock->lock.counter = 1; - output = ECL_T; - print_lock("acquired %p\t", lock, lock); - } else if (lock->lock.owner == own_process) { - output = ECL_T; - } else { - output = ECL_NIL; + if ((rc = ecl_mutex_trylock(&lock->lock.mutex)) == ECL_MUTEX_SUCCESS) { + lock->lock.counter++; + lock->lock.owner = own_process; } ecl_enable_interrupts_env(env); - return output; + if (rc == ECL_MUTEX_SUCCESS) { + ecl_return1(env,lock); + } else if (rc == ECL_MUTEX_LOCKED) { + ecl_return1(env,ECL_NIL); +#if defined(ECL_MUTEX_DEADLOCK) + } else if (ecl_unlikely(rc == ECL_MUTEX_DEADLOCK)) { + FEerror_not_a_recursive_lock(lock); +#endif + } else { + FEunknown_lock_error(lock); + } } cl_object mp_get_lock_wait(cl_object lock) { cl_env_ptr env = ecl_process_env(); - unlikely_if (ecl_t_of(lock) != t_lock) { - FEerror_not_a_lock(lock); + cl_object own_process = env->own_process; + int rc; + if (ecl_unlikely(ecl_t_of(lock) != t_lock)) { + FEwrong_type_nth_arg(@[mp::get-lock], 1, lock, @[mp::lock]); + } +#if !defined(ECL_MUTEX_DEADLOCK) + if (ecl_unlikely(lock->lock.owner == own_process && !lock->lock.recursive)) { + /* INV: owner != nil only if the mutex is locked */ + FEerror_not_a_recursive_lock(lock); } - if (get_lock_inner(env, lock) == ECL_NIL) { - ecl_wait_on(env, own_or_get_lock, lock); +#endif + rc = ecl_mutex_lock(&lock->lock.mutex); + if (ecl_likely(rc == ECL_MUTEX_SUCCESS)) { + lock->lock.counter++; + lock->lock.owner = own_process; + ecl_enable_interrupts_env(env); + ecl_return1(env, lock); +#if defined(ECL_MUTEX_DEADLOCK) + } else if (ecl_unlikely(rc == ECL_MUTEX_DEADLOCK)) { + FEerror_not_a_recursive_lock(lock); +#endif + } else { + ecl_enable_interrupts_env(env); + FEunknown_lock_error(lock); } - @(return ECL_T); } @(defun mp::get-lock (lock &optional (wait ECL_T)) @ if (Null(wait)) { return mp_get_lock_nowait(lock); - } - else { + } else { return mp_get_lock_wait(lock); } @) diff --git a/src/c/threads/queue.d b/src/c/threads/queue.d index 09237c583..218e89bda 100755 --- a/src/c/threads/queue.d +++ b/src/c/threads/queue.d @@ -387,7 +387,7 @@ print_lock(char *prefix, cl_object l, ...) printf("\n%ld\t", ecl_fixnum(env->own_process->process.name)); vprintf(prefix, args); if (l != ECL_NIL) { - cl_object p = l->lock.queue_list; + cl_object p = l->semaphore.queue_list; while (p != ECL_NIL) { printf(" %lx", ecl_fixnum(ECL_CONS_CAR(p)->process.name)); p = ECL_CONS_CDR(p); diff --git a/src/c/unixint.d b/src/c/unixint.d index 373adbf8f..6bbb42e26 100644 --- a/src/c/unixint.d +++ b/src/c/unixint.d @@ -75,8 +75,6 @@ #include #include #include -/* To get APCProc calls */ -#define _WIN32_WINNT 0x400 #include #if defined(_MSC_VER) || defined(__MINGW32__) diff --git a/src/h/external.h b/src/h/external.h index 549d48661..cfec5f24d 100755 --- a/src/h/external.h +++ b/src/h/external.h @@ -598,6 +598,8 @@ extern ECL_API void FEinvalid_function(cl_object obj) ecl_attr_noreturn; extern ECL_API void FEinvalid_function_name(cl_object obj) ecl_attr_noreturn; extern ECL_API void FEprint_not_readable(cl_object obj) ecl_attr_noreturn; extern ECL_API void FEtimeout() ecl_attr_noreturn; +extern ECL_API void FEerror_not_owned(cl_object lock) ecl_attr_noreturn; +extern ECL_API void FEunknown_lock_error(cl_object lock) ecl_attr_noreturn; extern ECL_API cl_object CEerror(cl_object c, const char *err_str, int narg, ...); extern ECL_API void FElibc_error(const char *msg, int narg, ...) ecl_attr_noreturn; #if defined(ECL_MS_WINDOWS_HOST) || defined(cygwin) diff --git a/src/h/object.h b/src/h/object.h index 4b2ebd3e1..11fe20fc2 100644 --- a/src/h/object.h +++ b/src/h/object.h @@ -928,12 +928,26 @@ struct ecl_dummy { }; #ifdef ECL_THREADS + +#ifdef ECL_WINDOWS_THREADS +typedef HANDLE ecl_mutex_t; +typedef struct ecl_cond_var_t { + HANDLE broadcast_event; + HANDLE signal_event; + cl_index state; +} ecl_cond_var_t; +#else +typedef pthread_mutex_t ecl_mutex_t; +typedef pthread_cond_t ecl_cond_var_t; +#endif + enum { ECL_PROCESS_INACTIVE = 0, ECL_PROCESS_BOOTING, ECL_PROCESS_ACTIVE, ECL_PROCESS_EXITING }; + struct ecl_process { _ECL_HDR; cl_object name; @@ -990,8 +1004,7 @@ struct ecl_barrier { struct ecl_lock { _ECL_HDR1(recursive); - cl_object queue_list; - cl_object queue_spinlock; + ecl_mutex_t mutex; cl_object owner; /* thread holding the lock or NIL */ cl_object name; cl_fixnum counter; @@ -1020,9 +1033,7 @@ struct ecl_rwlock { struct ecl_condition_variable { _ECL_HDR; - cl_object queue_list; - cl_object queue_spinlock; - cl_object lock; + ecl_cond_var_t cv; }; #endif /* ECL_THREADS */ diff --git a/src/h/threads.h b/src/h/threads.h new file mode 100644 index 000000000..07d6a9ee7 --- /dev/null +++ b/src/h/threads.h @@ -0,0 +1,472 @@ +/* -*- Mode: C; c-basic-offset: 2; indent-tabs-mode: nil -*- */ +/* vim: set filetype=c tabstop=2 shiftwidth=2 expandtab: */ + +/* + * threads.h - wrapper for mutex and condition variable operating + * system primitives + * + * Copyright (c) 2020, Marius Gerbershagen + * + * See file 'LICENSE' for the copyright details. + * + */ + +#ifdef ECL_THREADS + +#ifndef ECL_MUTEX_H +#define ECL_MUTEX_H + +#include +#ifdef ECL_WINDOWS_THREADS +# include +# include +#else +# include +#endif +#include +#ifdef HAVE_GETTIMEOFDAY +# include +#endif +#include + +#if !defined(ECL_WINDOWS_THREADS) + +#define ECL_MUTEX_SUCCESS 0 +#define ECL_MUTEX_LOCKED EBUSY +#define ECL_MUTEX_NOT_OWNED EPERM +#define ECL_MUTEX_TIMEOUT ETIMEDOUT +#define ECL_MUTEX_DEADLOCK EDEADLK + +/* MUTEX */ + +/* Non-recursive locks are only provided as an optimization; on + * Windows locks are always recursive. If ECL_MUTEX_DEADLOCK is + * undefined, recursive locks are not available. */ + +static inline void +ecl_mutex_init(ecl_mutex_t *mutex, bool recursive) +{ + pthread_mutexattr_t mutexattr[1]; + pthread_mutexattr_init(mutexattr); + if (recursive) { + pthread_mutexattr_settype(mutexattr, PTHREAD_MUTEX_RECURSIVE); + } else { + pthread_mutexattr_settype(mutexattr, PTHREAD_MUTEX_ERRORCHECK); + } + pthread_mutex_init(mutex, mutexattr); +} + +static inline void +ecl_mutex_destroy(ecl_mutex_t *mutex) +{ + pthread_mutex_destroy(mutex); +} + +static inline int +ecl_mutex_unlock(ecl_mutex_t *mutex) +{ + return pthread_mutex_unlock(mutex); +} + +static inline int +ecl_mutex_trylock(ecl_mutex_t *mutex) +{ + return pthread_mutex_trylock(mutex); +} + +static inline int +ecl_mutex_lock(ecl_mutex_t *mutex) +{ + return pthread_mutex_lock(mutex); +} + +/* CONDITION VARIABLE */ + +static inline void +add_timeout_delta(struct timespec *ts, double seconds) +{ + struct timeval tp; + + gettimeofday(&tp, NULL); + /* Convert from timeval to timespec */ + ts->tv_sec = tp.tv_sec; + ts->tv_nsec = tp.tv_usec * 1000; + + /* Add `seconds' delta */ + ts->tv_sec += (time_t)floor(seconds); + ts->tv_nsec += (long)((seconds - floor(seconds)) * 1e9); + if (ts->tv_nsec >= 1e9) { + ts->tv_nsec -= 1e9; + ts->tv_sec++; + } +} + + +static inline void +ecl_cond_var_init(ecl_cond_var_t *cv) +{ + pthread_cond_init(cv, NULL); +} + +static inline void +ecl_cond_var_destroy(ecl_cond_var_t *cv) +{ + pthread_cond_destroy(cv); +} + +static inline int +ecl_cond_var_wait(ecl_cond_var_t *cv, ecl_mutex_t *mutex) +{ + return pthread_cond_wait(cv, mutex); +} + +static inline int +ecl_cond_var_timedwait(ecl_cond_var_t *cv, ecl_mutex_t *mutex, double seconds) +{ + struct timespec ts; + add_timeout_delta(&ts, seconds); + return pthread_cond_timedwait(cv, mutex, &ts); +} + +static inline int +ecl_cond_var_signal(ecl_cond_var_t *cv) +{ + return pthread_cond_signal(cv); +} + +static inline int +ecl_cond_var_broadcast(ecl_cond_var_t *cv) +{ + return pthread_cond_broadcast(cv); +} + +#else /* ECL_WINDOWS_THREADS */ + +/* To allow for timed wait operations on locks and for interrupting + * deadlocked threads, we use Windows mutexes instead of critical + * section objects (which would serve the same purpose and be slightly + * faster). This also requires implementing our own version of + * condition variables, since the ones provided by Windows only work + * together with critical sections. */ + +#define ECL_MUTEX_SUCCESS 0 +#define ECL_MUTEX_LOCKED -1 +#define ECL_MUTEX_NOT_OWNED ERROR_NOT_OWNER +#define ECL_MUTEX_TIMEOUT ERROR_TIMEOUT +#undef ECL_MUTEX_DEADLOCK + +/* MUTEX */ + +static inline void +ecl_mutex_init(ecl_mutex_t *mutex, bool recursive) +{ + *mutex = CreateMutex(NULL, FALSE, NULL); +} + +static inline void +ecl_mutex_destroy(ecl_mutex_t *mutex) +{ + CloseHandle(*mutex); +} + +static inline int +ecl_mutex_unlock(ecl_mutex_t *mutex) +{ + return ReleaseMutex(*mutex) ? ECL_MUTEX_SUCCESS : GetLastError(); +} + +static inline int +ecl_mutex_trylock(ecl_mutex_t *mutex) +{ + switch (WaitForSingleObject(*mutex, 0)) { + case WAIT_OBJECT_0: + case WAIT_ABANDONED: + return ECL_MUTEX_SUCCESS; + case WAIT_TIMEOUT: + return ECL_MUTEX_LOCKED; + default: + return GetLastError(); + } +} + +static inline int +ecl_mutex_lock(ecl_mutex_t *mutex) +{ + AGAIN: + switch (WaitForSingleObjectEx(*mutex, INFINITE, TRUE)) { + case WAIT_OBJECT_0: + case WAIT_ABANDONED: + return ECL_MUTEX_SUCCESS; + case WAIT_IO_COMPLETION: + goto AGAIN; + case WAIT_TIMEOUT: + return ECL_MUTEX_LOCKED; + default: + return GetLastError(); + } +} + +static inline DWORD +remaining_milliseconds(double seconds, DWORD start_ticks) +{ + DWORD ret = ((DWORD) seconds * 1000.0) - (GetTickCount() - start_ticks); + return (ret < 0) ? 0 : ret; +} + +/* CONDITION VARIABLE */ + +/* IMPLEMENTATION + * -------------- + * Condition variables are implemented on top of the event + * synchronization objects provided by the Windows operating + * system. + * + * Event objects allow a thread to wait until a certain event is + * signaled. If the event has already been signaled before the thread + * checks for the event, no wait is performed and WaitForSingleObject, + * WaitForMultipleObjects returns immediately. Event objects can be of + * the manual reset flavor which stays signaled until explicitely + * reset and auto reset which wakes up only one thread and then resets + * itself automatically. We use a manual reset event for broadcasting + * (waking up all threads waiting on the condition variable) and an + * auto reset event for signaling (waking up only a single thread). + * + * The complete state of the condition variable (how many threads are + * waiting and whether we are waiting, waking up threads or reseting + * the broadcasting event) is stored in an integer which is + * manipulated using atomic operations. + * + * RACE CONDITION SAFETY + * --------------------- + * To prevent wakeup signals from getting lost, condition variables + * are required to atomically unlock the associated mutex when + * starting a wait operation. While it is not possible to atomically + * unlock a mutex and wait for an event on Windows, the implementation + * is nevertheless race condition free in this regard. + * + * Consider two threads, thread A which starts a wait operation on a + * condition variable and thread B which wakes up the same condition + * variable. A increases the wait count, releases the mutex but + * doesn't wait for a wakeup event yet. Thread B acquires the mutex, + * and signals an event (broadcast or signal). Since the event is + * reset only after a waiting thread has been released, at the time + * that thread A starts waiting the event is still signaled and so the + * wakeup event is not lost. Note that for this to work, it is crucial + * that the wait count is increased before the mutex is released in + * order for thread B to know that a thread is about to start waiting. + * + * Further race conditions between multiple broadcasting/signaling + * threads threads are prevented by allowing only one operation + * (signaling or broadcasting) at a time to happen. The case in which + * threads start waiting during a wakeup operation is treated as if + * the waiting thread had arrived before the wakeup. Race conditions + * between threads which start waiting while the broadcast event is + * reset are prevented by letting the about to be waiting threads spin + * until the reset has finished. + * + * INTERRUPT SAFETY + * ---------------- + * INV: On Windows, interrupts happen only when accessing the thread + * local environment or when entering an alertable wait state. In the + * latter case, an arriving interrupt causes the wait function to + * return WAIT_IO_COMPLETION. + * + * This allows us to provide an interrupt safe implementation by + * queueing the interrupt, fixing up the wait count of the condition + * variable and then executing the interrupt. If the interrupt doesn't + * execute a non-local jump, we resume waiting again. + * + * Note that while this prevents the state of the condition variable + * from getting corrupted, it doesn't prevent wakup events which + * arrive while the interrupt is executing from getting lost. It is up + * to the user to prevent this type of logical bugs. + */ + +#define ECL_COND_VAR_BROADCAST 1 +#define ECL_COND_VAR_SIGNAL 2 +#define ECL_COND_VAR_RESET 3 + +static inline cl_index +ecl_cond_var_wait_count(cl_index state) +{ + return state >> 2; +} + +static inline cl_index +ecl_cond_var_status(cl_index state) +{ + return state & 3; +} + +static inline cl_index +ecl_cond_var_make_state(cl_index wait_count, cl_index status) +{ + return (wait_count << 2) | status; +} + +static inline void +ecl_cond_var_increment_wait_count(ecl_cond_var_t *cv) +{ + cl_index old_state, new_state; + do { + old_state = AO_load_acquire((AO_t*)&cv->state); + new_state = ecl_cond_var_make_state(ecl_cond_var_wait_count(old_state) + 1, + ecl_cond_var_status(old_state)); + } while (ecl_cond_var_status(old_state) == ECL_COND_VAR_RESET || + !AO_compare_and_swap_full((AO_t*)&cv->state, (AO_t)old_state, (AO_t)new_state)); +} + +static inline void +ecl_cond_var_decrement_wait_count(ecl_cond_var_t *cv) +{ + cl_index old_state, new_state; + do { + old_state = AO_load_acquire((AO_t*)&cv->state); + new_state = ecl_cond_var_make_state(ecl_cond_var_wait_count(old_state) - 1, + ecl_cond_var_status(old_state)); + } while (!AO_compare_and_swap_full((AO_t*)&cv->state, (AO_t)old_state, (AO_t)new_state)); +} + +static inline int +ecl_cond_var_handle_event(DWORD rc, ecl_cond_var_t *cv, ecl_mutex_t *mutex) +{ + cl_index old_state, new_state, wait_count; + switch (rc) { + case WAIT_OBJECT_0: + case WAIT_ABANDONED: + /* broadcast event */ + do { + /* INV: ecl_cond_var_status(old_state) == ECL_COND_VAR_BROADCAST */ + old_state = AO_load_acquire((AO_t*)&cv->state); + wait_count = ecl_cond_var_wait_count(old_state) - 1; + if (wait_count == 0) { + new_state = ecl_cond_var_make_state(0, ECL_COND_VAR_RESET); + while (!AO_compare_and_swap_full((AO_t*)&cv->state, (AO_t)old_state, (AO_t)new_state)); + ResetEvent(cv->broadcast_event); + AO_store_release((AO_t*)&cv->state, 0); + break; + } + new_state = ecl_cond_var_make_state(wait_count, ECL_COND_VAR_BROADCAST); + } while(!AO_compare_and_swap_full((AO_t*)&cv->state, (AO_t)old_state, (AO_t)new_state)); + return ecl_mutex_lock(mutex); + case WAIT_OBJECT_0 + 1: + case WAIT_ABANDONED + 1: + /* signal event */ + do { + /* INV: ecl_cond_var_status(old_state) == ECL_COND_VAR_SIGNAL */ + old_state = AO_load_acquire((AO_t*)&cv->state); + wait_count = ecl_cond_var_wait_count(old_state) - 1; + new_state = ecl_cond_var_make_state(wait_count, 0); + } while(!AO_compare_and_swap_full((AO_t*)&cv->state, (AO_t)old_state, (AO_t)new_state)); + return ecl_mutex_lock(mutex); + case WAIT_TIMEOUT: + ecl_cond_var_decrement_wait_count(cv); + rc = ecl_mutex_lock(mutex); + return (rc == ECL_MUTEX_SUCCESS) ? ECL_MUTEX_TIMEOUT : rc; + default: + ecl_cond_var_decrement_wait_count(cv); + return GetLastError(); + } +} + +static inline void +ecl_cond_var_init(ecl_cond_var_t *cv) +{ + cv->broadcast_event = CreateEvent(NULL, TRUE, FALSE, NULL); /* manual reset event */ + cv->signal_event = CreateEvent(NULL, FALSE, FALSE, NULL); /* auto reset event */ + cv->state = 0; +} + +static inline void +ecl_cond_var_destroy(ecl_cond_var_t *cv) +{ + CloseHandle(cv->broadcast_event); + CloseHandle(cv->signal_event); +} + +static inline int +ecl_cond_var_wait(ecl_cond_var_t *cv, ecl_mutex_t *mutex) +{ + cl_env_ptr env = ecl_process_env(); + DWORD rc; + HANDLE events[2]; + events[0] = cv->broadcast_event; + events[1] = cv->signal_event; + ecl_disable_interrupts_env(env); + ecl_cond_var_increment_wait_count(cv); + if (!ReleaseMutex(*mutex)) { + return GetLastError(); + } + while ((rc = WaitForMultipleObjectsEx(2, events, FALSE, INFINITE, TRUE)) == WAIT_IO_COMPLETION) { + /* we got an interrupt; first fix up the state of the condition variable, ... */ + ecl_cond_var_decrement_wait_count(cv); + /* ... then handle the interrupt ... */ + ecl_enable_interrupts_env(env); + ecl_check_pending_interrupts(env); + ecl_disable_interrupts_env(env); + /* ... and start waiting again ... */ + ecl_cond_var_increment_wait_count(cv); + } + return ecl_cond_var_handle_event(rc, cv, mutex); +} + +static inline int +ecl_cond_var_timedwait(ecl_cond_var_t *cv, ecl_mutex_t *mutex, double seconds) +{ + cl_env_ptr env = ecl_process_env(); + DWORD rc; + DWORD start_ticks = GetTickCount(); + HANDLE events[2]; + events[0] = cv->broadcast_event; + events[1] = cv->signal_event; + ecl_disable_interrupts_env(env); + ecl_cond_var_increment_wait_count(cv); + if (!ReleaseMutex(*mutex)) { + return GetLastError(); + } + while ((rc = WaitForMultipleObjectsEx(2, events, FALSE, remaining_milliseconds(seconds, start_ticks), TRUE)) == WAIT_IO_COMPLETION) { + ecl_cond_var_decrement_wait_count(cv); + ecl_enable_interrupts_env(env); + ecl_check_pending_interrupts(env); + ecl_disable_interrupts_env(env); + ecl_cond_var_increment_wait_count(cv); + } + return ecl_cond_var_handle_event(rc, cv, mutex); +} + +static inline int +ecl_cond_var_signal(ecl_cond_var_t *cv) +{ + cl_index old_state, new_state, wait_count, status; + do { + old_state = AO_load_acquire((AO_t*)&cv->state); + wait_count = ecl_cond_var_wait_count(old_state); + status = ecl_cond_var_status(old_state); + if (wait_count == 0 || status == ECL_COND_VAR_BROADCAST) { + return ECL_MUTEX_SUCCESS; + } + new_state = ecl_cond_var_make_state(wait_count, ECL_COND_VAR_SIGNAL); + } while(status != 0 || + !AO_compare_and_swap_full((AO_t*)&cv->state, (AO_t)old_state, (AO_t)new_state)); + return SetEvent(cv->signal_event) ? ECL_MUTEX_SUCCESS : GetLastError(); +} + +static inline int +ecl_cond_var_broadcast(ecl_cond_var_t *cv) +{ + cl_index old_state, new_state, wait_count, status; + do { + old_state = AO_load_acquire((AO_t*)&cv->state); + wait_count = ecl_cond_var_wait_count(old_state); + status = ecl_cond_var_status(old_state); + if (wait_count == 0 || status == ECL_COND_VAR_BROADCAST) { + return ECL_MUTEX_SUCCESS; + } + new_state = ecl_cond_var_make_state(wait_count, ECL_COND_VAR_BROADCAST); + } while(status != 0 || + !AO_compare_and_swap_full((AO_t*)&cv->state, (AO_t)old_state, (AO_t)new_state)); + return SetEvent(cv->broadcast_event) ? ECL_MUTEX_SUCCESS : GetLastError(); +} +#endif /* ECL_MUTEX_H */ + +#endif /* ECL_THREADS */ diff --git a/src/lsp/mp.lsp b/src/lsp/mp.lsp index ab3d1da04..6cbefadff 100644 --- a/src/lsp/mp.lsp +++ b/src/lsp/mp.lsp @@ -115,33 +115,20 @@ by ALLOW-WITH-INTERRUPTS." (defmacro with-lock ((lock-form &rest options) &body body) #-threads `(progn ,@body) - ;; Why do we need %count? Even if get-lock succeeeds, an interrupt may - ;; happen between the end of get-lock and when we save the output of - ;; the function. That means we lose the information and ignore that - ;; the lock was actually acquired. Furthermore, a lock can be recursive - ;; and mp:lock-holder is also not reliable. - ;; - ;; Next notice how we need to disable interrupts around the body and - ;; the get-lock statement, to ensure that the unlocking is done with - ;; interrupts disabled. + ;; We do our best to make sure that the lock is released if we are + ;; interrupted and jump up the call stack, however there is an + ;; unavoidable race condition if the interrupt happens after the + ;; mutex is locked but before we store the return value of + ;; mp:get-lock. #+threads - (ext:with-unique-names (lock owner count process) - `(let* ((,lock ,lock-form) - (,owner (mp:lock-owner ,lock)) - (,count (mp:lock-count ,lock))) - (declare (type fixnum ,count)) - (without-interrupts + (ext:with-unique-names (lock) + `(let ((,lock ,lock-form)) + (when (mp:get-lock ,lock) + (without-interrupts (unwind-protect (with-restored-interrupts - (mp::get-lock ,lock) - (locally ,@body)) - (let ((,process mp:*current-process*)) - (declare (optimize (speed 3) (safety 0) (debug 0))) - (when (and (eq ,process (mp:lock-owner ,lock)) - (or (not (eq ,owner ,process)) - (> (the fixnum (mp:lock-count ,lock)) - (the fixnum ,count)))) - (mp::giveup-lock ,lock)))))))) + (locally ,@body)) + (mp:giveup-lock ,lock))))))) #+ecl-read-write-lock (defmacro with-rwlock ((lock op) &body body) diff --git a/src/util/emacs.el b/src/util/emacs.el index 1e11c74ac..38c46c249 100644 --- a/src/util/emacs.el +++ b/src/util/emacs.el @@ -237,6 +237,7 @@ "h/object.h" "h/page.h" "h/stacks.h" +"h/threads.h" "lsp/arraylib.lsp" "lsp/assert.lsp" "lsp/autoload.lsp" -- GitLab From a5762b4a765b6741b56a6db0de4090cb8fddd3b0 Mon Sep 17 00:00:00 2001 From: Marius Gerbershagen Date: Sat, 22 Aug 2020 18:00:40 +0200 Subject: [PATCH 02/13] tests: 2am-ecl: implement timeouts for tests Add new macro test-with-timeout, refactor various global variables for test statistics into a struct. --- src/tests/2am.lisp | 129 ++++++++++++++++++++++++++++++++------------- 1 file changed, 92 insertions(+), 37 deletions(-) diff --git a/src/tests/2am.lisp b/src/tests/2am.lisp index 7543da6b0..b99ea2a03 100644 --- a/src/tests/2am.lisp +++ b/src/tests/2am.lisp @@ -24,21 +24,26 @@ #| to avoid conflict with the library name package 2am-ecl |# (defpackage #:2am-ecl (:use #:cl) - (:export #:test #:is #:signals #:finishes #:run #:suite)) + (:export #:test #:test-with-timeout #:is #:signals #:finishes + #:run #:suite)) (in-package #:2am-ecl) (defvar *tests* nil "A name of the default tests suite.") (defvar *suites* (make-hash-table) "A collection of test suites.") (defvar *hierarchy* (make-hash-table) "A hierarchy of test suites.") -(defvar *failures* nil) -(defvar *crashes* nil) -(defvar *test-name* nil) -(defvar *test-count* nil) -(defvar *pass-count* nil) -(defvar *fail-count* nil) +(defvar *stats* nil "Collection of test statistics.") (defvar *running* nil) +(defvar *test-name* nil) (defvar *last-fail* nil) +(defvar *default-timeout* 60.0 "Default timeout in seconds.") + +(defstruct test-stats + (failures (make-hash-table)) + (crashes 0) + (test-count 0) + (pass-count 0) + (fail-count 0)) (define-condition test-failure (simple-condition) ((test-name :initarg :name @@ -71,7 +76,12 @@ (defun shuffle (sequence) (%shuffle (map 'vector #'identity sequence))) -(defun report (test-count pass-count fail-count crashes) +(defun report (stats &aux + (test-count (test-stats-test-count stats)) + (pass-count (test-stats-pass-count stats)) + (fail-count (test-stats-fail-count stats)) + (crashes (test-stats-crashes stats)) + (failures (test-stats-failures stats))) (let ((num-check (+ pass-count fail-count))) (if *running* (format t "~&Did ~s test~:p (~s crashed), ~s check~:p.~%" test-count crashes num-check) @@ -92,16 +102,12 @@ (format t " CRASH [~A]: " (type-of fail))) (format t "~A~%" fail)) (format t "~&--------------------------------~%")) - *failures*))) + failures))) (defun %run (fn) - (let ((*test-count* 0) - (*pass-count* 0) - (*fail-count* 0) - (*failures* (make-hash-table)) - (*crashes* 0)) + (let ((*stats* (make-test-stats))) (multiple-value-prog1 (funcall fn) - (report *test-count* *pass-count* *fail-count* *crashes*)))) + (report *stats*)))) (defun %run-suite (name) (let ((visited nil) @@ -131,41 +137,90 @@ (map nil #'funcall (shuffle tests))))))) (values)) -(defun call-test (fn) - (format t "~&Running test ~s " *test-name*) - (finish-output) - (if *running* - (handler-case - (progn (incf *test-count*) - (funcall fn)) - (serious-condition (c) - (write-char #\X) - (incf *crashes*) - (push c (gethash *test-name* *failures*)))) - (%run fn)) - (values)) +(defun call-test (name fn) + (let ((*test-name* name)) + (format t "~&Running test ~s " *test-name*) + (finish-output) + (if *running* + (handler-case + (progn (incf (test-stats-test-count *stats*)) + (funcall fn)) + (serious-condition (c) + (write-char #\X) + (incf (test-stats-crashes *stats*)) + (push c (gethash *test-name* (test-stats-failures *stats*))))) + (%run fn)) + (values))) (defmacro test (name &body body) - "Define a test function and add it to `*tests*'." `(progn (defun ,name () - (let ((*test-name* ',name)) - (call-test (lambda () ,@body)))) + (call-test ',name (lambda () ,@body))) (pushnew ',name (gethash *tests* *suites*)) ',name)) +(defun kill-processes (process-list &optional original) + "Kills a list of processes, which may be the difference between two lists." + (let ((process-list (set-difference process-list original))) + (when (member mp:*current-process* process-list) + (error "Found myself in the kill list")) + (mapc #'mp:process-kill process-list) + process-list)) + +#+threads +(defun call-test-with-timeout (name timeout fn) + (let* ((all-processes (mp:all-processes)) + (finished nil) + (runner (mp:process-run-function + "runner" + #'(lambda (stats running) + (let ((*stats* stats) + (*running* running)) + (call-test name fn) + (setf finished t))) + *stats* *running*))) + (loop with *test-name* = name + with timestep = 0.2 + for time from 0.0 upto timeout by timestep + do (if finished + (return) + (sleep timestep)) + finally (mp:process-kill runner) + (failed (make-condition 'test-failure + :name name + :format-control "Timeout after ~A seconds" + :format-arguments (list timeout))) + (return-from call-test-with-timeout)) + (mp:process-join runner) + (let ((leftovers (kill-processes (mp:all-processes) all-processes))) + (when leftovers + (format t "~%;;; Stray processes: ~A~%" leftovers))))) + +#+threads +(defmacro test-with-timeout (name-and-timeout &body body) + (let (name timeout) + (if (listp name-and-timeout) + (setf name (first name-and-timeout) + timeout (second name-and-timeout)) + (setf name name-and-timeout + timeout '*default-timeout*)) + `(progn + (defun ,name () + (call-test-with-timeout ',name ,timeout (lambda () ,@body))) + (pushnew ',name (gethash *tests* *suites*)) + ',name))) + (defun passed () (write-char #\.) - (when *pass-count* - (incf *pass-count*)) + (when *stats* + (incf (test-stats-pass-count *stats*))) T) (defun failed (c) (write-char #\f) - (when *fail-count* - (incf *fail-count*)) - (when *failures* - (push c (gethash *test-name* *failures*))) + (when *stats* + (incf (test-stats-fail-count *stats*)) + (push c (gethash *test-name* (test-stats-failures *stats*)))) (setf *last-fail* c) nil) -- GitLab From 43befa3b59dc86b09dc5d69bae0dc1412ae6abc0 Mon Sep 17 00:00:00 2001 From: Marius Gerbershagen Date: Sat, 22 Aug 2020 20:28:33 +0200 Subject: [PATCH 03/13] tests: multiprocessing: major overhaul Changes include: - consistent naming - replaced unnecessary use of (sleep) by better synchronization mechanisms - tests are run with timeouts - clean up stray threads which would otherwise wait for all eternity - better error messages in case of test failures --- src/tests/normal-tests/multiprocessing.lsp | 936 ++++++++++----------- 1 file changed, 436 insertions(+), 500 deletions(-) diff --git a/src/tests/normal-tests/multiprocessing.lsp b/src/tests/normal-tests/multiprocessing.lsp index 0f3b8506f..d0d356075 100644 --- a/src/tests/normal-tests/multiprocessing.lsp +++ b/src/tests/normal-tests/multiprocessing.lsp @@ -9,42 +9,16 @@ (suite 'mp) - -;; Auxiliary routines for multithreaded tests - -(defun kill-and-wait (process-list &optional original wait) - "Kills a list of processes, which may be the difference between two lists, -waiting for all processes to finish. Currently it has no timeout, meaning -it may block hard the lisp image." - (let ((process-list (set-difference process-list original))) - (when (member mp:*current-process* process-list) - (error "Found myself in the kill list")) - (mapc #'mp:process-kill process-list) - (when wait - (loop for i in process-list - do (mp:process-join i))) - process-list)) - -(defun mp-test-run (closure) - (let* ((all-processes (mp:all-processes)) - (output (multiple-value-list (funcall closure)))) - (sleep 0.2) ; time to exit some processes - (let ((leftovers (kill-and-wait (mp:all-processes) all-processes))) - (cond (leftovers - (format t "~%;;; Stray processes: ~A" leftovers)) - (t - (values-list output)))))) - -(defmacro def-mp-test (name body expected-value) - "Runs some test code and only returns the output when the code exited without -creating stray processes." - (let ((all-processes (gensym)) - (output (gensym)) - (leftover (gensym))) - `(test ,name - (is-equal - (mp-test-run #'(lambda () ,body)) - ,expected-value)))) +;;; Important Note: +;;; +;;; Testing multithreading primitives such as locks or semaphores +;;; often requires synchronizing multiple threads. To keep the tests +;;; as simple as possible, this is synchronization is often done by +;;; using busy waits. As a consequence, test failures may manifest as +;;; timeouts. Some tests for semaphores and barriers also use mutexes +;;; for synchronization purposes and will fail if mutexes don't work +;;; correctly. Nearly all tests also assume that creating, killing or +;;; joining threads works properly, which is not tested separately. ;; Locks @@ -57,13 +31,13 @@ creating stray processes." ;;; When a WITH-LOCK is interrupted, it is not able to release ;;; the resulting lock and an error is signaled. ;;; -(test mp-0001-with-lock - (let ((flag t) - (lock (mp:make-lock :name "mp-0001-with-lock" :recursive nil))) +(test-with-timeout mp.mutex.with-lock + (let ((flag 0) + (lock (mp:make-lock :name "mutex.with-lock" :recursive nil))) (mp:with-lock (lock) (let ((background-process (mp:process-run-function - "mp-0001-with-lock" + "mutex.with-lock" #'(lambda () (handler-case (progn @@ -73,234 +47,320 @@ creating stray processes." (error (c) (princ c)(terpri) (setf flag c))) - (setf flag 2))))) + (setf flag 3))))) ;; The background process should not be able to get ;; the lock, and will simply wait. Now we interrupt it ;; and the process should gracefully quit, without ;; signalling any serious condition - (and (progn (sleep 1) - (is (mp:process-kill background-process))) - (progn (sleep 1) - (is (not (mp:process-active-p background-process)))) - (is (eq flag 1))))))) + (loop until (/= flag 0)) + (sleep 0.1) + (is (mp:process-kill background-process)) + (mp:process-join background-process) + (is (= flag 1)))))) + +;;; Date: 12/04/2012 +;;; Non-recursive mutexes should signal an error when they +;;; cannot be relocked. +(test-with-timeout mp.mutex.recursive-error + (let* ((mutex (mp:make-lock :name 'mutex-001-recursive-error))) + (is (mp:get-lock mutex)) + (is (eq (mp:lock-owner mutex) mp:*current-process*)) + (signals error (mp:get-lock mutex)) + (is (mp:giveup-lock mutex)) + (is (null (mp:lock-owner mutex))) + (is (zerop (mp:lock-count mutex))))) + +;;; Date: 12/04/2012 +;;; Recursive locks increase the counter. +(test-with-timeout mp.mutex.recursive-count + (let* ((mutex (mp:make-lock :name 'mutex-002-recursive-count :recursive t))) + (is (loop for i from 1 upto 10 + always (and (mp:get-lock mutex) + (= (mp:lock-count mutex) i) + (eq (mp:lock-owner mutex) mp:*current-process*)))) + (is (loop for i from 9 downto 0 + always (and (eq (mp:lock-owner mutex) mp:*current-process*) + (mp:giveup-lock mutex) + (= (mp:lock-count mutex) i)))) + (is (null (mp:lock-owner mutex))) + (is (zerop (mp:lock-count mutex))))) + + +;;; Date: 12/04/2012 +;;; When multiple threads compete for a mutex, they should +;;; all get the same chance of accessing the resource +;;; +;;; Disabled since underlying OS functions don't guarantee fairness -- +;;; mg 2020-08-22 +#+(or) +(test-with-timeout mp.mutex.fairness + (let* ((mutex (mp:make-lock :name "mutex.fairness")) + (nthreads 10) + (count 10) + (counter (* nthreads count)) + (array (make-array count :element-type 'fixnum :initial-element 0))) + (flet ((slave (n) + (loop with continue = t + for i from 1 by 1 + while continue do + (mp:get-lock mutex) + (cond ((plusp counter) + (decf counter) + (setf (aref array n) i)) + (t + (setf continue nil))) + (mp:giveup-lock mutex)))) + ;; Launch all agents. They will be locked + (let ((all-processes + (mp:with-lock (mutex) + (loop for n from 0 below nthreads + collect (mp:process-run-function n #'slave n) + ;; ... and give them some time to block on this mutex + finally (sleep 1))))) + ;; Now they are released and operate. They should all have + ;; the same share of counts. + (loop for p in all-processes + do (mp:process-join p)) + (loop for i from 0 below nthreads + (is (= (aref array i) count))))))) + +;;; Date: 12/04/2012 +;;; It is possible to kill processes waiting for a lock. +;;; +(test-with-timeout mp.mutex.interruptible + (let ((mutex (mp:make-lock :name "mutex.interruptible")) + (flag 0)) + (mp:get-lock mutex) + (let ((sleeper-thread + (mp:process-run-function + "mutex.interruptible" + #'(lambda () + (setf flag 1) + (mp:with-lock (mutex) + (setf flag 2)))))) + (loop until (/= flag 0)) + (sleep 0.1) + (is (mp:process-active-p sleeper-thread)) + (mp:process-kill sleeper-thread) + (mp:process-join sleeper-thread) + (is (= flag 1)) + (is (eq (mp:lock-owner mutex) mp:*current-process*))) + (mp:giveup-lock mutex))) ;; Semaphores ;;; Date: 14/04/2012 ;;; Ensure that at creation name and counter are set -(test sem-make-and-counter - (is (loop with name = "sem-make-and-counter" - for count from 0 to 10 - for sem = (mp:make-semaphore :name name :count count) - always (and (eq (mp:semaphore-name sem) name) - (= (mp:semaphore-count sem) count) - (zerop (mp:semaphore-wait-count sem)))))) +(test mp.sem.make-and-counter + (loop with name = "sem.make-and-counter" + for count from 0 to 10 + for sem = (mp:make-semaphore :name name :count count) + do (is (eq (mp:semaphore-name sem) name)) + (is (= (mp:semaphore-count sem) count)) + (is (zerop (mp:semaphore-wait-count sem))))) ;;; Date: 14/04/2012 ;;; Ensure that signal changes the counter by the specified amount -(test sem-signal-semaphore-count - (is - (loop with name = "sem-signal-semaphore-count" - for count from 0 to 10 - always (loop for delta from 0 to 10 - for sem = (mp:make-semaphore :name name :count count) - always (and (= (mp:semaphore-count sem) count) - (null (mp:signal-semaphore sem delta)) - (= (mp:semaphore-count sem ) (+ count delta))))))) +(test-with-timeout mp.sem.signal-semaphore-count + (loop with name = "sem.signal-semaphore-count" + for count from 0 to 10 + do (loop for delta from 0 to 10 + for sem = (mp:make-semaphore :name name :count count) + do (is (= (mp:semaphore-count sem) count)) + (is (null (mp:signal-semaphore sem delta))) + (is (= (mp:semaphore-count sem ) (+ count delta)))))) ;;; Date: 14/04/2012 ;;; A semaphore with a count of zero blocks a process -(def-mp-test sem-signal-one-process - (let* ((flag nil) - (sem (mp:make-semaphore :name "sem-signal-one")) - (a-process (mp:process-run-function - "sem-signal-one-process" - #'(lambda () - (mp:wait-on-semaphore sem) - (setf flag t))))) - (and (null flag) - (mp:process-active-p a-process) - (progn (mp:signal-semaphore sem) (sleep 0.2) flag) - (= (mp:semaphore-count sem) 0))) - t) +(test-with-timeout mp.sem.signal-one-process + (let* ((flag nil) + (sem (mp:make-semaphore :name "sem.signal-one")) + (a-process (mp:process-run-function + "sem.signal-one-process" + #'(lambda () + (mp:wait-on-semaphore sem) + (setf flag t))))) + (is (null flag)) + (is (mp:process-active-p a-process)) + (mp:signal-semaphore sem) + (mp:process-join a-process) + (is flag) + (is (= (mp:semaphore-count sem) 0)))) ;;; Date: 14/04/2012 ;;; We can signal multiple processes -(test sem-signal-n-processes +(test-with-timeout mp.sem.signal-n-processes (loop for count from 1 upto 10 always (let* ((counter 0) - (lock (mp:make-lock :name "sem-signal-n-processes")) - (sem (mp:make-semaphore :name "sem-signal-n-processs")) - (all-process + (lock (mp:make-lock :name "sem.signal-n-processes")) + (sem (mp:make-semaphore :name "sem.signal-n-processs")) + (all-processes (loop for i from 1 upto count collect (mp:process-run-function - "sem-signal-n-processes" + "sem.signal-n-processes" #'(lambda () (mp:wait-on-semaphore sem) (mp:with-lock (lock) (incf counter))))))) - (sleep 0.1) ; let threads settle on semaphore - (and (is (zerop counter)) - (is (every #'mp:process-active-p all-process)) - (is (= (mp:semaphore-wait-count sem) count) - "Number of threads waitng on semaphore should be ~s (but is ~s)." - count (mp:semaphore-wait-count sem)) - (is (progn (mp:signal-semaphore sem count) - (sleep 0.2) - (= counter count)) - "Counter should be ~s (but is ~s)." count counter) - (is (= (mp:semaphore-count sem) 0)))))) + (loop until (= (mp:semaphore-wait-count sem) count)) + (is (zerop counter)) + (is (every #'mp:process-active-p all-processes)) + (mp:signal-semaphore sem count) + (mapc #'mp:process-join all-processes) + (is (= counter count) + "Counter should be ~s (but is ~s)." count counter) + (is (= (mp:semaphore-count sem) 0))))) ;;; Date: 14/04/2012 ;;; When we signal N processes and N+M are waiting, only N awake -(test sem-signal-only-n-processes - (loop for m from 1 upto 3 always - (loop for n from 1 upto 4 always +(test-with-timeout mp.sem.signal-only-n-processes + (loop for m from 1 upto 3 do + (loop for n from 1 upto 4 do (let* ((counter 0) - (lock (mp:make-lock :name "sem-signal-n-processes")) - (sem (mp:make-semaphore :name "sem-signal-n-processs")) - (all-process + (lock (mp:make-lock :name "sem.signal-n-processes")) + (sem (mp:make-semaphore :name "sem.signal-n-processs")) + (all-processes (loop for i from 1 upto (+ n m) collect (mp:process-run-function - "sem-signal-n-processes" + "sem.signal-n-processes" #'(lambda () (mp:wait-on-semaphore sem) (mp:with-lock (lock) (incf counter))))))) - (sleep 0.1) ; let threads settle on semaphore - (and (is (zerop counter)) - (is (every #'mp:process-active-p all-process)) - (is (= (mp:semaphore-wait-count sem) (+ m n)) - "Number of threads waiting on semaphore should be ~s (but is ~s)." - (+ m n) (mp:semaphore-wait-count sem)) - (is (progn (mp:signal-semaphore sem n) - (sleep 0.02) - (= counter n))) - (is (= (mp:semaphore-wait-count sem) m) - "Number of threads waitng on semaphore should be ~s (but is ~s)." - m (mp:semaphore-wait-count sem)) - (is (progn (mp:signal-semaphore sem m) - (sleep 0.02) - (= counter (+ n m))) - "Counter should be ~s (but is ~s)." (+ n m) counter)))))) + (loop until (= (mp:semaphore-wait-count sem) (+ m n))) + (is (zerop counter)) + (is (every #'mp:process-active-p all-processes)) + (mp:signal-semaphore sem n) + (loop until (= (count-if #'mp:process-active-p all-processes) m)) + (is (= counter n)) + (is (= (mp:semaphore-wait-count sem) m) + "Number of threads waiting on semaphore should be ~s (but is ~s)." + m (mp:semaphore-wait-count sem)) + (mp:signal-semaphore sem m) + (mapc #'mp:process-join all-processes) + (is (= counter (+ n m)) + "Counter should be ~s (but is ~s)." + (+ n m) counter))))) ;;; Date: 14/04/2012 ;;; It is possible to kill processes waiting for a semaphore. ;;; -(def-mp-test sem-interruptible - (loop with sem = (mp:make-semaphore :name "sem-interruptible") - with flag = nil - for count from 1 to 10 - for all-processes = (loop for i from 1 upto count - collect (mp:process-run-function - "sem-interruptible" - #'(lambda () - (mp:wait-on-semaphore sem) - (setf flag t)))) - always (and (progn (sleep 0.2) (null flag)) - (every #'mp:process-active-p all-processes) - (= (mp:semaphore-wait-count sem) count) - (mapc #'mp:process-kill all-processes) - (progn (sleep 0.2) (notany #'mp:process-active-p all-processes)) - (null flag) - (zerop (mp:semaphore-wait-count sem)) - t)) - t) +(test-with-timeout mp.sem.interruptible + (loop with sem = (mp:make-semaphore :name "sem.interruptible") + with flag = nil + for count from 1 to 10 + for all-processes = (loop for i from 1 upto count + collect (mp:process-run-function + "sem.interruptible" + #'(lambda () + (mp:wait-on-semaphore sem) + (setf flag t)))) + do (loop until (= (mp:semaphore-wait-count sem) count)) + (is (null flag)) + (is (every #'mp:process-active-p all-processes)) + (mapc #'mp:process-kill all-processes) + (mapc #'mp:process-join all-processes) + (is (null flag)) + ;; Usually, the wait count should be zero at this point. We may + ;; get higher values since the interrupt doesn't lock the mutex + ;; associated to the semaphore and thus multiple threads may write + ;; the wait count at the same time. However, interrupts are provided + ;; only for debugging purposes, for which this behaviour is acceptable. + (is (<= (mp:semaphore-wait-count sem) count)))) ;;; Date: 14/04/2012 ;;; When we kill a process, it is removed from the wait queue. ;;; -(def-mp-test sem-interrupt-updates-queue - (let* ((sem (mp:make-semaphore :name "sem-interrupt-updates-queue")) +(test-with-timeout mp.sem.interrupt-updates-queue + (let* ((sem (mp:make-semaphore :name "sem.interrupt-updates-queue")) (process (mp:process-run-function - "sem-interrupt-updates-queue" + "sem.interrupt-updates-queue" #'(lambda () (mp:wait-on-semaphore sem))))) - (sleep 0.2) - (and (= (mp:semaphore-wait-count sem) 1) - (mp:process-active-p process) - (progn (mp:process-kill process) - (sleep 0.2) - (not (mp:process-active-p process))) - (zerop (mp:semaphore-wait-count sem)) - t)) - t) + (loop until (= (mp:semaphore-wait-count sem) 1)) + (is (mp:process-active-p process)) + (mp:process-kill process) + (mp:process-join process) + ;; In contrast to the previous test, if we interrupt only a single thread + ;; the wait count must be correct, since only a single thread is writing. + (is (zerop (mp:semaphore-wait-count sem))))) ;;; Date: 14/04/2012 ;;; When we kill a process, it signals another one. This is tricky, ;;; because we need the awake signal to arrive _after_ the process is ;;; killed, but the process must still be in the queue for the semaphore -;;; to awake it. The way we solve this is by intercepting the kill signal. +;;; to awake it. ;;; -(test sem-interrupted-resignals - (let* ((sem (mp:make-semaphore :name "sem-interrupted-resignals")) +(test-with-timeout mp.sem.interrupted-resignals + (let* ((sem (mp:make-semaphore :name "sem.interrupted-resignals")) (flag1 nil) (flag2 nil) (process1 (mp:process-run-function - "sem-interrupted-resignals" + "sem.interrupted-resignals-1" #'(lambda () (unwind-protect (mp:wait-on-semaphore sem) - (sleep 4) - (setf flag1 t) - )))) + (loop repeat (* 60 100) do (sleep 1/100)) + (setf flag1 t))))) (process2 (mp:process-run-function - "sem-interrupted-resignals" + "sem.interrupted-resignals-2" #'(lambda () (mp:wait-on-semaphore sem) (setf flag2 t))))) - (sleep 0.2) - (and (is (= (mp:semaphore-wait-count sem) 2)) - (is (mp:process-active-p process1)) - (is (mp:process-active-p process2)) - ;; We kill the process but ensure it is still running - (is (progn (mp:process-kill process1) - (mp:process-active-p process1))) - (is (null flag1)) - ;; ... and in the queue - (is (= (mp:semaphore-wait-count sem) 2)) - ;; We awake it and it should awake the other one - (is (progn (format t "~%;;; Signaling semaphore") - (mp:signal-semaphore sem) - (sleep 1) - (zerop (mp:semaphore-wait-count sem)))) - (is flag2)))) + (loop until (= (mp:semaphore-wait-count sem) 2)) + (is (mp:process-active-p process1)) + (is (mp:process-active-p process2)) + ;; We kill the process but ensure it is still running + (mp:process-kill process1) + (is (mp:process-active-p process1)) + (is (null flag1)) + ;; Wait until the process is no longer waiting for the semaphore + (loop until (= (mp:semaphore-wait-count sem) 1)) + ;; ... then awake it and the other process should start working + (mp:signal-semaphore sem) + (mp:process-join process2) + (is (zerop (mp:semaphore-wait-count sem))) + (is flag2) + ;; Finally we kill the first process (which will by this time be + ;; stuck in the unwind-protect call) again. + (mp:process-kill process1) + (mp:process-join process1) + (is (null flag1)))) ;;; Date: 14/04/2012 ;;; 1 producer and N consumers, non-blocking, because the initial count ;;; is larger than the consumed data. -(def-mp-test sem-1-to-n-non-blocking +(test-with-timeout mp.sem.1-to-n-non-blocking (loop with counter = 0 - with lock = (mp:make-lock :name "sem-1-to-n-communication") + with lock = (mp:make-lock :name "sem.1-to-n-communication") for n from 1 to 10 for m = (round 128 n) for length = (* n m) - for sem = (mp:make-semaphore :name "sem-1-to-n-communication" :count length) + for sem = (mp:make-semaphore :name "sem.1-to-n-communication" :count length) for producers = (progn (setf counter 0) (loop for i from 0 below n collect (mp:process-run-function - "sem-1-to-n-consumer" + "sem.1-to-n-consumer" #'(lambda () (loop for i from 0 below m do (mp:wait-on-semaphore sem) do (mp:with-lock (lock) (incf counter))))))) do (mapc #'mp:process-join producers) - always (and (= counter length) - (zerop (mp:semaphore-count sem)) - (zerop (mp:semaphore-wait-count sem)))) - t) + (is (= counter length)) + (is (zerop (mp:semaphore-count sem))) + (is (zerop (mp:semaphore-wait-count sem))))) ;;; Date: 14/04/2012 ;;; 1 producer and N consumers, blocking due to a slow producer. -(def-mp-test sem-1-to-n-blocking - (loop with lock = (mp:make-lock :name "sem-1-to-n-communication") +(test-with-timeout mp.sem.1-to-n-blocking + (loop with lock = (mp:make-lock :name "sem.1-to-n-communication") for n from 1 to 10 for m = (round 10000 n) for length = (* n m) - for sem = (mp:make-semaphore :name "sem-1-to-n-communication" :count 0) + for sem = (mp:make-semaphore :name "sem.1-to-n-communication" :count 0) for counter = 0 for producers = (loop for i from 0 below n collect (mp:process-run-function - "sem-1-to-n-consumer" + "sem.1-to-n-consumer" #'(lambda () (loop for i from 0 below m do (mp:wait-on-semaphore sem)) @@ -308,156 +368,50 @@ creating stray processes." do (loop for i from 0 below length do (mp:signal-semaphore sem)) do (mapc #'mp:process-join producers) - always (and (= counter n) - (zerop (mp:semaphore-count sem)) - (zerop (mp:semaphore-wait-count sem)))) - t) - - -;; Mutexes -;;; Date: 12/04/2012 -;;; Non-recursive mutexes should signal an error when they -;;; cannot be relocked. -(test mutex-001-recursive-error - (is-true - (let* ((mutex (mp:make-lock :name 'mutex-001-recursive-error))) - (and - (mp:get-lock mutex) - (eq (mp:lock-owner mutex) mp:*current-process*) - (handler-case - (progn (mp:get-lock mutex) nil) - (error (c) t)) - (mp:giveup-lock mutex) - (null (mp:lock-owner mutex)) - (zerop (mp:lock-count mutex)))))) - -;;; Date: 12/04/2012 -;;; Recursive locks increase the counter. -(test mutex-002-recursive-count - (is-true - (let* ((mutex (mp:make-lock :name 'mutex-002-recursive-count :recursive t))) - (and - (loop for i from 1 upto 10 - always (and (mp:get-lock mutex) - (= (mp:lock-count mutex) i) - (eq (mp:lock-owner mutex) mp:*current-process*))) - (loop for i from 9 downto 0 - always (and (eq (mp:lock-owner mutex) mp:*current-process*) - (mp:giveup-lock mutex) - (= (mp:lock-count mutex) i))) - (null (mp:lock-owner mutex)) - (zerop (mp:lock-count mutex)))))) - - -;;; Date: 12/04/2012 -;;; When multiple threads compete for a mutex, they should -;;; all get the same chance of accessing the resource -;;; -(def-mp-test mutex-003-fairness - (let* ((mutex (mp:make-lock :name 'mutex-001-fairness)) - (nthreads 10) - (count 10) - (counter (* nthreads count)) - (array (make-array count :element-type 'fixnum :initial-element 0))) - (flet ((slave (n) - (loop with continue = t - for i from 1 by 1 - while continue do - (mp:get-lock mutex) - (cond ((plusp counter) - (decf counter) - (setf (aref array n) i)) - (t - (setf continue nil))) - (mp:giveup-lock mutex)))) - ;; Launch all agents. They will be locked - (let ((all-processes - (mp:with-lock (mutex) - (loop for n from 0 below nthreads - collect (mp:process-run-function n #'slave n) - ;; ... and give them some time to block on this mutex - finally (sleep 1))))) - ;; Now they are released and operate. They should all have - ;; the same share of counts. - (loop for p in all-processes - do (mp:process-join p)) - (loop for i from 0 below nthreads - always (= (aref array i) count))))) - t) - -;;; Date: 12/04/2012 -;;; It is possible to kill processes waiting for a lock. We launch a lot of -;;; processes, 50% of which are zombies: they acquire the lock and do not -;;; do anything. These processes are then killed, resulting in the others -;;; doing their job. -;;; -(def-mp-test mutex-004-interruptible - (let* ((mutex (mp:make-lock :name "mutex-003-fairness")) - (nprocesses 20) - (counter 0)) - (flet ((normal-thread () - (mp:with-lock (mutex) - (incf counter))) - (zombie-thread () - (mp:with-lock (mutex) - (loop (sleep 10))))) - (let* ((all-processes (loop for i from 0 below nprocesses - for zombie = (zerop (mod i 2)) - for fn = (if zombie #'zombie-thread #'normal-thread) - collect (cons zombie - (mp:process-run-function - "mutex-003-fairness" - fn)))) - (zombies (mapcar #'cdr (remove-if-not #'car all-processes)))) - (and (zerop counter) ; No proces works because the first one is a zombie - (kill-and-wait zombies) - (progn (sleep 0.2) (= counter (/ nprocesses 2))) - (not (mp:lock-owner mutex)) - t)))) - t) + (is (= counter n)) + (is (zerop (mp:semaphore-count sem))) + (is (zerop (mp:semaphore-wait-count sem))))) ;; Mailbox ;;; Date: 14/04/2012 ;;; Ensure that at creation name and counter are set, and mailbox is empty. -(test mailbox-make-and-counter - (is - (loop with name = "mbox-make-and-counter" - for count from 4 to 63 - for mbox = (mp:make-mailbox :name name :count count) - always (and (eq (mp:mailbox-name mbox) name) - (>= (mp:mailbox-count mbox) count) - (mp:mailbox-empty-p mbox))))) +(test mp.mbox.make-and-counter + (loop with name = "mbox.make-and-counter" + for count from 4 to 63 + for mbox = (mp:make-mailbox :name name :count count) + do (is (eq (mp:mailbox-name mbox) name)) + (is (>= (mp:mailbox-count mbox) count)) + (is (mp:mailbox-empty-p mbox)))) ;;; Date: 14/04/2012 ;;; Ensure that the mailbox works in a nonblocking fashion (when the ;;; number of messages < mailbox size in a single producer and single ;;; consumer setting. We do not need to create new threads for this. -(test mbox-mailbox-nonblocking-io-1-to-1 - (is - (loop with count = 30 - with name = "mbox-mailbox-nonblocking-io-1-to-1" - with mbox = (mp:make-mailbox :name name :count count) - for l from 1 to 10 - for messages = (loop for i from 1 to l - do (mp:mailbox-send mbox i) - collect i) - always - (and (not (mp:mailbox-empty-p mbox)) - (equalp (loop for i from 1 to l - collect (mp:mailbox-read mbox)) - messages) - (mp:mailbox-empty-p mbox))))) +(test-with-timeout mp.mbox.nonblocking-io-1-to-1 + (loop with count = 30 + with name = "mbox.nonblocking-io-1-to-1" + with mbox = (mp:make-mailbox :name name :count count) + for l from 1 to 10 + for messages = (loop for i from 1 to l + do (mp:mailbox-send mbox i) + collect i) + do + (is (not (mp:mailbox-empty-p mbox))) + (is (equalp (loop for i from 1 to l + collect (mp:mailbox-read mbox)) + messages)) + (is (mp:mailbox-empty-p mbox)))) ;;; Date: 14/04/2012 ;;; The mailbox blocks a process when it saturates the write queue. -(def-mp-test mbox-blocks-1-to-1 +(test-with-timeout mp.mbox.blocks-1-to-1 (let* ((flag nil) - (mbox (mp:make-mailbox :name "mbox-signal-one" :count 32)) + (mbox (mp:make-mailbox :name "mbox.blocks-1-to-1" :count 32)) (size (mp:mailbox-count mbox)) (a-process (mp:process-run-function - "mbox-signal-one-process" + "mbox.blocks-1-to-1" #'(lambda () ;; This does not block (loop for i from 1 to size @@ -467,85 +421,83 @@ creating stray processes." (mp:mailbox-send mbox (1+ size)) ;; Now we unblock (setf flag nil))))) - (sleep 0.2) ; give time for all messages to arrive - (and (not (mp:mailbox-empty-p mbox)) ; the queue has messages - (mp:process-active-p a-process) ; the process is active - flag ; and it is blocked - (loop for i from 1 to (1+ size) ; messages arrive in order - always (= i (mp:mailbox-read mbox))) - (null flag) ; and process unblocked - (mp:mailbox-empty-p mbox) - t)) - t) + (sleep 0.2) + (is (not (mp:mailbox-empty-p mbox))) ; the queue has messages + (is (mp:process-active-p a-process)) ; the process is active + (is flag) ; and it is blocked + (is (loop for i from 1 to (1+ size) ; messages arrive in order + always (= i (mp:mailbox-read mbox)))) + (mp:process-join a-process) + (is (null flag)) ; and process unblocked + (is (mp:mailbox-empty-p mbox)))) ;;; Date: 14/04/2012 ;;; N producers and 1 consumer -(def-mp-test mbox-n-to-1-communication +(test-with-timeout mp.mbox.n-to-1-communication (loop with length = 10000 - with mbox = (mp:make-mailbox :name "mbox-n-to-1-communication" :count 128) - for n from 1 to 10 - for m = (round length n) - for messages = (loop for i from 0 below (* n m) collect i) - for producers = (loop for i from 0 below n - do (mp:process-run-function - "mbox-n-to-1-producer" - (let ((proc-no i)) - #'(lambda () - (loop for i from 0 below m - for msg = (+ i (* proc-no m)) - do (mp:mailbox-send mbox msg)))))) - always (and (equalp - (sort (loop for i from 1 to (* n m) - collect (mp:mailbox-read mbox)) - #'<) - messages) - (mp:mailbox-empty-p mbox))) - t) + with mbox = (mp:make-mailbox :name "mbox.n-to-1-communication" :count 128) + for n from 1 to 10 + for m = (round length n) + for messages = (loop for i from 0 below (* n m) collect i) + for producers = (loop for i from 0 below n + do (mp:process-run-function + "mbox.n-to-1-producer" + (let ((proc-no i)) + #'(lambda () + (loop for i from 0 below m + for msg = (+ i (* proc-no m)) + do (mp:mailbox-send mbox msg)))))) + do (is (equalp + (sort (loop for i from 1 to (* n m) + collect (mp:mailbox-read mbox)) + #'<) + messages)) + (is (mp:mailbox-empty-p mbox)))) ;;; Date: 14/04/2012 ;;; 1 producer and N consumer, but they do not block, because the ;;; queue is large enough and pre-filled with messages -(test mbox-1-to-n-non-blocking +(test-with-timeout mp.mbox.1-to-n-non-blocking (loop - for n from 1 to 10 - for m = (round 128 n) - for length = (* n m) - for mbox = (mp:make-mailbox :name "mbox-1-to-n-communication" :count length) - for flags = (make-array length :initial-element nil) - for aux = (loop for i from 0 below length - do (mp:mailbox-send mbox i)) - for producers = (loop for i from 0 below n - do (mp:process-run-function - "mbox-1-to-n-consumer" - #'(lambda () - (loop for i from 0 below m - for msg = (mp:mailbox-read mbox) - do (setf (aref flags msg) t))))) - do (sleep 0.1) - always (and (is (every #'identity flags)) - (is (mp:mailbox-empty-p mbox))))) + for n from 1 to 10 + for m = (round 128 n) + for length = (* n m) + for mbox = (mp:make-mailbox :name "mbox.1-to-n-non-blocking" :count length) + for flags = (make-array length :initial-element nil) + for aux = (loop for i from 0 below length + do (mp:mailbox-send mbox i)) + for consumers = (loop for i from 0 below n + collect (mp:process-run-function + "mbox.1-to-n-consumer" + #'(lambda () + (loop for i from 0 below m + for msg = (mp:mailbox-read mbox) + do (setf (aref flags msg) t))))) + do (mapc #'mp:process-join consumers) + (is (every #'identity flags)) + (is (mp:mailbox-empty-p mbox)))) ;;; Date: 14/04/2012 ;;; 1 producer and N consumers, which block, because the producer ;;; is started _after_ them and is slower. -(test mbox-1-to-n-blocking +(test-with-timeout mp.mbox.1-to-n-blocking (loop for n from 1 to 10 - for m = (round 10000 n) - for length = (* n m) - for mbox = (mp:make-mailbox :name "mbox-1-to-n-communication" :count length) - for flags = (make-array length :initial-element nil) - for producers = (loop for i from 0 below n - do (mp:process-run-function - "mbox-1-to-n-consumer" - #'(lambda () - (loop for i from 0 below m - for msg = (mp:mailbox-read mbox) - do (setf (aref flags msg) t))))) - do (loop for i from 0 below length - do (mp:mailbox-send mbox i)) - do (sleep 0.1) - always (and (is (every #'identity flags)) - (is (mp:mailbox-empty-p mbox))))) + for m = (round 10000 n) + for length = (* n m) + for mbox = (mp:make-mailbox :name "mp.mbox.1-to-n-blocking" :count length) + for flags = (make-array length :initial-element nil) + for consumers = (loop for i from 0 below n + collect (mp:process-run-function + "mbox.1-to-n-consumer" + #'(lambda () + (loop for i from 0 below m + for msg = (mp:mailbox-read mbox) + do (setf (aref flags msg) t))))) + do (loop for i from 0 below length + do (mp:mailbox-send mbox i)) + do (mapc #'mp:process-join consumers) + (is (every #'identity flags)) + (is (mp:mailbox-empty-p mbox)))) ;;; Date: 2016-11-10 @@ -561,19 +513,22 @@ creating stray processes." ;;; HOLDING-LOCK-P verifies, if the current process holds the ;;; lock. ;;; -(test mp-holding-lock-p - (let ((lock (mp:make-lock :name "mp-holding-lock-p" :recursive nil))) +(test-with-timeout mp.mutex.holding-lock-p + (let ((lock (mp:make-lock :name "mutex.holding-lock-p" :recursive nil))) (is-false (mp:holding-lock-p lock)) (mp:with-lock (lock) (is-true (mp:holding-lock-p lock)) (mp:process-run-function - "mp-holding-lock-p" + "mutex.holding-lock-p" #'(lambda () (is-false (mp:holding-lock-p lock))))) (is-false (mp:holding-lock-p lock)) (mp:process-run-function - "mp-holding-lock-p" + "mutex.holding-lock-p" #'(lambda () (is-false (mp:holding-lock-p lock)))))) + +;; Atomics + (ext:with-clean-symbols (test-struct test-class *x*) (defstruct (test-struct :atomic-accessors) (slot1 0) @@ -582,14 +537,13 @@ creating stray processes." ((slot1 :initform 0) (slot2))) (defvar *x*) - ;;; Date: 2018-09-21 ;;; From: Marius Gerbershagen ;;; Description: ;;; ;;; Verifies that atomic-update works correctly. ;;; - (test atomic-update + (test-with-timeout mp.atomics.atomic-update (let* ((n-threads 100) (n-updates 1000) (n-total (* n-threads n-updates))) @@ -614,8 +568,7 @@ creating stray processes." (list (first plist) (1+ (second plist))))) (mp:atomic-update (slot-value object 'slot1) #'1+) - (mp:atomic-update (test-struct-slot1 struct) #'1+) - (sleep 0.00001)))))) + (mp:atomic-update (test-struct-slot1 struct) #'1+)))))) (is (car cons) n-total) (is (cdr cons) n-total) (is (svref vector 1) n-total) @@ -627,14 +580,14 @@ creating stray processes." (eq (slot-value object 'slot2) 1))) (signals error (eval '(mp:compare-and-swap (test-struct-slot2 struct) 1 2)))))) - + ;;; Date: 2018-09-22 ;;; From: Marius Gerbershagen ;;; Description: ;;; ;;; Verifies that atomic-push and atomic-pop work correctly. ;;; - (test atomic-push/pop + (test-with-timeout mp.atomics.atomic-push/pop (let* ((n-threads 100) (n-updates 1000)) (setf *x* nil) @@ -644,21 +597,18 @@ creating stray processes." "" (lambda () (loop for i below n-updates do - (mp:atomic-push i (symbol-value '*x*)) - (sleep 0.00001)) + (mp:atomic-push i (symbol-value '*x*))) (loop repeat (1- n-updates) do - (mp:atomic-pop (symbol-value '*x*)) - (sleep 0.00001)))))) + (mp:atomic-pop (symbol-value '*x*))))))) (is (length *x*) n-threads))) - ;;; Date: 2018-09-29 ;;; From: Marius Gerbershagen ;;; Description: ;;; ;;; Verifies that atomic-incf and atomic-decf work correctly. ;;; - (test atomic-incf/decf + (test-with-timeout mp.atomics.atomic-incf/decf (let* ((n-threads 100) (n-updates 1000) (increment (1+ (random most-positive-fixnum))) @@ -680,8 +630,7 @@ creating stray processes." (mp:atomic-incf (car cons) increment) (mp:atomic-incf (svref vector 1) increment) (mp:atomic-incf (symbol-value '*x*) increment) - (mp:atomic-incf (slot-value object 'slot1) increment) - (sleep 0.00001)))))) + (mp:atomic-incf (slot-value object 'slot1) increment)))))) (is (car cons) final-value) (is (cdr cons) final-value) (is (svref vector 1) final-value) @@ -697,8 +646,7 @@ creating stray processes." (mp:atomic-decf (car cons) increment) (mp:atomic-decf (svref vector 1) increment) (mp:atomic-decf (symbol-value '*x*) increment) - (mp:atomic-decf (slot-value object 'slot1) increment) - (sleep 0.00001)))))) + (mp:atomic-decf (slot-value object 'slot1) increment)))))) (is (car cons) 0) (is (cdr cons) 0) (is (svref vector 1) 0) @@ -712,7 +660,7 @@ creating stray processes." ;;; Verifies that CAS expansion may be removed. ;;; (ext:with-clean-symbols (*obj* foo) - (test defcas/remcas + (test mp.atomics.defcas/remcas (mp:defcas foo (lambda (object old new) (assert (consp object)) (setf (car object) old @@ -730,13 +678,15 @@ creating stray processes." ;;; ;;; Verifies that CAS modifications honor the package locks. ;;; -(test cas-locked-package +(test mp.atomics.cas-locked-package (signals package-error (mp:defcas cl:car (lambda (obj old new) nil))) (signals package-error (mp:remcas 'cl:car)) (finishes (mp:defcas cor (lambda (obj old new) nil))) (finishes (mp:remcas 'cor))) +;; Barriers + ;;; Date: 2020-08-14 ;;; From: Daniel Kochmański ;;; Description: @@ -750,105 +700,91 @@ creating stray processes." (is (= 3 (mp:barrier-count barrier))) (is (= 0 (mp:barrier-arrivers-count barrier))))) -(test mp.barrier.blocking - (let ((barrier (mp:make-barrier 3)) - (before-barrier 0) - (after-barrier 0)) - (labels ((try-barrier () - (mp:process-run-function - "try-barrier" - (lambda () - (incf before-barrier) - (mp:barrier-wait barrier) - (incf after-barrier)))) - (check-barrier (before after arrivers) - (try-barrier) - (sleep 0.01) - (is (= before before-barrier)) - (is (= after after-barrier)) - (is (= arrivers (mp:barrier-arrivers-count barrier))))) - (check-barrier 1 0 1) - (check-barrier 2 0 2) - (check-barrier 3 3 0) - (check-barrier 4 3 1) - (check-barrier 5 3 2) - (check-barrier 6 6 0)))) - -(test mp.barrier.unblock-1 - (let ((barrier (mp:make-barrier 3)) - (before-barrier 0) - (after-barrier 0)) - (labels ((try-barrier () - (mp:process-run-function - "try-barrier" - (lambda () - (incf before-barrier) - (mp:barrier-wait barrier) - (incf after-barrier)))) - (check-barrier (before after arrivers) - (try-barrier) - (sleep 0.01) - (is (= before before-barrier)) - (is (= after after-barrier)) - (is (= arrivers (mp:barrier-arrivers-count barrier)))) - (wake-barrier () - (mp:barrier-unblock barrier :kill-waiting nil)) - (kill-barrier () - (mp:barrier-unblock barrier :kill-waiting t))) - (check-barrier 1 0 1) - (check-barrier 2 0 2) - (wake-barrier) - (sleep 0.01) - (check-barrier 3 2 1) - (check-barrier 4 2 2) - (kill-barrier) - (sleep 0.01) - (check-barrier 5 2 1)))) - -(test mp.barrier.unblock-2 - (let ((barrier (mp:make-barrier 3)) - (before-barrier 0) - (after-barrier 0)) - (labels ((try-barrier () - (mp:process-run-function - "try-barrier" - (lambda () - (incf before-barrier) - (mp:barrier-wait barrier) - (incf after-barrier)))) - (check-barrier (before after arrivers) - (try-barrier) - (sleep 0.01) - (is (= before before-barrier)) - (is (= after after-barrier)) - (is (= arrivers (mp:barrier-arrivers-count barrier))))) - (mp:barrier-unblock barrier :disable t) - (check-barrier 1 1 0) - (check-barrier 2 2 0) - (check-barrier 3 3 0) - (check-barrier 4 4 0)))) - -(test mp.barrier.unblock-3 - (let ((barrier (mp:make-barrier 3)) - (before-barrier 0) - (after-barrier 0)) - (labels ((try-barrier () - (mp:process-run-function +(let (barrier before-barrier after-barrier all-processes mutex) + (labels ((try-barrier () + (push (mp:process-run-function "try-barrier" (lambda () - (incf before-barrier) + (mp:with-lock (mutex) + (incf before-barrier)) (mp:barrier-wait barrier) - (incf after-barrier)))) - (check-barrier (before after arrivers) - (try-barrier) - (sleep 0.01) - (is (= before before-barrier)) - (is (= after after-barrier)) - (is (= arrivers (mp:barrier-arrivers-count barrier))))) - (mp:barrier-unblock barrier :reset-count 4) - (check-barrier 1 0 1) - (check-barrier 2 0 2) - (check-barrier 3 0 3) - (check-barrier 4 4 0) - (check-barrier 5 4 1) - (check-barrier 6 4 2)))) + (mp:with-lock (mutex) + (incf after-barrier)))) + all-processes)) + (check-barrier (before after arrivers) + (try-barrier) + (loop until (= before before-barrier)) + (loop until (= after after-barrier)) + (loop until (= arrivers (mp:barrier-arrivers-count barrier)))) + (wake-barrier () + (mp:barrier-unblock barrier :kill-waiting nil)) + (kill-barrier () + (mp:barrier-unblock barrier :kill-waiting t))) + + (test-with-timeout mp.barrier.blocking + (setf barrier (mp:make-barrier 3) + before-barrier 0 + after-barrier 0 + all-processes nil + mutex (mp:make-lock)) + (check-barrier 1 0 1) + (check-barrier 2 0 2) + (check-barrier 3 3 0) + (check-barrier 4 3 1) + (check-barrier 5 3 2) + (check-barrier 6 6 0) + ;; clean up + (mapc #'mp:process-join all-processes)) + + (test-with-timeout mp.barrier.unblock-1 + (setf barrier (mp:make-barrier 3) + before-barrier 0 + after-barrier 0 + all-processes nil + mutex (mp:make-lock)) + (check-barrier 1 0 1) + (check-barrier 2 0 2) + (wake-barrier) + (mapc #'mp:process-join all-processes) + (check-barrier 3 2 1) + (check-barrier 4 2 2) + (kill-barrier) + (mapc #'mp:process-join all-processes) + (setf all-processes nil) + (check-barrier 5 2 1) + ;; clean up + (mapc #'mp:process-kill all-processes) + (mapc #'mp:process-join all-processes)) + + (test-with-timeout mp.barrier.unblock-2 + (setf barrier (mp:make-barrier 3) + before-barrier 0 + after-barrier 0 + all-processes nil + mutex (mp:make-lock)) + (mp:barrier-unblock barrier :disable t) + (check-barrier 1 1 0) + (check-barrier 2 2 0) + (check-barrier 3 3 0) + (check-barrier 4 4 0) + ;; clean up + (mapc #'mp:process-join all-processes)) + + (test-with-timeout mp.barrier.unblock-3 + (setf barrier (mp:make-barrier 3) + before-barrier 0 + after-barrier 0 + all-processes nil + mutex (mp:make-lock)) + (mp:barrier-unblock barrier :reset-count 4) + (check-barrier 1 0 1) + (check-barrier 2 0 2) + (check-barrier 3 0 3) + (check-barrier 4 4 0) + (mapc #'mp:process-join all-processes) + (setf all-processes nil) + (check-barrier 5 4 1) + (check-barrier 6 4 2) + ;; clean up + (mapc #'mp:process-kill all-processes) + (mapc #'mp:process-join all-processes)))) -- GitLab From 61098a10095236b15821d631953c645b23a72c22 Mon Sep 17 00:00:00 2001 From: Marius Gerbershagen Date: Sun, 23 Aug 2020 18:33:20 +0200 Subject: [PATCH 04/13] tests: multiprocessing: add tests for condition variables --- src/tests/normal-tests/multiprocessing.lsp | 135 +++++++++++++++++++++ 1 file changed, 135 insertions(+) diff --git a/src/tests/normal-tests/multiprocessing.lsp b/src/tests/normal-tests/multiprocessing.lsp index d0d356075..4a34b9c41 100644 --- a/src/tests/normal-tests/multiprocessing.lsp +++ b/src/tests/normal-tests/multiprocessing.lsp @@ -788,3 +788,138 @@ ;; clean up (mapc #'mp:process-kill all-processes) (mapc #'mp:process-join all-processes)))) + + +;; Condition variables +(test-with-timeout mp.cv.wait-and-signal + (let* ((mutex (mp:make-lock :name "cv.wait-and-signal")) + (cv (mp:make-condition-variable)) + (counter-before 0) + (counter-after 0) + (n-threads 10) + (waiting-processes + (loop repeat n-threads collect + (mp:process-run-function + "cv.wait-and-signal" + (lambda () + (mp:get-lock mutex) + (incf counter-before) + (mp:condition-variable-wait cv mutex) + (when (not (eq (mp:lock-owner mutex) mp:*current-process*)) + (error "Wrong lock owner")) + (incf counter-after) + (mp:giveup-lock mutex)))))) + (loop until (mp:with-lock (mutex) (= counter-before n-threads))) + ;; signal wakes up _at least_ one thread each time it is called (for + ;; efficiency reasons, the posix specification explicitely allows waking + ;; up more than one thread) + (loop with n-signaled = 0 ; minimum number of threads that have woken up + until (= n-signaled n-threads) + do (mp:get-lock mutex) + (is (eq (mp:lock-owner mutex) mp:*current-process*)) + (mp:condition-variable-signal cv) + (mp:giveup-lock mutex) + (is (not (eq (mp:lock-owner mutex) mp:*current-process*))) + (loop until (mp:with-lock (mutex) + (and (> counter-after n-signaled) + (setf n-signaled counter-after))))))) + +(test-with-timeout mp.cv.wait-and-broadcast + (let* ((mutex (mp:make-lock :name "cv.wait-and-broadcast")) + (cv (mp:make-condition-variable)) + (counter-before 0) + (counter-after 0) + (wakeup nil) + (n-threads 10) + (waiting-processes + (loop repeat n-threads collect + (mp:process-run-function + "cv.wait-and-broadcast" + (lambda () + (mp:get-lock mutex) + (incf counter-before) + (loop until wakeup ; ignore spurious wakeups (allowed by posix) + do (mp:condition-variable-wait cv mutex)) + (when (not (eq (mp:lock-owner mutex) mp:*current-process*)) + (error "Wrong lock owner")) + (incf counter-after) + (mp:giveup-lock mutex)))))) + (loop until (mp:with-lock (mutex) (= counter-before n-threads))) + ;; broadcast wakes up all threads + (mp:get-lock mutex) + (is (eq (mp:lock-owner mutex) mp:*current-process*)) + (setf wakeup t) + (mp:condition-variable-broadcast cv) + (mp:giveup-lock mutex) + (is (not (eq (mp:lock-owner mutex) mp:*current-process*))) + (mapc #'mp:process-join waiting-processes) + (is (= counter-after n-threads)))) + +(test-with-timeout (mp.cv.timedwait-timeout 30) ; whole test times out after 30 seconds + (let* ((mutex (mp:make-lock :name "cv.timedwait-timeout")) + (cv (mp:make-condition-variable)) + (flag 0) + (waiting-process + (mp:process-run-function + "cv.timedwait-timeout" + (lambda () + (mp:get-lock mutex) + (setf flag 1) + ;; condition variable times out after 1 second, before + ;; whole test times out + (mp:condition-variable-timedwait cv mutex 1) + (when (not (eq (mp:lock-owner mutex) mp:*current-process*)) + (error "Wrong lock owner")) + (setf flag 2) + (mp:giveup-lock mutex))))) + (loop until (mp:with-lock (mutex) (/= flag 0))) + (mp:process-join waiting-process) + (is (null (mp:lock-owner mutex))) + (is (= flag 2)))) + +(test-with-timeout (mp.cv.timedwait-signal 30) ; whole test times out after 30 seconds + (let* ((mutex (mp:make-lock :name "cv.timedwait-signal")) + (cv (mp:make-condition-variable)) + (flag 0) + (waiting-process + (mp:process-run-function + "cv.timedwait-signal" + (lambda () + (mp:get-lock mutex) + (setf flag 1) + ;; condition variable times out after 60 seconds, after + ;; whole test times out + (mp:condition-variable-timedwait cv mutex 60) + (when (not (eq (mp:lock-owner mutex) mp:*current-process*)) + (error "Wrong lock owner")) + (setf flag 2) + (mp:giveup-lock mutex))))) + (loop until (mp:with-lock (mutex) (/= flag 0))) + (mp:get-lock mutex) + (is (eq (mp:lock-owner mutex) mp:*current-process*)) + (mp:condition-variable-signal cv) + (mp:giveup-lock mutex) + (is (not (eq (mp:lock-owner mutex) mp:*current-process*))) + (mp:process-join waiting-process) + (is (= flag 2)))) + +(test-with-timeout mp.cv.interruptible + (let* ((mutex (mp:make-lock :name "cv.interruptible")) + (cv (mp:make-condition-variable)) + (flag 0) + (waiting-process + (mp:process-run-function + "cv.interruptible" + (lambda () + (mp:get-lock mutex) + (setf flag 1) + (mp:condition-variable-wait cv mutex) + (setf flag 2) + (error "We shouldn't have gotten to this point"))))) + (loop until (mp:with-lock (mutex) (/= flag 0))) + (sleep 0.1) + (is (mp:process-kill waiting-process)) + (mp:process-join waiting-process) + (is (null (mp:lock-owner mutex))) + (is (= flag 1)))) + -- GitLab From 2dce0dabdb37124aa816c80944275fb24f9e29dd Mon Sep 17 00:00:00 2001 From: Marius Gerbershagen Date: Mon, 24 Aug 2020 20:51:51 +0200 Subject: [PATCH 05/13] doc: multithreading: clarify restrictions for mutex functions --- src/doc/manual/extensions/mp_ref_cv.txi | 29 +++++++++++++++----- src/doc/manual/extensions/mp_ref_mutex.txi | 18 +++++++----- src/doc/manual/extensions/mp_ref_process.txi | 6 ++++ 3 files changed, 39 insertions(+), 14 deletions(-) diff --git a/src/doc/manual/extensions/mp_ref_cv.txi b/src/doc/manual/extensions/mp_ref_cv.txi index 659c8e0e4..0618e688f 100644 --- a/src/doc/manual/extensions/mp_ref_cv.txi +++ b/src/doc/manual/extensions/mp_ref_cv.txi @@ -26,9 +26,23 @@ Creates a condition variable. @end deftypefun @defun mp:condition-variable-wait cv lock -Release @var{lock} and suspend thread until condition -@coderef{mp:condition-variable-signal} is called on @var{cv}. When thread -resumes re-aquire @var{lock}. +Release @var{lock} and suspend thread until +@coderef{mp:condition-variable-signal} or +@coderef{mp:condition-variable-broadcast} is called on @var{cv}. When +thread resumes re-aquire @var{lock}. May signal an error if @var{lock} +is not owned by the current thread. + +@strong{Note:} In some circumstances, the thread may wake up even if no +call to @coderef{mp:condition-variable-signal} or +@coderef{mp:condition-variable-broadcast} has happened. It is +recommended to check for the condition that triggered the wait in a loop +around any @code{mp:condition-variable-wait} call. + +@strong{Note:} While the condition variable is blocked waiting for a +signal or broadcast event, calling @code{mp:condition-variable-wait} +from further threads must be done using the same mutex as that used by +the threads that are already waiting on this condition variable. The +behaviour is undefined if this constraint is violated. @end defun @@ -40,7 +54,8 @@ resumes re-aquire @var{lock}. @defun mp:condition-variable-timedwait cv lock seconds @coderef{mp:condition-variable-wait} which timeouts after @var{seconds} -seconds. +seconds. May signal an error if @var{lock} is not owned by the current +thread. @end defun @@ -51,8 +66,8 @@ seconds. @end deftypefun @defun mp:condition-variable-signal cv -Signal @var{cv} (wakes up only one waiter). After signal, signaling -thread keeps lock, waking thread goes on the queue waiting for the lock. +Wake up at least one of the waiters of @var{cv}. Usually, this will wake +up only a single thread, but it may also wake up multiple threads. See @coderef{mp:condition-variable-wait}. @end defun @@ -65,7 +80,7 @@ See @coderef{mp:condition-variable-wait}. @end deftypefun @defun mp:condition-variable-broadcast cv -Signal @var{cv} (wakes up all waiters). +Wake up all waiters of @var{cv}. See @coderef{mp:condition-variable-wait}. @end defun diff --git a/src/doc/manual/extensions/mp_ref_mutex.txi b/src/doc/manual/extensions/mp_ref_mutex.txi index 4e62dfa86..54710cea5 100644 --- a/src/doc/manual/extensions/mp_ref_mutex.txi +++ b/src/doc/manual/extensions/mp_ref_mutex.txi @@ -67,8 +67,9 @@ Returns the name of @var{lock}. @end deftypefun @defun mp:lock-owner lock -Returns the process owning @var{lock}. For testing whether the current -thread is holding a lock see @coderef{mp:holding-lock-p}. +Returns the process owning @var{lock} or @code{nil} if the mutex is not +owned by any process. For testing whether the current thread +is holding a lock see @coderef{mp:holding-lock-p}. @end defun @@ -79,7 +80,7 @@ thread is holding a lock see @coderef{mp:holding-lock-p}. @end deftypefun @defun mp:lock-count lock -Returns number of processes waiting for @var{lock}. +Returns number of times @var{lock} has been locked. @end defun @@ -101,7 +102,8 @@ returns @code{ECL_NIL}, otherwise @code{ECL_T}. Tries to acquire a lock. @var{wait} indicates whether function should block or give up if @var{lock} is already taken. If @var{wait} is @code{nil} and @var{lock} can't be acquired returns -@code{nil}. Succesful operation returns @code{t}. +@code{nil}. Succesful operation returns @code{t}. Will signal an error +if the mutex is non-recursive and current thread already owns the lock. @end defun @@ -112,15 +114,17 @@ block or give up if @var{lock} is already taken. If @var{wait} is @end deftypefun @defun mp:giveup-lock lock -Releases @var{lock}. +Releases @var{lock} and returns @code{t}. May signal an error if the +lock is not owned by the current thread. @end defun @lspdef mp:with-lock @defmac mp:with-lock (lock-form) &body body -Acquire lock for the dynamic scope of @var{body}, which is executed -with the lock held by current thread. Returns the values of body. +Acquire lock for the dynamic scope of @var{body}, which is executed with +the lock held by current thread. Returns the values of +body. @c (lock-form &key wait-p timeout) diff --git a/src/doc/manual/extensions/mp_ref_process.txi b/src/doc/manual/extensions/mp_ref_process.txi index 1cc61e7c9..3857d1a99 100644 --- a/src/doc/manual/extensions/mp_ref_process.txi +++ b/src/doc/manual/extensions/mp_ref_process.txi @@ -58,6 +58,12 @@ one has to consider: @item Reentrancy: Functions, which usually are not called recursively can be re-entered during execution of the interrupt. @item Stack unwinding: Non-local jumps like @code{throw} or @code{return-from} in the interrupting code will handle @code{unwind-protect} forms like usual. However, the cleanup forms of an @code{unwind-protect} can still be interrupted. In that case the execution flow will jump to the next @code{unwind-protect}. @end itemize +Note also that no guarantees are made that functions from the Common +Lisp standard or ECL extensions are interrupt safe (although most of +them will be). In particular, the compiler (@code{compile} and +@code{compile-file} functions), FFI calls and aquire/release functions +for multithreading synchronization objects like mutexes or condition +variables should not be interrupted by @coderef{mp:interrupt-process}. @exindex Process interruption Example: -- GitLab From 579a8d4380a566415c2f020cfc8b4f95eb67c040 Mon Sep 17 00:00:00 2001 From: Marius Gerbershagen Date: Sat, 29 Aug 2020 19:51:58 +0200 Subject: [PATCH 06/13] run-program: simplify with-process-lock We already have a race condition between mp:get-lock and mp:holding-lock-p, there is no point in trying to make sure the lock is released at all costs during an interrupt. --- src/lsp/process.lsp | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/lsp/process.lsp b/src/lsp/process.lsp index 36215c2b0..c9392b61e 100644 --- a/src/lsp/process.lsp +++ b/src/lsp/process.lsp @@ -12,12 +12,11 @@ (ext:with-unique-names (lock wait-p) `(let ((,lock (external-process-%lock ,process)) (,wait-p ,wait)) - (mp:without-interrupts - (unwind-protect (mp::with-restored-interrupts - (when (mp:get-lock ,lock ,wait-p) - (locally ,@body))) - (when (mp:holding-lock-p ,lock) - (mp:giveup-lock ,lock)))))) + (when (mp:get-lock ,lock ,wait-p) + (mp:without-interrupts + (unwind-protect (mp::with-restored-interrupts + (locally ,@body)) + (mp:giveup-lock ,lock)))))) #-threads `(progn ,@body)) (defstruct (external-process (:constructor make-external-process ())) -- GitLab From b332f2c592c1ebae2fbfdc30e407ccb29b3f05a2 Mon Sep 17 00:00:00 2001 From: Marius Gerbershagen Date: Sat, 5 Sep 2020 20:53:00 +0200 Subject: [PATCH 07/13] multithreading: implement barriers using native mutexes --- src/c/alloc_2.d | 15 ++-- src/c/threads/barrier.d | 194 +++++++++++++++++++++------------------- src/h/object.h | 13 +-- 3 files changed, 119 insertions(+), 103 deletions(-) diff --git a/src/c/alloc_2.d b/src/c/alloc_2.d index 8957586b3..a36428c9c 100644 --- a/src/c/alloc_2.d +++ b/src/c/alloc_2.d @@ -463,8 +463,6 @@ cl_object_mark_proc(void *addr, struct GC_ms_entry *msp, struct GC_ms_entry *msl MAYBE_MARK(o->semaphore.name); break; case t_barrier: - MAYBE_MARK(o->barrier.queue_list); - MAYBE_MARK(o->barrier.queue_spinlock); MAYBE_MARK(o->barrier.name); break; case t_mailbox: @@ -1030,9 +1028,7 @@ init_alloc(void) to_bitmap(&o, &(o.semaphore.queue_list)) | to_bitmap(&o, &(o.semaphore.queue_spinlock)); type_info[t_barrier].descriptor = - to_bitmap(&o, &(o.barrier.name)) | - to_bitmap(&o, &(o.barrier.queue_list)) | - to_bitmap(&o, &(o.barrier.queue_spinlock)); + to_bitmap(&o, &(o.barrier.name)); type_info[t_mailbox].descriptor = to_bitmap(&o, &(o.mailbox.name)) | to_bitmap(&o, &(o.mailbox.data)) | @@ -1127,6 +1123,14 @@ standard_finalizer(cl_object o) ecl_enable_interrupts_env(the_env); break; } + case t_barrier: { + const cl_env_ptr the_env = ecl_process_env(); + ecl_disable_interrupts_env(the_env); + ecl_mutex_destroy(&o->barrier.mutex); + ecl_cond_var_destroy(&o->barrier.cv); + ecl_enable_interrupts_env(the_env); + break; + } # ifdef ECL_RWLOCK case t_rwlock: { const cl_env_ptr the_env = ecl_process_env(); @@ -1176,6 +1180,7 @@ register_finalizer(cl_object o, void *finalized_object, #if defined(ECL_THREADS) case t_lock: case t_condition_variable: + case t_barrier: # if defined(ECL_RWLOCK) case t_rwlock: # endif diff --git a/src/c/threads/barrier.d b/src/c/threads/barrier.d index c5512d83c..3aaaf39d3 100755 --- a/src/c/threads/barrier.d +++ b/src/c/threads/barrier.d @@ -5,6 +5,7 @@ * barrier.d - wait barriers * * Copyright (c) 2012 Juan Jose Garcia Ripoll + * Copyright (c) 2020 Marius Gerbershagen * * See file 'LICENSE' for the copyright details. * @@ -13,21 +14,21 @@ #include #include -static ECL_INLINE void -FEerror_not_a_barrier(cl_object barrier) -{ - FEwrong_type_argument(@'mp::barrier', barrier); -} - cl_object ecl_make_barrier(cl_object name, cl_index count) { + cl_env_ptr env = ecl_process_env(); cl_object output = ecl_alloc_object(t_barrier); + output->barrier.disabled = FALSE; + output->barrier.wakeup = 0; output->barrier.name = name; - output->barrier.arrivers_count = count; + output->barrier.arrivers_count = 0; output->barrier.count = count; - output->barrier.queue_list = ECL_NIL; - output->barrier.queue_spinlock = ECL_NIL; + ecl_disable_interrupts_env(env); + ecl_cond_var_init(&output->barrier.cv); + ecl_mutex_init(&output->barrier.mutex, FALSE); + ecl_set_finalizer_unprotected(output, ECL_T); + ecl_enable_interrupts_env(env); return output; } @@ -43,7 +44,7 @@ mp_barrier_name(cl_object barrier) { cl_env_ptr env = ecl_process_env(); unlikely_if (ecl_t_of(barrier) != t_barrier) { - FEerror_not_a_barrier(barrier); + FEwrong_type_only_arg(@[mp::barrier-name], barrier, @[mp::barrier]); } ecl_return1(env, barrier->barrier.name); } @@ -53,7 +54,7 @@ mp_barrier_count(cl_object barrier) { cl_env_ptr env = ecl_process_env(); unlikely_if (ecl_t_of(barrier) != t_barrier) { - FEerror_not_a_barrier(barrier); + FEwrong_type_only_arg(@[mp::barrier-count], barrier, @[mp::barrier]); } ecl_return1(env, ecl_make_fixnum(barrier->barrier.count)); } @@ -61,104 +62,111 @@ mp_barrier_count(cl_object barrier) cl_object mp_barrier_arrivers_count(cl_object barrier) { - cl_fixnum arrivers, count; cl_env_ptr env = ecl_process_env(); unlikely_if (ecl_t_of(barrier) != t_barrier) { - FEerror_not_a_barrier(barrier); + FEwrong_type_only_arg(@[mp::barrier-arrivers_count], barrier, @[mp::barrier]); } - arrivers = barrier->barrier.arrivers_count; - count = barrier->barrier.count; - if (arrivers < 0) - arrivers = 0; /* Disabled barrier */ - else - arrivers = count - arrivers; - ecl_return1(env, ecl_make_fixnum(arrivers)); + ecl_return1(env, ecl_make_fixnum(barrier->barrier.arrivers_count)); } +/* INV: locking the mutex in mp_barrier_unblock and mp_barrier_wait + * will always succeed since the functions are not reentrant and only + * lock/unlock the mutex while interrupts are disabled, therefore + * deadlocks can't happen. */ + @(defun mp::barrier-unblock (barrier &key reset_count disable kill_waiting) - int ping_flags = ECL_WAKEUP_RESET_FLAG | ECL_WAKEUP_ALL; - int kill_flags = ECL_WAKEUP_RESET_FLAG | ECL_WAKEUP_KILL | ECL_WAKEUP_ALL; @ unlikely_if (ecl_t_of(barrier) != t_barrier) { - FEerror_not_a_barrier(barrier); + FEwrong_type_nth_arg(@[mp::barrier-unblock], 1, barrier, @[mp::barrier]); + } + ecl_disable_interrupts_env(the_env); + AGAIN: + ecl_mutex_lock(&barrier->barrier.mutex); + if (barrier->barrier.wakeup) { + /* we are currently waking up blocked threads; loop until all + * threads have woken up */ + ecl_mutex_unlock(&barrier->barrier.mutex); + goto AGAIN; } - if (!Null(reset_count)) + if (!Null(reset_count)) { barrier->barrier.count = fixnnint(reset_count); - if (!Null(disable)) - barrier->barrier.arrivers_count = -1; - else - barrier->barrier.arrivers_count = barrier->barrier.count; - ecl_wakeup_waiters(the_env, barrier, - Null(kill_waiting)? ping_flags : kill_flags); + } + if (!Null(disable)) { + barrier->barrier.disabled = TRUE; + } else { + barrier->barrier.disabled = FALSE; + } + if (barrier->barrier.arrivers_count > 0) { + if (!Null(kill_waiting)) { + barrier->barrier.wakeup = ECL_BARRIER_WAKEUP_KILL; + } else { + barrier->barrier.wakeup = ECL_BARRIER_WAKEUP_NORMAL; + } + ecl_cond_var_broadcast(&barrier->barrier.cv); + } + ecl_mutex_unlock(&barrier->barrier.mutex); + ecl_enable_interrupts_env(the_env); @(return); @) -static cl_object -barrier_wait_condition(cl_env_ptr env, cl_object barrier) -{ - /* We were signaled */ - if (env->own_process->process.woken_up != ECL_NIL) - return ECL_T; - /* Disabled barrier */ - else if (barrier->barrier.arrivers_count < 0) - return ECL_T; - else - return ECL_NIL; -} - -static cl_fixnum -decrement_counter(cl_fixnum *counter) -{ - /* The logic is as follows: - * - If the counter is negative, we abort. This is a way of - * disabling the counter. - * - Otherwise, we decrease the counter only if it is positive - * - If the counter is currently zero, then we block. This - * situation implies that some other thread is unblocking. - */ - cl_fixnum c; - do { - c = *counter; - if (c < 0) { - return c; - } else if (c > 0) { - if (AO_compare_and_swap_full((AO_t*)counter, - (AO_t)c, (AO_t)(c-1))) - return c; - } - } while (1); -} - cl_object -mp_barrier_wait(cl_object barrier) -{ - cl_object output; - cl_fixnum counter; +mp_barrier_wait(cl_object barrier) { cl_env_ptr the_env = ecl_process_env(); - + volatile int wakeup = 0; unlikely_if (ecl_t_of(barrier) != t_barrier) { - FEerror_not_a_barrier(barrier); + FEwrong_type_only_arg(@[mp::barrier-wait], barrier, @[mp::barrier]); } - ecl_disable_interrupts_env(the_env); - counter = decrement_counter(&barrier->barrier.arrivers_count); - if (counter == 1) { - print_lock("barrier %p saturated", barrier, barrier); - /* There are (count-1) threads in the queue and we - * are the last one. We thus unblock all threads and - * proceed. */ - ecl_enable_interrupts_env(the_env); - mp_barrier_unblock(1, barrier); - output = @':unblocked'; - } else if (counter > 1) { - print_lock("barrier %p waiting", barrier, barrier); - ecl_enable_interrupts_env(the_env); - ecl_wait_on(the_env, barrier_wait_condition, barrier); - output = ECL_T; - } else { - print_lock("barrier %p pass-through", barrier, barrier); - ecl_enable_interrupts_env(the_env); - /* Barrier disabled */ - output = ECL_NIL; + ecl_bds_bind(the_env, ECL_INTERRUPTS_ENABLED, ECL_NIL); + /* check if the barrier is disabled */ + do { + ecl_mutex_lock(&barrier->barrier.mutex); + if (barrier->barrier.disabled) { + ecl_mutex_unlock(&barrier->barrier.mutex); + ecl_bds_unwind1(the_env); + ecl_check_pending_interrupts(the_env); + return ECL_NIL; + } + if (barrier->barrier.wakeup) { + /* We are currently waking up blocked threads; loop until all threads have + * woken up. */ + ecl_mutex_unlock(&barrier->barrier.mutex); + } else { + break; + } + } while(1); + /* check if we have reached the maximum count */ + if ((barrier->barrier.arrivers_count+1) == barrier->barrier.count) { + if (barrier->barrier.arrivers_count > 0) { + barrier->barrier.wakeup = ECL_BARRIER_WAKEUP_NORMAL; + ecl_cond_var_broadcast(&barrier->barrier.cv); + } + ecl_mutex_unlock(&barrier->barrier.mutex); + ecl_bds_unwind1(the_env); + ecl_check_pending_interrupts(the_env); + return @':unblocked'; } - return output; + /* barrier is neither disabled nor unblocked, start waiting */ + barrier->barrier.arrivers_count++; + ECL_UNWIND_PROTECT_BEGIN(the_env) { + do { + ecl_setq(the_env, ECL_INTERRUPTS_ENABLED, ECL_T); + ecl_cond_var_wait(&barrier->barrier.cv, &barrier->barrier.mutex); + ecl_setq(the_env, ECL_INTERRUPTS_ENABLED, ECL_NIL); + } while(!barrier->barrier.wakeup); + wakeup = barrier->barrier.wakeup; + if (barrier->barrier.arrivers_count - 1 == 0) { + /* we are the last thread to wake up, reset the barrier */ + barrier->barrier.wakeup = 0; + } + } ECL_UNWIND_PROTECT_THREAD_SAFE_EXIT { + --barrier->barrier.arrivers_count; + ecl_mutex_unlock(&barrier->barrier.mutex); + if (wakeup == ECL_BARRIER_WAKEUP_KILL) { + mp_exit_process(); + } + } ECL_UNWIND_PROTECT_THREAD_SAFE_END; + ecl_bds_unwind1(the_env); + ecl_check_pending_interrupts(the_env); + return ECL_T; } + diff --git a/src/h/object.h b/src/h/object.h index 11fe20fc2..df1bd271d 100644 --- a/src/h/object.h +++ b/src/h/object.h @@ -993,13 +993,16 @@ struct ecl_semaphore { cl_fixnum counter; }; +#define ECL_BARRIER_WAKEUP_NORMAL 1 +#define ECL_BARRIER_WAKEUP_KILL 2 + struct ecl_barrier { - _ECL_HDR; - cl_object queue_list; - cl_object queue_spinlock; + _ECL_HDR2(disabled,wakeup); cl_object name; - cl_fixnum count; - cl_fixnum arrivers_count; + cl_index count; + cl_index arrivers_count; + ecl_mutex_t mutex; + ecl_cond_var_t cv; }; struct ecl_lock { -- GitLab From 968083738afd060e69f6f10f6a91a6ad4666125f Mon Sep 17 00:00:00 2001 From: Marius Gerbershagen Date: Sun, 6 Sep 2020 21:13:09 +0200 Subject: [PATCH 08/13] multithreading: implement semaphores using native mutexes --- src/c/alloc_2.d | 15 ++++-- src/c/threads/semaphore.d | 111 +++++++++++++++++++------------------- src/h/object.h | 5 +- 3 files changed, 70 insertions(+), 61 deletions(-) diff --git a/src/c/alloc_2.d b/src/c/alloc_2.d index a36428c9c..144748e6c 100644 --- a/src/c/alloc_2.d +++ b/src/c/alloc_2.d @@ -458,8 +458,6 @@ cl_object_mark_proc(void *addr, struct GC_ms_entry *msp, struct GC_ms_entry *msl break; # endif case t_semaphore: - MAYBE_MARK(o->semaphore.queue_list); - MAYBE_MARK(o->semaphore.queue_spinlock); MAYBE_MARK(o->semaphore.name); break; case t_barrier: @@ -1024,9 +1022,7 @@ init_alloc(void) # endif type_info[t_condition_variable].descriptor = 0; type_info[t_semaphore].descriptor = - to_bitmap(&o, &(o.semaphore.name)) | - to_bitmap(&o, &(o.semaphore.queue_list)) | - to_bitmap(&o, &(o.semaphore.queue_spinlock)); + to_bitmap(&o, &(o.semaphore.name)); type_info[t_barrier].descriptor = to_bitmap(&o, &(o.barrier.name)); type_info[t_mailbox].descriptor = @@ -1131,6 +1127,14 @@ standard_finalizer(cl_object o) ecl_enable_interrupts_env(the_env); break; } + case t_semaphore: { + const cl_env_ptr the_env = ecl_process_env(); + ecl_disable_interrupts_env(the_env); + ecl_mutex_destroy(&o->semaphore.mutex); + ecl_cond_var_destroy(&o->semaphore.cv); + ecl_enable_interrupts_env(the_env); + break; + } # ifdef ECL_RWLOCK case t_rwlock: { const cl_env_ptr the_env = ecl_process_env(); @@ -1181,6 +1185,7 @@ register_finalizer(cl_object o, void *finalized_object, case t_lock: case t_condition_variable: case t_barrier: + case t_semaphore: # if defined(ECL_RWLOCK) case t_rwlock: # endif diff --git a/src/c/threads/semaphore.d b/src/c/threads/semaphore.d index 8811be3f0..86c191ca8 100644 --- a/src/c/threads/semaphore.d +++ b/src/c/threads/semaphore.d @@ -5,33 +5,28 @@ * semaphore.d - POSIX-like semaphores * * Copyright (c) 2011 Juan Jose Garcia Ripoll + * Copyright (c) 2020 Marius Gerbershagen * * See file 'LICENSE' for the copyright details. * */ -#define AO_ASSUME_WINDOWS98 /* We need this for CAS */ #include #include -#if !defined(AO_HAVE_fetch_and_add_full) -#error "Cannot implement semaphores without AO_fetch_and_add_full" -#endif - -static ECL_INLINE void -FEerror_not_a_semaphore(cl_object semaphore) -{ - FEwrong_type_argument(@'mp::semaphore', semaphore); -} - cl_object ecl_make_semaphore(cl_object name, cl_fixnum count) { + cl_env_ptr env = ecl_process_env(); cl_object output = ecl_alloc_object(t_semaphore); output->semaphore.name = name; output->semaphore.counter = count; - output->semaphore.queue_list = ECL_NIL; - output->semaphore.queue_spinlock = ECL_NIL; + output->semaphore.wait_count = 0; + ecl_disable_interrupts_env(env); + ecl_cond_var_init(&output->semaphore.cv); + ecl_mutex_init(&output->semaphore.mutex, FALSE); + ecl_set_finalizer_unprotected(output, ECL_T); + ecl_enable_interrupts_env(env); return output; } @@ -45,7 +40,7 @@ mp_semaphore_name(cl_object semaphore) { cl_env_ptr env = ecl_process_env(); unlikely_if (ecl_t_of(semaphore) != t_semaphore) { - FEerror_not_a_semaphore(semaphore); + FEwrong_type_only_arg(@[mp::semaphore-name], semaphore, @[mp::semaphore]); } ecl_return1(env, semaphore->semaphore.name); } @@ -55,7 +50,7 @@ mp_semaphore_count(cl_object semaphore) { cl_env_ptr env = ecl_process_env(); unlikely_if (ecl_t_of(semaphore) != t_semaphore) { - FEerror_not_a_semaphore(semaphore); + FEwrong_type_only_arg(@[mp::semaphore-count], semaphore, @[mp::semaphore]); } ecl_return1(env, ecl_make_fixnum(semaphore->semaphore.counter)); } @@ -65,69 +60,77 @@ mp_semaphore_wait_count(cl_object semaphore) { cl_env_ptr env = ecl_process_env(); unlikely_if (ecl_t_of(semaphore) != t_semaphore) { - FEerror_not_a_semaphore(semaphore); + FEwrong_type_only_arg(@[mp::semaphore-wait-count], semaphore, @[mp::semaphore]); } - ecl_return1(env, cl_length(semaphore->semaphore.queue_list)); + ecl_return1(env, ecl_make_fixnum(semaphore->semaphore.wait_count)); } @(defun mp::signal-semaphore (semaphore &optional (count ecl_make_fixnum(1))) @ { cl_fixnum n = fixnnint(count); - cl_env_ptr env = ecl_process_env(); unlikely_if (ecl_t_of(semaphore) != t_semaphore) { - FEerror_not_a_semaphore(semaphore); + FEwrong_type_nth_arg(@[mp::signal-semaphore], 1, semaphore, @[mp::semaphore]); } - AO_fetch_and_add((AO_t*)&semaphore->semaphore.counter, n); - if (semaphore->semaphore.queue_list != ECL_NIL) { - ecl_wakeup_waiters(env, semaphore, ECL_WAKEUP_ALL); + ecl_disable_interrupts_env(the_env); + ecl_mutex_lock(&semaphore->semaphore.mutex); + semaphore->semaphore.counter += n; + for (; n > 0; n--) { + ecl_cond_var_signal(&semaphore->semaphore.cv); } + ecl_mutex_unlock(&semaphore->semaphore.mutex); + ecl_enable_interrupts_env(the_env); @(return); } @) -static cl_object -get_semaphore_inner(cl_env_ptr env, cl_object semaphore) -{ - cl_object output; - ecl_disable_interrupts_env(env); - do { - cl_fixnum counter = semaphore->semaphore.counter; - if (!counter) { - output = ECL_NIL; - break; - } - if (AO_compare_and_swap_full((AO_t*)&(semaphore->semaphore.counter), - (AO_t)counter, (AO_t)(counter-1))) { - output = ecl_make_fixnum(counter); - break; - } - ecl_process_yield(); - } while (1); - ecl_enable_interrupts_env(env); - return output; -} - cl_object mp_wait_on_semaphore(cl_object semaphore) { - cl_env_ptr env = ecl_process_env(); - cl_object output; + cl_env_ptr the_env = ecl_process_env(); + volatile cl_object output; unlikely_if (ecl_t_of(semaphore) != t_semaphore) { - FEerror_not_a_semaphore(semaphore); + FEwrong_type_only_arg(@[mp::wait-on-semaphore], semaphore, @[mp::semaphore]); } - output = get_semaphore_inner(env, semaphore); - if (Null(output)) { - output = ecl_wait_on(env, get_semaphore_inner, semaphore); + ecl_bds_bind(the_env, ECL_INTERRUPTS_ENABLED, ECL_NIL); + ecl_mutex_lock(&semaphore->semaphore.mutex); + if (semaphore->semaphore.counter == 0) { + semaphore->semaphore.wait_count++; + ECL_UNWIND_PROTECT_BEGIN(the_env) { + do { + ecl_setq(the_env, ECL_INTERRUPTS_ENABLED, ECL_T); + ecl_cond_var_wait(&semaphore->semaphore.cv, &semaphore->semaphore.mutex); + ecl_setq(the_env, ECL_INTERRUPTS_ENABLED, ECL_NIL); + } while (semaphore->semaphore.counter == 0); + output = ecl_make_fixnum(semaphore->semaphore.counter--); + } ECL_UNWIND_PROTECT_THREAD_SAFE_EXIT { + semaphore->semaphore.wait_count--; + ecl_mutex_unlock(&semaphore->semaphore.mutex); + } ECL_UNWIND_PROTECT_THREAD_SAFE_END; + } else { + output = ecl_make_fixnum(semaphore->semaphore.counter--); + ecl_mutex_unlock(&semaphore->semaphore.mutex); } - ecl_return1(env, output); + ecl_bds_unwind1(the_env); + ecl_check_pending_interrupts(the_env); + ecl_return1(the_env, output); } cl_object mp_try_get_semaphore(cl_object semaphore) { - cl_env_ptr env = ecl_process_env(); + cl_env_ptr the_env = ecl_process_env(); + cl_object output; unlikely_if (ecl_t_of(semaphore) != t_semaphore) { - FEerror_not_a_semaphore(semaphore); + FEwrong_type_only_arg(@[mp::try-get-semaphore], semaphore, @[mp::semaphore]); + } + ecl_disable_interrupts_env(the_env); + ecl_mutex_lock(&semaphore->semaphore.mutex); + if (semaphore->semaphore.counter > 0) { + output = ecl_make_fixnum(semaphore->semaphore.counter--); + } else { + output = ECL_NIL; } - ecl_return1(env, get_semaphore_inner(env, semaphore)); + ecl_mutex_unlock(&semaphore->semaphore.mutex); + ecl_enable_interrupts_env(the_env); + ecl_return1(the_env, output); } diff --git a/src/h/object.h b/src/h/object.h index df1bd271d..ec969fa6f 100644 --- a/src/h/object.h +++ b/src/h/object.h @@ -987,10 +987,11 @@ struct ecl_queue { struct ecl_semaphore { _ECL_HDR; - cl_object queue_list; - cl_object queue_spinlock; cl_object name; cl_fixnum counter; + cl_fixnum wait_count; + ecl_mutex_t mutex; + ecl_cond_var_t cv; }; #define ECL_BARRIER_WAKEUP_NORMAL 1 -- GitLab From 23a7ade20ccf3b458e50337d4c47fb448da152aa Mon Sep 17 00:00:00 2001 From: Marius Gerbershagen Date: Mon, 7 Sep 2020 22:05:49 +0200 Subject: [PATCH 09/13] multithreading: implement mailboxes using native mutexes The old implementation was not race condition free. If two threads (A and B) were writing at the same time while one thread (C) was reading, the following could happen: 1. thread A increases the write pointer (but does not store the message yet) 2. thread B increases the write pointer, stores the message and signals thread C 3. thread C tries to read from the location that thread A has not yet written to The new implementation is a simple and obvious solution using a common mutex and two condition variables for reading/writing. We don't bother with a (complex) interrupt safe implementation. --- src/c/alloc_2.d | 16 +++-- src/c/threads/mailbox.d | 125 +++++++++++++++++++++++----------------- src/h/ecl_atomics.h | 3 - src/h/object.h | 7 ++- 4 files changed, 88 insertions(+), 63 deletions(-) diff --git a/src/c/alloc_2.d b/src/c/alloc_2.d index 144748e6c..03bacf67a 100644 --- a/src/c/alloc_2.d +++ b/src/c/alloc_2.d @@ -466,8 +466,6 @@ cl_object_mark_proc(void *addr, struct GC_ms_entry *msp, struct GC_ms_entry *msl case t_mailbox: MAYBE_MARK(o->mailbox.data); MAYBE_MARK(o->mailbox.name); - MAYBE_MARK(o->mailbox.reader_semaphore); - MAYBE_MARK(o->mailbox.writer_semaphore); break; # endif case t_codeblock: @@ -1027,9 +1025,7 @@ init_alloc(void) to_bitmap(&o, &(o.barrier.name)); type_info[t_mailbox].descriptor = to_bitmap(&o, &(o.mailbox.name)) | - to_bitmap(&o, &(o.mailbox.data)) | - to_bitmap(&o, &(o.mailbox.reader_semaphore)) | - to_bitmap(&o, &(o.mailbox.writer_semaphore)); + to_bitmap(&o, &(o.mailbox.data)); # endif type_info[t_codeblock].descriptor = to_bitmap(&o, &(o.cblock.data)) | @@ -1135,6 +1131,15 @@ standard_finalizer(cl_object o) ecl_enable_interrupts_env(the_env); break; } + case t_mailbox: { + const cl_env_ptr the_env = ecl_process_env(); + ecl_disable_interrupts_env(the_env); + ecl_mutex_destroy(&o->mailbox.mutex); + ecl_cond_var_destroy(&o->mailbox.reader_cv); + ecl_cond_var_destroy(&o->mailbox.writer_cv); + ecl_enable_interrupts_env(the_env); + break; + } # ifdef ECL_RWLOCK case t_rwlock: { const cl_env_ptr the_env = ecl_process_env(); @@ -1186,6 +1191,7 @@ register_finalizer(cl_object o, void *finalized_object, case t_condition_variable: case t_barrier: case t_semaphore: + case t_mailbox: # if defined(ECL_RWLOCK) case t_rwlock: # endif diff --git a/src/c/threads/mailbox.d b/src/c/threads/mailbox.d index 3d89f516e..556166613 100755 --- a/src/c/threads/mailbox.d +++ b/src/c/threads/mailbox.d @@ -13,22 +13,13 @@ #include #include -static ECL_INLINE void -FEerror_not_a_mailbox(cl_object mailbox) -{ - FEwrong_type_argument(@'mp::mailbox', mailbox); -} +/* NOTE: The mailbox functions are not interrupt safe. */ cl_object ecl_make_mailbox(cl_object name, cl_fixnum count) { + cl_env_ptr env = ecl_process_env(); cl_object output = ecl_alloc_object(t_mailbox); - cl_fixnum mask; - for (mask = 1; mask < count; mask <<= 1) {} - if (mask == 1) - mask = 63; - count = mask; - mask = count - 1; output->mailbox.name = name; output->mailbox.data = si_make_vector(ECL_T, /* element type */ ecl_make_fixnum(count), /* size */ @@ -36,13 +27,15 @@ ecl_make_mailbox(cl_object name, cl_fixnum count) ECL_NIL, /* fill pointer */ ECL_NIL, /* displaced to */ ECL_NIL); /* displacement */ - output->mailbox.reader_semaphore = - ecl_make_semaphore(name, 0); - output->mailbox.writer_semaphore = - ecl_make_semaphore(name, count); + output->mailbox.message_count = 0; output->mailbox.read_pointer = 0; output->mailbox.write_pointer = 0; - output->mailbox.mask = mask; + ecl_disable_interrupts_env(env); + ecl_mutex_init(&output->mailbox.mutex, FALSE); + ecl_cond_var_init(&output->mailbox.reader_cv); + ecl_cond_var_init(&output->mailbox.writer_cv); + ecl_set_finalizer_unprotected(output, ECL_T); + ecl_enable_interrupts_env(env); return output; } @@ -56,7 +49,7 @@ mp_mailbox_name(cl_object mailbox) { cl_env_ptr env = ecl_process_env(); unlikely_if (ecl_t_of(mailbox) != t_mailbox) { - FEerror_not_a_mailbox(mailbox); + FEwrong_type_only_arg(@[mp::mailbox-name], mailbox, @[mp::mailbox]); } ecl_return1(env, mailbox->mailbox.name); } @@ -66,7 +59,7 @@ mp_mailbox_count(cl_object mailbox) { cl_env_ptr env = ecl_process_env(); unlikely_if (ecl_t_of(mailbox) != t_mailbox) { - FEerror_not_a_mailbox(mailbox); + FEwrong_type_only_arg(@[mp::mailbox-count], mailbox, @[mp::mailbox]); } ecl_return1(env, ecl_make_fixnum(mailbox->mailbox.data->vector.dim)); } @@ -76,27 +69,41 @@ mp_mailbox_empty_p(cl_object mailbox) { cl_env_ptr env = ecl_process_env(); unlikely_if (ecl_t_of(mailbox) != t_mailbox) { - FEerror_not_a_mailbox(mailbox); + FEwrong_type_only_arg(@[mp::mailbox-empty-p], mailbox, @[mp::mailbox]); } - ecl_return1(env, mailbox->mailbox.reader_semaphore->semaphore.counter? ECL_NIL : ECL_T); + ecl_return1(env, mailbox->mailbox.message_count? ECL_NIL : ECL_T); +} + +static cl_object +read_message(cl_object mailbox) +{ + cl_object output; + cl_fixnum ndx = mailbox->mailbox.read_pointer++; + if (mailbox->mailbox.read_pointer >= mailbox->mailbox.data->vector.dim) { + mailbox->mailbox.read_pointer = 0; + } + output = mailbox->mailbox.data->vector.self.t[ndx]; + mailbox->mailbox.message_count--; + ecl_cond_var_signal(&mailbox->mailbox.writer_cv); + return output; } cl_object mp_mailbox_read(cl_object mailbox) { cl_env_ptr env = ecl_process_env(); - cl_fixnum ndx; cl_object output; unlikely_if (ecl_t_of(mailbox) != t_mailbox) { - FEerror_not_a_mailbox(mailbox); + FEwrong_type_only_arg(@[mp::mailbox-read], mailbox, @[mp::mailbox]); } - mp_wait_on_semaphore(mailbox->mailbox.reader_semaphore); + ecl_mutex_lock(&mailbox->mailbox.mutex); { - ndx = AO_fetch_and_add1((AO_t*)&mailbox->mailbox.read_pointer) & - mailbox->mailbox.mask; - output = mailbox->mailbox.data->vector.self.t[ndx]; + while (mailbox->mailbox.message_count == 0) { + ecl_cond_var_wait(&mailbox->mailbox.reader_cv, &mailbox->mailbox.mutex); + } + output = read_message(mailbox); } - mp_signal_semaphore(1, mailbox->mailbox.writer_semaphore); + ecl_mutex_unlock(&mailbox->mailbox.mutex); ecl_return1(env, output); } @@ -104,37 +111,50 @@ cl_object mp_mailbox_try_read(cl_object mailbox) { cl_env_ptr env = ecl_process_env(); - cl_fixnum ndx; cl_object output; unlikely_if (ecl_t_of(mailbox) != t_mailbox) { - FEerror_not_a_mailbox(mailbox); + FEwrong_type_only_arg(@[mp::mailbox-try-read], mailbox, @[mp::mailbox]); } - output = mp_try_get_semaphore(mailbox->mailbox.reader_semaphore); - if (output != ECL_NIL) { - ndx = AO_fetch_and_add1((AO_t*)&mailbox->mailbox.read_pointer) & - mailbox->mailbox.mask; - output = mailbox->mailbox.data->vector.self.t[ndx]; - mp_signal_semaphore(1, mailbox->mailbox.writer_semaphore); + ecl_mutex_lock(&mailbox->mailbox.mutex); + { + if (mailbox->mailbox.message_count == 0) { + output = ECL_NIL; + } else { + output = read_message(mailbox); + } } + ecl_mutex_unlock(&mailbox->mailbox.mutex); ecl_return1(env, output); } +static void +store_message(cl_object mailbox, cl_object msg) +{ + cl_fixnum ndx = mailbox->mailbox.write_pointer++; + if (mailbox->mailbox.write_pointer >= mailbox->mailbox.data->vector.dim) { + mailbox->mailbox.write_pointer = 0; + } + mailbox->mailbox.data->vector.self.t[ndx] = msg; + mailbox->mailbox.message_count++; + ecl_cond_var_signal(&mailbox->mailbox.reader_cv); +} + cl_object mp_mailbox_send(cl_object mailbox, cl_object msg) { cl_env_ptr env = ecl_process_env(); - cl_fixnum ndx; unlikely_if (ecl_t_of(mailbox) != t_mailbox) { - FEerror_not_a_mailbox(mailbox); + FEwrong_type_only_arg(@[mp::mailbox-send], mailbox, @[mp::mailbox]); } - mp_wait_on_semaphore(mailbox->mailbox.writer_semaphore); + ecl_mutex_lock(&mailbox->mailbox.mutex); { - ndx = AO_fetch_and_add1((AO_t*)&mailbox->mailbox.write_pointer) & - mailbox->mailbox.mask; - mailbox->mailbox.data->vector.self.t[ndx] = msg; + while (mailbox->mailbox.message_count == mailbox->mailbox.data->vector.dim) { + ecl_cond_var_wait(&mailbox->mailbox.writer_cv, &mailbox->mailbox.mutex); + } + store_message(mailbox, msg); } - mp_signal_semaphore(1, mailbox->mailbox.reader_semaphore); - ecl_return0(env); + ecl_mutex_unlock(&mailbox->mailbox.mutex); + ecl_return1(env, msg); } cl_object @@ -142,18 +162,19 @@ mp_mailbox_try_send(cl_object mailbox, cl_object msg) { cl_env_ptr env = ecl_process_env(); cl_object output; - cl_fixnum ndx; unlikely_if (ecl_t_of(mailbox) != t_mailbox) { - FEerror_not_a_mailbox(mailbox); + FEwrong_type_only_arg(@[mp::mailbox-try-send], mailbox, @[mp::mailbox]); } - output = mp_try_get_semaphore(mailbox->mailbox.writer_semaphore); - if (output != ECL_NIL) { - output = msg; - ndx = AO_fetch_and_add1((AO_t*)&mailbox->mailbox.write_pointer) & - mailbox->mailbox.mask; - mailbox->mailbox.data->vector.self.t[ndx] = msg; - mp_signal_semaphore(1, mailbox->mailbox.reader_semaphore); + ecl_mutex_lock(&mailbox->mailbox.mutex); + { + if (mailbox->mailbox.message_count == mailbox->mailbox.data->vector.dim) { + output = ECL_NIL; + } else { + store_message(mailbox, msg); + output = msg; + } } + ecl_mutex_unlock(&mailbox->mailbox.mutex); ecl_return1(env, output); } diff --git a/src/h/ecl_atomics.h b/src/h/ecl_atomics.h index eb9243f92..ee1c6c86c 100644 --- a/src/h/ecl_atomics.h +++ b/src/h/ecl_atomics.h @@ -29,9 +29,6 @@ # if !defined(AO_HAVE_compare_and_swap) # error "ECL needs AO_compare_and_swap or an equivalent" # endif -# if !defined(AO_HAVE_fetch_and_add1) -# error "Cannot implement mailboxs without AO_fetch_and_add1" -# endif # if !defined(AO_HAVE_fetch_and_add) # error "ECL needs AO_fetch_and_add or an equivalent" # endif diff --git a/src/h/object.h b/src/h/object.h index ec969fa6f..6d6b58120 100644 --- a/src/h/object.h +++ b/src/h/object.h @@ -1018,11 +1018,12 @@ struct ecl_mailbox { _ECL_HDR; cl_object name; cl_object data; - cl_object reader_semaphore; - cl_object writer_semaphore; + ecl_mutex_t mutex; + ecl_cond_var_t reader_cv; + ecl_cond_var_t writer_cv; + cl_index message_count; cl_index read_pointer; cl_index write_pointer; - cl_index mask; }; struct ecl_rwlock { -- GitLab From de5d56b4c63afca10673497998a628e8f41da1d1 Mon Sep 17 00:00:00 2001 From: Marius Gerbershagen Date: Sat, 26 Sep 2020 16:53:01 +0200 Subject: [PATCH 10/13] multithreading: replace various synchronization objects by native mutexes - Spinlocks have been replaced by ordinary locks. Without access to the underyling scheduler, spinlocks provide no performace benefit and may even be harmful in case of high contention. - Synchronization of process creation and exiting has been simplified. Instead of a spinlock, a barrier and atomic operations we now use only a single lock protecting the shared process state and a condition variable for implementing process joins. - Some locks which were implemented using Lisp objects now directly use a native mutex. - Our own mutex implementation has been removed as it is now unused. --- msvc/c/Makefile | 2 +- src/c/alloc_2.d | 17 +- src/c/main.d | 135 +++++++------- src/c/threads/process.d | 129 ++++++------- src/c/threads/queue.d | 401 ---------------------------------------- src/c/unixint.d | 46 ++--- src/configure | 2 +- src/configure.ac | 2 +- src/ecl/configpre.h | 3 - src/h/external.h | 10 +- src/h/internal.h | 44 ++--- src/h/object.h | 11 +- src/util/emacs.el | 1 - 13 files changed, 193 insertions(+), 610 deletions(-) delete mode 100755 src/c/threads/queue.d diff --git a/msvc/c/Makefile b/msvc/c/Makefile index 7b2637ac3..c123fb0b2 100755 --- a/msvc/c/Makefile +++ b/msvc/c/Makefile @@ -13,7 +13,7 @@ ECL_FPE_CODE=fpe_x86.c !if "$(ECL_THREADS)" != "" ECL_THREADS_FLAG=1 THREADS_OBJ= process.obj mutex.obj condition_variable.obj rwlock.obj \ - semaphore.obj barrier.obj mailbox.obj atomic.obj queue.obj + semaphore.obj barrier.obj mailbox.obj atomic.obj !else ECL_THREADS_FLAG=0 THREADS_OBJ= diff --git a/src/c/alloc_2.d b/src/c/alloc_2.d index 03bacf67a..7931c34c2 100644 --- a/src/c/alloc_2.d +++ b/src/c/alloc_2.d @@ -102,7 +102,7 @@ out_of_memory(size_t requested_bytes) /* The out of memory condition may happen in more than one thread */ /* But then we have to ensure the error has not been solved */ #ifdef ECL_THREADS - mp_get_lock_wait(cl_core.error_lock); + ecl_mutex_lock(&cl_core.error_lock); ECL_UNWIND_PROTECT_BEGIN(the_env) #endif { @@ -141,7 +141,7 @@ out_of_memory(size_t requested_bytes) } #ifdef ECL_THREADS ECL_UNWIND_PROTECT_EXIT { - mp_giveup_lock(cl_core.error_lock); + ecl_mutex_unlock(&cl_core.error_lock); } ECL_UNWIND_PROTECT_END; #endif ecl_bds_unwind1(the_env); @@ -432,10 +432,8 @@ cl_object_mark_proc(void *addr, struct GC_ms_entry *msp, struct GC_ms_entry *msl # ifdef ECL_THREADS case t_process: MAYBE_MARK(o->process.queue_record); - MAYBE_MARK(o->process.start_stop_spinlock); MAYBE_MARK(o->process.woken_up); MAYBE_MARK(o->process.exit_values); - MAYBE_MARK(o->process.exit_barrier); MAYBE_MARK(o->process.parent); MAYBE_MARK(o->process.initial_bindings); MAYBE_MARK(o->process.interrupt); @@ -1002,10 +1000,8 @@ init_alloc(void) to_bitmap(&o, &(o.process.interrupt)) | to_bitmap(&o, &(o.process.initial_bindings)) | to_bitmap(&o, &(o.process.parent)) | - to_bitmap(&o, &(o.process.exit_barrier)) | to_bitmap(&o, &(o.process.exit_values)) | to_bitmap(&o, &(o.process.woken_up)) | - to_bitmap(&o, &(o.process.start_stop_spinlock)) | to_bitmap(&o, &(o.process.queue_record)); type_info[t_lock].descriptor = to_bitmap(&o, &(o.lock.name)) | @@ -1149,6 +1145,14 @@ standard_finalizer(cl_object o) break; } # endif + case t_process: { + const cl_env_ptr the_env = ecl_process_env(); + ecl_disable_interrupts_env(the_env); + ecl_mutex_destroy(&o->process.start_stop_lock); + ecl_cond_var_destroy(&o->process.exit_barrier); + ecl_enable_interrupts_env(the_env); + break; + } case t_symbol: { ecl_atomic_push(&cl_core.reused_indices, ecl_make_fixnum(o->symbol.binding)); @@ -1195,6 +1199,7 @@ register_finalizer(cl_object o, void *finalized_object, # if defined(ECL_RWLOCK) case t_rwlock: # endif + case t_process: #endif /* ECL_THREADS */ /* Don't delete the standard finalizer. */ if (fn == NULL) { diff --git a/src/c/main.d b/src/c/main.d index 095734cbe..7967fc388 100755 --- a/src/c/main.d +++ b/src/c/main.d @@ -181,7 +181,7 @@ ecl_init_env(cl_env_ptr env) env->slot_cache = ecl_make_cache(3, 4096); env->interrupt_struct = ecl_alloc(sizeof(*env->interrupt_struct)); env->interrupt_struct->pending_interrupt = ECL_NIL; - env->interrupt_struct->signal_queue_spinlock = ECL_NIL; + ecl_mutex_init(&env->interrupt_struct->signal_queue_lock, FALSE); { int size = ecl_option_values[ECL_OPT_SIGNAL_QUEUE_SIZE]; env->interrupt_struct->signal_queue = cl_make_list(1, ecl_make_fixnum(size)); @@ -208,6 +208,7 @@ _ecl_dealloc_env(cl_env_ptr env) * a lisp environment set up -- the allocator assumes one -- and we * may have already cleaned up the value of ecl_process_env() */ + ecl_mutex_destroy(&env->interrupt_struct->signal_queue_lock); #if defined(ECL_USE_MPROTECT) if (munmap(env, sizeof(*env))) ecl_internal_error("Unable to deallocate environment structure."); @@ -360,92 +361,88 @@ ecl_def_ct_complex(flt_imag_unit_neg,&flt_zero_data,&flt_one_neg_data,static,con ecl_def_ct_complex(flt_imag_two,&flt_zero_data,&flt_two_data,static,const); struct cl_core_struct cl_core = { - ECL_NIL, /* packages */ - ECL_NIL, /* lisp_package */ - ECL_NIL, /* user_package */ - ECL_NIL, /* keyword_package */ - ECL_NIL, /* system_package */ - ECL_NIL, /* ext_package */ - ECL_NIL, /* clos_package */ + .packages = ECL_NIL, + .lisp_package = ECL_NIL, + .user_package = ECL_NIL, + .keyword_package = ECL_NIL, + .system_package = ECL_NIL, + .ext_package = ECL_NIL, + .clos_package = ECL_NIL, # ifdef ECL_CLOS_STREAMS - ECL_NIL, /* gray_package */ + .gray_package = ECL_NIL, # endif - ECL_NIL, /* mp_package */ - ECL_NIL, /* c_package */ - ECL_NIL, /* ffi_package */ - - ECL_NIL, /* pathname_translations */ - ECL_NIL, /* library_pathname */ - - ECL_NIL, /* terminal_io */ - ECL_NIL, /* null_stream */ - ECL_NIL, /* standard_input */ - ECL_NIL, /* standard_output */ - ECL_NIL, /* error_output */ - ECL_NIL, /* standard_readtable */ - ECL_NIL, /* dispatch_reader */ - ECL_NIL, /* default_dispatch_macro */ - - ECL_NIL, /* char_names */ - (cl_object)&str_empty_data, /* null_string */ - - (cl_object)&plus_half_data, /* plus_half */ - (cl_object)&minus_half_data, /* minus_half */ - (cl_object)&flt_imag_unit_data, /* imag_unit */ - (cl_object)&flt_imag_unit_neg_data, /* minus_imag_unit */ - (cl_object)&flt_imag_two_data, /* imag_two */ - (cl_object)&flt_zero_data, /* singlefloat_zero */ - (cl_object)&dbl_zero_data, /* doublefloat_zero */ - (cl_object)&flt_zero_neg_data, /* singlefloat_minus_zero */ - (cl_object)&dbl_zero_neg_data, /* doublefloat_minus_zero */ - (cl_object)&ldbl_zero_data, /* longfloat_zero */ - (cl_object)&ldbl_zero_neg_data, /* longfloat_minus_zero */ - - (cl_object)&str_G_data, /* gensym_prefix */ - (cl_object)&str_T_data, /* gentemp_prefix */ - ecl_make_fixnum(0), /* gentemp_counter */ - - ECL_NIL, /* Jan1st1970UT */ - - ECL_NIL, /* system_properties */ - ECL_NIL, /* setf_definition */ + .mp_package = ECL_NIL, + .c_package = ECL_NIL, + .ffi_package = ECL_NIL, + + .pathname_translations = ECL_NIL, + .library_pathname = ECL_NIL, + + .terminal_io = ECL_NIL, + .null_stream = ECL_NIL, + .standard_input = ECL_NIL, + .standard_output = ECL_NIL, + .error_output = ECL_NIL, + .standard_readtable = ECL_NIL, + .dispatch_reader = ECL_NIL, + .default_dispatch_macro = ECL_NIL, + + .char_names = ECL_NIL, + .null_string = (cl_object)&str_empty_data, + + .plus_half = (cl_object)&plus_half_data, + .minus_half = (cl_object)&minus_half_data, + .imag_unit = (cl_object)&flt_imag_unit_data, + .minus_imag_unit = (cl_object)&flt_imag_unit_neg_data, + .imag_two = (cl_object)&flt_imag_two_data, + .singlefloat_zero = (cl_object)&flt_zero_data, + .doublefloat_zero = (cl_object)&dbl_zero_data, + .singlefloat_minus_zero = (cl_object)&flt_zero_neg_data, + .doublefloat_minus_zero = (cl_object)&dbl_zero_neg_data, + .longfloat_zero = (cl_object)&ldbl_zero_data, + .longfloat_minus_zero = (cl_object)&ldbl_zero_neg_data, + + .gensym_prefix = (cl_object)&str_G_data, + .gentemp_prefix = (cl_object)&str_T_data, + .gentemp_counter = ecl_make_fixnum(0), + + .Jan1st1970UT = ECL_NIL, + + .system_properties = ECL_NIL, + .setf_definitions = ECL_NIL, #ifdef ECL_THREADS - ECL_NIL, /* processes */ - ECL_NIL, /* processes_spinlock */ - ECL_NIL, /* global_lock */ - ECL_NIL, /* error_lock */ - ECL_NIL, /* global_env_lock */ + .processes = ECL_NIL, #endif /* LIBRARIES is an adjustable vector of objects. It behaves as a vector of weak pointers thanks to the magic in gbc.d/alloc_2.d */ - ECL_NIL, /* libraries */ + .libraries = ECL_NIL, - 0, /* max_heap_size */ - ECL_NIL, /* bytes_consed */ - ECL_NIL, /* gc_counter */ - 0, /* gc_stats */ - 0, /* path_max */ + .max_heap_size = 0, + .bytes_consed = ECL_NIL, + .gc_counter = ECL_NIL, + .gc_stats = 0, + .path_max = 0, #ifdef GBC_BOEHM - NULL, /* safety_region */ + .safety_region = NULL, #endif - NULL, /* default_sigmask */ - 0, /* default_sigmask_bytes */ + .default_sigmask = NULL, + .default_sigmask_bytes = 0, #ifdef ECL_THREADS - 0, /* last_var_index */ - ECL_NIL, /* reused_indices */ + .last_var_index = 0, + .reused_indices = ECL_NIL, #endif - (cl_object)&str_slash_data, /* slash */ + .slash = (cl_object)&str_slash_data, - ECL_NIL, /* compiler_dispatch */ + .compiler_dispatch = ECL_NIL, - (cl_object)&default_rehash_size_data, /* rehash_size */ - (cl_object)&default_rehash_threshold_data, /* rehash_threshold */ + .rehash_size = (cl_object)&default_rehash_size_data, + .rehash_threshold = (cl_object)&default_rehash_threshold_data, - ECL_NIL /* known_signals */ + .known_signals = ECL_NIL }; #if !defined(ECL_MS_WINDOWS_HOST) diff --git a/src/c/threads/process.d b/src/c/threads/process.d index 952717b93..d93dc5fbf 100755 --- a/src/c/threads/process.d +++ b/src/c/threads/process.d @@ -27,6 +27,9 @@ #ifdef HAVE_GETTIMEOFDAY # include #endif +#ifdef HAVE_SCHED_H +# include +#endif #include #include @@ -90,7 +93,7 @@ extend_process_vector() cl_object v = cl_core.processes; cl_index new_size = v->vector.dim + v->vector.dim/2; cl_env_ptr the_env = ecl_process_env(); - ECL_WITH_SPINLOCK_BEGIN(the_env, &cl_core.processes_spinlock) { + ECL_WITH_NATIVE_LOCK_BEGIN(the_env, &cl_core.processes_lock) { cl_object other = cl_core.processes; if (new_size > other->vector.dim) { cl_object new = si_make_vector(ECL_T, @@ -100,7 +103,7 @@ extend_process_vector() ecl_copy_subarray(new, 0, other, 0, other->vector.dim); cl_core.processes = new; } - } ECL_WITH_SPINLOCK_END; + } ECL_WITH_NATIVE_LOCK_END; } static void @@ -109,7 +112,7 @@ ecl_list_process(cl_object process) cl_env_ptr the_env = ecl_process_env(); bool ok = 0; do { - ECL_WITH_SPINLOCK_BEGIN(the_env, &cl_core.processes_spinlock) { + ECL_WITH_NATIVE_LOCK_BEGIN(the_env, &cl_core.processes_lock) { cl_object vector = cl_core.processes; cl_index size = vector->vector.dim; cl_index ndx = vector->vector.fillp; @@ -118,7 +121,7 @@ ecl_list_process(cl_object process) vector->vector.fillp = ndx; ok = 1; } - } ECL_WITH_SPINLOCK_END; + } ECL_WITH_NATIVE_LOCK_END; if (ok) break; extend_process_vector(); } while (1); @@ -129,8 +132,7 @@ ecl_list_process(cl_object process) static void ecl_unlist_process(cl_object process) { - cl_env_ptr the_env = ecl_process_env(); - ecl_get_spinlock(the_env, &cl_core.processes_spinlock); + ecl_mutex_lock(&cl_core.processes_lock); cl_object vector = cl_core.processes; cl_index i; for (i = 0; i < vector->vector.fillp; i++) { @@ -143,7 +145,7 @@ ecl_unlist_process(cl_object process) break; } } - ecl_giveup_spinlock(&cl_core.processes_spinlock); + ecl_mutex_unlock(&cl_core.processes_lock); } static cl_object @@ -151,7 +153,7 @@ ecl_process_list() { cl_env_ptr the_env = ecl_process_env(); cl_object output = ECL_NIL; - ECL_WITH_SPINLOCK_BEGIN(the_env, &cl_core.processes_spinlock) { + ECL_WITH_NATIVE_LOCK_BEGIN(the_env, &cl_core.processes_lock) { cl_object vector = cl_core.processes; cl_object *data = vector->vector.self.t; cl_index i; @@ -160,7 +162,7 @@ ecl_process_list() if (p != ECL_NIL) output = ecl_cons(p, output); } - } ECL_WITH_SPINLOCK_END; + } ECL_WITH_NATIVE_LOCK_END; return output; } @@ -193,13 +195,11 @@ thread_cleanup(void *aux) /* The following flags will disable all interrupts. */ if (env) { - ecl_get_spinlock(env, &process->process.start_stop_spinlock); - } - AO_store_full((AO_t*)&process->process.phase, ECL_PROCESS_EXITING); - if (env) { - ecl_clear_bignum_registers(env); ecl_disable_interrupts_env(env); + ecl_clear_bignum_registers(env); } + ecl_mutex_lock(&process->process.start_stop_lock); + process->process.phase = ECL_PROCESS_EXITING; #ifdef HAVE_SIGPROCMASK /* ...but we might get stray signals. */ { @@ -214,12 +214,12 @@ thread_cleanup(void *aux) #ifdef ECL_WINDOWS_THREADS CloseHandle(process->process.thread); #endif - mp_barrier_unblock(3, process->process.exit_barrier, @':disable', ECL_T); ecl_set_process_env(NULL); if (env) _ecl_dealloc_env(env); - AO_store_release((AO_t*)&process->process.phase, ECL_PROCESS_INACTIVE); - ecl_giveup_spinlock(&process->process.start_stop_spinlock); + process->process.phase = ECL_PROCESS_INACTIVE; + ecl_cond_var_broadcast(&process->process.exit_barrier); + ecl_mutex_unlock(&process->process.start_stop_lock); } #ifdef ECL_WINDOWS_THREADS @@ -249,9 +249,7 @@ static DWORD WINAPI thread_entry_point(void *arg) pthread_cleanup_push(thread_cleanup, (void *)process); #endif ecl_cs_set_org(env); - ecl_get_spinlock(env, &process->process.start_stop_spinlock); - print_lock("ENVIRON %p %p %p %p", ECL_NIL, process, - env->bds_org, env->bds_top, env->bds_limit); + ecl_mutex_lock(&process->process.start_stop_lock); /* 2) Execute the code. The CATCH_ALL point is the destination * provides us with an elegant way to exit the thread: we just @@ -264,8 +262,8 @@ static DWORD WINAPI thread_entry_point(void *arg) pthread_sigmask(SIG_SETMASK, new, NULL); } #endif - ecl_giveup_spinlock(&process->process.start_stop_spinlock); process->process.phase = ECL_PROCESS_ACTIVE; + ecl_mutex_unlock(&process->process.start_stop_lock); ecl_enable_interrupts_env(env); si_trap_fpe(@'last', ECL_T); ecl_bds_bind(env, @'mp::*current-process*', process); @@ -285,7 +283,6 @@ static DWORD WINAPI thread_entry_point(void *arg) /* ABORT restart. */ process->process.exit_values = args; } ECL_RESTART_CASE_END; - process->process.phase = ECL_PROCESS_EXITING; ecl_bds_unwind1(env); } ECL_CATCH_ALL_END; @@ -305,6 +302,7 @@ static DWORD WINAPI thread_entry_point(void *arg) static cl_object alloc_process(cl_object name, cl_object initial_bindings) { + cl_env_ptr env = ecl_process_env(); cl_object process = ecl_alloc_object(t_process), array; process->process.phase = ECL_PROCESS_INACTIVE; process->process.name = name; @@ -322,12 +320,12 @@ alloc_process(cl_object name, cl_object initial_bindings) } process->process.initial_bindings = array; process->process.woken_up = ECL_NIL; - process->process.start_stop_spinlock = ECL_NIL; process->process.queue_record = ecl_list1(process); - /* Creates the exit barrier so that processes can wait for termination, - * but it is created in a disabled state. */ - process->process.exit_barrier = ecl_make_barrier(name, MOST_POSITIVE_FIXNUM); - mp_barrier_unblock(3, process->process.exit_barrier, @':disable', ECL_T); + ecl_disable_interrupts_env(env); + ecl_mutex_init(&process->process.start_stop_lock, TRUE); + ecl_cond_var_init(&process->process.exit_barrier); + ecl_set_finalizer_unprotected(process, ECL_T); + ecl_enable_interrupts_env(env); return process; } @@ -396,7 +394,7 @@ ecl_import_current_thread(cl_object name, cl_object bindings) env_aux->disable_interrupts = 1; env_aux->interrupt_struct = ecl_alloc_unprotected(sizeof(*env_aux->interrupt_struct)); env_aux->interrupt_struct->pending_interrupt = ECL_NIL; - env_aux->interrupt_struct->signal_queue_spinlock = ECL_NIL; + ecl_mutex_init(&env_aux->interrupt_struct->signal_queue_lock, FALSE); env_aux->interrupt_struct->signal_queue = ECL_NIL; ecl_set_process_env(env_aux); ecl_init_env(env_aux); @@ -421,8 +419,6 @@ ecl_import_current_thread(cl_object name, cl_object bindings) ecl_list_process(process); ecl_enable_interrupts_env(env); - /* Activate the barrier so that processes can immediately start waiting. */ - mp_barrier_unblock(1, process->process.exit_barrier); process->process.phase = ECL_PROCESS_ACTIVE; ecl_bds_bind(env, @'mp::*current-process*', process); @@ -470,11 +466,11 @@ mp_interrupt_process(cl_object process, cl_object function) { cl_env_ptr env = ecl_process_env(); /* Make sure we don't interrupt an exiting process */ - ECL_WITH_SPINLOCK_BEGIN(env, &process->process.start_stop_spinlock) { + ECL_WITH_NATIVE_LOCK_BEGIN(env, &process->process.start_stop_lock) { unlikely_if (mp_process_active_p(process) == ECL_NIL) FEerror("Cannot interrupt the inactive process ~A", 1, process); ecl_interrupt_process(process, function); - } ECL_WITH_SPINLOCK_END; + } ECL_WITH_NATIVE_LOCK_END; @(return ECL_T); } @@ -521,7 +517,13 @@ mp_process_kill(cl_object process) cl_object mp_process_yield(void) { - ecl_process_yield(); +#if defined(ECL_WINDOWS_THREADS) + Sleep(0); +#elif defined(HAVE_SCHED_H) + sched_yield(); +#else + ecl_musleep(0.0, 1); +#endif @(return); } @@ -532,17 +534,19 @@ mp_process_enable(cl_object process) * ECL_UNWIND_PROTECT_BEGIN, so they need to be declared volatile */ volatile cl_env_ptr process_env = NULL; cl_env_ptr the_env = ecl_process_env(); - volatile int ok = 0; + volatile int ok = 1; ECL_UNWIND_PROTECT_BEGIN(the_env) { - /* Try to gain exclusive access to the process at the same - * time we ensure that it is inactive. This prevents two - * concurrent calls to process-enable from different threads - * on the same process */ - unlikely_if (!AO_compare_and_swap_full((AO_t*)&process->process.phase, - ECL_PROCESS_INACTIVE, - ECL_PROCESS_BOOTING)) { + /* Try to gain exclusive access to the process. This prevents two + * concurrent calls to process-enable from different threads on + * the same process */ + ecl_mutex_lock(&process->process.start_stop_lock); + /* Ensure that the process is inactive. */ + if (process->process.phase != ECL_PROCESS_INACTIVE) { FEerror("Cannot enable the running process ~A.", 1, process); } + ok = 0; + process->process.phase = ECL_PROCESS_BOOTING; + process->process.parent = mp_current_process(); process->process.trap_fpe_bits = process->process.parent->process.env->trap_fpe_bits; @@ -567,12 +571,6 @@ mp_process_enable(cl_object process) process_env->thread_local_bindings = process_env->bindings_array->vector.self.t; - /* Activate the barrier so that processes can immediately start waiting. */ - mp_barrier_unblock(1, process->process.exit_barrier); - - /* Block the thread with this spinlock until it is ready */ - process->process.start_stop_spinlock = ECL_T; - ecl_disable_interrupts_env(the_env); #ifdef ECL_WINDOWS_THREADS { @@ -618,16 +616,15 @@ mp_process_enable(cl_object process) /* INV: interrupts are already disabled through thread safe * unwind-protect */ ecl_unlist_process(process); - /* Disable the barrier and alert possible waiting processes. */ - mp_barrier_unblock(3, process->process.exit_barrier, - @':disable', ECL_T); process->process.phase = ECL_PROCESS_INACTIVE; + /* Alert possible waiting processes. */ + ecl_cond_var_broadcast(&process->process.exit_barrier); process->process.env = NULL; if (process_env != NULL) _ecl_dealloc_env(process_env); } /* Unleash the thread */ - ecl_giveup_spinlock(&process->process.start_stop_spinlock); + ecl_mutex_unlock(&process->process.start_stop_lock); } ECL_UNWIND_PROTECT_THREAD_SAFE_END; @(return (ok? process : ECL_NIL)); @@ -677,13 +674,20 @@ mp_process_whostate(cl_object process) cl_object mp_process_join(cl_object process) { + cl_env_ptr the_env = ecl_process_env(); + volatile cl_object values; + assert_type_process(process); - if (process->process.phase) { - /* We try to acquire a lock that is only owned by the process - * while it is active. */ - mp_barrier_wait(process->process.exit_barrier); - } - return cl_values_list(process->process.exit_values); + ECL_UNWIND_PROTECT_BEGIN(the_env) { + ecl_mutex_lock(&process->process.start_stop_lock); + while (process->process.phase != ECL_PROCESS_INACTIVE) { + ecl_cond_var_wait(&process->process.exit_barrier, &process->process.start_stop_lock); + } + values = cl_values_list(process->process.exit_values); + } ECL_UNWIND_PROTECT_THREAD_SAFE_EXIT { + ecl_mutex_unlock(&process->process.start_stop_lock); + } ECL_UNWIND_PROTECT_THREAD_SAFE_END; + return values; } cl_object @@ -831,8 +835,8 @@ init_threads(cl_env_ptr env) process->process.env = env; process->process.woken_up = ECL_NIL; process->process.queue_record = ecl_list1(process); - process->process.start_stop_spinlock = ECL_NIL; - process->process.exit_barrier = ecl_make_barrier(process->process.name, MOST_POSITIVE_FIXNUM); + ecl_mutex_init(&process->process.start_stop_lock, TRUE); + ecl_cond_var_init(&process->process.exit_barrier); env->own_process = process; @@ -844,8 +848,9 @@ init_threads(cl_env_ptr env) v->vector.self.t[0] = process; v->vector.fillp = 1; cl_core.processes = v; - cl_core.global_lock = ecl_make_lock(@'mp::global-lock', 1); - cl_core.error_lock = ecl_make_lock(@'mp::error-lock', 1); - cl_core.global_env_lock = ecl_make_rwlock(@'ext::package-lock'); + ecl_mutex_init(&cl_core.processes_lock, 1); + ecl_mutex_init(&cl_core.global_lock, 1); + ecl_mutex_init(&cl_core.error_lock, 1); + ecl_rwlock_init(&cl_core.global_env_lock); } } diff --git a/src/c/threads/queue.d b/src/c/threads/queue.d deleted file mode 100755 index 218e89bda..000000000 --- a/src/c/threads/queue.d +++ /dev/null @@ -1,401 +0,0 @@ -/* -*- Mode: C; c-basic-offset: 2; indent-tabs-mode: nil -*- */ -/* vim: set filetype=c tabstop=2 shiftwidth=2 expandtab: */ - -/* - * queue.d - waiting queue for threads - * - * Copyright (c) 2011 Juan Jose Garcia Ripoll - * - * See file 'LICENSE' for the copyright details. - * - */ - -#ifdef HAVE_SCHED_H -#include -#endif -#include -#include -#include - -void ECL_INLINE -ecl_process_yield() -{ -#if defined(ECL_WINDOWS_THREADS) - Sleep(0); -#elif defined(HAVE_SCHED_H) - sched_yield(); -#else - ecl_musleep(0.0, 1); -#endif -} - -void ECL_INLINE -ecl_get_spinlock(cl_env_ptr the_env, cl_object *lock) -{ - cl_object own_process = the_env->own_process; - if(*lock == own_process) - return; - while (!AO_compare_and_swap_full((AO_t*)lock, (AO_t)ECL_NIL, - (AO_t)own_process)) { - ecl_process_yield(); - } -} - -void ECL_INLINE -ecl_giveup_spinlock(cl_object *lock) -{ - AO_store((AO_t*)lock, (AO_t)ECL_NIL); -} - -static ECL_INLINE void -wait_queue_nconc(cl_env_ptr the_env, cl_object q, cl_object new_tail) -{ - /* INV: interrupts are disabled */ - ecl_get_spinlock(the_env, &q->queue.spinlock); - q->queue.list = ecl_nconc(q->queue.list, new_tail); - ecl_giveup_spinlock(&q->queue.spinlock); -} - -static ECL_INLINE cl_object -wait_queue_pop_all(cl_env_ptr the_env, cl_object q) -{ - cl_object output; - ecl_disable_interrupts_env(the_env); - { - ecl_get_spinlock(the_env, &q->queue.spinlock); - output = q->queue.list; - q->queue.list = ECL_NIL; - ecl_giveup_spinlock(&q->queue.spinlock); - } - ecl_enable_interrupts_env(the_env); - return output; -} - -static ECL_INLINE void -wait_queue_delete(cl_env_ptr the_env, cl_object q, cl_object item) -{ - /* INV: interrupts are disabled */ - ecl_get_spinlock(the_env, &q->queue.spinlock); - q->queue.list = ecl_delete_eq(item, q->queue.list); - ecl_giveup_spinlock(&q->queue.spinlock); -} - -/*---------------------------------------------------------------------- - * THREAD SCHEDULER & WAITING - */ - -#if !defined(HAVE_SIGPROCMASK) -static cl_object -bignum_set_time(cl_object bignum, struct ecl_timeval *time) -{ - _ecl_big_set_index(bignum, time->tv_sec); - _ecl_big_mul_ui(bignum, bignum, 1000); - _ecl_big_add_ui(bignum, bignum, (time->tv_usec + 999) / 1000); - return bignum; -} - -static cl_object -elapsed_time(struct ecl_timeval *start) -{ - cl_object delta_big = _ecl_big_register0(); - cl_object aux_big = _ecl_big_register1(); - struct ecl_timeval now; - ecl_get_internal_real_time(&now); - bignum_set_time(aux_big, start); - bignum_set_time(delta_big, &now); - _ecl_big_sub(delta_big, delta_big, aux_big); - _ecl_big_register_free(aux_big); - return delta_big; -} - -static double -waiting_time(cl_index iteration, struct ecl_timeval *start) -{ - /* Waiting time is smaller than 0.10 s */ - double time; - cl_object top = ecl_make_fixnum(10 * 1000); - cl_object delta_big = elapsed_time(start); - _ecl_big_div_ui(delta_big, delta_big, iteration); - if (ecl_number_compare(delta_big, top) < 0) { - time = ecl_to_double(delta_big) * 1.5; - } else { - time = 0.10; - } - _ecl_big_register_free(delta_big); - return time; -} - -static cl_object -ecl_wait_on_timed(cl_env_ptr env, mp_wait_test condition, cl_object mp_object) -{ - volatile const cl_env_ptr the_env = env; - volatile cl_object own_process = the_env->own_process; - volatile cl_object record; - volatile cl_object output; - cl_fixnum iteration = 0; - struct ecl_timeval start; - ecl_get_internal_real_time(&start); - - /* 0) We reserve a record for the queue. In order to avoid - * using the garbage collector, we reuse records */ - record = own_process->process.queue_record; - unlikely_if (record == ECL_NIL) { - record = ecl_list1(own_process); - } else { - own_process->process.queue_record = ECL_NIL; - } - - ecl_bds_bind(the_env, @'ext::*interrupts-enabled*', ECL_NIL); - ECL_UNWIND_PROTECT_BEGIN(the_env) { - /* 2) Now we add ourselves to the queue. In order to - * avoid a call to the GC, we try to reuse records. */ - print_lock("adding to queue", mp_object); - own_process->process.woken_up = ECL_NIL; - wait_queue_nconc(the_env, mp_object, record); - ecl_bds_bind(the_env, @'ext::*interrupts-enabled*', ECL_T); - ecl_check_pending_interrupts(the_env); - - /* This spinlock is here because the default path (fair) is - * too slow */ - for (iteration = 0; iteration < 10; iteration++) { - if (!Null(output = condition(the_env, mp_object))) - break; - } - - /* 3) Unlike the sigsuspend() implementation, this - * implementation does not block signals and the - * wakeup event might be lost before the sleep - * function is invoked. We must thus spin over short - * intervals of time to ensure that we check the - * condition periodically. */ - while (Null(output)) { - ecl_musleep(waiting_time(iteration++, &start), 1); - output = condition(the_env, mp_object); - } - ecl_bds_unwind1(the_env); - } ECL_UNWIND_PROTECT_EXIT { - /* 4) At this point we wrap up. We remove ourselves - * from the queue and unblock the lisp interrupt - * signal. Note that we recover the cons for later use.*/ - wait_queue_delete(the_env, mp_object, own_process); - own_process->process.queue_record = record; - ECL_RPLACD(record, ECL_NIL); - - /* 5) When this process exits, it may be because it - * aborts (which we know because output == ECL_NIL), or - * because the condition is satisfied. In both cases - * we allow the first in the queue to test again its - * condition. This is needed for objects, such as - * semaphores, where the condition may be satisfied - * more than once. */ - if (Null(output)) { - ecl_wakeup_waiters(the_env, mp_object, ECL_WAKEUP_ONE); - } - } ECL_UNWIND_PROTECT_END; - ecl_bds_unwind1(the_env); - return output; -} -#endif - -/********************************************************************** - * BLOCKING WAIT QUEUE ALGORITHM - * - * This object keeps a list of processes waiting for a condition to - * happen. The queue is ordered and the only processes that check for - * the condition are - * - The first process to arrive to the queue, - * - Each process which is awoken. - * - The first process after the list of awoken processes. - * - * The idea is that this will ensure some fairness when unblocking the - * processes, which is important for abstractions such as mutexes or - * semaphores, where we want equal sharing of resources among processes. - * - * This also implies that the waiting processes depend on others to signal - * when to check for a condition. This happens in two situations - * - External code that changes the fields of the queue object - * must signal ecl_wakeup_waiters() (See mutex.d, semaphore.d, etc) - * - When a process exits ecl_wait_on() it always resignals the next - * process in the queue, because a condition may be satisfied more - * than once (for instance when a semaphore is changed, more than - * one process may be released) - * - * The critical part of this algorithm is the fact that processes - * communicating the change of conditions may do so before, during or - * after a process has been registered. Since we do not want those signals - * to be lost, a proper ordering of steps is required. - */ - -cl_object -ecl_wait_on(cl_env_ptr env, mp_wait_test condition, cl_object mp_object) -{ -#if defined(HAVE_SIGPROCMASK) - volatile const cl_env_ptr the_env = env; - volatile cl_object own_process = the_env->own_process; - volatile cl_object record; - volatile sigset_t original; - volatile cl_object output; - - /* 0) We reserve a record for the queue. In order to avoid - * using the garbage collector, we reuse records */ - record = own_process->process.queue_record; - unlikely_if (record == ECL_NIL) { - record = ecl_list1(own_process); - } else { - own_process->process.queue_record = ECL_NIL; - } - - /* 1) First we block lisp interrupt signals. This ensures that - * any awake signal that is issued from here is not lost. */ - { - int code = ecl_option_values[ECL_OPT_THREAD_INTERRUPT_SIGNAL]; - sigset_t empty; - sigemptyset(&empty); - sigaddset(&empty, code); - pthread_sigmask(SIG_BLOCK, &empty, (sigset_t *)&original); - } - - /* 2) Now we add ourselves to the queue. */ - own_process->process.woken_up = ECL_NIL; - wait_queue_nconc(the_env, mp_object, record); - - ECL_UNWIND_PROTECT_BEGIN(the_env) { - /* 3) At this point we may receive signals, but we - * might have missed a wakeup event if that happened - * between 0) and 2), which is why we start with the - * check*/ - while (Null(output = condition(the_env, mp_object))) - { - /* This will wait until we get a signal that - * demands some code being executed. Note that - * this includes our communication signals and - * the signals used by the GC. Note also that - * as a consequence we might throw / return - * which is why need to protect it all with - * UNWIND-PROTECT. */ - sigsuspend((sigset_t *)&original); - } - } ECL_UNWIND_PROTECT_EXIT { - /* 4) At this point we wrap up. We remove ourselves - * from the queue and unblock the lisp interrupt - * signal. Note that we recover the cons for later use.*/ - wait_queue_delete(the_env, mp_object, own_process); - own_process->process.queue_record = record; - ECL_RPLACD(record, ECL_NIL); - - /* 5) When this process exits, it may be because it - * aborts (which we know because output == ECL_NIL), or - * because the condition is satisfied. In both cases - * we allow the first in the queue to test again its - * condition. This is needed for objects, such as - * semaphores, where the condition may be satisfied - * more than once. */ - if (Null(output)) { - ecl_wakeup_waiters(the_env, mp_object, ECL_WAKEUP_ONE); - } - - /* 6) Restoring signals is done last, to ensure that - * all cleanup steps are performed. */ - pthread_sigmask(SIG_SETMASK, (sigset_t *)&original, NULL); - } ECL_UNWIND_PROTECT_END; - return output; -#else - return ecl_wait_on_timed(env, condition, mp_object); -#endif -} - -cl_object -ecl_waiter_pop(cl_env_ptr the_env, cl_object q) -{ - cl_object output; - ecl_disable_interrupts_env(the_env); - ecl_get_spinlock(the_env, &q->queue.spinlock); - { - cl_object l; - output = ECL_NIL; - for (l = q->queue.list; l != ECL_NIL; l = ECL_CONS_CDR(l)) { - cl_object p = ECL_CONS_CAR(l); - if (p->process.phase != ECL_PROCESS_INACTIVE && - p->process.phase != ECL_PROCESS_EXITING) { - output = p; - break; - } - } - } - ecl_giveup_spinlock(&q->queue.spinlock); - ecl_enable_interrupts_env(the_env); - return output; -} - -void -ecl_wakeup_waiters(cl_env_ptr the_env, cl_object q, int flags) -{ - ecl_disable_interrupts_env(the_env); - ecl_get_spinlock(the_env, &q->queue.spinlock); - if (q->queue.list != ECL_NIL) { - /* We scan the list of waiting processes, awaking one - * or more, depending on flags. In running through the list - * we eliminate zombie processes --- they should not be here - * because of the UNWIND-PROTECT in ecl_wait_on(), but - * sometimes shit happens */ - cl_object *tail, l; - for (tail = &q->queue.list; (l = *tail) != ECL_NIL; ) { - cl_object p = ECL_CONS_CAR(l); - ecl_get_spinlock(the_env, &p->process.start_stop_spinlock); - if (p->process.phase == ECL_PROCESS_INACTIVE || - p->process.phase == ECL_PROCESS_EXITING) { - print_lock("removing %p", q, p); - *tail = ECL_CONS_CDR(l); - } else { - print_lock("awaking %p", q, p); - /* If the process is active, we then - * simply awake it with a signal.*/ - p->process.woken_up = ECL_T; - if (flags & ECL_WAKEUP_DELETE) - *tail = ECL_CONS_CDR(l); - tail = &ECL_CONS_CDR(l); - if (flags & ECL_WAKEUP_KILL) - ecl_interrupt_process(p, @'mp::exit-process'); - else - ecl_wakeup_process(p); - if (!(flags & ECL_WAKEUP_ALL)) { - ecl_giveup_spinlock(&p->process.start_stop_spinlock); - break; - } - } - ecl_giveup_spinlock(&p->process.start_stop_spinlock); - } - } - ecl_giveup_spinlock(&q->queue.spinlock); - ecl_enable_interrupts_env(the_env); - ecl_process_yield(); -} - -#undef print_lock - -void -print_lock(char *prefix, cl_object l, ...) -{ - static cl_object lock = ECL_NIL; - va_list args; - va_start(args, l); - if (l == ECL_NIL - || ecl_t_of(l) == t_condition_variable - || ECL_FIXNUMP(l->lock.name)) { - cl_env_ptr env = ecl_process_env(); - ecl_get_spinlock(env, &lock); - printf("\n%ld\t", ecl_fixnum(env->own_process->process.name)); - vprintf(prefix, args); - if (l != ECL_NIL) { - cl_object p = l->semaphore.queue_list; - while (p != ECL_NIL) { - printf(" %lx", ecl_fixnum(ECL_CONS_CAR(p)->process.name)); - p = ECL_CONS_CDR(p); - } - } - fflush(stdout); - ecl_giveup_spinlock(&lock); - } - va_end(args); -} -/*#define print_lock(a,b,c) (void)0*/ diff --git a/src/c/unixint.d b/src/c/unixint.d index 6bbb42e26..4b607aede 100644 --- a/src/c/unixint.d +++ b/src/c/unixint.d @@ -423,16 +423,16 @@ handle_all_queued_interrupt_safe(cl_env_ptr env) static void queue_signal(cl_env_ptr env, cl_object code, int allocate) { - /* Note: We don't use ECL_WITH_SPINLOCK_BEGIN/END here since - * it checks for pending interrupts after unlocking the - * spinlock. This would lead to the interrupt being handled + /* Note: We don't use ECL_WITH_NATIVE_LOCK_BEGIN/END here + * since it checks for pending interrupts after unlocking the + * mutex. This would lead to the interrupt being handled * immediately when queueing an interrupt for the current * thread, even when interrupts are disabled. */ - /* INV: interrupts are disabled, therefore the spinlock will + /* INV: interrupts are disabled, therefore the lock will * always be released */ #ifdef ECL_THREADS - ecl_get_spinlock(ecl_process_env(), &env->interrupt_struct->signal_queue_spinlock); + ecl_mutex_lock(&env->interrupt_struct->signal_queue_lock); #endif cl_object record; @@ -453,7 +453,7 @@ queue_signal(cl_env_ptr env, cl_object code, int allocate) } #ifdef ECL_THREADS - ecl_giveup_spinlock(&env->interrupt_struct->signal_queue_spinlock); + ecl_mutex_unlock(&env->interrupt_struct->signal_queue_lock); #endif } @@ -461,19 +461,17 @@ static cl_object pop_signal(cl_env_ptr env) { cl_object record, value; - /* Note: We don't use ECL_WITH_SPINLOCK_BEGIN/END here since - * it checks for pending interrupts after unlocking the - * spinlock. This would lead to handle_all_queued and - * pop_signal being called again and the interrupts being - * handled in the wrong order. */ - - /* INV: ecl_get_spinlock and ecl_giveup_spinlock don't write - * into env, therefore it is valid to use - * ecl_disable_interrupts_env */ + /* Note: We don't use ECL_WITH_NATIVE_LOCK_BEGIN/END here + * since it checks for pending interrupts after unlocking the + * mutex. This would lead to handle_all_queued and pop_signal + * being called again and the interrupts being handled in the + * wrong order. */ + ecl_disable_interrupts_env(env); #ifdef ECL_THREADS - ecl_get_spinlock(env, &env->interrupt_struct->signal_queue_spinlock); + ecl_mutex_lock(&env->interrupt_struct->signal_queue_lock); #endif + if (env->interrupt_struct->pending_interrupt == ECL_NIL) { value = ECL_NIL; } else { @@ -486,8 +484,9 @@ pop_signal(cl_env_ptr env) env->interrupt_struct->signal_queue = record; } } + #ifdef ECL_THREADS - ecl_giveup_spinlock(&env->interrupt_struct->signal_queue_spinlock); + ecl_mutex_unlock(&env->interrupt_struct->signal_queue_lock); #endif ecl_enable_interrupts_env(env); return value; @@ -570,7 +569,7 @@ typedef struct { } signal_thread_message; static cl_object signal_thread_process = ECL_NIL; static signal_thread_message signal_thread_msg; -static cl_object signal_thread_spinlock = ECL_NIL; +static ecl_mutex_t signal_thread_lock; static int signal_thread_pipe[2] = {-1,-1}; static void @@ -592,9 +591,9 @@ handler_fn_prototype(deferred_signal_handler, int sig, siginfo_t *siginfo, void * Note that read() will abort the thread will get notified. */ signal_thread_msg = msg; } else if (signal_thread_pipe[1] > 0) { - ecl_get_spinlock(the_env, &signal_thread_spinlock); + ecl_mutex_lock(&signal_thread_lock); write(signal_thread_pipe[1], &msg, sizeof(msg)); - ecl_giveup_spinlock(&signal_thread_spinlock); + ecl_mutex_unlock(&signal_thread_lock); } else { /* Nothing to do. There is no way to handle this signal because * the responsible thread is not running */ @@ -630,9 +629,9 @@ asynchronous_signal_servicing_thread() * We create the object for communication. We need a lock to prevent other * threads from writing before the pipe is created. */ - ecl_get_spinlock(the_env, &signal_thread_spinlock); + ecl_mutex_lock(&signal_thread_lock); pipe(signal_thread_pipe); - ecl_giveup_spinlock(&signal_thread_spinlock); + ecl_mutex_unlock(&signal_thread_lock); signal_thread_msg.process = ECL_NIL; for (;;) { cl_object signal_code; @@ -1317,6 +1316,9 @@ install_asynchronous_signal_handlers() sigprocmask(SIG_SETMASK, NULL, sigmask); # endif #endif +#if defined(ECL_THREADS) && defined(HAVE_SIGPROCMASK) + ecl_mutex_init(&signal_thread_lock, TRUE); +#endif #ifdef SIGINT if (ecl_option_values[ECL_OPT_TRAP_SIGINT]) { async_handler(SIGINT, non_evil_signal_handler, sigmask); diff --git a/src/configure b/src/configure index f65d8c6d7..0494f6400 100755 --- a/src/configure +++ b/src/configure @@ -7326,7 +7326,7 @@ done for ac_header in sys/utsname.h float.h pwd.h dlfcn.h link.h \ mach-o/dyld.h dirent.h sys/ioctl.h sys/select.h \ - sys/wait.h semaphore.h + sys/wait.h do : as_ac_Header=`$as_echo "ac_cv_header_$ac_header" | $as_tr_sh` ac_fn_c_check_header_mongrel "$LINENO" "$ac_header" "$as_ac_Header" "$ac_includes_default" diff --git a/src/configure.ac b/src/configure.ac index f294a37fa..876d36363 100644 --- a/src/configure.ac +++ b/src/configure.ac @@ -676,7 +676,7 @@ dnl !!! end autoscan AC_CHECK_HEADERS( [sys/utsname.h float.h pwd.h dlfcn.h link.h] \ [mach-o/dyld.h dirent.h sys/ioctl.h sys/select.h] \ - [sys/wait.h semaphore.h] ) + [sys/wait.h] ) AC_CHECK_HEADERS([ulimit.h], [], [], diff --git a/src/ecl/configpre.h b/src/ecl/configpre.h index 6b886dd8b..c26df45bc 100644 --- a/src/ecl/configpre.h +++ b/src/ecl/configpre.h @@ -464,9 +464,6 @@ /* Define to 1 if you have the `select' function. */ #undef HAVE_SELECT -/* Define to 1 if you have the header file. */ -#undef HAVE_SEMAPHORE_H - /* Define to 1 if you have the `setenv' function. */ #undef HAVE_SETENV diff --git a/src/h/external.h b/src/h/external.h index cfec5f24d..9f99d14b6 100755 --- a/src/h/external.h +++ b/src/h/external.h @@ -150,7 +150,7 @@ struct cl_env_struct { struct ecl_interrupt_struct { cl_object pending_interrupt; cl_object signal_queue; - cl_object signal_queue_spinlock; + ecl_mutex_t signal_queue_lock; }; #ifndef __GNUC__ @@ -224,10 +224,10 @@ struct cl_core_struct { #ifdef ECL_THREADS cl_object processes; - cl_object processes_spinlock; - cl_object global_lock; - cl_object error_lock; - cl_object global_env_lock; + ecl_mutex_t processes_lock; + ecl_mutex_t global_lock; + ecl_mutex_t error_lock; + ecl_rwlock_t global_env_lock; #endif cl_object libraries; diff --git a/src/h/internal.h b/src/h/internal.h index e88de9f6e..24904bca4 100755 --- a/src/h/internal.h +++ b/src/h/internal.h @@ -338,11 +338,13 @@ extern void cl_write_object(cl_object x, cl_object stream); /* global locks */ +#include + #ifdef ECL_THREADS # define ECL_WITH_GLOBAL_LOCK_BEGIN(the_env) \ - ECL_WITH_LOCK_BEGIN(the_env, cl_core.global_lock) + ECL_WITH_NATIVE_LOCK_BEGIN(the_env, &cl_core.global_lock) # define ECL_WITH_GLOBAL_LOCK_END \ - ECL_WITH_LOCK_END + ECL_WITH_NATIVE_LOCK_END # define ECL_WITH_LOCK_BEGIN(the_env,lock) { \ const cl_env_ptr __ecl_the_env = the_env; \ const cl_object __ecl_the_lock = lock; \ @@ -354,39 +356,39 @@ extern void cl_write_object(cl_object x, cl_object stream); ECL_UNWIND_PROTECT_THREAD_SAFE_EXIT { \ mp_giveup_lock(__ecl_the_lock); \ } ECL_UNWIND_PROTECT_THREAD_SAFE_END; } -# define ECL_WITH_SPINLOCK_BEGIN(the_env,lock) { \ +# define ECL_WITH_NATIVE_LOCK_BEGIN(the_env,lock) { \ const cl_env_ptr __ecl_the_env = (the_env); \ - cl_object *__ecl_the_lock = (lock); \ + ecl_mutex_t* __ecl_the_lock = (lock); \ ECL_UNWIND_PROTECT_BEGIN(__ecl_the_env); \ - ecl_get_spinlock(__ecl_the_env, __ecl_the_lock); -# define ECL_WITH_SPINLOCK_END \ + ecl_mutex_lock(__ecl_the_lock); +# define ECL_WITH_NATIVE_LOCK_END \ ECL_UNWIND_PROTECT_THREAD_SAFE_EXIT { \ - ecl_giveup_spinlock(__ecl_the_lock); \ + ecl_mutex_unlock(__ecl_the_lock); \ } ECL_UNWIND_PROTECT_THREAD_SAFE_END; } #else # define ECL_WITH_GLOBAL_LOCK_BEGIN(the_env) # define ECL_WITH_GLOBAL_LOCK_END # define ECL_WITH_LOCK_BEGIN(the_env,lock) # define ECL_WITH_LOCK_END -# define ECL_WITH_SPINLOCK_BEGIN(the_env,lock) -# define ECL_WITH_SPINLOCK_END +# define ECL_WITH_NATIVE_LOCK_BEGIN(the_env,lock) +# define ECL_WITH_NATIVE_LOCK_END #endif /* ECL_THREADS */ #ifdef ECL_RWLOCK # define ECL_WITH_GLOBAL_ENV_RDLOCK_BEGIN(the_env) { \ const cl_env_ptr __ecl_pack_env = the_env; \ ecl_bds_bind(__ecl_pack_env, ECL_INTERRUPTS_ENABLED, ECL_NIL); \ - mp_get_rwlock_read_wait(cl_core.global_env_lock); + ecl_rwlock_lock_read(&cl_core.global_env_lock); # define ECL_WITH_GLOBAL_ENV_RDLOCK_END \ - mp_giveup_rwlock_read(cl_core.global_env_lock); \ + ecl_rwlock_unlock_read(&cl_core.global_env_lock); \ ecl_bds_unwind1(__ecl_pack_env); \ ecl_check_pending_interrupts(__ecl_pack_env); } # define ECL_WITH_GLOBAL_ENV_WRLOCK_BEGIN(the_env) { \ const cl_env_ptr __ecl_pack_env = the_env; \ ecl_bds_bind(__ecl_pack_env, ECL_INTERRUPTS_ENABLED, ECL_NIL); \ - mp_get_rwlock_write_wait(cl_core.global_env_lock); + ecl_rwlock_lock_write(&cl_core.global_env_lock); # define ECL_WITH_GLOBAL_ENV_WRLOCK_END \ - mp_giveup_rwlock_write(cl_core.global_env_lock); \ + ecl_rwlock_unlock_write(&cl_core.global_env_lock); \ ecl_bds_unwind1(__ecl_pack_env); \ ecl_check_pending_interrupts(__ecl_pack_env); } #else @@ -465,22 +467,6 @@ extern void ecl_musleep(double time, bool alertable); #define UTC_time_to_universal_time(x) ecl_plus(ecl_make_integer(x),cl_core.Jan1st1970UT) extern cl_fixnum ecl_runtime(void); -/* threads/mutex.d */ - -#ifdef ECL_THREADS -typedef cl_object (*mp_wait_test)(cl_env_ptr, cl_object); - -extern void ecl_process_yield(void); -extern void print_lock(char *s, cl_object lock, ...); -#define print_lock(...) ((void)0) -extern void ecl_get_spinlock(cl_env_ptr env, cl_object *lock); -extern void ecl_giveup_spinlock(cl_object *lock); -extern cl_object ecl_wait_on(cl_env_ptr env, mp_wait_test test, cl_object object); -extern void ecl_wakeup_waiters(cl_env_ptr the_env, cl_object o, int flags); -extern void ecl_wakeup_process(cl_object process); -extern cl_object ecl_waiter_pop(cl_env_ptr the_env, cl_object q); -#endif - /* threads/rwlock.d */ #ifdef ECL_THREADS diff --git a/src/h/object.h b/src/h/object.h index 6d6b58120..b988aba37 100644 --- a/src/h/object.h +++ b/src/h/object.h @@ -957,11 +957,11 @@ struct ecl_process { cl_object interrupt; cl_object initial_bindings; cl_object parent; - cl_object exit_barrier; cl_object exit_values; cl_object woken_up; cl_object queue_record; - cl_object start_stop_spinlock; + ecl_mutex_t start_stop_lock; /* phase is updated only when we hold this lock */ + ecl_cond_var_t exit_barrier; /* process-join waits on this barrier */ cl_index phase; #ifdef ECL_WINDOWS_THREADS HANDLE thread; @@ -979,12 +979,6 @@ enum { ECL_WAKEUP_DELETE = 8 }; -struct ecl_queue { - _ECL_HDR; - cl_object list; - cl_object spinlock; -}; - struct ecl_semaphore { _ECL_HDR; cl_object name; @@ -1143,7 +1137,6 @@ union cl_lispunion { struct ecl_instance instance; /* clos instance */ #ifdef ECL_THREADS struct ecl_process process; /* process */ - struct ecl_queue queue; /* lock */ struct ecl_lock lock; /* lock */ struct ecl_rwlock rwlock; /* read/write lock */ struct ecl_condition_variable condition_variable; /* condition-variable */ diff --git a/src/util/emacs.el b/src/util/emacs.el index 38c46c249..925ab5b0e 100644 --- a/src/util/emacs.el +++ b/src/util/emacs.el @@ -208,7 +208,6 @@ "c/threads/mailbox.d" "c/threads/mutex.d" "c/threads/process.d" -"c/threads/queue.d" "c/threads/rwlock.d" "c/threads/semaphore.d" "c/time.d" -- GitLab From 806336ed2efe7b245593df905385891b746c739f Mon Sep 17 00:00:00 2001 From: Marius Gerbershagen Date: Sat, 26 Sep 2020 18:23:54 +0200 Subject: [PATCH 11/13] multithreading: read-write-lock improvements Read-write locks are always provided; if no operating system primitives exist, emulate them using ordinary locks. Also provide a Windows implementation. --- src/aclocal.m4 | 4 +- src/c/alloc_2.d | 15 +- src/c/ecl_features.h | 2 +- src/c/threads/rwlock.d | 207 ++++++++------------ src/configure | 6 +- src/doc/manual/extensions/mp_ref_rwlock.txi | 7 +- src/ecl/configpre.h | 3 - src/h/internal.h | 134 +++++++------ src/h/object.h | 20 +- src/h/threads.h | 200 +++++++++++++++++++ 10 files changed, 367 insertions(+), 231 deletions(-) diff --git a/src/aclocal.m4 b/src/aclocal.m4 index 21eb7e0c8..ae230e2e5 100644 --- a/src/aclocal.m4 +++ b/src/aclocal.m4 @@ -266,7 +266,7 @@ THREAD_CFLAGS='' THREAD_LIBS='' THREAD_GC_FLAGS='--enable-threads=posix' INSTALL_TARGET='install' -THREAD_OBJ="$THREAD_OBJ threads/process threads/queue threads/mutex threads/condition_variable threads/semaphore threads/barrier threads/mailbox" +THREAD_OBJ="$THREAD_OBJ threads/process threads/mutex threads/condition_variable threads/semaphore threads/barrier threads/mailbox threads/rwlock" clibs='-lm' SONAME='' SONAME_LDFLAGS='' @@ -931,11 +931,9 @@ dnl Check whether we have POSIX read/write locks are available AC_DEFUN([ECL_POSIX_RWLOCK],[ AC_CHECK_FUNC( [pthread_rwlock_init], [ AC_CHECK_TYPES([pthread_rwlock_t], [ - AC_DEFINE([ECL_RWLOCK], [], [ECL_RWLOCK]) AC_DEFINE([HAVE_POSIX_RWLOCK], [], [HAVE_POSIX_RWLOCK]) ], []) ], []) -THREAD_OBJ="$THREAD_OBJ threads/rwlock" ]) diff --git a/src/c/alloc_2.d b/src/c/alloc_2.d index 7931c34c2..511f526b9 100644 --- a/src/c/alloc_2.d +++ b/src/c/alloc_2.d @@ -451,10 +451,7 @@ cl_object_mark_proc(void *addr, struct GC_ms_entry *msp, struct GC_ms_entry *msl break; case t_rwlock: MAYBE_MARK(o->rwlock.name); -# ifndef ECL_RWLOCK - MAYBE_MARK(o->rwlock.mutex); break; -# endif case t_semaphore: MAYBE_MARK(o->semaphore.name); break; @@ -1006,14 +1003,8 @@ init_alloc(void) type_info[t_lock].descriptor = to_bitmap(&o, &(o.lock.name)) | to_bitmap(&o, &(o.lock.owner)); -# ifdef ECL_RWLOCK type_info[t_rwlock].descriptor = to_bitmap(&o, &(o.rwlock.name)); -# else - type_info[t_rwlock].descriptor = - to_bitmap(&o, &(o.rwlock.name)) | - to_bitmap(&o, &(o.rwlock.mutex)); -# endif type_info[t_condition_variable].descriptor = 0; type_info[t_semaphore].descriptor = to_bitmap(&o, &(o.semaphore.name)); @@ -1136,15 +1127,13 @@ standard_finalizer(cl_object o) ecl_enable_interrupts_env(the_env); break; } -# ifdef ECL_RWLOCK case t_rwlock: { const cl_env_ptr the_env = ecl_process_env(); ecl_disable_interrupts_env(the_env); - pthread_rwlock_destroy(&o->rwlock.mutex); + ecl_rwlock_destroy(&o->rwlock.mutex); ecl_enable_interrupts_env(the_env); break; } -# endif case t_process: { const cl_env_ptr the_env = ecl_process_env(); ecl_disable_interrupts_env(the_env); @@ -1196,9 +1185,7 @@ register_finalizer(cl_object o, void *finalized_object, case t_barrier: case t_semaphore: case t_mailbox: -# if defined(ECL_RWLOCK) case t_rwlock: -# endif case t_process: #endif /* ECL_THREADS */ /* Don't delete the standard finalizer. */ diff --git a/src/c/ecl_features.h b/src/c/ecl_features.h index 963d007d0..f2fc96ee9 100644 --- a/src/c/ecl_features.h +++ b/src/c/ecl_features.h @@ -102,7 +102,7 @@ ecl_def_string_array(feature_names,static,const) = { #ifdef ECL_SEMAPHORES ecl_def_string_array_elt("SEMAPHORES"), #endif -#ifdef ECL_RWLOCK +#if defined(HAVE_POSIX_RWLOCK) || defined(ECL_WINDOWS_THREADS) ecl_def_string_array_elt("ECL-READ-WRITE-LOCK"), #endif #ifdef WORDS_BIGENDIAN diff --git a/src/c/threads/rwlock.d b/src/c/threads/rwlock.d index e55f29a12..d95533175 100644 --- a/src/c/threads/rwlock.d +++ b/src/c/threads/rwlock.d @@ -26,59 +26,16 @@ * READ/WRITE LOCKS */ -static void -FEerror_not_a_rwlock(cl_object lock) -{ - FEwrong_type_argument(@'mp::rwlock', lock); -} - -static void -FEunknown_rwlock_error(cl_object lock, int rc) -{ -#ifdef ECL_WINDOWS_THREADS - FEwin32_error("When acting on rwlock ~A, got an unexpected error.", 1, lock); -#else - const char *msg = NULL; - switch (rc) { - case EINVAL: - msg = "The value specified by rwlock is invalid"; - break; - case EPERM: - msg = "Read/write lock not owned by us"; - break; - case EDEADLK: - msg = "Thread already owns this lock"; - break; - case ENOMEM: - msg = "Out of memory"; - break; - default: - FElibc_error("When acting on rwlock ~A, got an unexpected error.", - 1, lock); - } - FEerror("When acting on rwlock ~A, got the following C library error:~%" - "~A", 2, lock, ecl_make_constant_base_string(msg,-1)); -#endif -} - cl_object ecl_make_rwlock(cl_object name) { - const cl_env_ptr the_env = ecl_process_env(); + cl_env_ptr env = ecl_process_env(); cl_object output = ecl_alloc_object(t_rwlock); -#ifdef ECL_RWLOCK - int rc; - ecl_disable_interrupts_env(the_env); - rc = pthread_rwlock_init(&output->rwlock.mutex, NULL); - ecl_enable_interrupts_env(the_env); - if (rc) { - FEerror("Unable to create read/write lock", 0); - } - ecl_set_finalizer_unprotected(output, ECL_T); -#else - output->rwlock.mutex = ecl_make_lock(name, 0); -#endif output->rwlock.name = name; + ecl_disable_interrupts_env(env); + ecl_rwlock_init(&output->rwlock.mutex); + ecl_set_finalizer_unprotected(output, ECL_T); + ecl_enable_interrupts_env(env); return output; } @@ -91,134 +48,130 @@ cl_object mp_rwlock_name(cl_object lock) { const cl_env_ptr env = ecl_process_env(); - if (ecl_t_of(lock) != t_rwlock) - FEerror_not_a_rwlock(lock); + if (ecl_unlikely(ecl_t_of(lock) != t_rwlock)) { + FEwrong_type_only_arg(@[mp::rwlock-name], lock, @[mp::rwlock]); + } ecl_return1(env, lock->rwlock.name); } cl_object mp_giveup_rwlock_read(cl_object lock) { - /* Must be called with interrupts disabled. */ - if (ecl_t_of(lock) != t_rwlock) - FEerror_not_a_rwlock(lock); -#ifdef ECL_RWLOCK - { - int rc = pthread_rwlock_unlock(&lock->rwlock.mutex); - if (rc) - FEunknown_rwlock_error(lock, rc); - @(return ECL_T); + const cl_env_ptr env = ecl_process_env(); + int rc; + if (ecl_unlikely(ecl_t_of(lock) != t_rwlock)) { + FEwrong_type_only_arg(@[mp::giveup-rwlock-read], lock, @[mp::rwlock]); + } + rc = ecl_rwlock_unlock_read(&lock->rwlock.mutex); + if (ecl_likely(rc == ECL_MUTEX_SUCCESS)) { + ecl_return1(env, ECL_T); + } else if (rc == ECL_MUTEX_NOT_OWNED) { + FEerror_not_owned(lock); + } else { + FEunknown_lock_error(lock); } -#else - return mp_giveup_lock(lock->rwlock.mutex); -#endif } cl_object mp_giveup_rwlock_write(cl_object lock) { - return mp_giveup_rwlock_read(lock); + const cl_env_ptr env = ecl_process_env(); + int rc; + if (ecl_unlikely(ecl_t_of(lock) != t_rwlock)) { + FEwrong_type_only_arg(@[mp::giveup-rwlock-write], lock, @[mp::rwlock]); + } + rc = ecl_rwlock_unlock_write(&lock->rwlock.mutex); + if (ecl_likely(rc == ECL_MUTEX_SUCCESS)) { + ecl_return1(env, ECL_T); + } else if (rc == ECL_MUTEX_NOT_OWNED) { + FEerror_not_owned(lock); + } else { + FEunknown_lock_error(lock); + } } cl_object mp_get_rwlock_read_nowait(cl_object lock) { - if (ecl_t_of(lock) != t_rwlock) - FEerror_not_a_rwlock(lock); -#ifdef ECL_RWLOCK - { - const cl_env_ptr env = ecl_process_env(); - cl_object output = ECL_T; - int rc = pthread_rwlock_tryrdlock(&lock->rwlock.mutex); - if (rc == 0) { - output = ECL_T; - } else if (rc == EBUSY) { - output = ECL_NIL; - } else { - FEunknown_rwlock_error(lock, rc); - } - ecl_return1(env, output); + const cl_env_ptr env = ecl_process_env(); + int rc; + if (ecl_unlikely(ecl_t_of(lock) != t_rwlock)) { + FEwrong_type_only_arg(@[mp::get-rwlock-read], lock, @[mp::rwlock]); + } + rc = ecl_rwlock_trylock_read(&lock->rwlock.mutex); + if (rc == ECL_MUTEX_SUCCESS) { + ecl_return1(env, ECL_T); + } else if (rc == ECL_MUTEX_LOCKED) { + ecl_return1(env, ECL_NIL); + } else { + FEunknown_lock_error(lock); } -#else - return mp_get_lock_nowait(lock->rwlock.mutex); -#endif } cl_object mp_get_rwlock_read_wait(cl_object lock) { - if (ecl_t_of(lock) != t_rwlock) - FEerror_not_a_rwlock(lock); -#ifdef ECL_RWLOCK - { - const cl_env_ptr env = ecl_process_env(); - int rc = pthread_rwlock_rdlock(&lock->rwlock.mutex); - if (rc != 0) { - FEunknown_rwlock_error(lock, rc); - } + const cl_env_ptr env = ecl_process_env(); + int rc; + if (ecl_unlikely(ecl_t_of(lock) != t_rwlock)) { + FEwrong_type_only_arg(@[mp::get-rwlock-read], lock, @[mp::rwlock]); + } + rc = ecl_rwlock_lock_read(&lock->rwlock.mutex); + if (ecl_likely(rc == ECL_MUTEX_SUCCESS)) { ecl_return1(env, ECL_T); + } else { + FEunknown_lock_error(lock); } -#else - return mp_get_lock_wait(lock->rwlock.mutex); -#endif } @(defun mp::get-rwlock-read (lock &optional (wait ECL_T)) @ - if (Null(wait)) + if (Null(wait)) { return mp_get_rwlock_read_nowait(lock); - else + } else { return mp_get_rwlock_read_wait(lock); + } @) cl_object mp_get_rwlock_write_nowait(cl_object lock) { - if (ecl_t_of(lock) != t_rwlock) - FEerror_not_a_rwlock(lock); -#ifdef ECL_RWLOCK - { - const cl_env_ptr env = ecl_process_env(); - cl_object output = ECL_T; - int rc = pthread_rwlock_trywrlock(&lock->rwlock.mutex); - if (rc == 0) { - output = ECL_T; - } else if (rc == EBUSY) { - output = ECL_NIL; - } else { - FEunknown_rwlock_error(lock, rc); - } - ecl_return1(env, output); + const cl_env_ptr env = ecl_process_env(); + int rc; + if (ecl_unlikely(ecl_t_of(lock) != t_rwlock)) { + FEwrong_type_only_arg(@[mp::get-rwlock-write], lock, @[mp::rwlock]); + } + rc = ecl_rwlock_trylock_write(&lock->rwlock.mutex); + if (rc == ECL_MUTEX_SUCCESS) { + ecl_return1(env, ECL_T); + } else if (rc == ECL_MUTEX_LOCKED) { + ecl_return1(env, ECL_NIL); + } else { + FEunknown_lock_error(lock); } -#else - return mp_get_lock_nowait(lock->rwlock.mutex); -#endif } cl_object mp_get_rwlock_write_wait(cl_object lock) { - if (ecl_t_of(lock) != t_rwlock) - FEerror_not_a_rwlock(lock); -#ifdef ECL_RWLOCK - { - int rc = pthread_rwlock_wrlock(&lock->rwlock.mutex); - if (rc != 0) { - FEunknown_rwlock_error(lock, rc); - } - @(return ECL_T); + const cl_env_ptr env = ecl_process_env(); + int rc; + if (ecl_unlikely(ecl_t_of(lock) != t_rwlock)) { + FEwrong_type_only_arg(@[mp::get-rwlock-write], lock, @[mp::rwlock]); + } + rc = ecl_rwlock_lock_write(&lock->rwlock.mutex); + if (ecl_likely(rc == ECL_MUTEX_SUCCESS)) { + ecl_return1(env, ECL_T); + } else { + FEunknown_lock_error(lock); } -#else - return mp_get_lock_wait(lock->rwlock.mutex); -#endif } @(defun mp::get-rwlock-write (lock &optional (wait ECL_T)) @ if (Null(wait)) { return mp_get_rwlock_write_nowait(lock); - } - else { + } else { return mp_get_rwlock_write_wait(lock); } @) diff --git a/src/configure b/src/configure index 0494f6400..6146f44c9 100755 --- a/src/configure +++ b/src/configure @@ -5142,7 +5142,7 @@ THREAD_CFLAGS='' THREAD_LIBS='' THREAD_GC_FLAGS='--enable-threads=posix' INSTALL_TARGET='install' -THREAD_OBJ="$THREAD_OBJ threads/process threads/queue threads/mutex threads/condition_variable threads/semaphore threads/barrier threads/mailbox" +THREAD_OBJ="$THREAD_OBJ threads/process threads/mutex threads/condition_variable threads/semaphore threads/barrier threads/mailbox threads/rwlock" clibs='-lm' SONAME='' SONAME_LDFLAGS='' @@ -6225,9 +6225,6 @@ _ACEOF -$as_echo "#define ECL_RWLOCK /**/" >>confdefs.h - - $as_echo "#define HAVE_POSIX_RWLOCK /**/" >>confdefs.h @@ -6236,7 +6233,6 @@ fi fi -THREAD_OBJ="$THREAD_OBJ threads/rwlock" boehm_configure_flags="${boehm_configure_flags} ${THREAD_GC_FLAGS}" for k in $THREAD_OBJ; do EXTRA_OBJS="$EXTRA_OBJS ${k}.${OBJEXT}"; done diff --git a/src/doc/manual/extensions/mp_ref_rwlock.txi b/src/doc/manual/extensions/mp_ref_rwlock.txi index 305ca1647..973d8700d 100644 --- a/src/doc/manual/extensions/mp_ref_rwlock.txi +++ b/src/doc/manual/extensions/mp_ref_rwlock.txi @@ -7,11 +7,8 @@ Readers-writer (or shared-exclusive ) locks allow concurrent access for read-only operations, while write operations require exclusive -access. @code{mp:rwlock} is non-recursive. - -Readers-writers locks are an optional feature, which is available if -@code{*features*} includes @code{:ecl-read-write-lock}. - +access. @code{mp:rwlock} is non-recursive and cannot be used together +with condition variables. @node Readers-writer locks dictionary @subsection Read-Write locks dictionary diff --git a/src/ecl/configpre.h b/src/ecl/configpre.h index c26df45bc..a2ed80c40 100644 --- a/src/ecl/configpre.h +++ b/src/ecl/configpre.h @@ -45,9 +45,6 @@ /* Define if your newline is CRLF */ #undef ECL_NEWLINE_IS_CRLF -/* ECL_RWLOCK */ -#undef ECL_RWLOCK - /* ECL_SIGNED_ZERO */ #undef ECL_SIGNED_ZERO diff --git a/src/h/internal.h b/src/h/internal.h index 24904bca4..b66cdcfda 100755 --- a/src/h/internal.h +++ b/src/h/internal.h @@ -336,69 +336,12 @@ extern void _ecl_string_push_c_string(cl_object s, const char *c); extern void cl_write_object(cl_object x, cl_object stream); -/* global locks */ - -#include +/* threads/rwlock.d */ #ifdef ECL_THREADS -# define ECL_WITH_GLOBAL_LOCK_BEGIN(the_env) \ - ECL_WITH_NATIVE_LOCK_BEGIN(the_env, &cl_core.global_lock) -# define ECL_WITH_GLOBAL_LOCK_END \ - ECL_WITH_NATIVE_LOCK_END -# define ECL_WITH_LOCK_BEGIN(the_env,lock) { \ - const cl_env_ptr __ecl_the_env = the_env; \ - const cl_object __ecl_the_lock = lock; \ - ecl_disable_interrupts_env(__ecl_the_env); \ - mp_get_lock_wait(__ecl_the_lock); \ - ECL_UNWIND_PROTECT_BEGIN(__ecl_the_env); \ - ecl_enable_interrupts_env(__ecl_the_env); -# define ECL_WITH_LOCK_END \ - ECL_UNWIND_PROTECT_THREAD_SAFE_EXIT { \ - mp_giveup_lock(__ecl_the_lock); \ - } ECL_UNWIND_PROTECT_THREAD_SAFE_END; } -# define ECL_WITH_NATIVE_LOCK_BEGIN(the_env,lock) { \ - const cl_env_ptr __ecl_the_env = (the_env); \ - ecl_mutex_t* __ecl_the_lock = (lock); \ - ECL_UNWIND_PROTECT_BEGIN(__ecl_the_env); \ - ecl_mutex_lock(__ecl_the_lock); -# define ECL_WITH_NATIVE_LOCK_END \ - ECL_UNWIND_PROTECT_THREAD_SAFE_EXIT { \ - ecl_mutex_unlock(__ecl_the_lock); \ - } ECL_UNWIND_PROTECT_THREAD_SAFE_END; } -#else -# define ECL_WITH_GLOBAL_LOCK_BEGIN(the_env) -# define ECL_WITH_GLOBAL_LOCK_END -# define ECL_WITH_LOCK_BEGIN(the_env,lock) -# define ECL_WITH_LOCK_END -# define ECL_WITH_NATIVE_LOCK_BEGIN(the_env,lock) -# define ECL_WITH_NATIVE_LOCK_END -#endif /* ECL_THREADS */ - -#ifdef ECL_RWLOCK -# define ECL_WITH_GLOBAL_ENV_RDLOCK_BEGIN(the_env) { \ - const cl_env_ptr __ecl_pack_env = the_env; \ - ecl_bds_bind(__ecl_pack_env, ECL_INTERRUPTS_ENABLED, ECL_NIL); \ - ecl_rwlock_lock_read(&cl_core.global_env_lock); -# define ECL_WITH_GLOBAL_ENV_RDLOCK_END \ - ecl_rwlock_unlock_read(&cl_core.global_env_lock); \ - ecl_bds_unwind1(__ecl_pack_env); \ - ecl_check_pending_interrupts(__ecl_pack_env); } -# define ECL_WITH_GLOBAL_ENV_WRLOCK_BEGIN(the_env) { \ - const cl_env_ptr __ecl_pack_env = the_env; \ - ecl_bds_bind(__ecl_pack_env, ECL_INTERRUPTS_ENABLED, ECL_NIL); \ - ecl_rwlock_lock_write(&cl_core.global_env_lock); -# define ECL_WITH_GLOBAL_ENV_WRLOCK_END \ - ecl_rwlock_unlock_write(&cl_core.global_env_lock); \ - ecl_bds_unwind1(__ecl_pack_env); \ - ecl_check_pending_interrupts(__ecl_pack_env); } -#else -# define ECL_WITH_GLOBAL_ENV_RDLOCK_BEGIN(the_env) -# define ECL_WITH_GLOBAL_ENV_RDLOCK_END -# define ECL_WITH_GLOBAL_ENV_WRLOCK_BEGIN(the_env) -# define ECL_WITH_GLOBAL_ENV_WRLOCK_END -#endif /* ECL_RWLOCK */ - -#include +extern cl_object mp_get_rwlock_read_wait(cl_object lock); +extern cl_object mp_get_rwlock_write_wait(cl_object lock); +#endif /* read.d */ #ifdef ECL_UNICODE @@ -467,13 +410,6 @@ extern void ecl_musleep(double time, bool alertable); #define UTC_time_to_universal_time(x) ecl_plus(ecl_make_integer(x),cl_core.Jan1st1970UT) extern cl_fixnum ecl_runtime(void); -/* threads/rwlock.d */ - -#ifdef ECL_THREADS -extern cl_object mp_get_rwlock_read_wait(cl_object lock); -extern cl_object mp_get_rwlock_write_wait(cl_object lock); -#endif - /* unixfsys.d */ /* Filename encodings: on Unix we use ordinary chars encoded in a user @@ -624,6 +560,68 @@ extern void ecl_interrupt_process(cl_object process, cl_object function); # endif #endif /* ECL_MS_WINDOWS_HOST */ +/* global locks */ + +#include + +#ifdef ECL_THREADS +# define ECL_WITH_GLOBAL_LOCK_BEGIN(the_env) \ + ECL_WITH_NATIVE_LOCK_BEGIN(the_env, &cl_core.global_lock) +# define ECL_WITH_GLOBAL_LOCK_END \ + ECL_WITH_NATIVE_LOCK_END +# define ECL_WITH_LOCK_BEGIN(the_env,lock) { \ + const cl_env_ptr __ecl_the_env = the_env; \ + const cl_object __ecl_the_lock = lock; \ + ecl_disable_interrupts_env(__ecl_the_env); \ + mp_get_lock_wait(__ecl_the_lock); \ + ECL_UNWIND_PROTECT_BEGIN(__ecl_the_env); \ + ecl_enable_interrupts_env(__ecl_the_env); +# define ECL_WITH_LOCK_END \ + ECL_UNWIND_PROTECT_THREAD_SAFE_EXIT { \ + mp_giveup_lock(__ecl_the_lock); \ + } ECL_UNWIND_PROTECT_THREAD_SAFE_END; } +# define ECL_WITH_NATIVE_LOCK_BEGIN(the_env,lock) { \ + const cl_env_ptr __ecl_the_env = (the_env); \ + ecl_mutex_t* __ecl_the_lock = (lock); \ + ecl_disable_interrupts_env(__ecl_the_env); \ + ecl_mutex_lock(__ecl_the_lock); \ + ECL_UNWIND_PROTECT_BEGIN(__ecl_the_env); \ + ecl_enable_interrupts_env(__ecl_the_env); +# define ECL_WITH_NATIVE_LOCK_END \ + ECL_UNWIND_PROTECT_THREAD_SAFE_EXIT { \ + ecl_mutex_unlock(__ecl_the_lock); \ + } ECL_UNWIND_PROTECT_THREAD_SAFE_END; } +# define ECL_WITH_GLOBAL_ENV_RDLOCK_BEGIN(the_env) { \ + const cl_env_ptr __ecl_pack_env = the_env; \ + ecl_bds_bind(__ecl_pack_env, ECL_INTERRUPTS_ENABLED, ECL_NIL); \ + ecl_rwlock_lock_read(&cl_core.global_env_lock); +# define ECL_WITH_GLOBAL_ENV_RDLOCK_END \ + ecl_rwlock_unlock_read(&cl_core.global_env_lock); \ + ecl_bds_unwind1(__ecl_pack_env); \ + ecl_check_pending_interrupts(__ecl_pack_env); } +# define ECL_WITH_GLOBAL_ENV_WRLOCK_BEGIN(the_env) { \ + const cl_env_ptr __ecl_pack_env = the_env; \ + ecl_bds_bind(__ecl_pack_env, ECL_INTERRUPTS_ENABLED, ECL_NIL); \ + ecl_rwlock_lock_write(&cl_core.global_env_lock); +# define ECL_WITH_GLOBAL_ENV_WRLOCK_END \ + ecl_rwlock_unlock_write(&cl_core.global_env_lock); \ + ecl_bds_unwind1(__ecl_pack_env); \ + ecl_check_pending_interrupts(__ecl_pack_env); } +#else +# define ECL_WITH_GLOBAL_LOCK_BEGIN(the_env) +# define ECL_WITH_GLOBAL_LOCK_END +# define ECL_WITH_LOCK_BEGIN(the_env,lock) +# define ECL_WITH_LOCK_END +# define ECL_WITH_NATIVE_LOCK_BEGIN(the_env,lock) +# define ECL_WITH_NATIVE_LOCK_END +# define ECL_WITH_GLOBAL_ENV_RDLOCK_BEGIN(the_env) +# define ECL_WITH_GLOBAL_ENV_RDLOCK_END +# define ECL_WITH_GLOBAL_ENV_WRLOCK_BEGIN(the_env) +# define ECL_WITH_GLOBAL_ENV_WRLOCK_END +#endif /* ECL_THREADS */ + +#include + /* * Fake several ISO C99 mathematical functions if not available */ diff --git a/src/h/object.h b/src/h/object.h index b988aba37..69c26660f 100644 --- a/src/h/object.h +++ b/src/h/object.h @@ -936,9 +936,23 @@ typedef struct ecl_cond_var_t { HANDLE signal_event; cl_index state; } ecl_cond_var_t; +typedef SRWLOCK ecl_rwlock_t; #else typedef pthread_mutex_t ecl_mutex_t; typedef pthread_cond_t ecl_cond_var_t; +# ifdef HAVE_POSIX_RWLOCK +typedef pthread_rwlock_t ecl_rwlock_t; +# else +typedef struct ecl_rwlock_t { + pthread_mutex_t mutex; + pthread_cond_t reader_cv; + pthread_cond_t writer_cv; + /* reader_count = 0: lock is free + * reader_count =-1: writer holds the lock + * reader_count > 0: number of readers */ + cl_fixnum reader_count; +} ecl_rwlock_t; +# endif #endif enum { @@ -1023,11 +1037,7 @@ struct ecl_mailbox { struct ecl_rwlock { _ECL_HDR; cl_object name; -#ifdef ECL_RWLOCK - pthread_rwlock_t mutex; -#else - cl_object mutex; -#endif + ecl_rwlock_t mutex; }; struct ecl_condition_variable { diff --git a/src/h/threads.h b/src/h/threads.h index 07d6a9ee7..7cbb89cfa 100644 --- a/src/h/threads.h +++ b/src/h/threads.h @@ -140,6 +140,152 @@ ecl_cond_var_broadcast(ecl_cond_var_t *cv) return pthread_cond_broadcast(cv); } +/* READ-WRITE LOCK */ + +/* If posix rwlocks are not available, we provide our own fallback + * implementation. Note that for reasons of simplicity and performance + * this implementation is a) not interrupt safe, b) will not signal + * errors on unlocking a mutex which is owned by a different thread + * and c) allows for writer starvation, i.e. as long as readers are + * active, the writer will never obtain the lock. */ + +static inline void +ecl_rwlock_init(ecl_rwlock_t *rwlock) +{ +#if defined(HAVE_POSIX_RWLOCK) + pthread_rwlock_init(rwlock, NULL); +#else + pthread_mutex_init(&rwlock->mutex, NULL); + pthread_cond_init(&rwlock->reader_cv, NULL); + pthread_cond_init(&rwlock->writer_cv, NULL); + rwlock->reader_count = 0; +#endif +} + +static inline void +ecl_rwlock_destroy(ecl_rwlock_t *rwlock) +{ +#if defined(HAVE_POSIX_RWLOCK) + pthread_rwlock_destroy(rwlock); +#else + pthread_mutex_destroy(&rwlock->mutex); + pthread_cond_destroy(&rwlock->reader_cv); + pthread_cond_destroy(&rwlock->writer_cv); +#endif +} + +static inline int +ecl_rwlock_unlock_read(ecl_rwlock_t *rwlock) +{ +#if defined(HAVE_POSIX_RWLOCK) + return pthread_rwlock_unlock(rwlock); +#else + int rc; + pthread_mutex_lock(&rwlock->mutex); + if (rwlock->reader_count <= 0) { + rc = ECL_MUTEX_NOT_OWNED; + } else { + if (--rwlock->reader_count == 0) { + pthread_cond_signal(&rwlock->writer_cv); + } + rc = ECL_MUTEX_SUCCESS; + } + pthread_mutex_unlock(&rwlock->mutex); + return rc; +#endif +} + +static inline int +ecl_rwlock_unlock_write(ecl_rwlock_t *rwlock) +{ +#if defined(HAVE_POSIX_RWLOCK) + return pthread_rwlock_unlock(rwlock); +#else + int rc; + pthread_mutex_lock(&rwlock->mutex); + if (rwlock->reader_count >= 0) { + rc = ECL_MUTEX_NOT_OWNED; + } else { + rwlock->reader_count = 0; + pthread_cond_signal(&rwlock->writer_cv); + pthread_cond_broadcast(&rwlock->reader_cv); + rc = ECL_MUTEX_SUCCESS; + } + pthread_mutex_unlock(&rwlock->mutex); + return rc; +#endif +} + +static inline int +ecl_rwlock_trylock_read(ecl_rwlock_t *rwlock) +{ +#if defined(HAVE_POSIX_RWLOCK) + return pthread_rwlock_tryrdlock(rwlock); +#else + int rc; + pthread_mutex_lock(&rwlock->mutex); + if (rwlock->reader_count == -1) { + rc = ECL_MUTEX_LOCKED; + } else { + ++rwlock->reader_count; + rc = ECL_MUTEX_SUCCESS; + } + pthread_mutex_unlock(&rwlock->mutex); + return rc; +#endif +} + +static inline int +ecl_rwlock_trylock_write(ecl_rwlock_t *rwlock) +{ +#if defined(HAVE_POSIX_RWLOCK) + return pthread_rwlock_trywrlock(rwlock); +#else + int rc; + pthread_mutex_lock(&rwlock->mutex); + if (rwlock->reader_count != 0) { + rc = ECL_MUTEX_LOCKED; + } else { + rwlock->reader_count = -1; + rc = ECL_MUTEX_SUCCESS; + } + pthread_mutex_unlock(&rwlock->mutex); + return rc; +#endif +} + +static inline int +ecl_rwlock_lock_read(ecl_rwlock_t *rwlock) +{ +#if defined(HAVE_POSIX_RWLOCK) + return pthread_rwlock_rdlock(rwlock); +#else + pthread_mutex_lock(&rwlock->mutex); + while (rwlock->reader_count == -1) { + pthread_cond_wait(&rwlock->reader_cv, &rwlock->mutex); + } + ++rwlock->reader_count; + pthread_mutex_unlock(&rwlock->mutex); + return ECL_MUTEX_SUCCESS; +#endif +} + +static inline int +ecl_rwlock_lock_write(ecl_rwlock_t *rwlock) +{ +#if defined(HAVE_POSIX_RWLOCK) + return pthread_rwlock_wrlock(rwlock); +#else + pthread_mutex_lock(&rwlock->mutex); + while (rwlock->reader_count != 0) { + pthread_cond_wait(&rwlock->writer_cv, &rwlock->mutex); + } + rwlock->reader_count = -1; + pthread_mutex_unlock(&rwlock->mutex); + return ECL_MUTEX_SUCCESS; +#endif +} + #else /* ECL_WINDOWS_THREADS */ /* To allow for timed wait operations on locks and for interrupting @@ -467,6 +613,60 @@ ecl_cond_var_broadcast(ecl_cond_var_t *cv) !AO_compare_and_swap_full((AO_t*)&cv->state, (AO_t)old_state, (AO_t)new_state)); return SetEvent(cv->broadcast_event) ? ECL_MUTEX_SUCCESS : GetLastError(); } + +/* READ-WRITE LOCK */ + +static inline void +ecl_rwlock_init(ecl_rwlock_t *rwlock) +{ + InitializeSRWLock(rwlock); +} + +static inline void +ecl_rwlock_destroy(ecl_rwlock_t *rwlock) { } + +static inline int +ecl_rwlock_unlock_read(ecl_rwlock_t *rwlock) +{ + ReleaseSRWLockShared(rwlock); + return ECL_MUTEX_SUCCESS; +} + +static inline int +ecl_rwlock_unlock_write(ecl_rwlock_t *rwlock) +{ + ReleaseSRWLockExclusive(rwlock); + return ECL_MUTEX_SUCCESS; +} + +static inline int +ecl_rwlock_trylock_read(ecl_rwlock_t *rwlock) +{ + return (TryAcquireSRWLockShared(rwlock) == 0) ? ECL_MUTEX_LOCKED : ECL_MUTEX_SUCCESS; +} + +static inline int +ecl_rwlock_trylock_write(ecl_rwlock_t *rwlock) +{ + return (TryAcquireSRWLockExclusive(rwlock) == 0) ? ECL_MUTEX_LOCKED : ECL_MUTEX_SUCCESS; +} + +static inline int +ecl_rwlock_lock_read(ecl_rwlock_t *rwlock) +{ + AcquireSRWLockShared(rwlock); + return ECL_MUTEX_SUCCESS; +} + +static inline int +ecl_rwlock_lock_write(ecl_rwlock_t *rwlock) +{ + AcquireSRWLockExclusive(rwlock); + return ECL_MUTEX_SUCCESS; +} + +#endif /* ECL_WINDOWS_THREADS */ + #endif /* ECL_MUTEX_H */ #endif /* ECL_THREADS */ -- GitLab From 5f65deea5bbeb2210c4b5ecec13468da4770d246 Mon Sep 17 00:00:00 2001 From: Marius Gerbershagen Date: Sun, 11 Oct 2020 20:50:38 +0200 Subject: [PATCH 12/13] multithreading: implement optional timeout for mp:get-lock --- src/aclocal.m4 | 3 +- src/c/symbols_list.h | 1 + src/c/threads/mutex.d | 100 +++++++++++++++++++++ src/configure | 11 +++ src/configure.ac | 2 +- src/doc/manual/extensions/mp_ref_mutex.txi | 4 +- src/ecl/configpre.h | 3 + src/h/config-internal.h.in | 2 + src/h/internal.h | 4 + src/h/threads.h | 30 +++++++ src/lsp/mp.lsp | 9 +- src/tests/normal-tests/multiprocessing.lsp | 39 ++++++++ 12 files changed, 201 insertions(+), 7 deletions(-) diff --git a/src/aclocal.m4 b/src/aclocal.m4 index ae230e2e5..cee0863c9 100644 --- a/src/aclocal.m4 +++ b/src/aclocal.m4 @@ -928,12 +928,13 @@ fi dnl ---------------------------------------------------------------------- dnl Check whether we have POSIX read/write locks are available -AC_DEFUN([ECL_POSIX_RWLOCK],[ +AC_DEFUN([ECL_PTHREAD_EXTENSIONS],[ AC_CHECK_FUNC( [pthread_rwlock_init], [ AC_CHECK_TYPES([pthread_rwlock_t], [ AC_DEFINE([HAVE_POSIX_RWLOCK], [], [HAVE_POSIX_RWLOCK]) ], []) ], []) +AC_CHECK_FUNCS([pthread_mutex_timedlock]) ]) diff --git a/src/c/symbols_list.h b/src/c/symbols_list.h index 80040ea94..fc99cd686 100755 --- a/src/c/symbols_list.h +++ b/src/c/symbols_list.h @@ -1609,6 +1609,7 @@ cl_symbols[] = { {MP_ "LOCK-COUNT" ECL_FUN("mp_lock_count", IF_MP(mp_lock_count), 1) ECL_VAR(MP_ORDINARY, OBJNULL)}, {MP_ "GET-LOCK" ECL_FUN("mp_get_lock", IF_MP(mp_get_lock), -2) ECL_VAR(MP_ORDINARY, OBJNULL)}, {MP_ "GIVEUP-LOCK" ECL_FUN("mp_giveup_lock", IF_MP(mp_giveup_lock), 1) ECL_VAR(MP_ORDINARY, OBJNULL)}, +{SYS_ "MUTEX-TIMEOUT" ECL_FUN("si_mutex_timeout", IF_MP(si_mutex_timeout), 3) ECL_VAR(SI_SPECIAL, OBJNULL)}, {MP_ "MAKE-CONDITION-VARIABLE" ECL_FUN("mp_make_condition_variable", IF_MP(mp_make_condition_variable), 0) ECL_VAR(MP_ORDINARY, OBJNULL)}, {MP_ "CONDITION-VARIABLE-WAIT" ECL_FUN("mp_condition_variable_wait", IF_MP(mp_condition_variable_wait), 2) ECL_VAR(MP_ORDINARY, OBJNULL)}, {MP_ "CONDITION-VARIABLE-TIMEDWAIT" ECL_FUN("mp_condition_variable_timedwait", IF_MP(mp_condition_variable_timedwait), 3) ECL_VAR(MP_ORDINARY, OBJNULL)}, diff --git a/src/c/threads/mutex.d b/src/c/threads/mutex.d index 4a28a827c..a11a85e0c 100755 --- a/src/c/threads/mutex.d +++ b/src/c/threads/mutex.d @@ -199,6 +199,7 @@ mp_get_lock_wait(cl_object lock) #endif rc = ecl_mutex_lock(&lock->lock.mutex); if (ecl_likely(rc == ECL_MUTEX_SUCCESS)) { + ecl_disable_interrupts_env(env); lock->lock.counter++; lock->lock.owner = own_process; ecl_enable_interrupts_env(env); @@ -208,7 +209,104 @@ mp_get_lock_wait(cl_object lock) FEerror_not_a_recursive_lock(lock); #endif } else { + FEunknown_lock_error(lock); + } +} + +static cl_object +si_abort_wait_on_mutex(cl_narg narg, ...) +{ + const cl_env_ptr the_env = ecl_process_env(); + cl_object env = the_env->function->cclosure.env; + cl_object lock = CAR(env); + if (ECL_SYM_VAL(the_env, @'si::mutex-timeout') == lock) { + ECL_SETQ(the_env, @'si::mutex-timeout', ECL_T); + cl_throw(@'si::mutex-timeout'); + } + @(return) +} + +cl_object +si_mutex_timeout(cl_object process, cl_object lock, cl_object timeout) +{ + const cl_env_ptr the_env = ecl_process_env(); + if (cl_plusp(timeout)) { + cl_sleep(timeout); + } + ECL_WITH_NATIVE_LOCK_BEGIN(the_env, &process->process.start_stop_lock) { + if (ecl_likely(mp_process_active_p(process) != ECL_NIL)) { + ecl_interrupt_process(process, + ecl_make_cclosure_va(si_abort_wait_on_mutex, + cl_list(1, lock), + @'si::mutex-timeout', + 0)); + } + } ECL_WITH_NATIVE_LOCK_END; + @(return) +} + +cl_object +mp_get_lock_timedwait(cl_object lock, cl_object timeout) +{ + cl_env_ptr env = ecl_process_env(); + cl_object own_process = env->own_process; + if (ecl_unlikely(ecl_t_of(lock) != t_lock)) { + FEwrong_type_nth_arg(@[mp::get-lock], 1, lock, @[mp::lock]); + } +#if !defined(ECL_MUTEX_DEADLOCK) + if (ecl_unlikely(lock->lock.owner == own_process && !lock->lock.recursive)) { + /* INV: owner != nil only if the mutex is locked */ + FEerror_not_a_recursive_lock(lock); + } +#endif +#if defined(ECL_WINDOWS_THREADS) || defined(HAVE_PTHREAD_MUTEX_TIMEDLOCK) + int rc = ecl_mutex_timedlock(&lock->lock.mutex, ecl_to_double(timeout)); +#else + /* If we don't have pthread_mutex_timedlock available, we create a + * timer thread which interrupts our thread after the specified + * timeout. si::mutex-timeout serves a dual purpose below: the + * symbol itself denotes a catchpoint and its value is used to + * determine a) if the catchpoint is active and b) if the timer has + * fired. */ + volatile int rc; + volatile cl_object timer_thread; + ecl_bds_bind(env, @'si::mutex-timeout', lock); + ECL_CATCH_BEGIN(env, @'si::mutex-timeout') { + timer_thread = mp_process_run_function(5, @'si::mutex-timeout', + @'si::mutex-timeout', + env->own_process, + lock, + timeout); + rc = ecl_mutex_lock(&lock->lock.mutex); + ECL_SETQ(env, @'si::mutex-timeout', ECL_NIL); + } ECL_CATCH_END; + ECL_WITH_NATIVE_LOCK_BEGIN(env, &timer_thread->process.start_stop_lock) { + if (mp_process_active_p(timer_thread)) { + ecl_interrupt_process(timer_thread, @'mp::exit-process'); + } + } ECL_WITH_NATIVE_LOCK_END; + if (ECL_SYM_VAL(env, @'si::mutex-timeout') == ECL_T) { + rc = ECL_MUTEX_TIMEOUT; + /* The mutex might have been locked before we could kill the timer + * thread. Therefore, we unconditionally try to unlock the mutex + * again and treat the operation as having timed out. */ + ecl_mutex_unlock(&lock->lock.mutex); + } + ecl_bds_unwind1(env); +#endif + if (rc == ECL_MUTEX_SUCCESS) { + ecl_disable_interrupts_env(env); + lock->lock.counter++; + lock->lock.owner = own_process; ecl_enable_interrupts_env(env); + ecl_return1(env, lock); + } else if (rc == ECL_MUTEX_TIMEOUT) { + ecl_return1(env,ECL_NIL); +#if defined(ECL_MUTEX_DEADLOCK) + } else if (ecl_unlikely(rc == ECL_MUTEX_DEADLOCK)) { + FEerror_not_a_recursive_lock(lock); +#endif + } else { FEunknown_lock_error(lock); } } @@ -217,6 +315,8 @@ mp_get_lock_wait(cl_object lock) @ if (Null(wait)) { return mp_get_lock_nowait(lock); + } else if (ecl_realp(wait)) { + return mp_get_lock_timedwait(lock, wait); } else { return mp_get_lock_wait(lock); } diff --git a/src/configure b/src/configure index 6146f44c9..407c32fb6 100755 --- a/src/configure +++ b/src/configure @@ -6233,6 +6233,17 @@ fi fi +for ac_func in pthread_mutex_timedlock +do : + ac_fn_c_check_func "$LINENO" "pthread_mutex_timedlock" "ac_cv_func_pthread_mutex_timedlock" +if test "x$ac_cv_func_pthread_mutex_timedlock" = xyes; then : + cat >>confdefs.h <<_ACEOF +#define HAVE_PTHREAD_MUTEX_TIMEDLOCK 1 +_ACEOF + +fi +done + boehm_configure_flags="${boehm_configure_flags} ${THREAD_GC_FLAGS}" for k in $THREAD_OBJ; do EXTRA_OBJS="$EXTRA_OBJS ${k}.${OBJEXT}"; done diff --git a/src/configure.ac b/src/configure.ac index 876d36363..515931e10 100644 --- a/src/configure.ac +++ b/src/configure.ac @@ -584,7 +584,7 @@ if test "${enable_threads}" = "yes" ; then else LIBS="${THREAD_LIBS} ${LIBS}" CFLAGS="${CFLAGS} ${THREAD_CFLAGS}" - ECL_POSIX_RWLOCK + ECL_PTHREAD_EXTENSIONS boehm_configure_flags="${boehm_configure_flags} ${THREAD_GC_FLAGS}" for k in $THREAD_OBJ; do EXTRA_OBJS="$EXTRA_OBJS ${k}.${OBJEXT}"; done AC_MSG_CHECKING([for thread object files]) diff --git a/src/doc/manual/extensions/mp_ref_mutex.txi b/src/doc/manual/extensions/mp_ref_mutex.txi index 54710cea5..5837b7b27 100644 --- a/src/doc/manual/extensions/mp_ref_mutex.txi +++ b/src/doc/manual/extensions/mp_ref_mutex.txi @@ -101,7 +101,9 @@ returns @code{ECL_NIL}, otherwise @code{ECL_T}. @defun mp:get-lock lock &optional (wait t) Tries to acquire a lock. @var{wait} indicates whether function should block or give up if @var{lock} is already taken. If @var{wait} is -@code{nil} and @var{lock} can't be acquired returns +@code{nil}, immediately return, if @var{wait} is a real number +@var{wait} specifies a timeout in seconds and otherwise block until the +lock becomes available. If @var{lock} can't be acquired return @code{nil}. Succesful operation returns @code{t}. Will signal an error if the mutex is non-recursive and current thread already owns the lock. @end defun diff --git a/src/ecl/configpre.h b/src/ecl/configpre.h index a2ed80c40..0d0f66c71 100644 --- a/src/ecl/configpre.h +++ b/src/ecl/configpre.h @@ -436,6 +436,9 @@ /* Define to 1 if you have the `powf' function. */ #undef HAVE_POWF +/* Define to 1 if you have the `pthread_mutex_timedlock' function. */ +#undef HAVE_PTHREAD_MUTEX_TIMEDLOCK + /* Define to 1 if the system has the type `pthread_rwlock_t'. */ #undef HAVE_PTHREAD_RWLOCK_T diff --git a/src/h/config-internal.h.in b/src/h/config-internal.h.in index fe36e7f03..612bd2b53 100644 --- a/src/h/config-internal.h.in +++ b/src/h/config-internal.h.in @@ -131,6 +131,8 @@ #undef HAVE_SEM_INIT /* whether we have read/write locks */ #undef HAVE_POSIX_RWLOCK +/* whether we have mutex lock operations with timeout */ +#undef HAVE_PTHREAD_MUTEX_TIMEDLOCK /* uname() for system identification */ #undef HAVE_UNAME #undef HAVE_UNISTD_H diff --git a/src/h/internal.h b/src/h/internal.h index b66cdcfda..f3f095d92 100755 --- a/src/h/internal.h +++ b/src/h/internal.h @@ -311,6 +311,10 @@ extern cl_fixnum ecl_option_values[ECL_OPT_LIMIT+1]; extern void ecl_init_bignum_registers(cl_env_ptr env); extern void ecl_clear_bignum_registers(cl_env_ptr env); +/* threads/mutex.d */ + +extern cl_object si_mutex_timeout(); + /* print.d */ extern cl_object _ecl_stream_or_default_output(cl_object stream); diff --git a/src/h/threads.h b/src/h/threads.h index 7cbb89cfa..38a390ca1 100644 --- a/src/h/threads.h +++ b/src/h/threads.h @@ -101,6 +101,18 @@ add_timeout_delta(struct timespec *ts, double seconds) } } +static inline int +ecl_mutex_timedlock(ecl_mutex_t *mutex, double seconds) +{ +#if defined(HAVE_PTHREAD_MUTEX_TIMEDLOCK) + struct timespec ts; + add_timeout_delta(&ts, seconds); + return pthread_mutex_timedlock(mutex, &ts); +#else + /* Not implemented, see mutex.d for alternative implementation using interrupts */ + return -1; +#endif +} static inline void ecl_cond_var_init(ecl_cond_var_t *cv) @@ -359,6 +371,24 @@ remaining_milliseconds(double seconds, DWORD start_ticks) return (ret < 0) ? 0 : ret; } +static inline int +ecl_mutex_timedlock(ecl_mutex_t *mutex, double seconds) +{ + DWORD start_ticks = GetTickCount(); + AGAIN: + switch (WaitForSingleObjectEx(*mutex, remaining_milliseconds(seconds, start_ticks), TRUE)) { + case WAIT_OBJECT_0: + case WAIT_ABANDONED: + return ECL_MUTEX_SUCCESS; + case WAIT_IO_COMPLETION: + goto AGAIN; + case WAIT_TIMEOUT: + return ECL_MUTEX_TIMEOUT; + default: + return GetLastError(); + } +} + /* CONDITION VARIABLE */ /* IMPLEMENTATION diff --git a/src/lsp/mp.lsp b/src/lsp/mp.lsp index 6cbefadff..87eb18ddf 100644 --- a/src/lsp/mp.lsp +++ b/src/lsp/mp.lsp @@ -112,7 +112,7 @@ by ALLOW-WITH-INTERRUPTS." ;;; ;;; Convenience macros for locks ;;; -(defmacro with-lock ((lock-form &rest options) &body body) +(defmacro with-lock ((lock-form &key (wait-form t)) &body body) #-threads `(progn ,@body) ;; We do our best to make sure that the lock is released if we are @@ -121,9 +121,10 @@ by ALLOW-WITH-INTERRUPTS." ;; mutex is locked but before we store the return value of ;; mp:get-lock. #+threads - (ext:with-unique-names (lock) - `(let ((,lock ,lock-form)) - (when (mp:get-lock ,lock) + (ext:with-unique-names (lock wait) + `(let ((,lock ,lock-form) + (,wait ,wait-form)) + (when (mp:get-lock ,lock ,wait) (without-interrupts (unwind-protect (with-restored-interrupts diff --git a/src/tests/normal-tests/multiprocessing.lsp b/src/tests/normal-tests/multiprocessing.lsp index 4a34b9c41..c2ca95ec4 100644 --- a/src/tests/normal-tests/multiprocessing.lsp +++ b/src/tests/normal-tests/multiprocessing.lsp @@ -147,6 +147,45 @@ (is (eq (mp:lock-owner mutex) mp:*current-process*))) (mp:giveup-lock mutex))) +(test-with-timeout (mp.mutex.timedlock-timeout 30) + (let ((mutex (mp:make-lock :name "mutex.timedlock-timeout")) + (flag 0)) + (mp:get-lock mutex) + (setf flag 1) + (let ((waiting-process + (mp:process-run-function + "mutex.timedlock-timeout" + (lambda () + (when (mp:get-lock mutex 1) + (error "Grabbing the mutex shouldn't have succeeded")) + (when (eq (mp:lock-owner mutex) mp:*current-process*) + (error "Wrong lock owner")) + (setf flag 2))))) + (mp:process-join waiting-process) + (is (eq mp:*current-process* (mp:lock-owner mutex))) + (is (= flag 2))))) + +(test-with-timeout (mp.mutex.timedlock-acquire 30) + (let ((mutex (mp:make-lock :name "mutex.timedlock-acquire")) + (flag 0)) + (mp:get-lock mutex) + (setf flag 1) + (let ((waiting-process + (mp:process-run-function + "mutex.timedlock-acquire" + (lambda () + (setf flag 2) + (unless (mp:get-lock mutex 60) + (error "Grabbing the mutex should have succeeded")) + (when (not (eq (mp:lock-owner mutex) mp:*current-process*)) + (error "Wrong lock owner")) + (setf flag 3) + (mp:giveup-lock mutex))))) + (loop until (> flag 1)) + (sleep 1) + (mp:giveup-lock mutex) + (mp:process-join waiting-process) + (is (= flag 3))))) ;; Semaphores -- GitLab From 4f3dc42ef6cea93dede6bd299cac614fc6bddada Mon Sep 17 00:00:00 2001 From: Marius Gerbershagen Date: Sat, 14 Nov 2020 20:21:13 +0100 Subject: [PATCH 13/13] time.d: allow for interrupts during (sleep) on Windows There's little reason for not doing so and on Unix systems one can already interrupt sleeping threads. --- src/c/time.d | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/c/time.d b/src/c/time.d index aba7272a0..37b1dffac 100644 --- a/src/c/time.d +++ b/src/c/time.d @@ -193,7 +193,7 @@ cl_sleep(cl_object z) time = 1e-9; } } ECL_WITHOUT_FPE_END; - ecl_musleep(time, 0); + ecl_musleep(time, 1); @(return ECL_NIL); } -- GitLab