diff --git a/msvc/c/Makefile b/msvc/c/Makefile index 7b2637ac3ad24fa36e6b7c1892320b4de4bbe007..c123fb0b207556919ece8527a5eceed9cb4dccc6 100755 --- a/msvc/c/Makefile +++ b/msvc/c/Makefile @@ -13,7 +13,7 @@ ECL_FPE_CODE=fpe_x86.c !if "$(ECL_THREADS)" != "" ECL_THREADS_FLAG=1 THREADS_OBJ= process.obj mutex.obj condition_variable.obj rwlock.obj \ - semaphore.obj barrier.obj mailbox.obj atomic.obj queue.obj + semaphore.obj barrier.obj mailbox.obj atomic.obj !else ECL_THREADS_FLAG=0 THREADS_OBJ= diff --git a/src/aclocal.m4 b/src/aclocal.m4 index 21eb7e0c8a46de1dbcbf9811f9a83bfcb2958ba3..cee0863c92411c32800503f90f95dcd8f5287083 100644 --- a/src/aclocal.m4 +++ b/src/aclocal.m4 @@ -266,7 +266,7 @@ THREAD_CFLAGS='' THREAD_LIBS='' THREAD_GC_FLAGS='--enable-threads=posix' INSTALL_TARGET='install' -THREAD_OBJ="$THREAD_OBJ threads/process threads/queue threads/mutex threads/condition_variable threads/semaphore threads/barrier threads/mailbox" +THREAD_OBJ="$THREAD_OBJ threads/process threads/mutex threads/condition_variable threads/semaphore threads/barrier threads/mailbox threads/rwlock" clibs='-lm' SONAME='' SONAME_LDFLAGS='' @@ -928,14 +928,13 @@ fi dnl ---------------------------------------------------------------------- dnl Check whether we have POSIX read/write locks are available -AC_DEFUN([ECL_POSIX_RWLOCK],[ +AC_DEFUN([ECL_PTHREAD_EXTENSIONS],[ AC_CHECK_FUNC( [pthread_rwlock_init], [ AC_CHECK_TYPES([pthread_rwlock_t], [ - AC_DEFINE([ECL_RWLOCK], [], [ECL_RWLOCK]) AC_DEFINE([HAVE_POSIX_RWLOCK], [], [HAVE_POSIX_RWLOCK]) ], []) ], []) -THREAD_OBJ="$THREAD_OBJ threads/rwlock" +AC_CHECK_FUNCS([pthread_mutex_timedlock]) ]) diff --git a/src/c/Makefile.in b/src/c/Makefile.in index ef6889afe967062d391124093f25b41bbc7803f3..e0f20dc17e7355efe6236849edece23310c82550 100644 --- a/src/c/Makefile.in +++ b/src/c/Makefile.in @@ -46,8 +46,9 @@ HFILES = $(HDIR)/config.h $(HDIR)/ecl.h $(HDIR)/ecl-cmp.h \ $(HDIR)/number.h $(HDIR)/page.h $(HDIR)/bytecodes.h \ $(HDIR)/cache.h $(HDIR)/config-internal.h $(HDIR)/ecl_atomics.h \ $(HDIR)/ecl-inl.h $(HDIR)/internal.h $(HDIR)/stack-resize.h \ - $(HDIR)/impl/math_dispatch2.h $(HDIR)/impl/math_dispatch.h \ - $(HDIR)/impl/math_fenv.h $(HDIR)/impl/math_fenv_msvc.h + $(HDIR)/threads.h $(HDIR)/impl/math_dispatch2.h \ + $(HDIR)/impl/math_dispatch.h $(HDIR)/impl/math_fenv.h \ + $(HDIR)/impl/math_fenv_msvc.h CLOS_OBJS = clos/cache.o clos/accessor.o clos/instance.o clos/gfun.o OBJS = main.o symbol.o package.o cons.o list.o apply.o eval.o \ interpreter.o compiler.o disassembler.o $(CLOS_OBJS) \ diff --git a/src/c/alloc_2.d b/src/c/alloc_2.d index f164482f621e9cc972c32eaa471bec96b1e11f12..511f526b9ba9005918f5f837948fe343117245f9 100644 --- a/src/c/alloc_2.d +++ b/src/c/alloc_2.d @@ -102,7 +102,7 @@ out_of_memory(size_t requested_bytes) /* The out of memory condition may happen in more than one thread */ /* But then we have to ensure the error has not been solved */ #ifdef ECL_THREADS - mp_get_lock_wait(cl_core.error_lock); + ecl_mutex_lock(&cl_core.error_lock); ECL_UNWIND_PROTECT_BEGIN(the_env) #endif { @@ -141,7 +141,7 @@ out_of_memory(size_t requested_bytes) } #ifdef ECL_THREADS ECL_UNWIND_PROTECT_EXIT { - mp_giveup_lock(cl_core.error_lock); + ecl_mutex_unlock(&cl_core.error_lock); } ECL_UNWIND_PROTECT_END; #endif ecl_bds_unwind1(the_env); @@ -432,10 +432,8 @@ cl_object_mark_proc(void *addr, struct GC_ms_entry *msp, struct GC_ms_entry *msl # ifdef ECL_THREADS case t_process: MAYBE_MARK(o->process.queue_record); - MAYBE_MARK(o->process.start_stop_spinlock); MAYBE_MARK(o->process.woken_up); MAYBE_MARK(o->process.exit_values); - MAYBE_MARK(o->process.exit_barrier); MAYBE_MARK(o->process.parent); MAYBE_MARK(o->process.initial_bindings); MAYBE_MARK(o->process.interrupt); @@ -446,37 +444,23 @@ cl_object_mark_proc(void *addr, struct GC_ms_entry *msp, struct GC_ms_entry *msl ecl_mark_env(o->process.env); break; case t_lock: - MAYBE_MARK(o->lock.queue_list); - MAYBE_MARK(o->lock.queue_spinlock); MAYBE_MARK(o->lock.owner); MAYBE_MARK(o->lock.name); break; case t_condition_variable: - MAYBE_MARK(o->condition_variable.queue_spinlock); - MAYBE_MARK(o->condition_variable.queue_list); - MAYBE_MARK(o->condition_variable.lock); break; case t_rwlock: MAYBE_MARK(o->rwlock.name); -# ifndef ECL_RWLOCK - MAYBE_MARK(o->rwlock.mutex); break; -# endif case t_semaphore: - MAYBE_MARK(o->semaphore.queue_list); - MAYBE_MARK(o->semaphore.queue_spinlock); MAYBE_MARK(o->semaphore.name); break; case t_barrier: - MAYBE_MARK(o->barrier.queue_list); - MAYBE_MARK(o->barrier.queue_spinlock); MAYBE_MARK(o->barrier.name); break; case t_mailbox: MAYBE_MARK(o->mailbox.data); MAYBE_MARK(o->mailbox.name); - MAYBE_MARK(o->mailbox.reader_semaphore); - MAYBE_MARK(o->mailbox.writer_semaphore); break; # endif case t_codeblock: @@ -1013,41 +997,22 @@ init_alloc(void) to_bitmap(&o, &(o.process.interrupt)) | to_bitmap(&o, &(o.process.initial_bindings)) | to_bitmap(&o, &(o.process.parent)) | - to_bitmap(&o, &(o.process.exit_barrier)) | to_bitmap(&o, &(o.process.exit_values)) | to_bitmap(&o, &(o.process.woken_up)) | - to_bitmap(&o, &(o.process.start_stop_spinlock)) | to_bitmap(&o, &(o.process.queue_record)); type_info[t_lock].descriptor = to_bitmap(&o, &(o.lock.name)) | - to_bitmap(&o, &(o.lock.owner)) | - to_bitmap(&o, &(o.lock.queue_spinlock)) | - to_bitmap(&o, &(o.lock.queue_list)); -# ifdef ECL_RWLOCK + to_bitmap(&o, &(o.lock.owner)); type_info[t_rwlock].descriptor = to_bitmap(&o, &(o.rwlock.name)); -# else - type_info[t_rwlock].descriptor = - to_bitmap(&o, &(o.rwlock.name)) | - to_bitmap(&o, &(o.rwlock.mutex)); -# endif - type_info[t_condition_variable].descriptor = - to_bitmap(&o, &(o.condition_variable.lock)) | - to_bitmap(&o, &(o.condition_variable.queue_list)) | - to_bitmap(&o, &(o.condition_variable.queue_spinlock)); + type_info[t_condition_variable].descriptor = 0; type_info[t_semaphore].descriptor = - to_bitmap(&o, &(o.semaphore.name)) | - to_bitmap(&o, &(o.semaphore.queue_list)) | - to_bitmap(&o, &(o.semaphore.queue_spinlock)); + to_bitmap(&o, &(o.semaphore.name)); type_info[t_barrier].descriptor = - to_bitmap(&o, &(o.barrier.name)) | - to_bitmap(&o, &(o.barrier.queue_list)) | - to_bitmap(&o, &(o.barrier.queue_spinlock)); + to_bitmap(&o, &(o.barrier.name)); type_info[t_mailbox].descriptor = to_bitmap(&o, &(o.mailbox.name)) | - to_bitmap(&o, &(o.mailbox.data)) | - to_bitmap(&o, &(o.mailbox.reader_semaphore)) | - to_bitmap(&o, &(o.mailbox.writer_semaphore)); + to_bitmap(&o, &(o.mailbox.data)); # endif type_info[t_codeblock].descriptor = to_bitmap(&o, &(o.cblock.data)) | @@ -1123,15 +1088,60 @@ standard_finalizer(cl_object o) GC_unregister_disappearing_link((void**)&(o->weak.value)); break; #ifdef ECL_THREADS -# ifdef ECL_RWLOCK + case t_lock: { + const cl_env_ptr the_env = ecl_process_env(); + ecl_disable_interrupts_env(the_env); + ecl_mutex_destroy(&o->lock.mutex); + ecl_enable_interrupts_env(the_env); + break; + } + case t_condition_variable: { + const cl_env_ptr the_env = ecl_process_env(); + ecl_disable_interrupts_env(the_env); + ecl_cond_var_destroy(&o->condition_variable.cv); + ecl_enable_interrupts_env(the_env); + break; + } + case t_barrier: { + const cl_env_ptr the_env = ecl_process_env(); + ecl_disable_interrupts_env(the_env); + ecl_mutex_destroy(&o->barrier.mutex); + ecl_cond_var_destroy(&o->barrier.cv); + ecl_enable_interrupts_env(the_env); + break; + } + case t_semaphore: { + const cl_env_ptr the_env = ecl_process_env(); + ecl_disable_interrupts_env(the_env); + ecl_mutex_destroy(&o->semaphore.mutex); + ecl_cond_var_destroy(&o->semaphore.cv); + ecl_enable_interrupts_env(the_env); + break; + } + case t_mailbox: { + const cl_env_ptr the_env = ecl_process_env(); + ecl_disable_interrupts_env(the_env); + ecl_mutex_destroy(&o->mailbox.mutex); + ecl_cond_var_destroy(&o->mailbox.reader_cv); + ecl_cond_var_destroy(&o->mailbox.writer_cv); + ecl_enable_interrupts_env(the_env); + break; + } case t_rwlock: { const cl_env_ptr the_env = ecl_process_env(); ecl_disable_interrupts_env(the_env); - pthread_rwlock_destroy(&o->rwlock.mutex); + ecl_rwlock_destroy(&o->rwlock.mutex); + ecl_enable_interrupts_env(the_env); + break; + } + case t_process: { + const cl_env_ptr the_env = ecl_process_env(); + ecl_disable_interrupts_env(the_env); + ecl_mutex_destroy(&o->process.start_stop_lock); + ecl_cond_var_destroy(&o->process.exit_barrier); ecl_enable_interrupts_env(the_env); break; } -# endif case t_symbol: { ecl_atomic_push(&cl_core.reused_indices, ecl_make_fixnum(o->symbol.binding)); @@ -1169,9 +1179,15 @@ register_finalizer(cl_object o, void *finalized_object, case t_codeblock: #endif case t_stream: -#if defined(ECL_THREADS) && defined(ECL_RWLOCK) +#if defined(ECL_THREADS) + case t_lock: + case t_condition_variable: + case t_barrier: + case t_semaphore: + case t_mailbox: case t_rwlock: -#endif + case t_process: +#endif /* ECL_THREADS */ /* Don't delete the standard finalizer. */ if (fn == NULL) { fn = (GC_finalization_proc)wrapped_finalizer; diff --git a/src/c/ecl_features.h b/src/c/ecl_features.h index 963d007d0708d52c02c266c23bffb7c4157a7af4..f2fc96ee9728e816ff69cac38ec72e6200d0ea73 100644 --- a/src/c/ecl_features.h +++ b/src/c/ecl_features.h @@ -102,7 +102,7 @@ ecl_def_string_array(feature_names,static,const) = { #ifdef ECL_SEMAPHORES ecl_def_string_array_elt("SEMAPHORES"), #endif -#ifdef ECL_RWLOCK +#if defined(HAVE_POSIX_RWLOCK) || defined(ECL_WINDOWS_THREADS) ecl_def_string_array_elt("ECL-READ-WRITE-LOCK"), #endif #ifdef WORDS_BIGENDIAN diff --git a/src/c/error.d b/src/c/error.d index bf97b8fca872f53c3be7dc0a94166b9caba2f76d..6ae680e22e0cc42f00f760d12e56d718b4bfeb27 100644 --- a/src/c/error.d +++ b/src/c/error.d @@ -463,6 +463,25 @@ FEinvalid_function_name(cl_object fname) @':datum', fname); } +#ifdef ECL_THREADS +void +FEerror_not_owned(cl_object lock) +{ + FEerror("Attempted to give up lock ~S that is not owned by process ~S", + 2, lock, mp_current_process()); +} + +void +FEunknown_lock_error(cl_object lock) +{ +#ifdef ECL_WINDOWS_THREADS + FEwin32_error("When acting on lock ~A, got an unexpected error.", 1, lock); +#else + FEerror("When acting on lock ~A, got an unexpected error.", 1, lock); +#endif +} +#endif + /* bootstrap version */ static int recursive_error = 0; diff --git a/src/c/main.d b/src/c/main.d index 095734cbe7aa9e7aec72a5a9adbeef8dea5961bb..7967fc388550f3527920e86b0e2052ccb9ef8293 100755 --- a/src/c/main.d +++ b/src/c/main.d @@ -181,7 +181,7 @@ ecl_init_env(cl_env_ptr env) env->slot_cache = ecl_make_cache(3, 4096); env->interrupt_struct = ecl_alloc(sizeof(*env->interrupt_struct)); env->interrupt_struct->pending_interrupt = ECL_NIL; - env->interrupt_struct->signal_queue_spinlock = ECL_NIL; + ecl_mutex_init(&env->interrupt_struct->signal_queue_lock, FALSE); { int size = ecl_option_values[ECL_OPT_SIGNAL_QUEUE_SIZE]; env->interrupt_struct->signal_queue = cl_make_list(1, ecl_make_fixnum(size)); @@ -208,6 +208,7 @@ _ecl_dealloc_env(cl_env_ptr env) * a lisp environment set up -- the allocator assumes one -- and we * may have already cleaned up the value of ecl_process_env() */ + ecl_mutex_destroy(&env->interrupt_struct->signal_queue_lock); #if defined(ECL_USE_MPROTECT) if (munmap(env, sizeof(*env))) ecl_internal_error("Unable to deallocate environment structure."); @@ -360,92 +361,88 @@ ecl_def_ct_complex(flt_imag_unit_neg,&flt_zero_data,&flt_one_neg_data,static,con ecl_def_ct_complex(flt_imag_two,&flt_zero_data,&flt_two_data,static,const); struct cl_core_struct cl_core = { - ECL_NIL, /* packages */ - ECL_NIL, /* lisp_package */ - ECL_NIL, /* user_package */ - ECL_NIL, /* keyword_package */ - ECL_NIL, /* system_package */ - ECL_NIL, /* ext_package */ - ECL_NIL, /* clos_package */ + .packages = ECL_NIL, + .lisp_package = ECL_NIL, + .user_package = ECL_NIL, + .keyword_package = ECL_NIL, + .system_package = ECL_NIL, + .ext_package = ECL_NIL, + .clos_package = ECL_NIL, # ifdef ECL_CLOS_STREAMS - ECL_NIL, /* gray_package */ + .gray_package = ECL_NIL, # endif - ECL_NIL, /* mp_package */ - ECL_NIL, /* c_package */ - ECL_NIL, /* ffi_package */ - - ECL_NIL, /* pathname_translations */ - ECL_NIL, /* library_pathname */ - - ECL_NIL, /* terminal_io */ - ECL_NIL, /* null_stream */ - ECL_NIL, /* standard_input */ - ECL_NIL, /* standard_output */ - ECL_NIL, /* error_output */ - ECL_NIL, /* standard_readtable */ - ECL_NIL, /* dispatch_reader */ - ECL_NIL, /* default_dispatch_macro */ - - ECL_NIL, /* char_names */ - (cl_object)&str_empty_data, /* null_string */ - - (cl_object)&plus_half_data, /* plus_half */ - (cl_object)&minus_half_data, /* minus_half */ - (cl_object)&flt_imag_unit_data, /* imag_unit */ - (cl_object)&flt_imag_unit_neg_data, /* minus_imag_unit */ - (cl_object)&flt_imag_two_data, /* imag_two */ - (cl_object)&flt_zero_data, /* singlefloat_zero */ - (cl_object)&dbl_zero_data, /* doublefloat_zero */ - (cl_object)&flt_zero_neg_data, /* singlefloat_minus_zero */ - (cl_object)&dbl_zero_neg_data, /* doublefloat_minus_zero */ - (cl_object)&ldbl_zero_data, /* longfloat_zero */ - (cl_object)&ldbl_zero_neg_data, /* longfloat_minus_zero */ - - (cl_object)&str_G_data, /* gensym_prefix */ - (cl_object)&str_T_data, /* gentemp_prefix */ - ecl_make_fixnum(0), /* gentemp_counter */ - - ECL_NIL, /* Jan1st1970UT */ - - ECL_NIL, /* system_properties */ - ECL_NIL, /* setf_definition */ + .mp_package = ECL_NIL, + .c_package = ECL_NIL, + .ffi_package = ECL_NIL, + + .pathname_translations = ECL_NIL, + .library_pathname = ECL_NIL, + + .terminal_io = ECL_NIL, + .null_stream = ECL_NIL, + .standard_input = ECL_NIL, + .standard_output = ECL_NIL, + .error_output = ECL_NIL, + .standard_readtable = ECL_NIL, + .dispatch_reader = ECL_NIL, + .default_dispatch_macro = ECL_NIL, + + .char_names = ECL_NIL, + .null_string = (cl_object)&str_empty_data, + + .plus_half = (cl_object)&plus_half_data, + .minus_half = (cl_object)&minus_half_data, + .imag_unit = (cl_object)&flt_imag_unit_data, + .minus_imag_unit = (cl_object)&flt_imag_unit_neg_data, + .imag_two = (cl_object)&flt_imag_two_data, + .singlefloat_zero = (cl_object)&flt_zero_data, + .doublefloat_zero = (cl_object)&dbl_zero_data, + .singlefloat_minus_zero = (cl_object)&flt_zero_neg_data, + .doublefloat_minus_zero = (cl_object)&dbl_zero_neg_data, + .longfloat_zero = (cl_object)&ldbl_zero_data, + .longfloat_minus_zero = (cl_object)&ldbl_zero_neg_data, + + .gensym_prefix = (cl_object)&str_G_data, + .gentemp_prefix = (cl_object)&str_T_data, + .gentemp_counter = ecl_make_fixnum(0), + + .Jan1st1970UT = ECL_NIL, + + .system_properties = ECL_NIL, + .setf_definitions = ECL_NIL, #ifdef ECL_THREADS - ECL_NIL, /* processes */ - ECL_NIL, /* processes_spinlock */ - ECL_NIL, /* global_lock */ - ECL_NIL, /* error_lock */ - ECL_NIL, /* global_env_lock */ + .processes = ECL_NIL, #endif /* LIBRARIES is an adjustable vector of objects. It behaves as a vector of weak pointers thanks to the magic in gbc.d/alloc_2.d */ - ECL_NIL, /* libraries */ + .libraries = ECL_NIL, - 0, /* max_heap_size */ - ECL_NIL, /* bytes_consed */ - ECL_NIL, /* gc_counter */ - 0, /* gc_stats */ - 0, /* path_max */ + .max_heap_size = 0, + .bytes_consed = ECL_NIL, + .gc_counter = ECL_NIL, + .gc_stats = 0, + .path_max = 0, #ifdef GBC_BOEHM - NULL, /* safety_region */ + .safety_region = NULL, #endif - NULL, /* default_sigmask */ - 0, /* default_sigmask_bytes */ + .default_sigmask = NULL, + .default_sigmask_bytes = 0, #ifdef ECL_THREADS - 0, /* last_var_index */ - ECL_NIL, /* reused_indices */ + .last_var_index = 0, + .reused_indices = ECL_NIL, #endif - (cl_object)&str_slash_data, /* slash */ + .slash = (cl_object)&str_slash_data, - ECL_NIL, /* compiler_dispatch */ + .compiler_dispatch = ECL_NIL, - (cl_object)&default_rehash_size_data, /* rehash_size */ - (cl_object)&default_rehash_threshold_data, /* rehash_threshold */ + .rehash_size = (cl_object)&default_rehash_size_data, + .rehash_threshold = (cl_object)&default_rehash_threshold_data, - ECL_NIL /* known_signals */ + .known_signals = ECL_NIL }; #if !defined(ECL_MS_WINDOWS_HOST) diff --git a/src/c/symbols_list.h b/src/c/symbols_list.h index 80040ea9462d7f5199d8edc8e7db925b267f773f..fc99cd6869c15a71712384591b64de693c563623 100755 --- a/src/c/symbols_list.h +++ b/src/c/symbols_list.h @@ -1609,6 +1609,7 @@ cl_symbols[] = { {MP_ "LOCK-COUNT" ECL_FUN("mp_lock_count", IF_MP(mp_lock_count), 1) ECL_VAR(MP_ORDINARY, OBJNULL)}, {MP_ "GET-LOCK" ECL_FUN("mp_get_lock", IF_MP(mp_get_lock), -2) ECL_VAR(MP_ORDINARY, OBJNULL)}, {MP_ "GIVEUP-LOCK" ECL_FUN("mp_giveup_lock", IF_MP(mp_giveup_lock), 1) ECL_VAR(MP_ORDINARY, OBJNULL)}, +{SYS_ "MUTEX-TIMEOUT" ECL_FUN("si_mutex_timeout", IF_MP(si_mutex_timeout), 3) ECL_VAR(SI_SPECIAL, OBJNULL)}, {MP_ "MAKE-CONDITION-VARIABLE" ECL_FUN("mp_make_condition_variable", IF_MP(mp_make_condition_variable), 0) ECL_VAR(MP_ORDINARY, OBJNULL)}, {MP_ "CONDITION-VARIABLE-WAIT" ECL_FUN("mp_condition_variable_wait", IF_MP(mp_condition_variable_wait), 2) ECL_VAR(MP_ORDINARY, OBJNULL)}, {MP_ "CONDITION-VARIABLE-TIMEDWAIT" ECL_FUN("mp_condition_variable_timedwait", IF_MP(mp_condition_variable_timedwait), 3) ECL_VAR(MP_ORDINARY, OBJNULL)}, diff --git a/src/c/threads/barrier.d b/src/c/threads/barrier.d index c5512d83c1df36b737c6d213921cd427a4506f6a..3aaaf39d37dda299af531549060a8373119a2ef8 100755 --- a/src/c/threads/barrier.d +++ b/src/c/threads/barrier.d @@ -5,6 +5,7 @@ * barrier.d - wait barriers * * Copyright (c) 2012 Juan Jose Garcia Ripoll + * Copyright (c) 2020 Marius Gerbershagen * * See file 'LICENSE' for the copyright details. * @@ -13,21 +14,21 @@ #include #include -static ECL_INLINE void -FEerror_not_a_barrier(cl_object barrier) -{ - FEwrong_type_argument(@'mp::barrier', barrier); -} - cl_object ecl_make_barrier(cl_object name, cl_index count) { + cl_env_ptr env = ecl_process_env(); cl_object output = ecl_alloc_object(t_barrier); + output->barrier.disabled = FALSE; + output->barrier.wakeup = 0; output->barrier.name = name; - output->barrier.arrivers_count = count; + output->barrier.arrivers_count = 0; output->barrier.count = count; - output->barrier.queue_list = ECL_NIL; - output->barrier.queue_spinlock = ECL_NIL; + ecl_disable_interrupts_env(env); + ecl_cond_var_init(&output->barrier.cv); + ecl_mutex_init(&output->barrier.mutex, FALSE); + ecl_set_finalizer_unprotected(output, ECL_T); + ecl_enable_interrupts_env(env); return output; } @@ -43,7 +44,7 @@ mp_barrier_name(cl_object barrier) { cl_env_ptr env = ecl_process_env(); unlikely_if (ecl_t_of(barrier) != t_barrier) { - FEerror_not_a_barrier(barrier); + FEwrong_type_only_arg(@[mp::barrier-name], barrier, @[mp::barrier]); } ecl_return1(env, barrier->barrier.name); } @@ -53,7 +54,7 @@ mp_barrier_count(cl_object barrier) { cl_env_ptr env = ecl_process_env(); unlikely_if (ecl_t_of(barrier) != t_barrier) { - FEerror_not_a_barrier(barrier); + FEwrong_type_only_arg(@[mp::barrier-count], barrier, @[mp::barrier]); } ecl_return1(env, ecl_make_fixnum(barrier->barrier.count)); } @@ -61,104 +62,111 @@ mp_barrier_count(cl_object barrier) cl_object mp_barrier_arrivers_count(cl_object barrier) { - cl_fixnum arrivers, count; cl_env_ptr env = ecl_process_env(); unlikely_if (ecl_t_of(barrier) != t_barrier) { - FEerror_not_a_barrier(barrier); + FEwrong_type_only_arg(@[mp::barrier-arrivers_count], barrier, @[mp::barrier]); } - arrivers = barrier->barrier.arrivers_count; - count = barrier->barrier.count; - if (arrivers < 0) - arrivers = 0; /* Disabled barrier */ - else - arrivers = count - arrivers; - ecl_return1(env, ecl_make_fixnum(arrivers)); + ecl_return1(env, ecl_make_fixnum(barrier->barrier.arrivers_count)); } +/* INV: locking the mutex in mp_barrier_unblock and mp_barrier_wait + * will always succeed since the functions are not reentrant and only + * lock/unlock the mutex while interrupts are disabled, therefore + * deadlocks can't happen. */ + @(defun mp::barrier-unblock (barrier &key reset_count disable kill_waiting) - int ping_flags = ECL_WAKEUP_RESET_FLAG | ECL_WAKEUP_ALL; - int kill_flags = ECL_WAKEUP_RESET_FLAG | ECL_WAKEUP_KILL | ECL_WAKEUP_ALL; @ unlikely_if (ecl_t_of(barrier) != t_barrier) { - FEerror_not_a_barrier(barrier); + FEwrong_type_nth_arg(@[mp::barrier-unblock], 1, barrier, @[mp::barrier]); + } + ecl_disable_interrupts_env(the_env); + AGAIN: + ecl_mutex_lock(&barrier->barrier.mutex); + if (barrier->barrier.wakeup) { + /* we are currently waking up blocked threads; loop until all + * threads have woken up */ + ecl_mutex_unlock(&barrier->barrier.mutex); + goto AGAIN; } - if (!Null(reset_count)) + if (!Null(reset_count)) { barrier->barrier.count = fixnnint(reset_count); - if (!Null(disable)) - barrier->barrier.arrivers_count = -1; - else - barrier->barrier.arrivers_count = barrier->barrier.count; - ecl_wakeup_waiters(the_env, barrier, - Null(kill_waiting)? ping_flags : kill_flags); + } + if (!Null(disable)) { + barrier->barrier.disabled = TRUE; + } else { + barrier->barrier.disabled = FALSE; + } + if (barrier->barrier.arrivers_count > 0) { + if (!Null(kill_waiting)) { + barrier->barrier.wakeup = ECL_BARRIER_WAKEUP_KILL; + } else { + barrier->barrier.wakeup = ECL_BARRIER_WAKEUP_NORMAL; + } + ecl_cond_var_broadcast(&barrier->barrier.cv); + } + ecl_mutex_unlock(&barrier->barrier.mutex); + ecl_enable_interrupts_env(the_env); @(return); @) -static cl_object -barrier_wait_condition(cl_env_ptr env, cl_object barrier) -{ - /* We were signaled */ - if (env->own_process->process.woken_up != ECL_NIL) - return ECL_T; - /* Disabled barrier */ - else if (barrier->barrier.arrivers_count < 0) - return ECL_T; - else - return ECL_NIL; -} - -static cl_fixnum -decrement_counter(cl_fixnum *counter) -{ - /* The logic is as follows: - * - If the counter is negative, we abort. This is a way of - * disabling the counter. - * - Otherwise, we decrease the counter only if it is positive - * - If the counter is currently zero, then we block. This - * situation implies that some other thread is unblocking. - */ - cl_fixnum c; - do { - c = *counter; - if (c < 0) { - return c; - } else if (c > 0) { - if (AO_compare_and_swap_full((AO_t*)counter, - (AO_t)c, (AO_t)(c-1))) - return c; - } - } while (1); -} - cl_object -mp_barrier_wait(cl_object barrier) -{ - cl_object output; - cl_fixnum counter; +mp_barrier_wait(cl_object barrier) { cl_env_ptr the_env = ecl_process_env(); - + volatile int wakeup = 0; unlikely_if (ecl_t_of(barrier) != t_barrier) { - FEerror_not_a_barrier(barrier); + FEwrong_type_only_arg(@[mp::barrier-wait], barrier, @[mp::barrier]); } - ecl_disable_interrupts_env(the_env); - counter = decrement_counter(&barrier->barrier.arrivers_count); - if (counter == 1) { - print_lock("barrier %p saturated", barrier, barrier); - /* There are (count-1) threads in the queue and we - * are the last one. We thus unblock all threads and - * proceed. */ - ecl_enable_interrupts_env(the_env); - mp_barrier_unblock(1, barrier); - output = @':unblocked'; - } else if (counter > 1) { - print_lock("barrier %p waiting", barrier, barrier); - ecl_enable_interrupts_env(the_env); - ecl_wait_on(the_env, barrier_wait_condition, barrier); - output = ECL_T; - } else { - print_lock("barrier %p pass-through", barrier, barrier); - ecl_enable_interrupts_env(the_env); - /* Barrier disabled */ - output = ECL_NIL; + ecl_bds_bind(the_env, ECL_INTERRUPTS_ENABLED, ECL_NIL); + /* check if the barrier is disabled */ + do { + ecl_mutex_lock(&barrier->barrier.mutex); + if (barrier->barrier.disabled) { + ecl_mutex_unlock(&barrier->barrier.mutex); + ecl_bds_unwind1(the_env); + ecl_check_pending_interrupts(the_env); + return ECL_NIL; + } + if (barrier->barrier.wakeup) { + /* We are currently waking up blocked threads; loop until all threads have + * woken up. */ + ecl_mutex_unlock(&barrier->barrier.mutex); + } else { + break; + } + } while(1); + /* check if we have reached the maximum count */ + if ((barrier->barrier.arrivers_count+1) == barrier->barrier.count) { + if (barrier->barrier.arrivers_count > 0) { + barrier->barrier.wakeup = ECL_BARRIER_WAKEUP_NORMAL; + ecl_cond_var_broadcast(&barrier->barrier.cv); + } + ecl_mutex_unlock(&barrier->barrier.mutex); + ecl_bds_unwind1(the_env); + ecl_check_pending_interrupts(the_env); + return @':unblocked'; } - return output; + /* barrier is neither disabled nor unblocked, start waiting */ + barrier->barrier.arrivers_count++; + ECL_UNWIND_PROTECT_BEGIN(the_env) { + do { + ecl_setq(the_env, ECL_INTERRUPTS_ENABLED, ECL_T); + ecl_cond_var_wait(&barrier->barrier.cv, &barrier->barrier.mutex); + ecl_setq(the_env, ECL_INTERRUPTS_ENABLED, ECL_NIL); + } while(!barrier->barrier.wakeup); + wakeup = barrier->barrier.wakeup; + if (barrier->barrier.arrivers_count - 1 == 0) { + /* we are the last thread to wake up, reset the barrier */ + barrier->barrier.wakeup = 0; + } + } ECL_UNWIND_PROTECT_THREAD_SAFE_EXIT { + --barrier->barrier.arrivers_count; + ecl_mutex_unlock(&barrier->barrier.mutex); + if (wakeup == ECL_BARRIER_WAKEUP_KILL) { + mp_exit_process(); + } + } ECL_UNWIND_PROTECT_THREAD_SAFE_END; + ecl_bds_unwind1(the_env); + ecl_check_pending_interrupts(the_env); + return ECL_T; } + diff --git a/src/c/threads/condition_variable.d b/src/c/threads/condition_variable.d index d1dc0cda3868418621e9f8048c855fe0f7d1b667..b8aab505fd395283c5bedc6a8564356c07be2504 100644 --- a/src/c/threads/condition_variable.d +++ b/src/c/threads/condition_variable.d @@ -2,16 +2,27 @@ /* vim: set filetype=c tabstop=2 shiftwidth=2 expandtab: */ /* - * condition_variable.d - condition variables for native threads + * condition_variable.d - condition variables * - * Copyright (c) 2003 Juan Jose Garcia Ripoll + * Copyright (c) 2003, Juan Jose Garcia Ripoll + * Copyright (c) 2020, Marius Gerbershagen * * See file 'LICENSE' for the copyright details. * */ +#ifndef __sun__ /* See unixinit.d for this */ +#define _XOPEN_SOURCE 600 /* For pthread mutex attributes */ +#endif +#define ECL_INCLUDE_MATH_H #include +#ifdef ECL_WINDOWS_THREADS +# include +#else +# include +#endif #include +#include /*---------------------------------------------------------------------- * CONDITION VARIABLES @@ -20,81 +31,133 @@ cl_object mp_make_condition_variable(void) { + cl_env_ptr env = ecl_process_env(); cl_object output = ecl_alloc_object(t_condition_variable); - output->condition_variable.queue_list = ECL_NIL; - output->condition_variable.queue_spinlock = ECL_NIL; - output->condition_variable.lock = ECL_NIL; - @(return output); -} - -static cl_object -condition_variable_wait(cl_env_ptr env, cl_object cv) -{ - cl_object lock = cv->condition_variable.lock; - cl_object own_process = env->own_process; - /* We have entered the queue and still own the mutex? */ - print_lock("cv lock %p is %p =? %p", cv, lock, lock->lock.owner, own_process); - if (lock->lock.owner == own_process) { - mp_giveup_lock(lock); - } - /* We always return when we have been explicitly awaken */ - return own_process->process.woken_up; + ecl_disable_interrupts_env(env); + ecl_cond_var_init(&output->condition_variable.cv); + ecl_set_finalizer_unprotected(output, ECL_T); + ecl_enable_interrupts_env(env); + @(return output) } cl_object mp_condition_variable_wait(cl_object cv, cl_object lock) { cl_env_ptr env = ecl_process_env(); - cl_object own_process = env->own_process; - unlikely_if (ecl_t_of(cv) != t_condition_variable) { + int rc; + cl_fixnum counter; + cl_object owner; + if (ecl_unlikely(ecl_t_of(cv) != t_condition_variable)) { FEwrong_type_nth_arg(@[mp::condition-variable-wait], 1, cv, @[mp::condition-variable]); } - unlikely_if (ecl_t_of(lock) != t_lock) { + if (ecl_unlikely(ecl_t_of(lock) != t_lock)) { FEwrong_type_nth_arg(@[mp::condition-variable-wait], 2, lock, @[mp::lock]); } - unlikely_if (cv->condition_variable.lock != ECL_NIL && - cv->condition_variable.lock != lock) { - FEerror("Attempt to associate lock ~A~%with condition variable ~A," - "~%which is already associated to lock ~A", 2, lock, - cv, cv->condition_variable.lock); - } - unlikely_if (lock->lock.owner != own_process) { - FEerror("Attempt to wait on a condition variable using lock~%~S" - "~%which is not owned by process~%~S", 2, lock, own_process); - } - unlikely_if (lock->lock.recursive) { + if (ecl_unlikely(lock->lock.recursive)) { FEerror("mp:condition-variable-wait can not be used with recursive" " locks:~%~S", 1, lock); } - print_lock("waiting cv %p", cv, cv); - cv->condition_variable.lock = lock; - ecl_wait_on(env, condition_variable_wait, cv); - mp_get_lock_wait(lock); - @(return ECL_T); + if (ecl_unlikely(lock->lock.owner != env->own_process)) { + FEerror("Attempt to wait on a condition variable using lock~%~S" + "~%which is not owned by process~%~S", 2, lock, env->own_process); + } + ecl_disable_interrupts_env(env); + counter = lock->lock.counter; + owner = lock->lock.owner; + lock->lock.counter = 0; + lock->lock.owner = ECL_NIL; + ecl_enable_interrupts_env(env); + rc = ecl_cond_var_wait(&cv->condition_variable.cv, &lock->lock.mutex); + ecl_disable_interrupts_env(env); + lock->lock.owner = owner; + lock->lock.counter = counter; + ecl_enable_interrupts_env(env); + if (ecl_unlikely(rc != ECL_MUTEX_SUCCESS)) { + if (rc == ECL_MUTEX_NOT_OWNED) { + FEerror_not_owned(lock); + } else { + FEunknown_lock_error(lock); + } + } + @(return ECL_T) } cl_object mp_condition_variable_timedwait(cl_object cv, cl_object lock, cl_object seconds) { - FEerror("Timed condition variables are not supported.", 0); + cl_env_ptr env = ecl_process_env(); + int rc; + cl_fixnum counter; + cl_object owner; + + if (ecl_unlikely(ecl_t_of(cv) != t_condition_variable)) { + FEwrong_type_nth_arg(@[mp::condition-variable-timedwait], + 1, cv, @[mp::condition-variable]); + } + if (ecl_unlikely(ecl_t_of(lock) != t_lock)) { + FEwrong_type_nth_arg(@[mp::condition-variable-timedwait], + 2, lock, @[mp::lock]); + } + if (ecl_unlikely(lock->lock.recursive)) { + FEerror("mp:condition-variable-timedwait can not be used with recursive" + " locks:~%~S", 1, lock); + } + if (ecl_unlikely(lock->lock.owner != env->own_process)) { + FEerror("Attempt to wait on a condition variable using lock~%~S" + "~%which is not owned by process~%~S", 2, lock, env->own_process); + } + /* INV: ecl_minusp() makes sure `seconds' is real */ + if (ecl_unlikely(ecl_minusp(seconds))) { + cl_error(9, @'simple-type-error', @':format-control', + make_constant_base_string("Not a non-negative number ~S"), + @':format-arguments', cl_list(1, seconds), + @':expected-type', @'real', @':datum', seconds); + } + ecl_disable_interrupts_env(env); + counter = lock->lock.counter; + owner = lock->lock.owner; + lock->lock.counter = 0; + lock->lock.owner = ECL_NIL; + ecl_enable_interrupts_env(env); + + rc = ecl_cond_var_timedwait(&cv->condition_variable.cv, &lock->lock.mutex, ecl_to_double(seconds)); + + ecl_disable_interrupts_env(env); + lock->lock.owner = owner; + lock->lock.counter = counter; + ecl_enable_interrupts_env(env); + + if (ecl_unlikely(rc != ECL_MUTEX_SUCCESS && rc != ECL_MUTEX_TIMEOUT)) { + if (rc == ECL_MUTEX_NOT_OWNED) { + FEerror_not_owned(lock); + } else { + FEunknown_lock_error(lock); + } + } + @(return (rc == ECL_MUTEX_SUCCESS ? ECL_T : ECL_NIL)) } cl_object mp_condition_variable_signal(cl_object cv) { - print_lock("signal cv %p", cv, cv); - ecl_wakeup_waiters(ecl_process_env(), cv, - ECL_WAKEUP_RESET_FLAG | ECL_WAKEUP_ONE | ECL_WAKEUP_DELETE); - @(return ECL_T); + if (ecl_unlikely(ecl_t_of(cv) != t_condition_variable)) { + FEwrong_type_only_arg(@[mp::condition-variable-signal], + cv, @[mp::condition-variable]); + } + ecl_cond_var_signal(&cv->condition_variable.cv); + @(return ECL_T) } cl_object mp_condition_variable_broadcast(cl_object cv) { - print_lock("broadcast cv %p", cv); - ecl_wakeup_waiters(ecl_process_env(), cv, - ECL_WAKEUP_RESET_FLAG | ECL_WAKEUP_ALL | ECL_WAKEUP_DELETE); - @(return ECL_T); + if (ecl_unlikely(ecl_t_of(cv) != t_condition_variable)) { + FEwrong_type_only_arg(@[mp::condition-variable-broadcast], + cv, @[mp::condition-variable]); + } + ecl_cond_var_broadcast(&cv->condition_variable.cv); + @(return ECL_T) } + diff --git a/src/c/threads/mailbox.d b/src/c/threads/mailbox.d index 3d89f516e3bef0af963df06656065ae20dad943c..556166613382ac4bf42fe767c80276166e258123 100755 --- a/src/c/threads/mailbox.d +++ b/src/c/threads/mailbox.d @@ -13,22 +13,13 @@ #include #include -static ECL_INLINE void -FEerror_not_a_mailbox(cl_object mailbox) -{ - FEwrong_type_argument(@'mp::mailbox', mailbox); -} +/* NOTE: The mailbox functions are not interrupt safe. */ cl_object ecl_make_mailbox(cl_object name, cl_fixnum count) { + cl_env_ptr env = ecl_process_env(); cl_object output = ecl_alloc_object(t_mailbox); - cl_fixnum mask; - for (mask = 1; mask < count; mask <<= 1) {} - if (mask == 1) - mask = 63; - count = mask; - mask = count - 1; output->mailbox.name = name; output->mailbox.data = si_make_vector(ECL_T, /* element type */ ecl_make_fixnum(count), /* size */ @@ -36,13 +27,15 @@ ecl_make_mailbox(cl_object name, cl_fixnum count) ECL_NIL, /* fill pointer */ ECL_NIL, /* displaced to */ ECL_NIL); /* displacement */ - output->mailbox.reader_semaphore = - ecl_make_semaphore(name, 0); - output->mailbox.writer_semaphore = - ecl_make_semaphore(name, count); + output->mailbox.message_count = 0; output->mailbox.read_pointer = 0; output->mailbox.write_pointer = 0; - output->mailbox.mask = mask; + ecl_disable_interrupts_env(env); + ecl_mutex_init(&output->mailbox.mutex, FALSE); + ecl_cond_var_init(&output->mailbox.reader_cv); + ecl_cond_var_init(&output->mailbox.writer_cv); + ecl_set_finalizer_unprotected(output, ECL_T); + ecl_enable_interrupts_env(env); return output; } @@ -56,7 +49,7 @@ mp_mailbox_name(cl_object mailbox) { cl_env_ptr env = ecl_process_env(); unlikely_if (ecl_t_of(mailbox) != t_mailbox) { - FEerror_not_a_mailbox(mailbox); + FEwrong_type_only_arg(@[mp::mailbox-name], mailbox, @[mp::mailbox]); } ecl_return1(env, mailbox->mailbox.name); } @@ -66,7 +59,7 @@ mp_mailbox_count(cl_object mailbox) { cl_env_ptr env = ecl_process_env(); unlikely_if (ecl_t_of(mailbox) != t_mailbox) { - FEerror_not_a_mailbox(mailbox); + FEwrong_type_only_arg(@[mp::mailbox-count], mailbox, @[mp::mailbox]); } ecl_return1(env, ecl_make_fixnum(mailbox->mailbox.data->vector.dim)); } @@ -76,27 +69,41 @@ mp_mailbox_empty_p(cl_object mailbox) { cl_env_ptr env = ecl_process_env(); unlikely_if (ecl_t_of(mailbox) != t_mailbox) { - FEerror_not_a_mailbox(mailbox); + FEwrong_type_only_arg(@[mp::mailbox-empty-p], mailbox, @[mp::mailbox]); } - ecl_return1(env, mailbox->mailbox.reader_semaphore->semaphore.counter? ECL_NIL : ECL_T); + ecl_return1(env, mailbox->mailbox.message_count? ECL_NIL : ECL_T); +} + +static cl_object +read_message(cl_object mailbox) +{ + cl_object output; + cl_fixnum ndx = mailbox->mailbox.read_pointer++; + if (mailbox->mailbox.read_pointer >= mailbox->mailbox.data->vector.dim) { + mailbox->mailbox.read_pointer = 0; + } + output = mailbox->mailbox.data->vector.self.t[ndx]; + mailbox->mailbox.message_count--; + ecl_cond_var_signal(&mailbox->mailbox.writer_cv); + return output; } cl_object mp_mailbox_read(cl_object mailbox) { cl_env_ptr env = ecl_process_env(); - cl_fixnum ndx; cl_object output; unlikely_if (ecl_t_of(mailbox) != t_mailbox) { - FEerror_not_a_mailbox(mailbox); + FEwrong_type_only_arg(@[mp::mailbox-read], mailbox, @[mp::mailbox]); } - mp_wait_on_semaphore(mailbox->mailbox.reader_semaphore); + ecl_mutex_lock(&mailbox->mailbox.mutex); { - ndx = AO_fetch_and_add1((AO_t*)&mailbox->mailbox.read_pointer) & - mailbox->mailbox.mask; - output = mailbox->mailbox.data->vector.self.t[ndx]; + while (mailbox->mailbox.message_count == 0) { + ecl_cond_var_wait(&mailbox->mailbox.reader_cv, &mailbox->mailbox.mutex); + } + output = read_message(mailbox); } - mp_signal_semaphore(1, mailbox->mailbox.writer_semaphore); + ecl_mutex_unlock(&mailbox->mailbox.mutex); ecl_return1(env, output); } @@ -104,37 +111,50 @@ cl_object mp_mailbox_try_read(cl_object mailbox) { cl_env_ptr env = ecl_process_env(); - cl_fixnum ndx; cl_object output; unlikely_if (ecl_t_of(mailbox) != t_mailbox) { - FEerror_not_a_mailbox(mailbox); + FEwrong_type_only_arg(@[mp::mailbox-try-read], mailbox, @[mp::mailbox]); } - output = mp_try_get_semaphore(mailbox->mailbox.reader_semaphore); - if (output != ECL_NIL) { - ndx = AO_fetch_and_add1((AO_t*)&mailbox->mailbox.read_pointer) & - mailbox->mailbox.mask; - output = mailbox->mailbox.data->vector.self.t[ndx]; - mp_signal_semaphore(1, mailbox->mailbox.writer_semaphore); + ecl_mutex_lock(&mailbox->mailbox.mutex); + { + if (mailbox->mailbox.message_count == 0) { + output = ECL_NIL; + } else { + output = read_message(mailbox); + } } + ecl_mutex_unlock(&mailbox->mailbox.mutex); ecl_return1(env, output); } +static void +store_message(cl_object mailbox, cl_object msg) +{ + cl_fixnum ndx = mailbox->mailbox.write_pointer++; + if (mailbox->mailbox.write_pointer >= mailbox->mailbox.data->vector.dim) { + mailbox->mailbox.write_pointer = 0; + } + mailbox->mailbox.data->vector.self.t[ndx] = msg; + mailbox->mailbox.message_count++; + ecl_cond_var_signal(&mailbox->mailbox.reader_cv); +} + cl_object mp_mailbox_send(cl_object mailbox, cl_object msg) { cl_env_ptr env = ecl_process_env(); - cl_fixnum ndx; unlikely_if (ecl_t_of(mailbox) != t_mailbox) { - FEerror_not_a_mailbox(mailbox); + FEwrong_type_only_arg(@[mp::mailbox-send], mailbox, @[mp::mailbox]); } - mp_wait_on_semaphore(mailbox->mailbox.writer_semaphore); + ecl_mutex_lock(&mailbox->mailbox.mutex); { - ndx = AO_fetch_and_add1((AO_t*)&mailbox->mailbox.write_pointer) & - mailbox->mailbox.mask; - mailbox->mailbox.data->vector.self.t[ndx] = msg; + while (mailbox->mailbox.message_count == mailbox->mailbox.data->vector.dim) { + ecl_cond_var_wait(&mailbox->mailbox.writer_cv, &mailbox->mailbox.mutex); + } + store_message(mailbox, msg); } - mp_signal_semaphore(1, mailbox->mailbox.reader_semaphore); - ecl_return0(env); + ecl_mutex_unlock(&mailbox->mailbox.mutex); + ecl_return1(env, msg); } cl_object @@ -142,18 +162,19 @@ mp_mailbox_try_send(cl_object mailbox, cl_object msg) { cl_env_ptr env = ecl_process_env(); cl_object output; - cl_fixnum ndx; unlikely_if (ecl_t_of(mailbox) != t_mailbox) { - FEerror_not_a_mailbox(mailbox); + FEwrong_type_only_arg(@[mp::mailbox-try-send], mailbox, @[mp::mailbox]); } - output = mp_try_get_semaphore(mailbox->mailbox.writer_semaphore); - if (output != ECL_NIL) { - output = msg; - ndx = AO_fetch_and_add1((AO_t*)&mailbox->mailbox.write_pointer) & - mailbox->mailbox.mask; - mailbox->mailbox.data->vector.self.t[ndx] = msg; - mp_signal_semaphore(1, mailbox->mailbox.reader_semaphore); + ecl_mutex_lock(&mailbox->mailbox.mutex); + { + if (mailbox->mailbox.message_count == mailbox->mailbox.data->vector.dim) { + output = ECL_NIL; + } else { + store_message(mailbox, msg); + output = msg; + } } + ecl_mutex_unlock(&mailbox->mailbox.mutex); ecl_return1(env, output); } diff --git a/src/c/threads/mutex.d b/src/c/threads/mutex.d index 62bde1b7b41a459e53c5cba130656e184f26e7ff..a11a85e0c171f8bbe82dd70492ab7ce3a0e6b707 100755 --- a/src/c/threads/mutex.d +++ b/src/c/threads/mutex.d @@ -2,27 +2,50 @@ /* vim: set filetype=c tabstop=2 shiftwidth=2 expandtab: */ /* - * mutex.d - mutually exclusive locks + * mutex.d - mutually exclusive locks. * - * Copyright (c) 2003 Juan Jose Garcia Ripoll + * Copyright (c) 2003, Juan Jose Garcia Ripoll + * Copyright (c) 2020, Marius Gerbershagen * * See file 'LICENSE' for the copyright details. * */ +#ifndef __sun__ /* See unixinit.d for this */ +#define _XOPEN_SOURCE 600 /* For pthread mutex attributes */ +#endif +#include #include #include - /*---------------------------------------------------------------------- * LOCKS or MUTEX */ -static void -FEerror_not_a_lock(cl_object lock) -{ - FEwrong_type_argument(@'mp::lock', lock); -} + +/* THREAD SAFETY + * + * mp:lock-owner, mp:holding-lock-p and mp:lock-count will return + * wrong values in the following scenarios: + * 1. Another thread is in the process of locking/unlocking the mutex. + * This in unavoidable since count and owner cannot both be stored + * atomically. + * 2. A call to mp:get-lock-wait is interrupted after the mutex has + * been locked but before count and owner have been set. In this + * case, the mutex will appear to be unlocked even though it is + * already locked. If the interrupting code performs a nonlocal + * jump up the call stack, this will persist even after the + * interrupt. However, the mutex can still be unlocked by + * mp:giveup-lock since the check whether the mutex is locked or + * not is done by OS level functions. + * 3. A call to mp:condition-variable-(timed)wait is interrupted after + * the mutex has been unlocked/relocked but after/before count and + * owner have been set. The consequences are equivalent to scenario 2. + * In summary, owner can be nil and count 0 even though the mutex is + * locked but the converse (owner != nil and count != 0 when the mutex + * is unlocked) cannot happen. + * + */ static void FEerror_not_a_recursive_lock(cl_object lock) @@ -31,76 +54,73 @@ FEerror_not_a_recursive_lock(cl_object lock) 2, lock, lock->lock.owner); } -static void -FEerror_not_owned(cl_object lock) -{ - FEerror("Attempted to give up lock ~S that is not owned by process ~S", - 2, lock, mp_current_process()); -} - cl_object ecl_make_lock(cl_object name, bool recursive) { + cl_env_ptr env = ecl_process_env(); cl_object output = ecl_alloc_object(t_lock); output->lock.name = name; output->lock.owner = ECL_NIL; output->lock.counter = 0; output->lock.recursive = recursive; - output->lock.queue_list = ECL_NIL; - output->lock.queue_spinlock = ECL_NIL; + ecl_disable_interrupts_env(env); + ecl_mutex_init(&output->lock.mutex, recursive); + ecl_set_finalizer_unprotected(output, ECL_T); + ecl_enable_interrupts_env(env); return output; } @(defun mp::make-lock (&key name (recursive ECL_NIL)) @ - @(return ecl_make_lock(name, !Null(recursive))); + @(return ecl_make_lock(name, !Null(recursive))) @) cl_object mp_recursive_lock_p(cl_object lock) { cl_env_ptr env = ecl_process_env(); - unlikely_if (ecl_t_of(lock) != t_lock) - FEerror_not_a_lock(lock); + if (ecl_unlikely(ecl_t_of(lock) != t_lock)) { + FEwrong_type_only_arg(@[mp::recursive-lock-p], lock, @[mp::lock]); + } ecl_return1(env, lock->lock.recursive? ECL_T : ECL_NIL); } cl_object -mp_holding_lock_p(cl_object lock) +mp_lock_name(cl_object lock) { cl_env_ptr env = ecl_process_env(); - cl_object own_process = env->own_process; - unlikely_if (ecl_t_of(lock) != t_lock) - FEerror_not_a_lock(lock); - ecl_return1(env, (lock->lock.owner == own_process) ? ECL_T : ECL_NIL); + if (ecl_unlikely(ecl_t_of(lock) != t_lock)) { + FEwrong_type_only_arg(@[mp::lock-name], lock, @[mp::lock]); + } + ecl_return1(env, lock->lock.name); } cl_object -mp_lock_name(cl_object lock) +mp_lock_owner(cl_object lock) { cl_env_ptr env = ecl_process_env(); - unlikely_if (ecl_t_of(lock) != t_lock) { - FEerror_not_a_lock(lock); + if (ecl_unlikely(ecl_t_of(lock) != t_lock)) { + FEwrong_type_only_arg(@[mp::lock-owner], lock, @[mp::lock]); } - ecl_return1(env, lock->lock.name); + ecl_return1(env, lock->lock.owner); } cl_object -mp_lock_owner(cl_object lock) +mp_holding_lock_p(cl_object lock) { cl_env_ptr env = ecl_process_env(); - unlikely_if (ecl_t_of(lock) != t_lock) { - FEerror_not_a_lock(lock); + if (ecl_unlikely(ecl_t_of(lock) != t_lock)) { + FEwrong_type_only_arg(@[mp::holding-lock-p], lock, @[mp::lock]); } - ecl_return1(env, lock->lock.owner); + ecl_return1(env, (lock->lock.owner == mp_current_process())? ECL_T : ECL_NIL); } cl_object mp_lock_count(cl_object lock) { cl_env_ptr env = ecl_process_env(); - unlikely_if (ecl_t_of(lock) != t_lock) { - FEerror_not_a_lock(lock); + if (ecl_unlikely(ecl_t_of(lock) != t_lock)) { + FEwrong_type_only_arg(@[mp::lock-count], lock, @[mp::lock]); } ecl_return1(env, ecl_make_fixnum(lock->lock.counter)); } @@ -108,103 +128,196 @@ mp_lock_count(cl_object lock) cl_object mp_giveup_lock(cl_object lock) { - /* Must be called with interrupts disabled. */ cl_env_ptr env = ecl_process_env(); - cl_object own_process = env->own_process; - unlikely_if (ecl_t_of(lock) != t_lock) { - FEerror_not_a_lock(lock); + int rc; + if (ecl_unlikely(ecl_t_of(lock) != t_lock)) { + FEwrong_type_only_arg(@[mp::giveup-lock], lock, @[mp::lock]); + } + ecl_disable_interrupts_env(env); + if ((lock->lock.counter > 0 ? --lock->lock.counter : 0) == 0) { + lock->lock.owner = ECL_NIL; } - unlikely_if (lock->lock.owner != own_process) { + rc = ecl_mutex_unlock(&lock->lock.mutex); + ecl_enable_interrupts_env(env); + if (ecl_likely(rc == ECL_MUTEX_SUCCESS)) { + ecl_return1(env, ECL_T); + } else if (rc == ECL_MUTEX_NOT_OWNED) { FEerror_not_owned(lock); + } else { + FEunknown_lock_error(lock); } - if (--lock->lock.counter == 0) { - cl_object first = ecl_waiter_pop(env, lock);; - if (first == ECL_NIL) { - lock->lock.owner = ECL_NIL; - } else { - lock->lock.counter = 1; - lock->lock.owner = first; - ecl_wakeup_process(first); - } - } - ecl_return1(env, ECL_T); } -static cl_object -get_lock_inner(cl_env_ptr env, cl_object lock) +cl_object +mp_get_lock_nowait(cl_object lock) { - cl_object output; + cl_env_ptr env = ecl_process_env(); cl_object own_process = env->own_process; + int rc; + if (ecl_unlikely(ecl_t_of(lock) != t_lock)) { + FEwrong_type_nth_arg(@[mp::get-lock], 1, lock, @[mp::lock]); + } +#if !defined(ECL_MUTEX_DEADLOCK) + if (ecl_unlikely(lock->lock.owner == own_process && !lock->lock.recursive)) { + /* INV: owner != nil only if the mutex is locked */ + FEerror_not_a_recursive_lock(lock); + } +#endif ecl_disable_interrupts_env(env); - if (AO_compare_and_swap_full((AO_t*)&(lock->lock.owner), - (AO_t)ECL_NIL, (AO_t)own_process)) { - lock->lock.counter = 1; - output = ECL_T; - print_lock("acquired %p\t", lock, lock); - } else if (lock->lock.owner == own_process) { - unlikely_if (!lock->lock.recursive) { - FEerror_not_a_recursive_lock(lock); - } - ++lock->lock.counter; - output = ECL_T; - } else { - print_lock("failed acquiring %p for %d\t", lock, lock, - lock->lock.owner); - output = ECL_NIL; + if ((rc = ecl_mutex_trylock(&lock->lock.mutex)) == ECL_MUTEX_SUCCESS) { + lock->lock.counter++; + lock->lock.owner = own_process; } ecl_enable_interrupts_env(env); - return output; + if (rc == ECL_MUTEX_SUCCESS) { + ecl_return1(env,lock); + } else if (rc == ECL_MUTEX_LOCKED) { + ecl_return1(env,ECL_NIL); +#if defined(ECL_MUTEX_DEADLOCK) + } else if (ecl_unlikely(rc == ECL_MUTEX_DEADLOCK)) { + FEerror_not_a_recursive_lock(lock); +#endif + } else { + FEunknown_lock_error(lock); + } } cl_object -mp_get_lock_nowait(cl_object lock) +mp_get_lock_wait(cl_object lock) { cl_env_ptr env = ecl_process_env(); - unlikely_if (ecl_t_of(lock) != t_lock) { - FEerror_not_a_lock(lock); + cl_object own_process = env->own_process; + int rc; + if (ecl_unlikely(ecl_t_of(lock) != t_lock)) { + FEwrong_type_nth_arg(@[mp::get-lock], 1, lock, @[mp::lock]); + } +#if !defined(ECL_MUTEX_DEADLOCK) + if (ecl_unlikely(lock->lock.owner == own_process && !lock->lock.recursive)) { + /* INV: owner != nil only if the mutex is locked */ + FEerror_not_a_recursive_lock(lock); + } +#endif + rc = ecl_mutex_lock(&lock->lock.mutex); + if (ecl_likely(rc == ECL_MUTEX_SUCCESS)) { + ecl_disable_interrupts_env(env); + lock->lock.counter++; + lock->lock.owner = own_process; + ecl_enable_interrupts_env(env); + ecl_return1(env, lock); +#if defined(ECL_MUTEX_DEADLOCK) + } else if (ecl_unlikely(rc == ECL_MUTEX_DEADLOCK)) { + FEerror_not_a_recursive_lock(lock); +#endif + } else { + FEunknown_lock_error(lock); } - ecl_return1(env, get_lock_inner(env, lock)); } static cl_object -own_or_get_lock(cl_env_ptr env, cl_object lock) +si_abort_wait_on_mutex(cl_narg narg, ...) { - cl_object output; - cl_object own_process = env->own_process; - ecl_disable_interrupts_env(env); - if (AO_compare_and_swap_full((AO_t*)&(lock->lock.owner), - (AO_t)ECL_NIL, (AO_t)own_process)) { - lock->lock.counter = 1; - output = ECL_T; - print_lock("acquired %p\t", lock, lock); - } else if (lock->lock.owner == own_process) { - output = ECL_T; - } else { - output = ECL_NIL; + const cl_env_ptr the_env = ecl_process_env(); + cl_object env = the_env->function->cclosure.env; + cl_object lock = CAR(env); + if (ECL_SYM_VAL(the_env, @'si::mutex-timeout') == lock) { + ECL_SETQ(the_env, @'si::mutex-timeout', ECL_T); + cl_throw(@'si::mutex-timeout'); } - ecl_enable_interrupts_env(env); - return output; + @(return) } cl_object -mp_get_lock_wait(cl_object lock) +si_mutex_timeout(cl_object process, cl_object lock, cl_object timeout) +{ + const cl_env_ptr the_env = ecl_process_env(); + if (cl_plusp(timeout)) { + cl_sleep(timeout); + } + ECL_WITH_NATIVE_LOCK_BEGIN(the_env, &process->process.start_stop_lock) { + if (ecl_likely(mp_process_active_p(process) != ECL_NIL)) { + ecl_interrupt_process(process, + ecl_make_cclosure_va(si_abort_wait_on_mutex, + cl_list(1, lock), + @'si::mutex-timeout', + 0)); + } + } ECL_WITH_NATIVE_LOCK_END; + @(return) +} + +cl_object +mp_get_lock_timedwait(cl_object lock, cl_object timeout) { cl_env_ptr env = ecl_process_env(); - unlikely_if (ecl_t_of(lock) != t_lock) { - FEerror_not_a_lock(lock); + cl_object own_process = env->own_process; + if (ecl_unlikely(ecl_t_of(lock) != t_lock)) { + FEwrong_type_nth_arg(@[mp::get-lock], 1, lock, @[mp::lock]); + } +#if !defined(ECL_MUTEX_DEADLOCK) + if (ecl_unlikely(lock->lock.owner == own_process && !lock->lock.recursive)) { + /* INV: owner != nil only if the mutex is locked */ + FEerror_not_a_recursive_lock(lock); + } +#endif +#if defined(ECL_WINDOWS_THREADS) || defined(HAVE_PTHREAD_MUTEX_TIMEDLOCK) + int rc = ecl_mutex_timedlock(&lock->lock.mutex, ecl_to_double(timeout)); +#else + /* If we don't have pthread_mutex_timedlock available, we create a + * timer thread which interrupts our thread after the specified + * timeout. si::mutex-timeout serves a dual purpose below: the + * symbol itself denotes a catchpoint and its value is used to + * determine a) if the catchpoint is active and b) if the timer has + * fired. */ + volatile int rc; + volatile cl_object timer_thread; + ecl_bds_bind(env, @'si::mutex-timeout', lock); + ECL_CATCH_BEGIN(env, @'si::mutex-timeout') { + timer_thread = mp_process_run_function(5, @'si::mutex-timeout', + @'si::mutex-timeout', + env->own_process, + lock, + timeout); + rc = ecl_mutex_lock(&lock->lock.mutex); + ECL_SETQ(env, @'si::mutex-timeout', ECL_NIL); + } ECL_CATCH_END; + ECL_WITH_NATIVE_LOCK_BEGIN(env, &timer_thread->process.start_stop_lock) { + if (mp_process_active_p(timer_thread)) { + ecl_interrupt_process(timer_thread, @'mp::exit-process'); + } + } ECL_WITH_NATIVE_LOCK_END; + if (ECL_SYM_VAL(env, @'si::mutex-timeout') == ECL_T) { + rc = ECL_MUTEX_TIMEOUT; + /* The mutex might have been locked before we could kill the timer + * thread. Therefore, we unconditionally try to unlock the mutex + * again and treat the operation as having timed out. */ + ecl_mutex_unlock(&lock->lock.mutex); } - if (get_lock_inner(env, lock) == ECL_NIL) { - ecl_wait_on(env, own_or_get_lock, lock); + ecl_bds_unwind1(env); +#endif + if (rc == ECL_MUTEX_SUCCESS) { + ecl_disable_interrupts_env(env); + lock->lock.counter++; + lock->lock.owner = own_process; + ecl_enable_interrupts_env(env); + ecl_return1(env, lock); + } else if (rc == ECL_MUTEX_TIMEOUT) { + ecl_return1(env,ECL_NIL); +#if defined(ECL_MUTEX_DEADLOCK) + } else if (ecl_unlikely(rc == ECL_MUTEX_DEADLOCK)) { + FEerror_not_a_recursive_lock(lock); +#endif + } else { + FEunknown_lock_error(lock); } - @(return ECL_T); } @(defun mp::get-lock (lock &optional (wait ECL_T)) @ if (Null(wait)) { return mp_get_lock_nowait(lock); - } - else { + } else if (ecl_realp(wait)) { + return mp_get_lock_timedwait(lock, wait); + } else { return mp_get_lock_wait(lock); } @) diff --git a/src/c/threads/process.d b/src/c/threads/process.d index 952717b93a0f00c531574a7b88ebfa28d9c965df..d93dc5fbfec56c02bf430e425af2959b266b13ec 100755 --- a/src/c/threads/process.d +++ b/src/c/threads/process.d @@ -27,6 +27,9 @@ #ifdef HAVE_GETTIMEOFDAY # include #endif +#ifdef HAVE_SCHED_H +# include +#endif #include #include @@ -90,7 +93,7 @@ extend_process_vector() cl_object v = cl_core.processes; cl_index new_size = v->vector.dim + v->vector.dim/2; cl_env_ptr the_env = ecl_process_env(); - ECL_WITH_SPINLOCK_BEGIN(the_env, &cl_core.processes_spinlock) { + ECL_WITH_NATIVE_LOCK_BEGIN(the_env, &cl_core.processes_lock) { cl_object other = cl_core.processes; if (new_size > other->vector.dim) { cl_object new = si_make_vector(ECL_T, @@ -100,7 +103,7 @@ extend_process_vector() ecl_copy_subarray(new, 0, other, 0, other->vector.dim); cl_core.processes = new; } - } ECL_WITH_SPINLOCK_END; + } ECL_WITH_NATIVE_LOCK_END; } static void @@ -109,7 +112,7 @@ ecl_list_process(cl_object process) cl_env_ptr the_env = ecl_process_env(); bool ok = 0; do { - ECL_WITH_SPINLOCK_BEGIN(the_env, &cl_core.processes_spinlock) { + ECL_WITH_NATIVE_LOCK_BEGIN(the_env, &cl_core.processes_lock) { cl_object vector = cl_core.processes; cl_index size = vector->vector.dim; cl_index ndx = vector->vector.fillp; @@ -118,7 +121,7 @@ ecl_list_process(cl_object process) vector->vector.fillp = ndx; ok = 1; } - } ECL_WITH_SPINLOCK_END; + } ECL_WITH_NATIVE_LOCK_END; if (ok) break; extend_process_vector(); } while (1); @@ -129,8 +132,7 @@ ecl_list_process(cl_object process) static void ecl_unlist_process(cl_object process) { - cl_env_ptr the_env = ecl_process_env(); - ecl_get_spinlock(the_env, &cl_core.processes_spinlock); + ecl_mutex_lock(&cl_core.processes_lock); cl_object vector = cl_core.processes; cl_index i; for (i = 0; i < vector->vector.fillp; i++) { @@ -143,7 +145,7 @@ ecl_unlist_process(cl_object process) break; } } - ecl_giveup_spinlock(&cl_core.processes_spinlock); + ecl_mutex_unlock(&cl_core.processes_lock); } static cl_object @@ -151,7 +153,7 @@ ecl_process_list() { cl_env_ptr the_env = ecl_process_env(); cl_object output = ECL_NIL; - ECL_WITH_SPINLOCK_BEGIN(the_env, &cl_core.processes_spinlock) { + ECL_WITH_NATIVE_LOCK_BEGIN(the_env, &cl_core.processes_lock) { cl_object vector = cl_core.processes; cl_object *data = vector->vector.self.t; cl_index i; @@ -160,7 +162,7 @@ ecl_process_list() if (p != ECL_NIL) output = ecl_cons(p, output); } - } ECL_WITH_SPINLOCK_END; + } ECL_WITH_NATIVE_LOCK_END; return output; } @@ -193,13 +195,11 @@ thread_cleanup(void *aux) /* The following flags will disable all interrupts. */ if (env) { - ecl_get_spinlock(env, &process->process.start_stop_spinlock); - } - AO_store_full((AO_t*)&process->process.phase, ECL_PROCESS_EXITING); - if (env) { - ecl_clear_bignum_registers(env); ecl_disable_interrupts_env(env); + ecl_clear_bignum_registers(env); } + ecl_mutex_lock(&process->process.start_stop_lock); + process->process.phase = ECL_PROCESS_EXITING; #ifdef HAVE_SIGPROCMASK /* ...but we might get stray signals. */ { @@ -214,12 +214,12 @@ thread_cleanup(void *aux) #ifdef ECL_WINDOWS_THREADS CloseHandle(process->process.thread); #endif - mp_barrier_unblock(3, process->process.exit_barrier, @':disable', ECL_T); ecl_set_process_env(NULL); if (env) _ecl_dealloc_env(env); - AO_store_release((AO_t*)&process->process.phase, ECL_PROCESS_INACTIVE); - ecl_giveup_spinlock(&process->process.start_stop_spinlock); + process->process.phase = ECL_PROCESS_INACTIVE; + ecl_cond_var_broadcast(&process->process.exit_barrier); + ecl_mutex_unlock(&process->process.start_stop_lock); } #ifdef ECL_WINDOWS_THREADS @@ -249,9 +249,7 @@ static DWORD WINAPI thread_entry_point(void *arg) pthread_cleanup_push(thread_cleanup, (void *)process); #endif ecl_cs_set_org(env); - ecl_get_spinlock(env, &process->process.start_stop_spinlock); - print_lock("ENVIRON %p %p %p %p", ECL_NIL, process, - env->bds_org, env->bds_top, env->bds_limit); + ecl_mutex_lock(&process->process.start_stop_lock); /* 2) Execute the code. The CATCH_ALL point is the destination * provides us with an elegant way to exit the thread: we just @@ -264,8 +262,8 @@ static DWORD WINAPI thread_entry_point(void *arg) pthread_sigmask(SIG_SETMASK, new, NULL); } #endif - ecl_giveup_spinlock(&process->process.start_stop_spinlock); process->process.phase = ECL_PROCESS_ACTIVE; + ecl_mutex_unlock(&process->process.start_stop_lock); ecl_enable_interrupts_env(env); si_trap_fpe(@'last', ECL_T); ecl_bds_bind(env, @'mp::*current-process*', process); @@ -285,7 +283,6 @@ static DWORD WINAPI thread_entry_point(void *arg) /* ABORT restart. */ process->process.exit_values = args; } ECL_RESTART_CASE_END; - process->process.phase = ECL_PROCESS_EXITING; ecl_bds_unwind1(env); } ECL_CATCH_ALL_END; @@ -305,6 +302,7 @@ static DWORD WINAPI thread_entry_point(void *arg) static cl_object alloc_process(cl_object name, cl_object initial_bindings) { + cl_env_ptr env = ecl_process_env(); cl_object process = ecl_alloc_object(t_process), array; process->process.phase = ECL_PROCESS_INACTIVE; process->process.name = name; @@ -322,12 +320,12 @@ alloc_process(cl_object name, cl_object initial_bindings) } process->process.initial_bindings = array; process->process.woken_up = ECL_NIL; - process->process.start_stop_spinlock = ECL_NIL; process->process.queue_record = ecl_list1(process); - /* Creates the exit barrier so that processes can wait for termination, - * but it is created in a disabled state. */ - process->process.exit_barrier = ecl_make_barrier(name, MOST_POSITIVE_FIXNUM); - mp_barrier_unblock(3, process->process.exit_barrier, @':disable', ECL_T); + ecl_disable_interrupts_env(env); + ecl_mutex_init(&process->process.start_stop_lock, TRUE); + ecl_cond_var_init(&process->process.exit_barrier); + ecl_set_finalizer_unprotected(process, ECL_T); + ecl_enable_interrupts_env(env); return process; } @@ -396,7 +394,7 @@ ecl_import_current_thread(cl_object name, cl_object bindings) env_aux->disable_interrupts = 1; env_aux->interrupt_struct = ecl_alloc_unprotected(sizeof(*env_aux->interrupt_struct)); env_aux->interrupt_struct->pending_interrupt = ECL_NIL; - env_aux->interrupt_struct->signal_queue_spinlock = ECL_NIL; + ecl_mutex_init(&env_aux->interrupt_struct->signal_queue_lock, FALSE); env_aux->interrupt_struct->signal_queue = ECL_NIL; ecl_set_process_env(env_aux); ecl_init_env(env_aux); @@ -421,8 +419,6 @@ ecl_import_current_thread(cl_object name, cl_object bindings) ecl_list_process(process); ecl_enable_interrupts_env(env); - /* Activate the barrier so that processes can immediately start waiting. */ - mp_barrier_unblock(1, process->process.exit_barrier); process->process.phase = ECL_PROCESS_ACTIVE; ecl_bds_bind(env, @'mp::*current-process*', process); @@ -470,11 +466,11 @@ mp_interrupt_process(cl_object process, cl_object function) { cl_env_ptr env = ecl_process_env(); /* Make sure we don't interrupt an exiting process */ - ECL_WITH_SPINLOCK_BEGIN(env, &process->process.start_stop_spinlock) { + ECL_WITH_NATIVE_LOCK_BEGIN(env, &process->process.start_stop_lock) { unlikely_if (mp_process_active_p(process) == ECL_NIL) FEerror("Cannot interrupt the inactive process ~A", 1, process); ecl_interrupt_process(process, function); - } ECL_WITH_SPINLOCK_END; + } ECL_WITH_NATIVE_LOCK_END; @(return ECL_T); } @@ -521,7 +517,13 @@ mp_process_kill(cl_object process) cl_object mp_process_yield(void) { - ecl_process_yield(); +#if defined(ECL_WINDOWS_THREADS) + Sleep(0); +#elif defined(HAVE_SCHED_H) + sched_yield(); +#else + ecl_musleep(0.0, 1); +#endif @(return); } @@ -532,17 +534,19 @@ mp_process_enable(cl_object process) * ECL_UNWIND_PROTECT_BEGIN, so they need to be declared volatile */ volatile cl_env_ptr process_env = NULL; cl_env_ptr the_env = ecl_process_env(); - volatile int ok = 0; + volatile int ok = 1; ECL_UNWIND_PROTECT_BEGIN(the_env) { - /* Try to gain exclusive access to the process at the same - * time we ensure that it is inactive. This prevents two - * concurrent calls to process-enable from different threads - * on the same process */ - unlikely_if (!AO_compare_and_swap_full((AO_t*)&process->process.phase, - ECL_PROCESS_INACTIVE, - ECL_PROCESS_BOOTING)) { + /* Try to gain exclusive access to the process. This prevents two + * concurrent calls to process-enable from different threads on + * the same process */ + ecl_mutex_lock(&process->process.start_stop_lock); + /* Ensure that the process is inactive. */ + if (process->process.phase != ECL_PROCESS_INACTIVE) { FEerror("Cannot enable the running process ~A.", 1, process); } + ok = 0; + process->process.phase = ECL_PROCESS_BOOTING; + process->process.parent = mp_current_process(); process->process.trap_fpe_bits = process->process.parent->process.env->trap_fpe_bits; @@ -567,12 +571,6 @@ mp_process_enable(cl_object process) process_env->thread_local_bindings = process_env->bindings_array->vector.self.t; - /* Activate the barrier so that processes can immediately start waiting. */ - mp_barrier_unblock(1, process->process.exit_barrier); - - /* Block the thread with this spinlock until it is ready */ - process->process.start_stop_spinlock = ECL_T; - ecl_disable_interrupts_env(the_env); #ifdef ECL_WINDOWS_THREADS { @@ -618,16 +616,15 @@ mp_process_enable(cl_object process) /* INV: interrupts are already disabled through thread safe * unwind-protect */ ecl_unlist_process(process); - /* Disable the barrier and alert possible waiting processes. */ - mp_barrier_unblock(3, process->process.exit_barrier, - @':disable', ECL_T); process->process.phase = ECL_PROCESS_INACTIVE; + /* Alert possible waiting processes. */ + ecl_cond_var_broadcast(&process->process.exit_barrier); process->process.env = NULL; if (process_env != NULL) _ecl_dealloc_env(process_env); } /* Unleash the thread */ - ecl_giveup_spinlock(&process->process.start_stop_spinlock); + ecl_mutex_unlock(&process->process.start_stop_lock); } ECL_UNWIND_PROTECT_THREAD_SAFE_END; @(return (ok? process : ECL_NIL)); @@ -677,13 +674,20 @@ mp_process_whostate(cl_object process) cl_object mp_process_join(cl_object process) { + cl_env_ptr the_env = ecl_process_env(); + volatile cl_object values; + assert_type_process(process); - if (process->process.phase) { - /* We try to acquire a lock that is only owned by the process - * while it is active. */ - mp_barrier_wait(process->process.exit_barrier); - } - return cl_values_list(process->process.exit_values); + ECL_UNWIND_PROTECT_BEGIN(the_env) { + ecl_mutex_lock(&process->process.start_stop_lock); + while (process->process.phase != ECL_PROCESS_INACTIVE) { + ecl_cond_var_wait(&process->process.exit_barrier, &process->process.start_stop_lock); + } + values = cl_values_list(process->process.exit_values); + } ECL_UNWIND_PROTECT_THREAD_SAFE_EXIT { + ecl_mutex_unlock(&process->process.start_stop_lock); + } ECL_UNWIND_PROTECT_THREAD_SAFE_END; + return values; } cl_object @@ -831,8 +835,8 @@ init_threads(cl_env_ptr env) process->process.env = env; process->process.woken_up = ECL_NIL; process->process.queue_record = ecl_list1(process); - process->process.start_stop_spinlock = ECL_NIL; - process->process.exit_barrier = ecl_make_barrier(process->process.name, MOST_POSITIVE_FIXNUM); + ecl_mutex_init(&process->process.start_stop_lock, TRUE); + ecl_cond_var_init(&process->process.exit_barrier); env->own_process = process; @@ -844,8 +848,9 @@ init_threads(cl_env_ptr env) v->vector.self.t[0] = process; v->vector.fillp = 1; cl_core.processes = v; - cl_core.global_lock = ecl_make_lock(@'mp::global-lock', 1); - cl_core.error_lock = ecl_make_lock(@'mp::error-lock', 1); - cl_core.global_env_lock = ecl_make_rwlock(@'ext::package-lock'); + ecl_mutex_init(&cl_core.processes_lock, 1); + ecl_mutex_init(&cl_core.global_lock, 1); + ecl_mutex_init(&cl_core.error_lock, 1); + ecl_rwlock_init(&cl_core.global_env_lock); } } diff --git a/src/c/threads/queue.d b/src/c/threads/queue.d deleted file mode 100755 index 09237c58398894ce31083fd0aef6583b8ad78719..0000000000000000000000000000000000000000 --- a/src/c/threads/queue.d +++ /dev/null @@ -1,401 +0,0 @@ -/* -*- Mode: C; c-basic-offset: 2; indent-tabs-mode: nil -*- */ -/* vim: set filetype=c tabstop=2 shiftwidth=2 expandtab: */ - -/* - * queue.d - waiting queue for threads - * - * Copyright (c) 2011 Juan Jose Garcia Ripoll - * - * See file 'LICENSE' for the copyright details. - * - */ - -#ifdef HAVE_SCHED_H -#include -#endif -#include -#include -#include - -void ECL_INLINE -ecl_process_yield() -{ -#if defined(ECL_WINDOWS_THREADS) - Sleep(0); -#elif defined(HAVE_SCHED_H) - sched_yield(); -#else - ecl_musleep(0.0, 1); -#endif -} - -void ECL_INLINE -ecl_get_spinlock(cl_env_ptr the_env, cl_object *lock) -{ - cl_object own_process = the_env->own_process; - if(*lock == own_process) - return; - while (!AO_compare_and_swap_full((AO_t*)lock, (AO_t)ECL_NIL, - (AO_t)own_process)) { - ecl_process_yield(); - } -} - -void ECL_INLINE -ecl_giveup_spinlock(cl_object *lock) -{ - AO_store((AO_t*)lock, (AO_t)ECL_NIL); -} - -static ECL_INLINE void -wait_queue_nconc(cl_env_ptr the_env, cl_object q, cl_object new_tail) -{ - /* INV: interrupts are disabled */ - ecl_get_spinlock(the_env, &q->queue.spinlock); - q->queue.list = ecl_nconc(q->queue.list, new_tail); - ecl_giveup_spinlock(&q->queue.spinlock); -} - -static ECL_INLINE cl_object -wait_queue_pop_all(cl_env_ptr the_env, cl_object q) -{ - cl_object output; - ecl_disable_interrupts_env(the_env); - { - ecl_get_spinlock(the_env, &q->queue.spinlock); - output = q->queue.list; - q->queue.list = ECL_NIL; - ecl_giveup_spinlock(&q->queue.spinlock); - } - ecl_enable_interrupts_env(the_env); - return output; -} - -static ECL_INLINE void -wait_queue_delete(cl_env_ptr the_env, cl_object q, cl_object item) -{ - /* INV: interrupts are disabled */ - ecl_get_spinlock(the_env, &q->queue.spinlock); - q->queue.list = ecl_delete_eq(item, q->queue.list); - ecl_giveup_spinlock(&q->queue.spinlock); -} - -/*---------------------------------------------------------------------- - * THREAD SCHEDULER & WAITING - */ - -#if !defined(HAVE_SIGPROCMASK) -static cl_object -bignum_set_time(cl_object bignum, struct ecl_timeval *time) -{ - _ecl_big_set_index(bignum, time->tv_sec); - _ecl_big_mul_ui(bignum, bignum, 1000); - _ecl_big_add_ui(bignum, bignum, (time->tv_usec + 999) / 1000); - return bignum; -} - -static cl_object -elapsed_time(struct ecl_timeval *start) -{ - cl_object delta_big = _ecl_big_register0(); - cl_object aux_big = _ecl_big_register1(); - struct ecl_timeval now; - ecl_get_internal_real_time(&now); - bignum_set_time(aux_big, start); - bignum_set_time(delta_big, &now); - _ecl_big_sub(delta_big, delta_big, aux_big); - _ecl_big_register_free(aux_big); - return delta_big; -} - -static double -waiting_time(cl_index iteration, struct ecl_timeval *start) -{ - /* Waiting time is smaller than 0.10 s */ - double time; - cl_object top = ecl_make_fixnum(10 * 1000); - cl_object delta_big = elapsed_time(start); - _ecl_big_div_ui(delta_big, delta_big, iteration); - if (ecl_number_compare(delta_big, top) < 0) { - time = ecl_to_double(delta_big) * 1.5; - } else { - time = 0.10; - } - _ecl_big_register_free(delta_big); - return time; -} - -static cl_object -ecl_wait_on_timed(cl_env_ptr env, mp_wait_test condition, cl_object mp_object) -{ - volatile const cl_env_ptr the_env = env; - volatile cl_object own_process = the_env->own_process; - volatile cl_object record; - volatile cl_object output; - cl_fixnum iteration = 0; - struct ecl_timeval start; - ecl_get_internal_real_time(&start); - - /* 0) We reserve a record for the queue. In order to avoid - * using the garbage collector, we reuse records */ - record = own_process->process.queue_record; - unlikely_if (record == ECL_NIL) { - record = ecl_list1(own_process); - } else { - own_process->process.queue_record = ECL_NIL; - } - - ecl_bds_bind(the_env, @'ext::*interrupts-enabled*', ECL_NIL); - ECL_UNWIND_PROTECT_BEGIN(the_env) { - /* 2) Now we add ourselves to the queue. In order to - * avoid a call to the GC, we try to reuse records. */ - print_lock("adding to queue", mp_object); - own_process->process.woken_up = ECL_NIL; - wait_queue_nconc(the_env, mp_object, record); - ecl_bds_bind(the_env, @'ext::*interrupts-enabled*', ECL_T); - ecl_check_pending_interrupts(the_env); - - /* This spinlock is here because the default path (fair) is - * too slow */ - for (iteration = 0; iteration < 10; iteration++) { - if (!Null(output = condition(the_env, mp_object))) - break; - } - - /* 3) Unlike the sigsuspend() implementation, this - * implementation does not block signals and the - * wakeup event might be lost before the sleep - * function is invoked. We must thus spin over short - * intervals of time to ensure that we check the - * condition periodically. */ - while (Null(output)) { - ecl_musleep(waiting_time(iteration++, &start), 1); - output = condition(the_env, mp_object); - } - ecl_bds_unwind1(the_env); - } ECL_UNWIND_PROTECT_EXIT { - /* 4) At this point we wrap up. We remove ourselves - * from the queue and unblock the lisp interrupt - * signal. Note that we recover the cons for later use.*/ - wait_queue_delete(the_env, mp_object, own_process); - own_process->process.queue_record = record; - ECL_RPLACD(record, ECL_NIL); - - /* 5) When this process exits, it may be because it - * aborts (which we know because output == ECL_NIL), or - * because the condition is satisfied. In both cases - * we allow the first in the queue to test again its - * condition. This is needed for objects, such as - * semaphores, where the condition may be satisfied - * more than once. */ - if (Null(output)) { - ecl_wakeup_waiters(the_env, mp_object, ECL_WAKEUP_ONE); - } - } ECL_UNWIND_PROTECT_END; - ecl_bds_unwind1(the_env); - return output; -} -#endif - -/********************************************************************** - * BLOCKING WAIT QUEUE ALGORITHM - * - * This object keeps a list of processes waiting for a condition to - * happen. The queue is ordered and the only processes that check for - * the condition are - * - The first process to arrive to the queue, - * - Each process which is awoken. - * - The first process after the list of awoken processes. - * - * The idea is that this will ensure some fairness when unblocking the - * processes, which is important for abstractions such as mutexes or - * semaphores, where we want equal sharing of resources among processes. - * - * This also implies that the waiting processes depend on others to signal - * when to check for a condition. This happens in two situations - * - External code that changes the fields of the queue object - * must signal ecl_wakeup_waiters() (See mutex.d, semaphore.d, etc) - * - When a process exits ecl_wait_on() it always resignals the next - * process in the queue, because a condition may be satisfied more - * than once (for instance when a semaphore is changed, more than - * one process may be released) - * - * The critical part of this algorithm is the fact that processes - * communicating the change of conditions may do so before, during or - * after a process has been registered. Since we do not want those signals - * to be lost, a proper ordering of steps is required. - */ - -cl_object -ecl_wait_on(cl_env_ptr env, mp_wait_test condition, cl_object mp_object) -{ -#if defined(HAVE_SIGPROCMASK) - volatile const cl_env_ptr the_env = env; - volatile cl_object own_process = the_env->own_process; - volatile cl_object record; - volatile sigset_t original; - volatile cl_object output; - - /* 0) We reserve a record for the queue. In order to avoid - * using the garbage collector, we reuse records */ - record = own_process->process.queue_record; - unlikely_if (record == ECL_NIL) { - record = ecl_list1(own_process); - } else { - own_process->process.queue_record = ECL_NIL; - } - - /* 1) First we block lisp interrupt signals. This ensures that - * any awake signal that is issued from here is not lost. */ - { - int code = ecl_option_values[ECL_OPT_THREAD_INTERRUPT_SIGNAL]; - sigset_t empty; - sigemptyset(&empty); - sigaddset(&empty, code); - pthread_sigmask(SIG_BLOCK, &empty, (sigset_t *)&original); - } - - /* 2) Now we add ourselves to the queue. */ - own_process->process.woken_up = ECL_NIL; - wait_queue_nconc(the_env, mp_object, record); - - ECL_UNWIND_PROTECT_BEGIN(the_env) { - /* 3) At this point we may receive signals, but we - * might have missed a wakeup event if that happened - * between 0) and 2), which is why we start with the - * check*/ - while (Null(output = condition(the_env, mp_object))) - { - /* This will wait until we get a signal that - * demands some code being executed. Note that - * this includes our communication signals and - * the signals used by the GC. Note also that - * as a consequence we might throw / return - * which is why need to protect it all with - * UNWIND-PROTECT. */ - sigsuspend((sigset_t *)&original); - } - } ECL_UNWIND_PROTECT_EXIT { - /* 4) At this point we wrap up. We remove ourselves - * from the queue and unblock the lisp interrupt - * signal. Note that we recover the cons for later use.*/ - wait_queue_delete(the_env, mp_object, own_process); - own_process->process.queue_record = record; - ECL_RPLACD(record, ECL_NIL); - - /* 5) When this process exits, it may be because it - * aborts (which we know because output == ECL_NIL), or - * because the condition is satisfied. In both cases - * we allow the first in the queue to test again its - * condition. This is needed for objects, such as - * semaphores, where the condition may be satisfied - * more than once. */ - if (Null(output)) { - ecl_wakeup_waiters(the_env, mp_object, ECL_WAKEUP_ONE); - } - - /* 6) Restoring signals is done last, to ensure that - * all cleanup steps are performed. */ - pthread_sigmask(SIG_SETMASK, (sigset_t *)&original, NULL); - } ECL_UNWIND_PROTECT_END; - return output; -#else - return ecl_wait_on_timed(env, condition, mp_object); -#endif -} - -cl_object -ecl_waiter_pop(cl_env_ptr the_env, cl_object q) -{ - cl_object output; - ecl_disable_interrupts_env(the_env); - ecl_get_spinlock(the_env, &q->queue.spinlock); - { - cl_object l; - output = ECL_NIL; - for (l = q->queue.list; l != ECL_NIL; l = ECL_CONS_CDR(l)) { - cl_object p = ECL_CONS_CAR(l); - if (p->process.phase != ECL_PROCESS_INACTIVE && - p->process.phase != ECL_PROCESS_EXITING) { - output = p; - break; - } - } - } - ecl_giveup_spinlock(&q->queue.spinlock); - ecl_enable_interrupts_env(the_env); - return output; -} - -void -ecl_wakeup_waiters(cl_env_ptr the_env, cl_object q, int flags) -{ - ecl_disable_interrupts_env(the_env); - ecl_get_spinlock(the_env, &q->queue.spinlock); - if (q->queue.list != ECL_NIL) { - /* We scan the list of waiting processes, awaking one - * or more, depending on flags. In running through the list - * we eliminate zombie processes --- they should not be here - * because of the UNWIND-PROTECT in ecl_wait_on(), but - * sometimes shit happens */ - cl_object *tail, l; - for (tail = &q->queue.list; (l = *tail) != ECL_NIL; ) { - cl_object p = ECL_CONS_CAR(l); - ecl_get_spinlock(the_env, &p->process.start_stop_spinlock); - if (p->process.phase == ECL_PROCESS_INACTIVE || - p->process.phase == ECL_PROCESS_EXITING) { - print_lock("removing %p", q, p); - *tail = ECL_CONS_CDR(l); - } else { - print_lock("awaking %p", q, p); - /* If the process is active, we then - * simply awake it with a signal.*/ - p->process.woken_up = ECL_T; - if (flags & ECL_WAKEUP_DELETE) - *tail = ECL_CONS_CDR(l); - tail = &ECL_CONS_CDR(l); - if (flags & ECL_WAKEUP_KILL) - ecl_interrupt_process(p, @'mp::exit-process'); - else - ecl_wakeup_process(p); - if (!(flags & ECL_WAKEUP_ALL)) { - ecl_giveup_spinlock(&p->process.start_stop_spinlock); - break; - } - } - ecl_giveup_spinlock(&p->process.start_stop_spinlock); - } - } - ecl_giveup_spinlock(&q->queue.spinlock); - ecl_enable_interrupts_env(the_env); - ecl_process_yield(); -} - -#undef print_lock - -void -print_lock(char *prefix, cl_object l, ...) -{ - static cl_object lock = ECL_NIL; - va_list args; - va_start(args, l); - if (l == ECL_NIL - || ecl_t_of(l) == t_condition_variable - || ECL_FIXNUMP(l->lock.name)) { - cl_env_ptr env = ecl_process_env(); - ecl_get_spinlock(env, &lock); - printf("\n%ld\t", ecl_fixnum(env->own_process->process.name)); - vprintf(prefix, args); - if (l != ECL_NIL) { - cl_object p = l->lock.queue_list; - while (p != ECL_NIL) { - printf(" %lx", ecl_fixnum(ECL_CONS_CAR(p)->process.name)); - p = ECL_CONS_CDR(p); - } - } - fflush(stdout); - ecl_giveup_spinlock(&lock); - } - va_end(args); -} -/*#define print_lock(a,b,c) (void)0*/ diff --git a/src/c/threads/rwlock.d b/src/c/threads/rwlock.d index e55f29a12ca28158c8cc8a8d0bf26939c2bd80bf..d95533175dd98160897b9c1b699387385db54a9c 100644 --- a/src/c/threads/rwlock.d +++ b/src/c/threads/rwlock.d @@ -26,59 +26,16 @@ * READ/WRITE LOCKS */ -static void -FEerror_not_a_rwlock(cl_object lock) -{ - FEwrong_type_argument(@'mp::rwlock', lock); -} - -static void -FEunknown_rwlock_error(cl_object lock, int rc) -{ -#ifdef ECL_WINDOWS_THREADS - FEwin32_error("When acting on rwlock ~A, got an unexpected error.", 1, lock); -#else - const char *msg = NULL; - switch (rc) { - case EINVAL: - msg = "The value specified by rwlock is invalid"; - break; - case EPERM: - msg = "Read/write lock not owned by us"; - break; - case EDEADLK: - msg = "Thread already owns this lock"; - break; - case ENOMEM: - msg = "Out of memory"; - break; - default: - FElibc_error("When acting on rwlock ~A, got an unexpected error.", - 1, lock); - } - FEerror("When acting on rwlock ~A, got the following C library error:~%" - "~A", 2, lock, ecl_make_constant_base_string(msg,-1)); -#endif -} - cl_object ecl_make_rwlock(cl_object name) { - const cl_env_ptr the_env = ecl_process_env(); + cl_env_ptr env = ecl_process_env(); cl_object output = ecl_alloc_object(t_rwlock); -#ifdef ECL_RWLOCK - int rc; - ecl_disable_interrupts_env(the_env); - rc = pthread_rwlock_init(&output->rwlock.mutex, NULL); - ecl_enable_interrupts_env(the_env); - if (rc) { - FEerror("Unable to create read/write lock", 0); - } - ecl_set_finalizer_unprotected(output, ECL_T); -#else - output->rwlock.mutex = ecl_make_lock(name, 0); -#endif output->rwlock.name = name; + ecl_disable_interrupts_env(env); + ecl_rwlock_init(&output->rwlock.mutex); + ecl_set_finalizer_unprotected(output, ECL_T); + ecl_enable_interrupts_env(env); return output; } @@ -91,134 +48,130 @@ cl_object mp_rwlock_name(cl_object lock) { const cl_env_ptr env = ecl_process_env(); - if (ecl_t_of(lock) != t_rwlock) - FEerror_not_a_rwlock(lock); + if (ecl_unlikely(ecl_t_of(lock) != t_rwlock)) { + FEwrong_type_only_arg(@[mp::rwlock-name], lock, @[mp::rwlock]); + } ecl_return1(env, lock->rwlock.name); } cl_object mp_giveup_rwlock_read(cl_object lock) { - /* Must be called with interrupts disabled. */ - if (ecl_t_of(lock) != t_rwlock) - FEerror_not_a_rwlock(lock); -#ifdef ECL_RWLOCK - { - int rc = pthread_rwlock_unlock(&lock->rwlock.mutex); - if (rc) - FEunknown_rwlock_error(lock, rc); - @(return ECL_T); + const cl_env_ptr env = ecl_process_env(); + int rc; + if (ecl_unlikely(ecl_t_of(lock) != t_rwlock)) { + FEwrong_type_only_arg(@[mp::giveup-rwlock-read], lock, @[mp::rwlock]); + } + rc = ecl_rwlock_unlock_read(&lock->rwlock.mutex); + if (ecl_likely(rc == ECL_MUTEX_SUCCESS)) { + ecl_return1(env, ECL_T); + } else if (rc == ECL_MUTEX_NOT_OWNED) { + FEerror_not_owned(lock); + } else { + FEunknown_lock_error(lock); } -#else - return mp_giveup_lock(lock->rwlock.mutex); -#endif } cl_object mp_giveup_rwlock_write(cl_object lock) { - return mp_giveup_rwlock_read(lock); + const cl_env_ptr env = ecl_process_env(); + int rc; + if (ecl_unlikely(ecl_t_of(lock) != t_rwlock)) { + FEwrong_type_only_arg(@[mp::giveup-rwlock-write], lock, @[mp::rwlock]); + } + rc = ecl_rwlock_unlock_write(&lock->rwlock.mutex); + if (ecl_likely(rc == ECL_MUTEX_SUCCESS)) { + ecl_return1(env, ECL_T); + } else if (rc == ECL_MUTEX_NOT_OWNED) { + FEerror_not_owned(lock); + } else { + FEunknown_lock_error(lock); + } } cl_object mp_get_rwlock_read_nowait(cl_object lock) { - if (ecl_t_of(lock) != t_rwlock) - FEerror_not_a_rwlock(lock); -#ifdef ECL_RWLOCK - { - const cl_env_ptr env = ecl_process_env(); - cl_object output = ECL_T; - int rc = pthread_rwlock_tryrdlock(&lock->rwlock.mutex); - if (rc == 0) { - output = ECL_T; - } else if (rc == EBUSY) { - output = ECL_NIL; - } else { - FEunknown_rwlock_error(lock, rc); - } - ecl_return1(env, output); + const cl_env_ptr env = ecl_process_env(); + int rc; + if (ecl_unlikely(ecl_t_of(lock) != t_rwlock)) { + FEwrong_type_only_arg(@[mp::get-rwlock-read], lock, @[mp::rwlock]); + } + rc = ecl_rwlock_trylock_read(&lock->rwlock.mutex); + if (rc == ECL_MUTEX_SUCCESS) { + ecl_return1(env, ECL_T); + } else if (rc == ECL_MUTEX_LOCKED) { + ecl_return1(env, ECL_NIL); + } else { + FEunknown_lock_error(lock); } -#else - return mp_get_lock_nowait(lock->rwlock.mutex); -#endif } cl_object mp_get_rwlock_read_wait(cl_object lock) { - if (ecl_t_of(lock) != t_rwlock) - FEerror_not_a_rwlock(lock); -#ifdef ECL_RWLOCK - { - const cl_env_ptr env = ecl_process_env(); - int rc = pthread_rwlock_rdlock(&lock->rwlock.mutex); - if (rc != 0) { - FEunknown_rwlock_error(lock, rc); - } + const cl_env_ptr env = ecl_process_env(); + int rc; + if (ecl_unlikely(ecl_t_of(lock) != t_rwlock)) { + FEwrong_type_only_arg(@[mp::get-rwlock-read], lock, @[mp::rwlock]); + } + rc = ecl_rwlock_lock_read(&lock->rwlock.mutex); + if (ecl_likely(rc == ECL_MUTEX_SUCCESS)) { ecl_return1(env, ECL_T); + } else { + FEunknown_lock_error(lock); } -#else - return mp_get_lock_wait(lock->rwlock.mutex); -#endif } @(defun mp::get-rwlock-read (lock &optional (wait ECL_T)) @ - if (Null(wait)) + if (Null(wait)) { return mp_get_rwlock_read_nowait(lock); - else + } else { return mp_get_rwlock_read_wait(lock); + } @) cl_object mp_get_rwlock_write_nowait(cl_object lock) { - if (ecl_t_of(lock) != t_rwlock) - FEerror_not_a_rwlock(lock); -#ifdef ECL_RWLOCK - { - const cl_env_ptr env = ecl_process_env(); - cl_object output = ECL_T; - int rc = pthread_rwlock_trywrlock(&lock->rwlock.mutex); - if (rc == 0) { - output = ECL_T; - } else if (rc == EBUSY) { - output = ECL_NIL; - } else { - FEunknown_rwlock_error(lock, rc); - } - ecl_return1(env, output); + const cl_env_ptr env = ecl_process_env(); + int rc; + if (ecl_unlikely(ecl_t_of(lock) != t_rwlock)) { + FEwrong_type_only_arg(@[mp::get-rwlock-write], lock, @[mp::rwlock]); + } + rc = ecl_rwlock_trylock_write(&lock->rwlock.mutex); + if (rc == ECL_MUTEX_SUCCESS) { + ecl_return1(env, ECL_T); + } else if (rc == ECL_MUTEX_LOCKED) { + ecl_return1(env, ECL_NIL); + } else { + FEunknown_lock_error(lock); } -#else - return mp_get_lock_nowait(lock->rwlock.mutex); -#endif } cl_object mp_get_rwlock_write_wait(cl_object lock) { - if (ecl_t_of(lock) != t_rwlock) - FEerror_not_a_rwlock(lock); -#ifdef ECL_RWLOCK - { - int rc = pthread_rwlock_wrlock(&lock->rwlock.mutex); - if (rc != 0) { - FEunknown_rwlock_error(lock, rc); - } - @(return ECL_T); + const cl_env_ptr env = ecl_process_env(); + int rc; + if (ecl_unlikely(ecl_t_of(lock) != t_rwlock)) { + FEwrong_type_only_arg(@[mp::get-rwlock-write], lock, @[mp::rwlock]); + } + rc = ecl_rwlock_lock_write(&lock->rwlock.mutex); + if (ecl_likely(rc == ECL_MUTEX_SUCCESS)) { + ecl_return1(env, ECL_T); + } else { + FEunknown_lock_error(lock); } -#else - return mp_get_lock_wait(lock->rwlock.mutex); -#endif } @(defun mp::get-rwlock-write (lock &optional (wait ECL_T)) @ if (Null(wait)) { return mp_get_rwlock_write_nowait(lock); - } - else { + } else { return mp_get_rwlock_write_wait(lock); } @) diff --git a/src/c/threads/semaphore.d b/src/c/threads/semaphore.d index 8811be3f0ddcae2831bc582110252982662378c2..86c191ca8ab1f1a3ee0de1d743c854987a5877fd 100644 --- a/src/c/threads/semaphore.d +++ b/src/c/threads/semaphore.d @@ -5,33 +5,28 @@ * semaphore.d - POSIX-like semaphores * * Copyright (c) 2011 Juan Jose Garcia Ripoll + * Copyright (c) 2020 Marius Gerbershagen * * See file 'LICENSE' for the copyright details. * */ -#define AO_ASSUME_WINDOWS98 /* We need this for CAS */ #include #include -#if !defined(AO_HAVE_fetch_and_add_full) -#error "Cannot implement semaphores without AO_fetch_and_add_full" -#endif - -static ECL_INLINE void -FEerror_not_a_semaphore(cl_object semaphore) -{ - FEwrong_type_argument(@'mp::semaphore', semaphore); -} - cl_object ecl_make_semaphore(cl_object name, cl_fixnum count) { + cl_env_ptr env = ecl_process_env(); cl_object output = ecl_alloc_object(t_semaphore); output->semaphore.name = name; output->semaphore.counter = count; - output->semaphore.queue_list = ECL_NIL; - output->semaphore.queue_spinlock = ECL_NIL; + output->semaphore.wait_count = 0; + ecl_disable_interrupts_env(env); + ecl_cond_var_init(&output->semaphore.cv); + ecl_mutex_init(&output->semaphore.mutex, FALSE); + ecl_set_finalizer_unprotected(output, ECL_T); + ecl_enable_interrupts_env(env); return output; } @@ -45,7 +40,7 @@ mp_semaphore_name(cl_object semaphore) { cl_env_ptr env = ecl_process_env(); unlikely_if (ecl_t_of(semaphore) != t_semaphore) { - FEerror_not_a_semaphore(semaphore); + FEwrong_type_only_arg(@[mp::semaphore-name], semaphore, @[mp::semaphore]); } ecl_return1(env, semaphore->semaphore.name); } @@ -55,7 +50,7 @@ mp_semaphore_count(cl_object semaphore) { cl_env_ptr env = ecl_process_env(); unlikely_if (ecl_t_of(semaphore) != t_semaphore) { - FEerror_not_a_semaphore(semaphore); + FEwrong_type_only_arg(@[mp::semaphore-count], semaphore, @[mp::semaphore]); } ecl_return1(env, ecl_make_fixnum(semaphore->semaphore.counter)); } @@ -65,69 +60,77 @@ mp_semaphore_wait_count(cl_object semaphore) { cl_env_ptr env = ecl_process_env(); unlikely_if (ecl_t_of(semaphore) != t_semaphore) { - FEerror_not_a_semaphore(semaphore); + FEwrong_type_only_arg(@[mp::semaphore-wait-count], semaphore, @[mp::semaphore]); } - ecl_return1(env, cl_length(semaphore->semaphore.queue_list)); + ecl_return1(env, ecl_make_fixnum(semaphore->semaphore.wait_count)); } @(defun mp::signal-semaphore (semaphore &optional (count ecl_make_fixnum(1))) @ { cl_fixnum n = fixnnint(count); - cl_env_ptr env = ecl_process_env(); unlikely_if (ecl_t_of(semaphore) != t_semaphore) { - FEerror_not_a_semaphore(semaphore); + FEwrong_type_nth_arg(@[mp::signal-semaphore], 1, semaphore, @[mp::semaphore]); } - AO_fetch_and_add((AO_t*)&semaphore->semaphore.counter, n); - if (semaphore->semaphore.queue_list != ECL_NIL) { - ecl_wakeup_waiters(env, semaphore, ECL_WAKEUP_ALL); + ecl_disable_interrupts_env(the_env); + ecl_mutex_lock(&semaphore->semaphore.mutex); + semaphore->semaphore.counter += n; + for (; n > 0; n--) { + ecl_cond_var_signal(&semaphore->semaphore.cv); } + ecl_mutex_unlock(&semaphore->semaphore.mutex); + ecl_enable_interrupts_env(the_env); @(return); } @) -static cl_object -get_semaphore_inner(cl_env_ptr env, cl_object semaphore) -{ - cl_object output; - ecl_disable_interrupts_env(env); - do { - cl_fixnum counter = semaphore->semaphore.counter; - if (!counter) { - output = ECL_NIL; - break; - } - if (AO_compare_and_swap_full((AO_t*)&(semaphore->semaphore.counter), - (AO_t)counter, (AO_t)(counter-1))) { - output = ecl_make_fixnum(counter); - break; - } - ecl_process_yield(); - } while (1); - ecl_enable_interrupts_env(env); - return output; -} - cl_object mp_wait_on_semaphore(cl_object semaphore) { - cl_env_ptr env = ecl_process_env(); - cl_object output; + cl_env_ptr the_env = ecl_process_env(); + volatile cl_object output; unlikely_if (ecl_t_of(semaphore) != t_semaphore) { - FEerror_not_a_semaphore(semaphore); + FEwrong_type_only_arg(@[mp::wait-on-semaphore], semaphore, @[mp::semaphore]); } - output = get_semaphore_inner(env, semaphore); - if (Null(output)) { - output = ecl_wait_on(env, get_semaphore_inner, semaphore); + ecl_bds_bind(the_env, ECL_INTERRUPTS_ENABLED, ECL_NIL); + ecl_mutex_lock(&semaphore->semaphore.mutex); + if (semaphore->semaphore.counter == 0) { + semaphore->semaphore.wait_count++; + ECL_UNWIND_PROTECT_BEGIN(the_env) { + do { + ecl_setq(the_env, ECL_INTERRUPTS_ENABLED, ECL_T); + ecl_cond_var_wait(&semaphore->semaphore.cv, &semaphore->semaphore.mutex); + ecl_setq(the_env, ECL_INTERRUPTS_ENABLED, ECL_NIL); + } while (semaphore->semaphore.counter == 0); + output = ecl_make_fixnum(semaphore->semaphore.counter--); + } ECL_UNWIND_PROTECT_THREAD_SAFE_EXIT { + semaphore->semaphore.wait_count--; + ecl_mutex_unlock(&semaphore->semaphore.mutex); + } ECL_UNWIND_PROTECT_THREAD_SAFE_END; + } else { + output = ecl_make_fixnum(semaphore->semaphore.counter--); + ecl_mutex_unlock(&semaphore->semaphore.mutex); } - ecl_return1(env, output); + ecl_bds_unwind1(the_env); + ecl_check_pending_interrupts(the_env); + ecl_return1(the_env, output); } cl_object mp_try_get_semaphore(cl_object semaphore) { - cl_env_ptr env = ecl_process_env(); + cl_env_ptr the_env = ecl_process_env(); + cl_object output; unlikely_if (ecl_t_of(semaphore) != t_semaphore) { - FEerror_not_a_semaphore(semaphore); + FEwrong_type_only_arg(@[mp::try-get-semaphore], semaphore, @[mp::semaphore]); + } + ecl_disable_interrupts_env(the_env); + ecl_mutex_lock(&semaphore->semaphore.mutex); + if (semaphore->semaphore.counter > 0) { + output = ecl_make_fixnum(semaphore->semaphore.counter--); + } else { + output = ECL_NIL; } - ecl_return1(env, get_semaphore_inner(env, semaphore)); + ecl_mutex_unlock(&semaphore->semaphore.mutex); + ecl_enable_interrupts_env(the_env); + ecl_return1(the_env, output); } diff --git a/src/c/time.d b/src/c/time.d index aba7272a07209fc00901c1ae441ba2a5c506c5c8..37b1dffac74aba1903cb04dba87651157b046082 100644 --- a/src/c/time.d +++ b/src/c/time.d @@ -193,7 +193,7 @@ cl_sleep(cl_object z) time = 1e-9; } } ECL_WITHOUT_FPE_END; - ecl_musleep(time, 0); + ecl_musleep(time, 1); @(return ECL_NIL); } diff --git a/src/c/unixint.d b/src/c/unixint.d index 373adbf8f367032f6bbefb6c8fdad426e8a4fcfc..4b607aede0481680f7f647a0527031154f79024e 100644 --- a/src/c/unixint.d +++ b/src/c/unixint.d @@ -75,8 +75,6 @@ #include #include #include -/* To get APCProc calls */ -#define _WIN32_WINNT 0x400 #include #if defined(_MSC_VER) || defined(__MINGW32__) @@ -425,16 +423,16 @@ handle_all_queued_interrupt_safe(cl_env_ptr env) static void queue_signal(cl_env_ptr env, cl_object code, int allocate) { - /* Note: We don't use ECL_WITH_SPINLOCK_BEGIN/END here since - * it checks for pending interrupts after unlocking the - * spinlock. This would lead to the interrupt being handled + /* Note: We don't use ECL_WITH_NATIVE_LOCK_BEGIN/END here + * since it checks for pending interrupts after unlocking the + * mutex. This would lead to the interrupt being handled * immediately when queueing an interrupt for the current * thread, even when interrupts are disabled. */ - /* INV: interrupts are disabled, therefore the spinlock will + /* INV: interrupts are disabled, therefore the lock will * always be released */ #ifdef ECL_THREADS - ecl_get_spinlock(ecl_process_env(), &env->interrupt_struct->signal_queue_spinlock); + ecl_mutex_lock(&env->interrupt_struct->signal_queue_lock); #endif cl_object record; @@ -455,7 +453,7 @@ queue_signal(cl_env_ptr env, cl_object code, int allocate) } #ifdef ECL_THREADS - ecl_giveup_spinlock(&env->interrupt_struct->signal_queue_spinlock); + ecl_mutex_unlock(&env->interrupt_struct->signal_queue_lock); #endif } @@ -463,19 +461,17 @@ static cl_object pop_signal(cl_env_ptr env) { cl_object record, value; - /* Note: We don't use ECL_WITH_SPINLOCK_BEGIN/END here since - * it checks for pending interrupts after unlocking the - * spinlock. This would lead to handle_all_queued and - * pop_signal being called again and the interrupts being - * handled in the wrong order. */ - - /* INV: ecl_get_spinlock and ecl_giveup_spinlock don't write - * into env, therefore it is valid to use - * ecl_disable_interrupts_env */ + /* Note: We don't use ECL_WITH_NATIVE_LOCK_BEGIN/END here + * since it checks for pending interrupts after unlocking the + * mutex. This would lead to handle_all_queued and pop_signal + * being called again and the interrupts being handled in the + * wrong order. */ + ecl_disable_interrupts_env(env); #ifdef ECL_THREADS - ecl_get_spinlock(env, &env->interrupt_struct->signal_queue_spinlock); + ecl_mutex_lock(&env->interrupt_struct->signal_queue_lock); #endif + if (env->interrupt_struct->pending_interrupt == ECL_NIL) { value = ECL_NIL; } else { @@ -488,8 +484,9 @@ pop_signal(cl_env_ptr env) env->interrupt_struct->signal_queue = record; } } + #ifdef ECL_THREADS - ecl_giveup_spinlock(&env->interrupt_struct->signal_queue_spinlock); + ecl_mutex_unlock(&env->interrupt_struct->signal_queue_lock); #endif ecl_enable_interrupts_env(env); return value; @@ -572,7 +569,7 @@ typedef struct { } signal_thread_message; static cl_object signal_thread_process = ECL_NIL; static signal_thread_message signal_thread_msg; -static cl_object signal_thread_spinlock = ECL_NIL; +static ecl_mutex_t signal_thread_lock; static int signal_thread_pipe[2] = {-1,-1}; static void @@ -594,9 +591,9 @@ handler_fn_prototype(deferred_signal_handler, int sig, siginfo_t *siginfo, void * Note that read() will abort the thread will get notified. */ signal_thread_msg = msg; } else if (signal_thread_pipe[1] > 0) { - ecl_get_spinlock(the_env, &signal_thread_spinlock); + ecl_mutex_lock(&signal_thread_lock); write(signal_thread_pipe[1], &msg, sizeof(msg)); - ecl_giveup_spinlock(&signal_thread_spinlock); + ecl_mutex_unlock(&signal_thread_lock); } else { /* Nothing to do. There is no way to handle this signal because * the responsible thread is not running */ @@ -632,9 +629,9 @@ asynchronous_signal_servicing_thread() * We create the object for communication. We need a lock to prevent other * threads from writing before the pipe is created. */ - ecl_get_spinlock(the_env, &signal_thread_spinlock); + ecl_mutex_lock(&signal_thread_lock); pipe(signal_thread_pipe); - ecl_giveup_spinlock(&signal_thread_spinlock); + ecl_mutex_unlock(&signal_thread_lock); signal_thread_msg.process = ECL_NIL; for (;;) { cl_object signal_code; @@ -1319,6 +1316,9 @@ install_asynchronous_signal_handlers() sigprocmask(SIG_SETMASK, NULL, sigmask); # endif #endif +#if defined(ECL_THREADS) && defined(HAVE_SIGPROCMASK) + ecl_mutex_init(&signal_thread_lock, TRUE); +#endif #ifdef SIGINT if (ecl_option_values[ECL_OPT_TRAP_SIGINT]) { async_handler(SIGINT, non_evil_signal_handler, sigmask); diff --git a/src/configure b/src/configure index f65d8c6d72ff18f0bd508156d2589f58a965d59f..407c32fb609866ed454446212e5aca6c3f3519b0 100755 --- a/src/configure +++ b/src/configure @@ -5142,7 +5142,7 @@ THREAD_CFLAGS='' THREAD_LIBS='' THREAD_GC_FLAGS='--enable-threads=posix' INSTALL_TARGET='install' -THREAD_OBJ="$THREAD_OBJ threads/process threads/queue threads/mutex threads/condition_variable threads/semaphore threads/barrier threads/mailbox" +THREAD_OBJ="$THREAD_OBJ threads/process threads/mutex threads/condition_variable threads/semaphore threads/barrier threads/mailbox threads/rwlock" clibs='-lm' SONAME='' SONAME_LDFLAGS='' @@ -6225,18 +6225,25 @@ _ACEOF -$as_echo "#define ECL_RWLOCK /**/" >>confdefs.h +$as_echo "#define HAVE_POSIX_RWLOCK /**/" >>confdefs.h -$as_echo "#define HAVE_POSIX_RWLOCK /**/" >>confdefs.h +fi fi +for ac_func in pthread_mutex_timedlock +do : + ac_fn_c_check_func "$LINENO" "pthread_mutex_timedlock" "ac_cv_func_pthread_mutex_timedlock" +if test "x$ac_cv_func_pthread_mutex_timedlock" = xyes; then : + cat >>confdefs.h <<_ACEOF +#define HAVE_PTHREAD_MUTEX_TIMEDLOCK 1 +_ACEOF fi +done -THREAD_OBJ="$THREAD_OBJ threads/rwlock" boehm_configure_flags="${boehm_configure_flags} ${THREAD_GC_FLAGS}" for k in $THREAD_OBJ; do EXTRA_OBJS="$EXTRA_OBJS ${k}.${OBJEXT}"; done @@ -7326,7 +7333,7 @@ done for ac_header in sys/utsname.h float.h pwd.h dlfcn.h link.h \ mach-o/dyld.h dirent.h sys/ioctl.h sys/select.h \ - sys/wait.h semaphore.h + sys/wait.h do : as_ac_Header=`$as_echo "ac_cv_header_$ac_header" | $as_tr_sh` ac_fn_c_check_header_mongrel "$LINENO" "$ac_header" "$as_ac_Header" "$ac_includes_default" diff --git a/src/configure.ac b/src/configure.ac index f294a37fab976b6d08c0328e36054c9f5eedf6e0..515931e10c9ddfbdd1fe91b41aeec3bb0f95c44c 100644 --- a/src/configure.ac +++ b/src/configure.ac @@ -584,7 +584,7 @@ if test "${enable_threads}" = "yes" ; then else LIBS="${THREAD_LIBS} ${LIBS}" CFLAGS="${CFLAGS} ${THREAD_CFLAGS}" - ECL_POSIX_RWLOCK + ECL_PTHREAD_EXTENSIONS boehm_configure_flags="${boehm_configure_flags} ${THREAD_GC_FLAGS}" for k in $THREAD_OBJ; do EXTRA_OBJS="$EXTRA_OBJS ${k}.${OBJEXT}"; done AC_MSG_CHECKING([for thread object files]) @@ -676,7 +676,7 @@ dnl !!! end autoscan AC_CHECK_HEADERS( [sys/utsname.h float.h pwd.h dlfcn.h link.h] \ [mach-o/dyld.h dirent.h sys/ioctl.h sys/select.h] \ - [sys/wait.h semaphore.h] ) + [sys/wait.h] ) AC_CHECK_HEADERS([ulimit.h], [], [], diff --git a/src/doc/manual/extensions/mp_ref_cv.txi b/src/doc/manual/extensions/mp_ref_cv.txi index 659c8e0e4ccb5a6ec2dfc0e211ced6044a0cf040..0618e688facc9f338367e21f368fc63c455b74aa 100644 --- a/src/doc/manual/extensions/mp_ref_cv.txi +++ b/src/doc/manual/extensions/mp_ref_cv.txi @@ -26,9 +26,23 @@ Creates a condition variable. @end deftypefun @defun mp:condition-variable-wait cv lock -Release @var{lock} and suspend thread until condition -@coderef{mp:condition-variable-signal} is called on @var{cv}. When thread -resumes re-aquire @var{lock}. +Release @var{lock} and suspend thread until +@coderef{mp:condition-variable-signal} or +@coderef{mp:condition-variable-broadcast} is called on @var{cv}. When +thread resumes re-aquire @var{lock}. May signal an error if @var{lock} +is not owned by the current thread. + +@strong{Note:} In some circumstances, the thread may wake up even if no +call to @coderef{mp:condition-variable-signal} or +@coderef{mp:condition-variable-broadcast} has happened. It is +recommended to check for the condition that triggered the wait in a loop +around any @code{mp:condition-variable-wait} call. + +@strong{Note:} While the condition variable is blocked waiting for a +signal or broadcast event, calling @code{mp:condition-variable-wait} +from further threads must be done using the same mutex as that used by +the threads that are already waiting on this condition variable. The +behaviour is undefined if this constraint is violated. @end defun @@ -40,7 +54,8 @@ resumes re-aquire @var{lock}. @defun mp:condition-variable-timedwait cv lock seconds @coderef{mp:condition-variable-wait} which timeouts after @var{seconds} -seconds. +seconds. May signal an error if @var{lock} is not owned by the current +thread. @end defun @@ -51,8 +66,8 @@ seconds. @end deftypefun @defun mp:condition-variable-signal cv -Signal @var{cv} (wakes up only one waiter). After signal, signaling -thread keeps lock, waking thread goes on the queue waiting for the lock. +Wake up at least one of the waiters of @var{cv}. Usually, this will wake +up only a single thread, but it may also wake up multiple threads. See @coderef{mp:condition-variable-wait}. @end defun @@ -65,7 +80,7 @@ See @coderef{mp:condition-variable-wait}. @end deftypefun @defun mp:condition-variable-broadcast cv -Signal @var{cv} (wakes up all waiters). +Wake up all waiters of @var{cv}. See @coderef{mp:condition-variable-wait}. @end defun diff --git a/src/doc/manual/extensions/mp_ref_mutex.txi b/src/doc/manual/extensions/mp_ref_mutex.txi index 4e62dfa862b0d8c28dc0bd3be3c88988d6d00ea9..5837b7b274acd862265648c32adfd76e24a335d7 100644 --- a/src/doc/manual/extensions/mp_ref_mutex.txi +++ b/src/doc/manual/extensions/mp_ref_mutex.txi @@ -67,8 +67,9 @@ Returns the name of @var{lock}. @end deftypefun @defun mp:lock-owner lock -Returns the process owning @var{lock}. For testing whether the current -thread is holding a lock see @coderef{mp:holding-lock-p}. +Returns the process owning @var{lock} or @code{nil} if the mutex is not +owned by any process. For testing whether the current thread +is holding a lock see @coderef{mp:holding-lock-p}. @end defun @@ -79,7 +80,7 @@ thread is holding a lock see @coderef{mp:holding-lock-p}. @end deftypefun @defun mp:lock-count lock -Returns number of processes waiting for @var{lock}. +Returns number of times @var{lock} has been locked. @end defun @@ -100,8 +101,11 @@ returns @code{ECL_NIL}, otherwise @code{ECL_T}. @defun mp:get-lock lock &optional (wait t) Tries to acquire a lock. @var{wait} indicates whether function should block or give up if @var{lock} is already taken. If @var{wait} is -@code{nil} and @var{lock} can't be acquired returns -@code{nil}. Succesful operation returns @code{t}. +@code{nil}, immediately return, if @var{wait} is a real number +@var{wait} specifies a timeout in seconds and otherwise block until the +lock becomes available. If @var{lock} can't be acquired return +@code{nil}. Succesful operation returns @code{t}. Will signal an error +if the mutex is non-recursive and current thread already owns the lock. @end defun @@ -112,15 +116,17 @@ block or give up if @var{lock} is already taken. If @var{wait} is @end deftypefun @defun mp:giveup-lock lock -Releases @var{lock}. +Releases @var{lock} and returns @code{t}. May signal an error if the +lock is not owned by the current thread. @end defun @lspdef mp:with-lock @defmac mp:with-lock (lock-form) &body body -Acquire lock for the dynamic scope of @var{body}, which is executed -with the lock held by current thread. Returns the values of body. +Acquire lock for the dynamic scope of @var{body}, which is executed with +the lock held by current thread. Returns the values of +body. @c (lock-form &key wait-p timeout) diff --git a/src/doc/manual/extensions/mp_ref_process.txi b/src/doc/manual/extensions/mp_ref_process.txi index 1cc61e7c9a71a302f415ef6f9885f5ec045c3726..3857d1a99d2714cddcb6edf1ea162a20f48be7f7 100644 --- a/src/doc/manual/extensions/mp_ref_process.txi +++ b/src/doc/manual/extensions/mp_ref_process.txi @@ -58,6 +58,12 @@ one has to consider: @item Reentrancy: Functions, which usually are not called recursively can be re-entered during execution of the interrupt. @item Stack unwinding: Non-local jumps like @code{throw} or @code{return-from} in the interrupting code will handle @code{unwind-protect} forms like usual. However, the cleanup forms of an @code{unwind-protect} can still be interrupted. In that case the execution flow will jump to the next @code{unwind-protect}. @end itemize +Note also that no guarantees are made that functions from the Common +Lisp standard or ECL extensions are interrupt safe (although most of +them will be). In particular, the compiler (@code{compile} and +@code{compile-file} functions), FFI calls and aquire/release functions +for multithreading synchronization objects like mutexes or condition +variables should not be interrupted by @coderef{mp:interrupt-process}. @exindex Process interruption Example: diff --git a/src/doc/manual/extensions/mp_ref_rwlock.txi b/src/doc/manual/extensions/mp_ref_rwlock.txi index 305ca16475b9a62eca284485b28944ecf14dbefd..973d8700dccece3ef304604849e85a9c00d6ba08 100644 --- a/src/doc/manual/extensions/mp_ref_rwlock.txi +++ b/src/doc/manual/extensions/mp_ref_rwlock.txi @@ -7,11 +7,8 @@ Readers-writer (or shared-exclusive ) locks allow concurrent access for read-only operations, while write operations require exclusive -access. @code{mp:rwlock} is non-recursive. - -Readers-writers locks are an optional feature, which is available if -@code{*features*} includes @code{:ecl-read-write-lock}. - +access. @code{mp:rwlock} is non-recursive and cannot be used together +with condition variables. @node Readers-writer locks dictionary @subsection Read-Write locks dictionary diff --git a/src/ecl/configpre.h b/src/ecl/configpre.h index 6b886dd8baa2f3a3e42c3a7016230283a60c4dc0..0d0f66c71473747a5caf7b87010570f387ecc979 100644 --- a/src/ecl/configpre.h +++ b/src/ecl/configpre.h @@ -45,9 +45,6 @@ /* Define if your newline is CRLF */ #undef ECL_NEWLINE_IS_CRLF -/* ECL_RWLOCK */ -#undef ECL_RWLOCK - /* ECL_SIGNED_ZERO */ #undef ECL_SIGNED_ZERO @@ -439,6 +436,9 @@ /* Define to 1 if you have the `powf' function. */ #undef HAVE_POWF +/* Define to 1 if you have the `pthread_mutex_timedlock' function. */ +#undef HAVE_PTHREAD_MUTEX_TIMEDLOCK + /* Define to 1 if the system has the type `pthread_rwlock_t'. */ #undef HAVE_PTHREAD_RWLOCK_T @@ -464,9 +464,6 @@ /* Define to 1 if you have the `select' function. */ #undef HAVE_SELECT -/* Define to 1 if you have the header file. */ -#undef HAVE_SEMAPHORE_H - /* Define to 1 if you have the `setenv' function. */ #undef HAVE_SETENV diff --git a/src/h/config-internal.h.in b/src/h/config-internal.h.in index fe36e7f03417227b73bda2754604939582196179..612bd2b5308e8c6af58fc9721ac849c529e1e274 100644 --- a/src/h/config-internal.h.in +++ b/src/h/config-internal.h.in @@ -131,6 +131,8 @@ #undef HAVE_SEM_INIT /* whether we have read/write locks */ #undef HAVE_POSIX_RWLOCK +/* whether we have mutex lock operations with timeout */ +#undef HAVE_PTHREAD_MUTEX_TIMEDLOCK /* uname() for system identification */ #undef HAVE_UNAME #undef HAVE_UNISTD_H diff --git a/src/h/ecl_atomics.h b/src/h/ecl_atomics.h index eb9243f92f85deabd96bbfd1b6d0088ed1ba9044..ee1c6c86ca7ef6f20f43e918fa0bc489154b55e3 100644 --- a/src/h/ecl_atomics.h +++ b/src/h/ecl_atomics.h @@ -29,9 +29,6 @@ # if !defined(AO_HAVE_compare_and_swap) # error "ECL needs AO_compare_and_swap or an equivalent" # endif -# if !defined(AO_HAVE_fetch_and_add1) -# error "Cannot implement mailboxs without AO_fetch_and_add1" -# endif # if !defined(AO_HAVE_fetch_and_add) # error "ECL needs AO_fetch_and_add or an equivalent" # endif diff --git a/src/h/external.h b/src/h/external.h index 549d48661e09bdcfdffc705e7f0b9efa145934d0..9f99d14b65469a8b80095203c2664612e500c6ad 100755 --- a/src/h/external.h +++ b/src/h/external.h @@ -150,7 +150,7 @@ struct cl_env_struct { struct ecl_interrupt_struct { cl_object pending_interrupt; cl_object signal_queue; - cl_object signal_queue_spinlock; + ecl_mutex_t signal_queue_lock; }; #ifndef __GNUC__ @@ -224,10 +224,10 @@ struct cl_core_struct { #ifdef ECL_THREADS cl_object processes; - cl_object processes_spinlock; - cl_object global_lock; - cl_object error_lock; - cl_object global_env_lock; + ecl_mutex_t processes_lock; + ecl_mutex_t global_lock; + ecl_mutex_t error_lock; + ecl_rwlock_t global_env_lock; #endif cl_object libraries; @@ -598,6 +598,8 @@ extern ECL_API void FEinvalid_function(cl_object obj) ecl_attr_noreturn; extern ECL_API void FEinvalid_function_name(cl_object obj) ecl_attr_noreturn; extern ECL_API void FEprint_not_readable(cl_object obj) ecl_attr_noreturn; extern ECL_API void FEtimeout() ecl_attr_noreturn; +extern ECL_API void FEerror_not_owned(cl_object lock) ecl_attr_noreturn; +extern ECL_API void FEunknown_lock_error(cl_object lock) ecl_attr_noreturn; extern ECL_API cl_object CEerror(cl_object c, const char *err_str, int narg, ...); extern ECL_API void FElibc_error(const char *msg, int narg, ...) ecl_attr_noreturn; #if defined(ECL_MS_WINDOWS_HOST) || defined(cygwin) diff --git a/src/h/internal.h b/src/h/internal.h index e88de9f6e2c5fa5298eda1768e7a6a2872a4ef64..f3f095d9226dfea6b2fea2ce5cf4686d65bdd200 100755 --- a/src/h/internal.h +++ b/src/h/internal.h @@ -311,6 +311,10 @@ extern cl_fixnum ecl_option_values[ECL_OPT_LIMIT+1]; extern void ecl_init_bignum_registers(cl_env_ptr env); extern void ecl_clear_bignum_registers(cl_env_ptr env); +/* threads/mutex.d */ + +extern cl_object si_mutex_timeout(); + /* print.d */ extern cl_object _ecl_stream_or_default_output(cl_object stream); @@ -336,67 +340,12 @@ extern void _ecl_string_push_c_string(cl_object s, const char *c); extern void cl_write_object(cl_object x, cl_object stream); -/* global locks */ +/* threads/rwlock.d */ #ifdef ECL_THREADS -# define ECL_WITH_GLOBAL_LOCK_BEGIN(the_env) \ - ECL_WITH_LOCK_BEGIN(the_env, cl_core.global_lock) -# define ECL_WITH_GLOBAL_LOCK_END \ - ECL_WITH_LOCK_END -# define ECL_WITH_LOCK_BEGIN(the_env,lock) { \ - const cl_env_ptr __ecl_the_env = the_env; \ - const cl_object __ecl_the_lock = lock; \ - ecl_disable_interrupts_env(__ecl_the_env); \ - mp_get_lock_wait(__ecl_the_lock); \ - ECL_UNWIND_PROTECT_BEGIN(__ecl_the_env); \ - ecl_enable_interrupts_env(__ecl_the_env); -# define ECL_WITH_LOCK_END \ - ECL_UNWIND_PROTECT_THREAD_SAFE_EXIT { \ - mp_giveup_lock(__ecl_the_lock); \ - } ECL_UNWIND_PROTECT_THREAD_SAFE_END; } -# define ECL_WITH_SPINLOCK_BEGIN(the_env,lock) { \ - const cl_env_ptr __ecl_the_env = (the_env); \ - cl_object *__ecl_the_lock = (lock); \ - ECL_UNWIND_PROTECT_BEGIN(__ecl_the_env); \ - ecl_get_spinlock(__ecl_the_env, __ecl_the_lock); -# define ECL_WITH_SPINLOCK_END \ - ECL_UNWIND_PROTECT_THREAD_SAFE_EXIT { \ - ecl_giveup_spinlock(__ecl_the_lock); \ - } ECL_UNWIND_PROTECT_THREAD_SAFE_END; } -#else -# define ECL_WITH_GLOBAL_LOCK_BEGIN(the_env) -# define ECL_WITH_GLOBAL_LOCK_END -# define ECL_WITH_LOCK_BEGIN(the_env,lock) -# define ECL_WITH_LOCK_END -# define ECL_WITH_SPINLOCK_BEGIN(the_env,lock) -# define ECL_WITH_SPINLOCK_END -#endif /* ECL_THREADS */ - -#ifdef ECL_RWLOCK -# define ECL_WITH_GLOBAL_ENV_RDLOCK_BEGIN(the_env) { \ - const cl_env_ptr __ecl_pack_env = the_env; \ - ecl_bds_bind(__ecl_pack_env, ECL_INTERRUPTS_ENABLED, ECL_NIL); \ - mp_get_rwlock_read_wait(cl_core.global_env_lock); -# define ECL_WITH_GLOBAL_ENV_RDLOCK_END \ - mp_giveup_rwlock_read(cl_core.global_env_lock); \ - ecl_bds_unwind1(__ecl_pack_env); \ - ecl_check_pending_interrupts(__ecl_pack_env); } -# define ECL_WITH_GLOBAL_ENV_WRLOCK_BEGIN(the_env) { \ - const cl_env_ptr __ecl_pack_env = the_env; \ - ecl_bds_bind(__ecl_pack_env, ECL_INTERRUPTS_ENABLED, ECL_NIL); \ - mp_get_rwlock_write_wait(cl_core.global_env_lock); -# define ECL_WITH_GLOBAL_ENV_WRLOCK_END \ - mp_giveup_rwlock_write(cl_core.global_env_lock); \ - ecl_bds_unwind1(__ecl_pack_env); \ - ecl_check_pending_interrupts(__ecl_pack_env); } -#else -# define ECL_WITH_GLOBAL_ENV_RDLOCK_BEGIN(the_env) -# define ECL_WITH_GLOBAL_ENV_RDLOCK_END -# define ECL_WITH_GLOBAL_ENV_WRLOCK_BEGIN(the_env) -# define ECL_WITH_GLOBAL_ENV_WRLOCK_END -#endif /* ECL_RWLOCK */ - -#include +extern cl_object mp_get_rwlock_read_wait(cl_object lock); +extern cl_object mp_get_rwlock_write_wait(cl_object lock); +#endif /* read.d */ #ifdef ECL_UNICODE @@ -465,29 +414,6 @@ extern void ecl_musleep(double time, bool alertable); #define UTC_time_to_universal_time(x) ecl_plus(ecl_make_integer(x),cl_core.Jan1st1970UT) extern cl_fixnum ecl_runtime(void); -/* threads/mutex.d */ - -#ifdef ECL_THREADS -typedef cl_object (*mp_wait_test)(cl_env_ptr, cl_object); - -extern void ecl_process_yield(void); -extern void print_lock(char *s, cl_object lock, ...); -#define print_lock(...) ((void)0) -extern void ecl_get_spinlock(cl_env_ptr env, cl_object *lock); -extern void ecl_giveup_spinlock(cl_object *lock); -extern cl_object ecl_wait_on(cl_env_ptr env, mp_wait_test test, cl_object object); -extern void ecl_wakeup_waiters(cl_env_ptr the_env, cl_object o, int flags); -extern void ecl_wakeup_process(cl_object process); -extern cl_object ecl_waiter_pop(cl_env_ptr the_env, cl_object q); -#endif - -/* threads/rwlock.d */ - -#ifdef ECL_THREADS -extern cl_object mp_get_rwlock_read_wait(cl_object lock); -extern cl_object mp_get_rwlock_write_wait(cl_object lock); -#endif - /* unixfsys.d */ /* Filename encodings: on Unix we use ordinary chars encoded in a user @@ -638,6 +564,68 @@ extern void ecl_interrupt_process(cl_object process, cl_object function); # endif #endif /* ECL_MS_WINDOWS_HOST */ +/* global locks */ + +#include + +#ifdef ECL_THREADS +# define ECL_WITH_GLOBAL_LOCK_BEGIN(the_env) \ + ECL_WITH_NATIVE_LOCK_BEGIN(the_env, &cl_core.global_lock) +# define ECL_WITH_GLOBAL_LOCK_END \ + ECL_WITH_NATIVE_LOCK_END +# define ECL_WITH_LOCK_BEGIN(the_env,lock) { \ + const cl_env_ptr __ecl_the_env = the_env; \ + const cl_object __ecl_the_lock = lock; \ + ecl_disable_interrupts_env(__ecl_the_env); \ + mp_get_lock_wait(__ecl_the_lock); \ + ECL_UNWIND_PROTECT_BEGIN(__ecl_the_env); \ + ecl_enable_interrupts_env(__ecl_the_env); +# define ECL_WITH_LOCK_END \ + ECL_UNWIND_PROTECT_THREAD_SAFE_EXIT { \ + mp_giveup_lock(__ecl_the_lock); \ + } ECL_UNWIND_PROTECT_THREAD_SAFE_END; } +# define ECL_WITH_NATIVE_LOCK_BEGIN(the_env,lock) { \ + const cl_env_ptr __ecl_the_env = (the_env); \ + ecl_mutex_t* __ecl_the_lock = (lock); \ + ecl_disable_interrupts_env(__ecl_the_env); \ + ecl_mutex_lock(__ecl_the_lock); \ + ECL_UNWIND_PROTECT_BEGIN(__ecl_the_env); \ + ecl_enable_interrupts_env(__ecl_the_env); +# define ECL_WITH_NATIVE_LOCK_END \ + ECL_UNWIND_PROTECT_THREAD_SAFE_EXIT { \ + ecl_mutex_unlock(__ecl_the_lock); \ + } ECL_UNWIND_PROTECT_THREAD_SAFE_END; } +# define ECL_WITH_GLOBAL_ENV_RDLOCK_BEGIN(the_env) { \ + const cl_env_ptr __ecl_pack_env = the_env; \ + ecl_bds_bind(__ecl_pack_env, ECL_INTERRUPTS_ENABLED, ECL_NIL); \ + ecl_rwlock_lock_read(&cl_core.global_env_lock); +# define ECL_WITH_GLOBAL_ENV_RDLOCK_END \ + ecl_rwlock_unlock_read(&cl_core.global_env_lock); \ + ecl_bds_unwind1(__ecl_pack_env); \ + ecl_check_pending_interrupts(__ecl_pack_env); } +# define ECL_WITH_GLOBAL_ENV_WRLOCK_BEGIN(the_env) { \ + const cl_env_ptr __ecl_pack_env = the_env; \ + ecl_bds_bind(__ecl_pack_env, ECL_INTERRUPTS_ENABLED, ECL_NIL); \ + ecl_rwlock_lock_write(&cl_core.global_env_lock); +# define ECL_WITH_GLOBAL_ENV_WRLOCK_END \ + ecl_rwlock_unlock_write(&cl_core.global_env_lock); \ + ecl_bds_unwind1(__ecl_pack_env); \ + ecl_check_pending_interrupts(__ecl_pack_env); } +#else +# define ECL_WITH_GLOBAL_LOCK_BEGIN(the_env) +# define ECL_WITH_GLOBAL_LOCK_END +# define ECL_WITH_LOCK_BEGIN(the_env,lock) +# define ECL_WITH_LOCK_END +# define ECL_WITH_NATIVE_LOCK_BEGIN(the_env,lock) +# define ECL_WITH_NATIVE_LOCK_END +# define ECL_WITH_GLOBAL_ENV_RDLOCK_BEGIN(the_env) +# define ECL_WITH_GLOBAL_ENV_RDLOCK_END +# define ECL_WITH_GLOBAL_ENV_WRLOCK_BEGIN(the_env) +# define ECL_WITH_GLOBAL_ENV_WRLOCK_END +#endif /* ECL_THREADS */ + +#include + /* * Fake several ISO C99 mathematical functions if not available */ diff --git a/src/h/object.h b/src/h/object.h index 4b2ebd3e1e62c12995b46ab9f02d9baf39d1662e..69c26660f490582e54563327d74f056b843bb4a0 100644 --- a/src/h/object.h +++ b/src/h/object.h @@ -928,12 +928,40 @@ struct ecl_dummy { }; #ifdef ECL_THREADS + +#ifdef ECL_WINDOWS_THREADS +typedef HANDLE ecl_mutex_t; +typedef struct ecl_cond_var_t { + HANDLE broadcast_event; + HANDLE signal_event; + cl_index state; +} ecl_cond_var_t; +typedef SRWLOCK ecl_rwlock_t; +#else +typedef pthread_mutex_t ecl_mutex_t; +typedef pthread_cond_t ecl_cond_var_t; +# ifdef HAVE_POSIX_RWLOCK +typedef pthread_rwlock_t ecl_rwlock_t; +# else +typedef struct ecl_rwlock_t { + pthread_mutex_t mutex; + pthread_cond_t reader_cv; + pthread_cond_t writer_cv; + /* reader_count = 0: lock is free + * reader_count =-1: writer holds the lock + * reader_count > 0: number of readers */ + cl_fixnum reader_count; +} ecl_rwlock_t; +# endif +#endif + enum { ECL_PROCESS_INACTIVE = 0, ECL_PROCESS_BOOTING, ECL_PROCESS_ACTIVE, ECL_PROCESS_EXITING }; + struct ecl_process { _ECL_HDR; cl_object name; @@ -943,11 +971,11 @@ struct ecl_process { cl_object interrupt; cl_object initial_bindings; cl_object parent; - cl_object exit_barrier; cl_object exit_values; cl_object woken_up; cl_object queue_record; - cl_object start_stop_spinlock; + ecl_mutex_t start_stop_lock; /* phase is updated only when we hold this lock */ + ecl_cond_var_t exit_barrier; /* process-join waits on this barrier */ cl_index phase; #ifdef ECL_WINDOWS_THREADS HANDLE thread; @@ -965,33 +993,30 @@ enum { ECL_WAKEUP_DELETE = 8 }; -struct ecl_queue { - _ECL_HDR; - cl_object list; - cl_object spinlock; -}; - struct ecl_semaphore { _ECL_HDR; - cl_object queue_list; - cl_object queue_spinlock; cl_object name; cl_fixnum counter; + cl_fixnum wait_count; + ecl_mutex_t mutex; + ecl_cond_var_t cv; }; +#define ECL_BARRIER_WAKEUP_NORMAL 1 +#define ECL_BARRIER_WAKEUP_KILL 2 + struct ecl_barrier { - _ECL_HDR; - cl_object queue_list; - cl_object queue_spinlock; + _ECL_HDR2(disabled,wakeup); cl_object name; - cl_fixnum count; - cl_fixnum arrivers_count; + cl_index count; + cl_index arrivers_count; + ecl_mutex_t mutex; + ecl_cond_var_t cv; }; struct ecl_lock { _ECL_HDR1(recursive); - cl_object queue_list; - cl_object queue_spinlock; + ecl_mutex_t mutex; cl_object owner; /* thread holding the lock or NIL */ cl_object name; cl_fixnum counter; @@ -1001,28 +1026,23 @@ struct ecl_mailbox { _ECL_HDR; cl_object name; cl_object data; - cl_object reader_semaphore; - cl_object writer_semaphore; + ecl_mutex_t mutex; + ecl_cond_var_t reader_cv; + ecl_cond_var_t writer_cv; + cl_index message_count; cl_index read_pointer; cl_index write_pointer; - cl_index mask; }; struct ecl_rwlock { _ECL_HDR; cl_object name; -#ifdef ECL_RWLOCK - pthread_rwlock_t mutex; -#else - cl_object mutex; -#endif + ecl_rwlock_t mutex; }; struct ecl_condition_variable { _ECL_HDR; - cl_object queue_list; - cl_object queue_spinlock; - cl_object lock; + ecl_cond_var_t cv; }; #endif /* ECL_THREADS */ @@ -1127,7 +1147,6 @@ union cl_lispunion { struct ecl_instance instance; /* clos instance */ #ifdef ECL_THREADS struct ecl_process process; /* process */ - struct ecl_queue queue; /* lock */ struct ecl_lock lock; /* lock */ struct ecl_rwlock rwlock; /* read/write lock */ struct ecl_condition_variable condition_variable; /* condition-variable */ diff --git a/src/h/threads.h b/src/h/threads.h new file mode 100644 index 0000000000000000000000000000000000000000..38a390ca1c11349a093ed023836272ceba472fa8 --- /dev/null +++ b/src/h/threads.h @@ -0,0 +1,702 @@ +/* -*- Mode: C; c-basic-offset: 2; indent-tabs-mode: nil -*- */ +/* vim: set filetype=c tabstop=2 shiftwidth=2 expandtab: */ + +/* + * threads.h - wrapper for mutex and condition variable operating + * system primitives + * + * Copyright (c) 2020, Marius Gerbershagen + * + * See file 'LICENSE' for the copyright details. + * + */ + +#ifdef ECL_THREADS + +#ifndef ECL_MUTEX_H +#define ECL_MUTEX_H + +#include +#ifdef ECL_WINDOWS_THREADS +# include +# include +#else +# include +#endif +#include +#ifdef HAVE_GETTIMEOFDAY +# include +#endif +#include + +#if !defined(ECL_WINDOWS_THREADS) + +#define ECL_MUTEX_SUCCESS 0 +#define ECL_MUTEX_LOCKED EBUSY +#define ECL_MUTEX_NOT_OWNED EPERM +#define ECL_MUTEX_TIMEOUT ETIMEDOUT +#define ECL_MUTEX_DEADLOCK EDEADLK + +/* MUTEX */ + +/* Non-recursive locks are only provided as an optimization; on + * Windows locks are always recursive. If ECL_MUTEX_DEADLOCK is + * undefined, recursive locks are not available. */ + +static inline void +ecl_mutex_init(ecl_mutex_t *mutex, bool recursive) +{ + pthread_mutexattr_t mutexattr[1]; + pthread_mutexattr_init(mutexattr); + if (recursive) { + pthread_mutexattr_settype(mutexattr, PTHREAD_MUTEX_RECURSIVE); + } else { + pthread_mutexattr_settype(mutexattr, PTHREAD_MUTEX_ERRORCHECK); + } + pthread_mutex_init(mutex, mutexattr); +} + +static inline void +ecl_mutex_destroy(ecl_mutex_t *mutex) +{ + pthread_mutex_destroy(mutex); +} + +static inline int +ecl_mutex_unlock(ecl_mutex_t *mutex) +{ + return pthread_mutex_unlock(mutex); +} + +static inline int +ecl_mutex_trylock(ecl_mutex_t *mutex) +{ + return pthread_mutex_trylock(mutex); +} + +static inline int +ecl_mutex_lock(ecl_mutex_t *mutex) +{ + return pthread_mutex_lock(mutex); +} + +/* CONDITION VARIABLE */ + +static inline void +add_timeout_delta(struct timespec *ts, double seconds) +{ + struct timeval tp; + + gettimeofday(&tp, NULL); + /* Convert from timeval to timespec */ + ts->tv_sec = tp.tv_sec; + ts->tv_nsec = tp.tv_usec * 1000; + + /* Add `seconds' delta */ + ts->tv_sec += (time_t)floor(seconds); + ts->tv_nsec += (long)((seconds - floor(seconds)) * 1e9); + if (ts->tv_nsec >= 1e9) { + ts->tv_nsec -= 1e9; + ts->tv_sec++; + } +} + +static inline int +ecl_mutex_timedlock(ecl_mutex_t *mutex, double seconds) +{ +#if defined(HAVE_PTHREAD_MUTEX_TIMEDLOCK) + struct timespec ts; + add_timeout_delta(&ts, seconds); + return pthread_mutex_timedlock(mutex, &ts); +#else + /* Not implemented, see mutex.d for alternative implementation using interrupts */ + return -1; +#endif +} + +static inline void +ecl_cond_var_init(ecl_cond_var_t *cv) +{ + pthread_cond_init(cv, NULL); +} + +static inline void +ecl_cond_var_destroy(ecl_cond_var_t *cv) +{ + pthread_cond_destroy(cv); +} + +static inline int +ecl_cond_var_wait(ecl_cond_var_t *cv, ecl_mutex_t *mutex) +{ + return pthread_cond_wait(cv, mutex); +} + +static inline int +ecl_cond_var_timedwait(ecl_cond_var_t *cv, ecl_mutex_t *mutex, double seconds) +{ + struct timespec ts; + add_timeout_delta(&ts, seconds); + return pthread_cond_timedwait(cv, mutex, &ts); +} + +static inline int +ecl_cond_var_signal(ecl_cond_var_t *cv) +{ + return pthread_cond_signal(cv); +} + +static inline int +ecl_cond_var_broadcast(ecl_cond_var_t *cv) +{ + return pthread_cond_broadcast(cv); +} + +/* READ-WRITE LOCK */ + +/* If posix rwlocks are not available, we provide our own fallback + * implementation. Note that for reasons of simplicity and performance + * this implementation is a) not interrupt safe, b) will not signal + * errors on unlocking a mutex which is owned by a different thread + * and c) allows for writer starvation, i.e. as long as readers are + * active, the writer will never obtain the lock. */ + +static inline void +ecl_rwlock_init(ecl_rwlock_t *rwlock) +{ +#if defined(HAVE_POSIX_RWLOCK) + pthread_rwlock_init(rwlock, NULL); +#else + pthread_mutex_init(&rwlock->mutex, NULL); + pthread_cond_init(&rwlock->reader_cv, NULL); + pthread_cond_init(&rwlock->writer_cv, NULL); + rwlock->reader_count = 0; +#endif +} + +static inline void +ecl_rwlock_destroy(ecl_rwlock_t *rwlock) +{ +#if defined(HAVE_POSIX_RWLOCK) + pthread_rwlock_destroy(rwlock); +#else + pthread_mutex_destroy(&rwlock->mutex); + pthread_cond_destroy(&rwlock->reader_cv); + pthread_cond_destroy(&rwlock->writer_cv); +#endif +} + +static inline int +ecl_rwlock_unlock_read(ecl_rwlock_t *rwlock) +{ +#if defined(HAVE_POSIX_RWLOCK) + return pthread_rwlock_unlock(rwlock); +#else + int rc; + pthread_mutex_lock(&rwlock->mutex); + if (rwlock->reader_count <= 0) { + rc = ECL_MUTEX_NOT_OWNED; + } else { + if (--rwlock->reader_count == 0) { + pthread_cond_signal(&rwlock->writer_cv); + } + rc = ECL_MUTEX_SUCCESS; + } + pthread_mutex_unlock(&rwlock->mutex); + return rc; +#endif +} + +static inline int +ecl_rwlock_unlock_write(ecl_rwlock_t *rwlock) +{ +#if defined(HAVE_POSIX_RWLOCK) + return pthread_rwlock_unlock(rwlock); +#else + int rc; + pthread_mutex_lock(&rwlock->mutex); + if (rwlock->reader_count >= 0) { + rc = ECL_MUTEX_NOT_OWNED; + } else { + rwlock->reader_count = 0; + pthread_cond_signal(&rwlock->writer_cv); + pthread_cond_broadcast(&rwlock->reader_cv); + rc = ECL_MUTEX_SUCCESS; + } + pthread_mutex_unlock(&rwlock->mutex); + return rc; +#endif +} + +static inline int +ecl_rwlock_trylock_read(ecl_rwlock_t *rwlock) +{ +#if defined(HAVE_POSIX_RWLOCK) + return pthread_rwlock_tryrdlock(rwlock); +#else + int rc; + pthread_mutex_lock(&rwlock->mutex); + if (rwlock->reader_count == -1) { + rc = ECL_MUTEX_LOCKED; + } else { + ++rwlock->reader_count; + rc = ECL_MUTEX_SUCCESS; + } + pthread_mutex_unlock(&rwlock->mutex); + return rc; +#endif +} + +static inline int +ecl_rwlock_trylock_write(ecl_rwlock_t *rwlock) +{ +#if defined(HAVE_POSIX_RWLOCK) + return pthread_rwlock_trywrlock(rwlock); +#else + int rc; + pthread_mutex_lock(&rwlock->mutex); + if (rwlock->reader_count != 0) { + rc = ECL_MUTEX_LOCKED; + } else { + rwlock->reader_count = -1; + rc = ECL_MUTEX_SUCCESS; + } + pthread_mutex_unlock(&rwlock->mutex); + return rc; +#endif +} + +static inline int +ecl_rwlock_lock_read(ecl_rwlock_t *rwlock) +{ +#if defined(HAVE_POSIX_RWLOCK) + return pthread_rwlock_rdlock(rwlock); +#else + pthread_mutex_lock(&rwlock->mutex); + while (rwlock->reader_count == -1) { + pthread_cond_wait(&rwlock->reader_cv, &rwlock->mutex); + } + ++rwlock->reader_count; + pthread_mutex_unlock(&rwlock->mutex); + return ECL_MUTEX_SUCCESS; +#endif +} + +static inline int +ecl_rwlock_lock_write(ecl_rwlock_t *rwlock) +{ +#if defined(HAVE_POSIX_RWLOCK) + return pthread_rwlock_wrlock(rwlock); +#else + pthread_mutex_lock(&rwlock->mutex); + while (rwlock->reader_count != 0) { + pthread_cond_wait(&rwlock->writer_cv, &rwlock->mutex); + } + rwlock->reader_count = -1; + pthread_mutex_unlock(&rwlock->mutex); + return ECL_MUTEX_SUCCESS; +#endif +} + +#else /* ECL_WINDOWS_THREADS */ + +/* To allow for timed wait operations on locks and for interrupting + * deadlocked threads, we use Windows mutexes instead of critical + * section objects (which would serve the same purpose and be slightly + * faster). This also requires implementing our own version of + * condition variables, since the ones provided by Windows only work + * together with critical sections. */ + +#define ECL_MUTEX_SUCCESS 0 +#define ECL_MUTEX_LOCKED -1 +#define ECL_MUTEX_NOT_OWNED ERROR_NOT_OWNER +#define ECL_MUTEX_TIMEOUT ERROR_TIMEOUT +#undef ECL_MUTEX_DEADLOCK + +/* MUTEX */ + +static inline void +ecl_mutex_init(ecl_mutex_t *mutex, bool recursive) +{ + *mutex = CreateMutex(NULL, FALSE, NULL); +} + +static inline void +ecl_mutex_destroy(ecl_mutex_t *mutex) +{ + CloseHandle(*mutex); +} + +static inline int +ecl_mutex_unlock(ecl_mutex_t *mutex) +{ + return ReleaseMutex(*mutex) ? ECL_MUTEX_SUCCESS : GetLastError(); +} + +static inline int +ecl_mutex_trylock(ecl_mutex_t *mutex) +{ + switch (WaitForSingleObject(*mutex, 0)) { + case WAIT_OBJECT_0: + case WAIT_ABANDONED: + return ECL_MUTEX_SUCCESS; + case WAIT_TIMEOUT: + return ECL_MUTEX_LOCKED; + default: + return GetLastError(); + } +} + +static inline int +ecl_mutex_lock(ecl_mutex_t *mutex) +{ + AGAIN: + switch (WaitForSingleObjectEx(*mutex, INFINITE, TRUE)) { + case WAIT_OBJECT_0: + case WAIT_ABANDONED: + return ECL_MUTEX_SUCCESS; + case WAIT_IO_COMPLETION: + goto AGAIN; + case WAIT_TIMEOUT: + return ECL_MUTEX_LOCKED; + default: + return GetLastError(); + } +} + +static inline DWORD +remaining_milliseconds(double seconds, DWORD start_ticks) +{ + DWORD ret = ((DWORD) seconds * 1000.0) - (GetTickCount() - start_ticks); + return (ret < 0) ? 0 : ret; +} + +static inline int +ecl_mutex_timedlock(ecl_mutex_t *mutex, double seconds) +{ + DWORD start_ticks = GetTickCount(); + AGAIN: + switch (WaitForSingleObjectEx(*mutex, remaining_milliseconds(seconds, start_ticks), TRUE)) { + case WAIT_OBJECT_0: + case WAIT_ABANDONED: + return ECL_MUTEX_SUCCESS; + case WAIT_IO_COMPLETION: + goto AGAIN; + case WAIT_TIMEOUT: + return ECL_MUTEX_TIMEOUT; + default: + return GetLastError(); + } +} + +/* CONDITION VARIABLE */ + +/* IMPLEMENTATION + * -------------- + * Condition variables are implemented on top of the event + * synchronization objects provided by the Windows operating + * system. + * + * Event objects allow a thread to wait until a certain event is + * signaled. If the event has already been signaled before the thread + * checks for the event, no wait is performed and WaitForSingleObject, + * WaitForMultipleObjects returns immediately. Event objects can be of + * the manual reset flavor which stays signaled until explicitely + * reset and auto reset which wakes up only one thread and then resets + * itself automatically. We use a manual reset event for broadcasting + * (waking up all threads waiting on the condition variable) and an + * auto reset event for signaling (waking up only a single thread). + * + * The complete state of the condition variable (how many threads are + * waiting and whether we are waiting, waking up threads or reseting + * the broadcasting event) is stored in an integer which is + * manipulated using atomic operations. + * + * RACE CONDITION SAFETY + * --------------------- + * To prevent wakeup signals from getting lost, condition variables + * are required to atomically unlock the associated mutex when + * starting a wait operation. While it is not possible to atomically + * unlock a mutex and wait for an event on Windows, the implementation + * is nevertheless race condition free in this regard. + * + * Consider two threads, thread A which starts a wait operation on a + * condition variable and thread B which wakes up the same condition + * variable. A increases the wait count, releases the mutex but + * doesn't wait for a wakeup event yet. Thread B acquires the mutex, + * and signals an event (broadcast or signal). Since the event is + * reset only after a waiting thread has been released, at the time + * that thread A starts waiting the event is still signaled and so the + * wakeup event is not lost. Note that for this to work, it is crucial + * that the wait count is increased before the mutex is released in + * order for thread B to know that a thread is about to start waiting. + * + * Further race conditions between multiple broadcasting/signaling + * threads threads are prevented by allowing only one operation + * (signaling or broadcasting) at a time to happen. The case in which + * threads start waiting during a wakeup operation is treated as if + * the waiting thread had arrived before the wakeup. Race conditions + * between threads which start waiting while the broadcast event is + * reset are prevented by letting the about to be waiting threads spin + * until the reset has finished. + * + * INTERRUPT SAFETY + * ---------------- + * INV: On Windows, interrupts happen only when accessing the thread + * local environment or when entering an alertable wait state. In the + * latter case, an arriving interrupt causes the wait function to + * return WAIT_IO_COMPLETION. + * + * This allows us to provide an interrupt safe implementation by + * queueing the interrupt, fixing up the wait count of the condition + * variable and then executing the interrupt. If the interrupt doesn't + * execute a non-local jump, we resume waiting again. + * + * Note that while this prevents the state of the condition variable + * from getting corrupted, it doesn't prevent wakup events which + * arrive while the interrupt is executing from getting lost. It is up + * to the user to prevent this type of logical bugs. + */ + +#define ECL_COND_VAR_BROADCAST 1 +#define ECL_COND_VAR_SIGNAL 2 +#define ECL_COND_VAR_RESET 3 + +static inline cl_index +ecl_cond_var_wait_count(cl_index state) +{ + return state >> 2; +} + +static inline cl_index +ecl_cond_var_status(cl_index state) +{ + return state & 3; +} + +static inline cl_index +ecl_cond_var_make_state(cl_index wait_count, cl_index status) +{ + return (wait_count << 2) | status; +} + +static inline void +ecl_cond_var_increment_wait_count(ecl_cond_var_t *cv) +{ + cl_index old_state, new_state; + do { + old_state = AO_load_acquire((AO_t*)&cv->state); + new_state = ecl_cond_var_make_state(ecl_cond_var_wait_count(old_state) + 1, + ecl_cond_var_status(old_state)); + } while (ecl_cond_var_status(old_state) == ECL_COND_VAR_RESET || + !AO_compare_and_swap_full((AO_t*)&cv->state, (AO_t)old_state, (AO_t)new_state)); +} + +static inline void +ecl_cond_var_decrement_wait_count(ecl_cond_var_t *cv) +{ + cl_index old_state, new_state; + do { + old_state = AO_load_acquire((AO_t*)&cv->state); + new_state = ecl_cond_var_make_state(ecl_cond_var_wait_count(old_state) - 1, + ecl_cond_var_status(old_state)); + } while (!AO_compare_and_swap_full((AO_t*)&cv->state, (AO_t)old_state, (AO_t)new_state)); +} + +static inline int +ecl_cond_var_handle_event(DWORD rc, ecl_cond_var_t *cv, ecl_mutex_t *mutex) +{ + cl_index old_state, new_state, wait_count; + switch (rc) { + case WAIT_OBJECT_0: + case WAIT_ABANDONED: + /* broadcast event */ + do { + /* INV: ecl_cond_var_status(old_state) == ECL_COND_VAR_BROADCAST */ + old_state = AO_load_acquire((AO_t*)&cv->state); + wait_count = ecl_cond_var_wait_count(old_state) - 1; + if (wait_count == 0) { + new_state = ecl_cond_var_make_state(0, ECL_COND_VAR_RESET); + while (!AO_compare_and_swap_full((AO_t*)&cv->state, (AO_t)old_state, (AO_t)new_state)); + ResetEvent(cv->broadcast_event); + AO_store_release((AO_t*)&cv->state, 0); + break; + } + new_state = ecl_cond_var_make_state(wait_count, ECL_COND_VAR_BROADCAST); + } while(!AO_compare_and_swap_full((AO_t*)&cv->state, (AO_t)old_state, (AO_t)new_state)); + return ecl_mutex_lock(mutex); + case WAIT_OBJECT_0 + 1: + case WAIT_ABANDONED + 1: + /* signal event */ + do { + /* INV: ecl_cond_var_status(old_state) == ECL_COND_VAR_SIGNAL */ + old_state = AO_load_acquire((AO_t*)&cv->state); + wait_count = ecl_cond_var_wait_count(old_state) - 1; + new_state = ecl_cond_var_make_state(wait_count, 0); + } while(!AO_compare_and_swap_full((AO_t*)&cv->state, (AO_t)old_state, (AO_t)new_state)); + return ecl_mutex_lock(mutex); + case WAIT_TIMEOUT: + ecl_cond_var_decrement_wait_count(cv); + rc = ecl_mutex_lock(mutex); + return (rc == ECL_MUTEX_SUCCESS) ? ECL_MUTEX_TIMEOUT : rc; + default: + ecl_cond_var_decrement_wait_count(cv); + return GetLastError(); + } +} + +static inline void +ecl_cond_var_init(ecl_cond_var_t *cv) +{ + cv->broadcast_event = CreateEvent(NULL, TRUE, FALSE, NULL); /* manual reset event */ + cv->signal_event = CreateEvent(NULL, FALSE, FALSE, NULL); /* auto reset event */ + cv->state = 0; +} + +static inline void +ecl_cond_var_destroy(ecl_cond_var_t *cv) +{ + CloseHandle(cv->broadcast_event); + CloseHandle(cv->signal_event); +} + +static inline int +ecl_cond_var_wait(ecl_cond_var_t *cv, ecl_mutex_t *mutex) +{ + cl_env_ptr env = ecl_process_env(); + DWORD rc; + HANDLE events[2]; + events[0] = cv->broadcast_event; + events[1] = cv->signal_event; + ecl_disable_interrupts_env(env); + ecl_cond_var_increment_wait_count(cv); + if (!ReleaseMutex(*mutex)) { + return GetLastError(); + } + while ((rc = WaitForMultipleObjectsEx(2, events, FALSE, INFINITE, TRUE)) == WAIT_IO_COMPLETION) { + /* we got an interrupt; first fix up the state of the condition variable, ... */ + ecl_cond_var_decrement_wait_count(cv); + /* ... then handle the interrupt ... */ + ecl_enable_interrupts_env(env); + ecl_check_pending_interrupts(env); + ecl_disable_interrupts_env(env); + /* ... and start waiting again ... */ + ecl_cond_var_increment_wait_count(cv); + } + return ecl_cond_var_handle_event(rc, cv, mutex); +} + +static inline int +ecl_cond_var_timedwait(ecl_cond_var_t *cv, ecl_mutex_t *mutex, double seconds) +{ + cl_env_ptr env = ecl_process_env(); + DWORD rc; + DWORD start_ticks = GetTickCount(); + HANDLE events[2]; + events[0] = cv->broadcast_event; + events[1] = cv->signal_event; + ecl_disable_interrupts_env(env); + ecl_cond_var_increment_wait_count(cv); + if (!ReleaseMutex(*mutex)) { + return GetLastError(); + } + while ((rc = WaitForMultipleObjectsEx(2, events, FALSE, remaining_milliseconds(seconds, start_ticks), TRUE)) == WAIT_IO_COMPLETION) { + ecl_cond_var_decrement_wait_count(cv); + ecl_enable_interrupts_env(env); + ecl_check_pending_interrupts(env); + ecl_disable_interrupts_env(env); + ecl_cond_var_increment_wait_count(cv); + } + return ecl_cond_var_handle_event(rc, cv, mutex); +} + +static inline int +ecl_cond_var_signal(ecl_cond_var_t *cv) +{ + cl_index old_state, new_state, wait_count, status; + do { + old_state = AO_load_acquire((AO_t*)&cv->state); + wait_count = ecl_cond_var_wait_count(old_state); + status = ecl_cond_var_status(old_state); + if (wait_count == 0 || status == ECL_COND_VAR_BROADCAST) { + return ECL_MUTEX_SUCCESS; + } + new_state = ecl_cond_var_make_state(wait_count, ECL_COND_VAR_SIGNAL); + } while(status != 0 || + !AO_compare_and_swap_full((AO_t*)&cv->state, (AO_t)old_state, (AO_t)new_state)); + return SetEvent(cv->signal_event) ? ECL_MUTEX_SUCCESS : GetLastError(); +} + +static inline int +ecl_cond_var_broadcast(ecl_cond_var_t *cv) +{ + cl_index old_state, new_state, wait_count, status; + do { + old_state = AO_load_acquire((AO_t*)&cv->state); + wait_count = ecl_cond_var_wait_count(old_state); + status = ecl_cond_var_status(old_state); + if (wait_count == 0 || status == ECL_COND_VAR_BROADCAST) { + return ECL_MUTEX_SUCCESS; + } + new_state = ecl_cond_var_make_state(wait_count, ECL_COND_VAR_BROADCAST); + } while(status != 0 || + !AO_compare_and_swap_full((AO_t*)&cv->state, (AO_t)old_state, (AO_t)new_state)); + return SetEvent(cv->broadcast_event) ? ECL_MUTEX_SUCCESS : GetLastError(); +} + +/* READ-WRITE LOCK */ + +static inline void +ecl_rwlock_init(ecl_rwlock_t *rwlock) +{ + InitializeSRWLock(rwlock); +} + +static inline void +ecl_rwlock_destroy(ecl_rwlock_t *rwlock) { } + +static inline int +ecl_rwlock_unlock_read(ecl_rwlock_t *rwlock) +{ + ReleaseSRWLockShared(rwlock); + return ECL_MUTEX_SUCCESS; +} + +static inline int +ecl_rwlock_unlock_write(ecl_rwlock_t *rwlock) +{ + ReleaseSRWLockExclusive(rwlock); + return ECL_MUTEX_SUCCESS; +} + +static inline int +ecl_rwlock_trylock_read(ecl_rwlock_t *rwlock) +{ + return (TryAcquireSRWLockShared(rwlock) == 0) ? ECL_MUTEX_LOCKED : ECL_MUTEX_SUCCESS; +} + +static inline int +ecl_rwlock_trylock_write(ecl_rwlock_t *rwlock) +{ + return (TryAcquireSRWLockExclusive(rwlock) == 0) ? ECL_MUTEX_LOCKED : ECL_MUTEX_SUCCESS; +} + +static inline int +ecl_rwlock_lock_read(ecl_rwlock_t *rwlock) +{ + AcquireSRWLockShared(rwlock); + return ECL_MUTEX_SUCCESS; +} + +static inline int +ecl_rwlock_lock_write(ecl_rwlock_t *rwlock) +{ + AcquireSRWLockExclusive(rwlock); + return ECL_MUTEX_SUCCESS; +} + +#endif /* ECL_WINDOWS_THREADS */ + +#endif /* ECL_MUTEX_H */ + +#endif /* ECL_THREADS */ diff --git a/src/lsp/mp.lsp b/src/lsp/mp.lsp index ab3d1da04278db2e2d6b570cfe945d45694989e0..87eb18ddf7be8d640001b3129f9763e6bef5c500 100644 --- a/src/lsp/mp.lsp +++ b/src/lsp/mp.lsp @@ -112,36 +112,24 @@ by ALLOW-WITH-INTERRUPTS." ;;; ;;; Convenience macros for locks ;;; -(defmacro with-lock ((lock-form &rest options) &body body) +(defmacro with-lock ((lock-form &key (wait-form t)) &body body) #-threads `(progn ,@body) - ;; Why do we need %count? Even if get-lock succeeeds, an interrupt may - ;; happen between the end of get-lock and when we save the output of - ;; the function. That means we lose the information and ignore that - ;; the lock was actually acquired. Furthermore, a lock can be recursive - ;; and mp:lock-holder is also not reliable. - ;; - ;; Next notice how we need to disable interrupts around the body and - ;; the get-lock statement, to ensure that the unlocking is done with - ;; interrupts disabled. + ;; We do our best to make sure that the lock is released if we are + ;; interrupted and jump up the call stack, however there is an + ;; unavoidable race condition if the interrupt happens after the + ;; mutex is locked but before we store the return value of + ;; mp:get-lock. #+threads - (ext:with-unique-names (lock owner count process) - `(let* ((,lock ,lock-form) - (,owner (mp:lock-owner ,lock)) - (,count (mp:lock-count ,lock))) - (declare (type fixnum ,count)) - (without-interrupts + (ext:with-unique-names (lock wait) + `(let ((,lock ,lock-form) + (,wait ,wait-form)) + (when (mp:get-lock ,lock ,wait) + (without-interrupts (unwind-protect (with-restored-interrupts - (mp::get-lock ,lock) - (locally ,@body)) - (let ((,process mp:*current-process*)) - (declare (optimize (speed 3) (safety 0) (debug 0))) - (when (and (eq ,process (mp:lock-owner ,lock)) - (or (not (eq ,owner ,process)) - (> (the fixnum (mp:lock-count ,lock)) - (the fixnum ,count)))) - (mp::giveup-lock ,lock)))))))) + (locally ,@body)) + (mp:giveup-lock ,lock))))))) #+ecl-read-write-lock (defmacro with-rwlock ((lock op) &body body) diff --git a/src/lsp/process.lsp b/src/lsp/process.lsp index 36215c2b0342d791fbb3ec70df0e58398f97b5a7..c9392b61e9947a0bb48e33fe42729a8340eacb89 100644 --- a/src/lsp/process.lsp +++ b/src/lsp/process.lsp @@ -12,12 +12,11 @@ (ext:with-unique-names (lock wait-p) `(let ((,lock (external-process-%lock ,process)) (,wait-p ,wait)) - (mp:without-interrupts - (unwind-protect (mp::with-restored-interrupts - (when (mp:get-lock ,lock ,wait-p) - (locally ,@body))) - (when (mp:holding-lock-p ,lock) - (mp:giveup-lock ,lock)))))) + (when (mp:get-lock ,lock ,wait-p) + (mp:without-interrupts + (unwind-protect (mp::with-restored-interrupts + (locally ,@body)) + (mp:giveup-lock ,lock)))))) #-threads `(progn ,@body)) (defstruct (external-process (:constructor make-external-process ())) diff --git a/src/tests/2am.lisp b/src/tests/2am.lisp index 7543da6b0b065bf3574fd1657d55c30c08db34cb..b99ea2a03d99394b4cf47f95810eefad88c7cb81 100644 --- a/src/tests/2am.lisp +++ b/src/tests/2am.lisp @@ -24,21 +24,26 @@ #| to avoid conflict with the library name package 2am-ecl |# (defpackage #:2am-ecl (:use #:cl) - (:export #:test #:is #:signals #:finishes #:run #:suite)) + (:export #:test #:test-with-timeout #:is #:signals #:finishes + #:run #:suite)) (in-package #:2am-ecl) (defvar *tests* nil "A name of the default tests suite.") (defvar *suites* (make-hash-table) "A collection of test suites.") (defvar *hierarchy* (make-hash-table) "A hierarchy of test suites.") -(defvar *failures* nil) -(defvar *crashes* nil) -(defvar *test-name* nil) -(defvar *test-count* nil) -(defvar *pass-count* nil) -(defvar *fail-count* nil) +(defvar *stats* nil "Collection of test statistics.") (defvar *running* nil) +(defvar *test-name* nil) (defvar *last-fail* nil) +(defvar *default-timeout* 60.0 "Default timeout in seconds.") + +(defstruct test-stats + (failures (make-hash-table)) + (crashes 0) + (test-count 0) + (pass-count 0) + (fail-count 0)) (define-condition test-failure (simple-condition) ((test-name :initarg :name @@ -71,7 +76,12 @@ (defun shuffle (sequence) (%shuffle (map 'vector #'identity sequence))) -(defun report (test-count pass-count fail-count crashes) +(defun report (stats &aux + (test-count (test-stats-test-count stats)) + (pass-count (test-stats-pass-count stats)) + (fail-count (test-stats-fail-count stats)) + (crashes (test-stats-crashes stats)) + (failures (test-stats-failures stats))) (let ((num-check (+ pass-count fail-count))) (if *running* (format t "~&Did ~s test~:p (~s crashed), ~s check~:p.~%" test-count crashes num-check) @@ -92,16 +102,12 @@ (format t " CRASH [~A]: " (type-of fail))) (format t "~A~%" fail)) (format t "~&--------------------------------~%")) - *failures*))) + failures))) (defun %run (fn) - (let ((*test-count* 0) - (*pass-count* 0) - (*fail-count* 0) - (*failures* (make-hash-table)) - (*crashes* 0)) + (let ((*stats* (make-test-stats))) (multiple-value-prog1 (funcall fn) - (report *test-count* *pass-count* *fail-count* *crashes*)))) + (report *stats*)))) (defun %run-suite (name) (let ((visited nil) @@ -131,41 +137,90 @@ (map nil #'funcall (shuffle tests))))))) (values)) -(defun call-test (fn) - (format t "~&Running test ~s " *test-name*) - (finish-output) - (if *running* - (handler-case - (progn (incf *test-count*) - (funcall fn)) - (serious-condition (c) - (write-char #\X) - (incf *crashes*) - (push c (gethash *test-name* *failures*)))) - (%run fn)) - (values)) +(defun call-test (name fn) + (let ((*test-name* name)) + (format t "~&Running test ~s " *test-name*) + (finish-output) + (if *running* + (handler-case + (progn (incf (test-stats-test-count *stats*)) + (funcall fn)) + (serious-condition (c) + (write-char #\X) + (incf (test-stats-crashes *stats*)) + (push c (gethash *test-name* (test-stats-failures *stats*))))) + (%run fn)) + (values))) (defmacro test (name &body body) - "Define a test function and add it to `*tests*'." `(progn (defun ,name () - (let ((*test-name* ',name)) - (call-test (lambda () ,@body)))) + (call-test ',name (lambda () ,@body))) (pushnew ',name (gethash *tests* *suites*)) ',name)) +(defun kill-processes (process-list &optional original) + "Kills a list of processes, which may be the difference between two lists." + (let ((process-list (set-difference process-list original))) + (when (member mp:*current-process* process-list) + (error "Found myself in the kill list")) + (mapc #'mp:process-kill process-list) + process-list)) + +#+threads +(defun call-test-with-timeout (name timeout fn) + (let* ((all-processes (mp:all-processes)) + (finished nil) + (runner (mp:process-run-function + "runner" + #'(lambda (stats running) + (let ((*stats* stats) + (*running* running)) + (call-test name fn) + (setf finished t))) + *stats* *running*))) + (loop with *test-name* = name + with timestep = 0.2 + for time from 0.0 upto timeout by timestep + do (if finished + (return) + (sleep timestep)) + finally (mp:process-kill runner) + (failed (make-condition 'test-failure + :name name + :format-control "Timeout after ~A seconds" + :format-arguments (list timeout))) + (return-from call-test-with-timeout)) + (mp:process-join runner) + (let ((leftovers (kill-processes (mp:all-processes) all-processes))) + (when leftovers + (format t "~%;;; Stray processes: ~A~%" leftovers))))) + +#+threads +(defmacro test-with-timeout (name-and-timeout &body body) + (let (name timeout) + (if (listp name-and-timeout) + (setf name (first name-and-timeout) + timeout (second name-and-timeout)) + (setf name name-and-timeout + timeout '*default-timeout*)) + `(progn + (defun ,name () + (call-test-with-timeout ',name ,timeout (lambda () ,@body))) + (pushnew ',name (gethash *tests* *suites*)) + ',name))) + (defun passed () (write-char #\.) - (when *pass-count* - (incf *pass-count*)) + (when *stats* + (incf (test-stats-pass-count *stats*))) T) (defun failed (c) (write-char #\f) - (when *fail-count* - (incf *fail-count*)) - (when *failures* - (push c (gethash *test-name* *failures*))) + (when *stats* + (incf (test-stats-fail-count *stats*)) + (push c (gethash *test-name* (test-stats-failures *stats*)))) (setf *last-fail* c) nil) diff --git a/src/tests/normal-tests/multiprocessing.lsp b/src/tests/normal-tests/multiprocessing.lsp index 0f3b8506fe89bae0bbb2717fadc34d4c775b7598..c2ca95ec4d2a046cfe26969b3aade8f2aebb21e8 100644 --- a/src/tests/normal-tests/multiprocessing.lsp +++ b/src/tests/normal-tests/multiprocessing.lsp @@ -9,42 +9,16 @@ (suite 'mp) - -;; Auxiliary routines for multithreaded tests - -(defun kill-and-wait (process-list &optional original wait) - "Kills a list of processes, which may be the difference between two lists, -waiting for all processes to finish. Currently it has no timeout, meaning -it may block hard the lisp image." - (let ((process-list (set-difference process-list original))) - (when (member mp:*current-process* process-list) - (error "Found myself in the kill list")) - (mapc #'mp:process-kill process-list) - (when wait - (loop for i in process-list - do (mp:process-join i))) - process-list)) - -(defun mp-test-run (closure) - (let* ((all-processes (mp:all-processes)) - (output (multiple-value-list (funcall closure)))) - (sleep 0.2) ; time to exit some processes - (let ((leftovers (kill-and-wait (mp:all-processes) all-processes))) - (cond (leftovers - (format t "~%;;; Stray processes: ~A" leftovers)) - (t - (values-list output)))))) - -(defmacro def-mp-test (name body expected-value) - "Runs some test code and only returns the output when the code exited without -creating stray processes." - (let ((all-processes (gensym)) - (output (gensym)) - (leftover (gensym))) - `(test ,name - (is-equal - (mp-test-run #'(lambda () ,body)) - ,expected-value)))) +;;; Important Note: +;;; +;;; Testing multithreading primitives such as locks or semaphores +;;; often requires synchronizing multiple threads. To keep the tests +;;; as simple as possible, this is synchronization is often done by +;;; using busy waits. As a consequence, test failures may manifest as +;;; timeouts. Some tests for semaphores and barriers also use mutexes +;;; for synchronization purposes and will fail if mutexes don't work +;;; correctly. Nearly all tests also assume that creating, killing or +;;; joining threads works properly, which is not tested separately. ;; Locks @@ -57,13 +31,13 @@ creating stray processes." ;;; When a WITH-LOCK is interrupted, it is not able to release ;;; the resulting lock and an error is signaled. ;;; -(test mp-0001-with-lock - (let ((flag t) - (lock (mp:make-lock :name "mp-0001-with-lock" :recursive nil))) +(test-with-timeout mp.mutex.with-lock + (let ((flag 0) + (lock (mp:make-lock :name "mutex.with-lock" :recursive nil))) (mp:with-lock (lock) (let ((background-process (mp:process-run-function - "mp-0001-with-lock" + "mutex.with-lock" #'(lambda () (handler-case (progn @@ -73,234 +47,359 @@ creating stray processes." (error (c) (princ c)(terpri) (setf flag c))) - (setf flag 2))))) + (setf flag 3))))) ;; The background process should not be able to get ;; the lock, and will simply wait. Now we interrupt it ;; and the process should gracefully quit, without ;; signalling any serious condition - (and (progn (sleep 1) - (is (mp:process-kill background-process))) - (progn (sleep 1) - (is (not (mp:process-active-p background-process)))) - (is (eq flag 1))))))) + (loop until (/= flag 0)) + (sleep 0.1) + (is (mp:process-kill background-process)) + (mp:process-join background-process) + (is (= flag 1)))))) + +;;; Date: 12/04/2012 +;;; Non-recursive mutexes should signal an error when they +;;; cannot be relocked. +(test-with-timeout mp.mutex.recursive-error + (let* ((mutex (mp:make-lock :name 'mutex-001-recursive-error))) + (is (mp:get-lock mutex)) + (is (eq (mp:lock-owner mutex) mp:*current-process*)) + (signals error (mp:get-lock mutex)) + (is (mp:giveup-lock mutex)) + (is (null (mp:lock-owner mutex))) + (is (zerop (mp:lock-count mutex))))) + +;;; Date: 12/04/2012 +;;; Recursive locks increase the counter. +(test-with-timeout mp.mutex.recursive-count + (let* ((mutex (mp:make-lock :name 'mutex-002-recursive-count :recursive t))) + (is (loop for i from 1 upto 10 + always (and (mp:get-lock mutex) + (= (mp:lock-count mutex) i) + (eq (mp:lock-owner mutex) mp:*current-process*)))) + (is (loop for i from 9 downto 0 + always (and (eq (mp:lock-owner mutex) mp:*current-process*) + (mp:giveup-lock mutex) + (= (mp:lock-count mutex) i)))) + (is (null (mp:lock-owner mutex))) + (is (zerop (mp:lock-count mutex))))) + +;;; Date: 12/04/2012 +;;; When multiple threads compete for a mutex, they should +;;; all get the same chance of accessing the resource +;;; +;;; Disabled since underlying OS functions don't guarantee fairness -- +;;; mg 2020-08-22 +#+(or) +(test-with-timeout mp.mutex.fairness + (let* ((mutex (mp:make-lock :name "mutex.fairness")) + (nthreads 10) + (count 10) + (counter (* nthreads count)) + (array (make-array count :element-type 'fixnum :initial-element 0))) + (flet ((slave (n) + (loop with continue = t + for i from 1 by 1 + while continue do + (mp:get-lock mutex) + (cond ((plusp counter) + (decf counter) + (setf (aref array n) i)) + (t + (setf continue nil))) + (mp:giveup-lock mutex)))) + ;; Launch all agents. They will be locked + (let ((all-processes + (mp:with-lock (mutex) + (loop for n from 0 below nthreads + collect (mp:process-run-function n #'slave n) + ;; ... and give them some time to block on this mutex + finally (sleep 1))))) + ;; Now they are released and operate. They should all have + ;; the same share of counts. + (loop for p in all-processes + do (mp:process-join p)) + (loop for i from 0 below nthreads + (is (= (aref array i) count))))))) + +;;; Date: 12/04/2012 +;;; It is possible to kill processes waiting for a lock. +;;; +(test-with-timeout mp.mutex.interruptible + (let ((mutex (mp:make-lock :name "mutex.interruptible")) + (flag 0)) + (mp:get-lock mutex) + (let ((sleeper-thread + (mp:process-run-function + "mutex.interruptible" + #'(lambda () + (setf flag 1) + (mp:with-lock (mutex) + (setf flag 2)))))) + (loop until (/= flag 0)) + (sleep 0.1) + (is (mp:process-active-p sleeper-thread)) + (mp:process-kill sleeper-thread) + (mp:process-join sleeper-thread) + (is (= flag 1)) + (is (eq (mp:lock-owner mutex) mp:*current-process*))) + (mp:giveup-lock mutex))) + +(test-with-timeout (mp.mutex.timedlock-timeout 30) + (let ((mutex (mp:make-lock :name "mutex.timedlock-timeout")) + (flag 0)) + (mp:get-lock mutex) + (setf flag 1) + (let ((waiting-process + (mp:process-run-function + "mutex.timedlock-timeout" + (lambda () + (when (mp:get-lock mutex 1) + (error "Grabbing the mutex shouldn't have succeeded")) + (when (eq (mp:lock-owner mutex) mp:*current-process*) + (error "Wrong lock owner")) + (setf flag 2))))) + (mp:process-join waiting-process) + (is (eq mp:*current-process* (mp:lock-owner mutex))) + (is (= flag 2))))) + +(test-with-timeout (mp.mutex.timedlock-acquire 30) + (let ((mutex (mp:make-lock :name "mutex.timedlock-acquire")) + (flag 0)) + (mp:get-lock mutex) + (setf flag 1) + (let ((waiting-process + (mp:process-run-function + "mutex.timedlock-acquire" + (lambda () + (setf flag 2) + (unless (mp:get-lock mutex 60) + (error "Grabbing the mutex should have succeeded")) + (when (not (eq (mp:lock-owner mutex) mp:*current-process*)) + (error "Wrong lock owner")) + (setf flag 3) + (mp:giveup-lock mutex))))) + (loop until (> flag 1)) + (sleep 1) + (mp:giveup-lock mutex) + (mp:process-join waiting-process) + (is (= flag 3))))) ;; Semaphores ;;; Date: 14/04/2012 ;;; Ensure that at creation name and counter are set -(test sem-make-and-counter - (is (loop with name = "sem-make-and-counter" - for count from 0 to 10 - for sem = (mp:make-semaphore :name name :count count) - always (and (eq (mp:semaphore-name sem) name) - (= (mp:semaphore-count sem) count) - (zerop (mp:semaphore-wait-count sem)))))) +(test mp.sem.make-and-counter + (loop with name = "sem.make-and-counter" + for count from 0 to 10 + for sem = (mp:make-semaphore :name name :count count) + do (is (eq (mp:semaphore-name sem) name)) + (is (= (mp:semaphore-count sem) count)) + (is (zerop (mp:semaphore-wait-count sem))))) ;;; Date: 14/04/2012 ;;; Ensure that signal changes the counter by the specified amount -(test sem-signal-semaphore-count - (is - (loop with name = "sem-signal-semaphore-count" - for count from 0 to 10 - always (loop for delta from 0 to 10 - for sem = (mp:make-semaphore :name name :count count) - always (and (= (mp:semaphore-count sem) count) - (null (mp:signal-semaphore sem delta)) - (= (mp:semaphore-count sem ) (+ count delta))))))) +(test-with-timeout mp.sem.signal-semaphore-count + (loop with name = "sem.signal-semaphore-count" + for count from 0 to 10 + do (loop for delta from 0 to 10 + for sem = (mp:make-semaphore :name name :count count) + do (is (= (mp:semaphore-count sem) count)) + (is (null (mp:signal-semaphore sem delta))) + (is (= (mp:semaphore-count sem ) (+ count delta)))))) ;;; Date: 14/04/2012 ;;; A semaphore with a count of zero blocks a process -(def-mp-test sem-signal-one-process - (let* ((flag nil) - (sem (mp:make-semaphore :name "sem-signal-one")) - (a-process (mp:process-run-function - "sem-signal-one-process" - #'(lambda () - (mp:wait-on-semaphore sem) - (setf flag t))))) - (and (null flag) - (mp:process-active-p a-process) - (progn (mp:signal-semaphore sem) (sleep 0.2) flag) - (= (mp:semaphore-count sem) 0))) - t) +(test-with-timeout mp.sem.signal-one-process + (let* ((flag nil) + (sem (mp:make-semaphore :name "sem.signal-one")) + (a-process (mp:process-run-function + "sem.signal-one-process" + #'(lambda () + (mp:wait-on-semaphore sem) + (setf flag t))))) + (is (null flag)) + (is (mp:process-active-p a-process)) + (mp:signal-semaphore sem) + (mp:process-join a-process) + (is flag) + (is (= (mp:semaphore-count sem) 0)))) ;;; Date: 14/04/2012 ;;; We can signal multiple processes -(test sem-signal-n-processes +(test-with-timeout mp.sem.signal-n-processes (loop for count from 1 upto 10 always (let* ((counter 0) - (lock (mp:make-lock :name "sem-signal-n-processes")) - (sem (mp:make-semaphore :name "sem-signal-n-processs")) - (all-process + (lock (mp:make-lock :name "sem.signal-n-processes")) + (sem (mp:make-semaphore :name "sem.signal-n-processs")) + (all-processes (loop for i from 1 upto count collect (mp:process-run-function - "sem-signal-n-processes" + "sem.signal-n-processes" #'(lambda () (mp:wait-on-semaphore sem) (mp:with-lock (lock) (incf counter))))))) - (sleep 0.1) ; let threads settle on semaphore - (and (is (zerop counter)) - (is (every #'mp:process-active-p all-process)) - (is (= (mp:semaphore-wait-count sem) count) - "Number of threads waitng on semaphore should be ~s (but is ~s)." - count (mp:semaphore-wait-count sem)) - (is (progn (mp:signal-semaphore sem count) - (sleep 0.2) - (= counter count)) - "Counter should be ~s (but is ~s)." count counter) - (is (= (mp:semaphore-count sem) 0)))))) + (loop until (= (mp:semaphore-wait-count sem) count)) + (is (zerop counter)) + (is (every #'mp:process-active-p all-processes)) + (mp:signal-semaphore sem count) + (mapc #'mp:process-join all-processes) + (is (= counter count) + "Counter should be ~s (but is ~s)." count counter) + (is (= (mp:semaphore-count sem) 0))))) ;;; Date: 14/04/2012 ;;; When we signal N processes and N+M are waiting, only N awake -(test sem-signal-only-n-processes - (loop for m from 1 upto 3 always - (loop for n from 1 upto 4 always +(test-with-timeout mp.sem.signal-only-n-processes + (loop for m from 1 upto 3 do + (loop for n from 1 upto 4 do (let* ((counter 0) - (lock (mp:make-lock :name "sem-signal-n-processes")) - (sem (mp:make-semaphore :name "sem-signal-n-processs")) - (all-process + (lock (mp:make-lock :name "sem.signal-n-processes")) + (sem (mp:make-semaphore :name "sem.signal-n-processs")) + (all-processes (loop for i from 1 upto (+ n m) collect (mp:process-run-function - "sem-signal-n-processes" + "sem.signal-n-processes" #'(lambda () (mp:wait-on-semaphore sem) (mp:with-lock (lock) (incf counter))))))) - (sleep 0.1) ; let threads settle on semaphore - (and (is (zerop counter)) - (is (every #'mp:process-active-p all-process)) - (is (= (mp:semaphore-wait-count sem) (+ m n)) - "Number of threads waiting on semaphore should be ~s (but is ~s)." - (+ m n) (mp:semaphore-wait-count sem)) - (is (progn (mp:signal-semaphore sem n) - (sleep 0.02) - (= counter n))) - (is (= (mp:semaphore-wait-count sem) m) - "Number of threads waitng on semaphore should be ~s (but is ~s)." - m (mp:semaphore-wait-count sem)) - (is (progn (mp:signal-semaphore sem m) - (sleep 0.02) - (= counter (+ n m))) - "Counter should be ~s (but is ~s)." (+ n m) counter)))))) + (loop until (= (mp:semaphore-wait-count sem) (+ m n))) + (is (zerop counter)) + (is (every #'mp:process-active-p all-processes)) + (mp:signal-semaphore sem n) + (loop until (= (count-if #'mp:process-active-p all-processes) m)) + (is (= counter n)) + (is (= (mp:semaphore-wait-count sem) m) + "Number of threads waiting on semaphore should be ~s (but is ~s)." + m (mp:semaphore-wait-count sem)) + (mp:signal-semaphore sem m) + (mapc #'mp:process-join all-processes) + (is (= counter (+ n m)) + "Counter should be ~s (but is ~s)." + (+ n m) counter))))) ;;; Date: 14/04/2012 ;;; It is possible to kill processes waiting for a semaphore. ;;; -(def-mp-test sem-interruptible - (loop with sem = (mp:make-semaphore :name "sem-interruptible") - with flag = nil - for count from 1 to 10 - for all-processes = (loop for i from 1 upto count - collect (mp:process-run-function - "sem-interruptible" - #'(lambda () - (mp:wait-on-semaphore sem) - (setf flag t)))) - always (and (progn (sleep 0.2) (null flag)) - (every #'mp:process-active-p all-processes) - (= (mp:semaphore-wait-count sem) count) - (mapc #'mp:process-kill all-processes) - (progn (sleep 0.2) (notany #'mp:process-active-p all-processes)) - (null flag) - (zerop (mp:semaphore-wait-count sem)) - t)) - t) +(test-with-timeout mp.sem.interruptible + (loop with sem = (mp:make-semaphore :name "sem.interruptible") + with flag = nil + for count from 1 to 10 + for all-processes = (loop for i from 1 upto count + collect (mp:process-run-function + "sem.interruptible" + #'(lambda () + (mp:wait-on-semaphore sem) + (setf flag t)))) + do (loop until (= (mp:semaphore-wait-count sem) count)) + (is (null flag)) + (is (every #'mp:process-active-p all-processes)) + (mapc #'mp:process-kill all-processes) + (mapc #'mp:process-join all-processes) + (is (null flag)) + ;; Usually, the wait count should be zero at this point. We may + ;; get higher values since the interrupt doesn't lock the mutex + ;; associated to the semaphore and thus multiple threads may write + ;; the wait count at the same time. However, interrupts are provided + ;; only for debugging purposes, for which this behaviour is acceptable. + (is (<= (mp:semaphore-wait-count sem) count)))) ;;; Date: 14/04/2012 ;;; When we kill a process, it is removed from the wait queue. ;;; -(def-mp-test sem-interrupt-updates-queue - (let* ((sem (mp:make-semaphore :name "sem-interrupt-updates-queue")) +(test-with-timeout mp.sem.interrupt-updates-queue + (let* ((sem (mp:make-semaphore :name "sem.interrupt-updates-queue")) (process (mp:process-run-function - "sem-interrupt-updates-queue" + "sem.interrupt-updates-queue" #'(lambda () (mp:wait-on-semaphore sem))))) - (sleep 0.2) - (and (= (mp:semaphore-wait-count sem) 1) - (mp:process-active-p process) - (progn (mp:process-kill process) - (sleep 0.2) - (not (mp:process-active-p process))) - (zerop (mp:semaphore-wait-count sem)) - t)) - t) + (loop until (= (mp:semaphore-wait-count sem) 1)) + (is (mp:process-active-p process)) + (mp:process-kill process) + (mp:process-join process) + ;; In contrast to the previous test, if we interrupt only a single thread + ;; the wait count must be correct, since only a single thread is writing. + (is (zerop (mp:semaphore-wait-count sem))))) ;;; Date: 14/04/2012 ;;; When we kill a process, it signals another one. This is tricky, ;;; because we need the awake signal to arrive _after_ the process is ;;; killed, but the process must still be in the queue for the semaphore -;;; to awake it. The way we solve this is by intercepting the kill signal. +;;; to awake it. ;;; -(test sem-interrupted-resignals - (let* ((sem (mp:make-semaphore :name "sem-interrupted-resignals")) +(test-with-timeout mp.sem.interrupted-resignals + (let* ((sem (mp:make-semaphore :name "sem.interrupted-resignals")) (flag1 nil) (flag2 nil) (process1 (mp:process-run-function - "sem-interrupted-resignals" + "sem.interrupted-resignals-1" #'(lambda () (unwind-protect (mp:wait-on-semaphore sem) - (sleep 4) - (setf flag1 t) - )))) + (loop repeat (* 60 100) do (sleep 1/100)) + (setf flag1 t))))) (process2 (mp:process-run-function - "sem-interrupted-resignals" + "sem.interrupted-resignals-2" #'(lambda () (mp:wait-on-semaphore sem) (setf flag2 t))))) - (sleep 0.2) - (and (is (= (mp:semaphore-wait-count sem) 2)) - (is (mp:process-active-p process1)) - (is (mp:process-active-p process2)) - ;; We kill the process but ensure it is still running - (is (progn (mp:process-kill process1) - (mp:process-active-p process1))) - (is (null flag1)) - ;; ... and in the queue - (is (= (mp:semaphore-wait-count sem) 2)) - ;; We awake it and it should awake the other one - (is (progn (format t "~%;;; Signaling semaphore") - (mp:signal-semaphore sem) - (sleep 1) - (zerop (mp:semaphore-wait-count sem)))) - (is flag2)))) + (loop until (= (mp:semaphore-wait-count sem) 2)) + (is (mp:process-active-p process1)) + (is (mp:process-active-p process2)) + ;; We kill the process but ensure it is still running + (mp:process-kill process1) + (is (mp:process-active-p process1)) + (is (null flag1)) + ;; Wait until the process is no longer waiting for the semaphore + (loop until (= (mp:semaphore-wait-count sem) 1)) + ;; ... then awake it and the other process should start working + (mp:signal-semaphore sem) + (mp:process-join process2) + (is (zerop (mp:semaphore-wait-count sem))) + (is flag2) + ;; Finally we kill the first process (which will by this time be + ;; stuck in the unwind-protect call) again. + (mp:process-kill process1) + (mp:process-join process1) + (is (null flag1)))) ;;; Date: 14/04/2012 ;;; 1 producer and N consumers, non-blocking, because the initial count ;;; is larger than the consumed data. -(def-mp-test sem-1-to-n-non-blocking +(test-with-timeout mp.sem.1-to-n-non-blocking (loop with counter = 0 - with lock = (mp:make-lock :name "sem-1-to-n-communication") + with lock = (mp:make-lock :name "sem.1-to-n-communication") for n from 1 to 10 for m = (round 128 n) for length = (* n m) - for sem = (mp:make-semaphore :name "sem-1-to-n-communication" :count length) + for sem = (mp:make-semaphore :name "sem.1-to-n-communication" :count length) for producers = (progn (setf counter 0) (loop for i from 0 below n collect (mp:process-run-function - "sem-1-to-n-consumer" + "sem.1-to-n-consumer" #'(lambda () (loop for i from 0 below m do (mp:wait-on-semaphore sem) do (mp:with-lock (lock) (incf counter))))))) do (mapc #'mp:process-join producers) - always (and (= counter length) - (zerop (mp:semaphore-count sem)) - (zerop (mp:semaphore-wait-count sem)))) - t) + (is (= counter length)) + (is (zerop (mp:semaphore-count sem))) + (is (zerop (mp:semaphore-wait-count sem))))) ;;; Date: 14/04/2012 ;;; 1 producer and N consumers, blocking due to a slow producer. -(def-mp-test sem-1-to-n-blocking - (loop with lock = (mp:make-lock :name "sem-1-to-n-communication") +(test-with-timeout mp.sem.1-to-n-blocking + (loop with lock = (mp:make-lock :name "sem.1-to-n-communication") for n from 1 to 10 for m = (round 10000 n) for length = (* n m) - for sem = (mp:make-semaphore :name "sem-1-to-n-communication" :count 0) + for sem = (mp:make-semaphore :name "sem.1-to-n-communication" :count 0) for counter = 0 for producers = (loop for i from 0 below n collect (mp:process-run-function - "sem-1-to-n-consumer" + "sem.1-to-n-consumer" #'(lambda () (loop for i from 0 below m do (mp:wait-on-semaphore sem)) @@ -308,156 +407,50 @@ creating stray processes." do (loop for i from 0 below length do (mp:signal-semaphore sem)) do (mapc #'mp:process-join producers) - always (and (= counter n) - (zerop (mp:semaphore-count sem)) - (zerop (mp:semaphore-wait-count sem)))) - t) - - -;; Mutexes -;;; Date: 12/04/2012 -;;; Non-recursive mutexes should signal an error when they -;;; cannot be relocked. -(test mutex-001-recursive-error - (is-true - (let* ((mutex (mp:make-lock :name 'mutex-001-recursive-error))) - (and - (mp:get-lock mutex) - (eq (mp:lock-owner mutex) mp:*current-process*) - (handler-case - (progn (mp:get-lock mutex) nil) - (error (c) t)) - (mp:giveup-lock mutex) - (null (mp:lock-owner mutex)) - (zerop (mp:lock-count mutex)))))) - -;;; Date: 12/04/2012 -;;; Recursive locks increase the counter. -(test mutex-002-recursive-count - (is-true - (let* ((mutex (mp:make-lock :name 'mutex-002-recursive-count :recursive t))) - (and - (loop for i from 1 upto 10 - always (and (mp:get-lock mutex) - (= (mp:lock-count mutex) i) - (eq (mp:lock-owner mutex) mp:*current-process*))) - (loop for i from 9 downto 0 - always (and (eq (mp:lock-owner mutex) mp:*current-process*) - (mp:giveup-lock mutex) - (= (mp:lock-count mutex) i))) - (null (mp:lock-owner mutex)) - (zerop (mp:lock-count mutex)))))) - - -;;; Date: 12/04/2012 -;;; When multiple threads compete for a mutex, they should -;;; all get the same chance of accessing the resource -;;; -(def-mp-test mutex-003-fairness - (let* ((mutex (mp:make-lock :name 'mutex-001-fairness)) - (nthreads 10) - (count 10) - (counter (* nthreads count)) - (array (make-array count :element-type 'fixnum :initial-element 0))) - (flet ((slave (n) - (loop with continue = t - for i from 1 by 1 - while continue do - (mp:get-lock mutex) - (cond ((plusp counter) - (decf counter) - (setf (aref array n) i)) - (t - (setf continue nil))) - (mp:giveup-lock mutex)))) - ;; Launch all agents. They will be locked - (let ((all-processes - (mp:with-lock (mutex) - (loop for n from 0 below nthreads - collect (mp:process-run-function n #'slave n) - ;; ... and give them some time to block on this mutex - finally (sleep 1))))) - ;; Now they are released and operate. They should all have - ;; the same share of counts. - (loop for p in all-processes - do (mp:process-join p)) - (loop for i from 0 below nthreads - always (= (aref array i) count))))) - t) - -;;; Date: 12/04/2012 -;;; It is possible to kill processes waiting for a lock. We launch a lot of -;;; processes, 50% of which are zombies: they acquire the lock and do not -;;; do anything. These processes are then killed, resulting in the others -;;; doing their job. -;;; -(def-mp-test mutex-004-interruptible - (let* ((mutex (mp:make-lock :name "mutex-003-fairness")) - (nprocesses 20) - (counter 0)) - (flet ((normal-thread () - (mp:with-lock (mutex) - (incf counter))) - (zombie-thread () - (mp:with-lock (mutex) - (loop (sleep 10))))) - (let* ((all-processes (loop for i from 0 below nprocesses - for zombie = (zerop (mod i 2)) - for fn = (if zombie #'zombie-thread #'normal-thread) - collect (cons zombie - (mp:process-run-function - "mutex-003-fairness" - fn)))) - (zombies (mapcar #'cdr (remove-if-not #'car all-processes)))) - (and (zerop counter) ; No proces works because the first one is a zombie - (kill-and-wait zombies) - (progn (sleep 0.2) (= counter (/ nprocesses 2))) - (not (mp:lock-owner mutex)) - t)))) - t) + (is (= counter n)) + (is (zerop (mp:semaphore-count sem))) + (is (zerop (mp:semaphore-wait-count sem))))) ;; Mailbox ;;; Date: 14/04/2012 ;;; Ensure that at creation name and counter are set, and mailbox is empty. -(test mailbox-make-and-counter - (is - (loop with name = "mbox-make-and-counter" - for count from 4 to 63 - for mbox = (mp:make-mailbox :name name :count count) - always (and (eq (mp:mailbox-name mbox) name) - (>= (mp:mailbox-count mbox) count) - (mp:mailbox-empty-p mbox))))) +(test mp.mbox.make-and-counter + (loop with name = "mbox.make-and-counter" + for count from 4 to 63 + for mbox = (mp:make-mailbox :name name :count count) + do (is (eq (mp:mailbox-name mbox) name)) + (is (>= (mp:mailbox-count mbox) count)) + (is (mp:mailbox-empty-p mbox)))) ;;; Date: 14/04/2012 ;;; Ensure that the mailbox works in a nonblocking fashion (when the ;;; number of messages < mailbox size in a single producer and single ;;; consumer setting. We do not need to create new threads for this. -(test mbox-mailbox-nonblocking-io-1-to-1 - (is - (loop with count = 30 - with name = "mbox-mailbox-nonblocking-io-1-to-1" - with mbox = (mp:make-mailbox :name name :count count) - for l from 1 to 10 - for messages = (loop for i from 1 to l - do (mp:mailbox-send mbox i) - collect i) - always - (and (not (mp:mailbox-empty-p mbox)) - (equalp (loop for i from 1 to l - collect (mp:mailbox-read mbox)) - messages) - (mp:mailbox-empty-p mbox))))) +(test-with-timeout mp.mbox.nonblocking-io-1-to-1 + (loop with count = 30 + with name = "mbox.nonblocking-io-1-to-1" + with mbox = (mp:make-mailbox :name name :count count) + for l from 1 to 10 + for messages = (loop for i from 1 to l + do (mp:mailbox-send mbox i) + collect i) + do + (is (not (mp:mailbox-empty-p mbox))) + (is (equalp (loop for i from 1 to l + collect (mp:mailbox-read mbox)) + messages)) + (is (mp:mailbox-empty-p mbox)))) ;;; Date: 14/04/2012 ;;; The mailbox blocks a process when it saturates the write queue. -(def-mp-test mbox-blocks-1-to-1 +(test-with-timeout mp.mbox.blocks-1-to-1 (let* ((flag nil) - (mbox (mp:make-mailbox :name "mbox-signal-one" :count 32)) + (mbox (mp:make-mailbox :name "mbox.blocks-1-to-1" :count 32)) (size (mp:mailbox-count mbox)) (a-process (mp:process-run-function - "mbox-signal-one-process" + "mbox.blocks-1-to-1" #'(lambda () ;; This does not block (loop for i from 1 to size @@ -467,85 +460,83 @@ creating stray processes." (mp:mailbox-send mbox (1+ size)) ;; Now we unblock (setf flag nil))))) - (sleep 0.2) ; give time for all messages to arrive - (and (not (mp:mailbox-empty-p mbox)) ; the queue has messages - (mp:process-active-p a-process) ; the process is active - flag ; and it is blocked - (loop for i from 1 to (1+ size) ; messages arrive in order - always (= i (mp:mailbox-read mbox))) - (null flag) ; and process unblocked - (mp:mailbox-empty-p mbox) - t)) - t) + (sleep 0.2) + (is (not (mp:mailbox-empty-p mbox))) ; the queue has messages + (is (mp:process-active-p a-process)) ; the process is active + (is flag) ; and it is blocked + (is (loop for i from 1 to (1+ size) ; messages arrive in order + always (= i (mp:mailbox-read mbox)))) + (mp:process-join a-process) + (is (null flag)) ; and process unblocked + (is (mp:mailbox-empty-p mbox)))) ;;; Date: 14/04/2012 ;;; N producers and 1 consumer -(def-mp-test mbox-n-to-1-communication +(test-with-timeout mp.mbox.n-to-1-communication (loop with length = 10000 - with mbox = (mp:make-mailbox :name "mbox-n-to-1-communication" :count 128) - for n from 1 to 10 - for m = (round length n) - for messages = (loop for i from 0 below (* n m) collect i) - for producers = (loop for i from 0 below n - do (mp:process-run-function - "mbox-n-to-1-producer" - (let ((proc-no i)) - #'(lambda () - (loop for i from 0 below m - for msg = (+ i (* proc-no m)) - do (mp:mailbox-send mbox msg)))))) - always (and (equalp - (sort (loop for i from 1 to (* n m) - collect (mp:mailbox-read mbox)) - #'<) - messages) - (mp:mailbox-empty-p mbox))) - t) + with mbox = (mp:make-mailbox :name "mbox.n-to-1-communication" :count 128) + for n from 1 to 10 + for m = (round length n) + for messages = (loop for i from 0 below (* n m) collect i) + for producers = (loop for i from 0 below n + do (mp:process-run-function + "mbox.n-to-1-producer" + (let ((proc-no i)) + #'(lambda () + (loop for i from 0 below m + for msg = (+ i (* proc-no m)) + do (mp:mailbox-send mbox msg)))))) + do (is (equalp + (sort (loop for i from 1 to (* n m) + collect (mp:mailbox-read mbox)) + #'<) + messages)) + (is (mp:mailbox-empty-p mbox)))) ;;; Date: 14/04/2012 ;;; 1 producer and N consumer, but they do not block, because the ;;; queue is large enough and pre-filled with messages -(test mbox-1-to-n-non-blocking +(test-with-timeout mp.mbox.1-to-n-non-blocking (loop - for n from 1 to 10 - for m = (round 128 n) - for length = (* n m) - for mbox = (mp:make-mailbox :name "mbox-1-to-n-communication" :count length) - for flags = (make-array length :initial-element nil) - for aux = (loop for i from 0 below length - do (mp:mailbox-send mbox i)) - for producers = (loop for i from 0 below n - do (mp:process-run-function - "mbox-1-to-n-consumer" - #'(lambda () - (loop for i from 0 below m - for msg = (mp:mailbox-read mbox) - do (setf (aref flags msg) t))))) - do (sleep 0.1) - always (and (is (every #'identity flags)) - (is (mp:mailbox-empty-p mbox))))) + for n from 1 to 10 + for m = (round 128 n) + for length = (* n m) + for mbox = (mp:make-mailbox :name "mbox.1-to-n-non-blocking" :count length) + for flags = (make-array length :initial-element nil) + for aux = (loop for i from 0 below length + do (mp:mailbox-send mbox i)) + for consumers = (loop for i from 0 below n + collect (mp:process-run-function + "mbox.1-to-n-consumer" + #'(lambda () + (loop for i from 0 below m + for msg = (mp:mailbox-read mbox) + do (setf (aref flags msg) t))))) + do (mapc #'mp:process-join consumers) + (is (every #'identity flags)) + (is (mp:mailbox-empty-p mbox)))) ;;; Date: 14/04/2012 ;;; 1 producer and N consumers, which block, because the producer ;;; is started _after_ them and is slower. -(test mbox-1-to-n-blocking +(test-with-timeout mp.mbox.1-to-n-blocking (loop for n from 1 to 10 - for m = (round 10000 n) - for length = (* n m) - for mbox = (mp:make-mailbox :name "mbox-1-to-n-communication" :count length) - for flags = (make-array length :initial-element nil) - for producers = (loop for i from 0 below n - do (mp:process-run-function - "mbox-1-to-n-consumer" - #'(lambda () - (loop for i from 0 below m - for msg = (mp:mailbox-read mbox) - do (setf (aref flags msg) t))))) - do (loop for i from 0 below length - do (mp:mailbox-send mbox i)) - do (sleep 0.1) - always (and (is (every #'identity flags)) - (is (mp:mailbox-empty-p mbox))))) + for m = (round 10000 n) + for length = (* n m) + for mbox = (mp:make-mailbox :name "mp.mbox.1-to-n-blocking" :count length) + for flags = (make-array length :initial-element nil) + for consumers = (loop for i from 0 below n + collect (mp:process-run-function + "mbox.1-to-n-consumer" + #'(lambda () + (loop for i from 0 below m + for msg = (mp:mailbox-read mbox) + do (setf (aref flags msg) t))))) + do (loop for i from 0 below length + do (mp:mailbox-send mbox i)) + do (mapc #'mp:process-join consumers) + (is (every #'identity flags)) + (is (mp:mailbox-empty-p mbox)))) ;;; Date: 2016-11-10 @@ -561,19 +552,22 @@ creating stray processes." ;;; HOLDING-LOCK-P verifies, if the current process holds the ;;; lock. ;;; -(test mp-holding-lock-p - (let ((lock (mp:make-lock :name "mp-holding-lock-p" :recursive nil))) +(test-with-timeout mp.mutex.holding-lock-p + (let ((lock (mp:make-lock :name "mutex.holding-lock-p" :recursive nil))) (is-false (mp:holding-lock-p lock)) (mp:with-lock (lock) (is-true (mp:holding-lock-p lock)) (mp:process-run-function - "mp-holding-lock-p" + "mutex.holding-lock-p" #'(lambda () (is-false (mp:holding-lock-p lock))))) (is-false (mp:holding-lock-p lock)) (mp:process-run-function - "mp-holding-lock-p" + "mutex.holding-lock-p" #'(lambda () (is-false (mp:holding-lock-p lock)))))) + +;; Atomics + (ext:with-clean-symbols (test-struct test-class *x*) (defstruct (test-struct :atomic-accessors) (slot1 0) @@ -582,14 +576,13 @@ creating stray processes." ((slot1 :initform 0) (slot2))) (defvar *x*) - ;;; Date: 2018-09-21 ;;; From: Marius Gerbershagen ;;; Description: ;;; ;;; Verifies that atomic-update works correctly. ;;; - (test atomic-update + (test-with-timeout mp.atomics.atomic-update (let* ((n-threads 100) (n-updates 1000) (n-total (* n-threads n-updates))) @@ -614,8 +607,7 @@ creating stray processes." (list (first plist) (1+ (second plist))))) (mp:atomic-update (slot-value object 'slot1) #'1+) - (mp:atomic-update (test-struct-slot1 struct) #'1+) - (sleep 0.00001)))))) + (mp:atomic-update (test-struct-slot1 struct) #'1+)))))) (is (car cons) n-total) (is (cdr cons) n-total) (is (svref vector 1) n-total) @@ -627,14 +619,14 @@ creating stray processes." (eq (slot-value object 'slot2) 1))) (signals error (eval '(mp:compare-and-swap (test-struct-slot2 struct) 1 2)))))) - + ;;; Date: 2018-09-22 ;;; From: Marius Gerbershagen ;;; Description: ;;; ;;; Verifies that atomic-push and atomic-pop work correctly. ;;; - (test atomic-push/pop + (test-with-timeout mp.atomics.atomic-push/pop (let* ((n-threads 100) (n-updates 1000)) (setf *x* nil) @@ -644,21 +636,18 @@ creating stray processes." "" (lambda () (loop for i below n-updates do - (mp:atomic-push i (symbol-value '*x*)) - (sleep 0.00001)) + (mp:atomic-push i (symbol-value '*x*))) (loop repeat (1- n-updates) do - (mp:atomic-pop (symbol-value '*x*)) - (sleep 0.00001)))))) + (mp:atomic-pop (symbol-value '*x*))))))) (is (length *x*) n-threads))) - ;;; Date: 2018-09-29 ;;; From: Marius Gerbershagen ;;; Description: ;;; ;;; Verifies that atomic-incf and atomic-decf work correctly. ;;; - (test atomic-incf/decf + (test-with-timeout mp.atomics.atomic-incf/decf (let* ((n-threads 100) (n-updates 1000) (increment (1+ (random most-positive-fixnum))) @@ -680,8 +669,7 @@ creating stray processes." (mp:atomic-incf (car cons) increment) (mp:atomic-incf (svref vector 1) increment) (mp:atomic-incf (symbol-value '*x*) increment) - (mp:atomic-incf (slot-value object 'slot1) increment) - (sleep 0.00001)))))) + (mp:atomic-incf (slot-value object 'slot1) increment)))))) (is (car cons) final-value) (is (cdr cons) final-value) (is (svref vector 1) final-value) @@ -697,8 +685,7 @@ creating stray processes." (mp:atomic-decf (car cons) increment) (mp:atomic-decf (svref vector 1) increment) (mp:atomic-decf (symbol-value '*x*) increment) - (mp:atomic-decf (slot-value object 'slot1) increment) - (sleep 0.00001)))))) + (mp:atomic-decf (slot-value object 'slot1) increment)))))) (is (car cons) 0) (is (cdr cons) 0) (is (svref vector 1) 0) @@ -712,7 +699,7 @@ creating stray processes." ;;; Verifies that CAS expansion may be removed. ;;; (ext:with-clean-symbols (*obj* foo) - (test defcas/remcas + (test mp.atomics.defcas/remcas (mp:defcas foo (lambda (object old new) (assert (consp object)) (setf (car object) old @@ -730,13 +717,15 @@ creating stray processes." ;;; ;;; Verifies that CAS modifications honor the package locks. ;;; -(test cas-locked-package +(test mp.atomics.cas-locked-package (signals package-error (mp:defcas cl:car (lambda (obj old new) nil))) (signals package-error (mp:remcas 'cl:car)) (finishes (mp:defcas cor (lambda (obj old new) nil))) (finishes (mp:remcas 'cor))) +;; Barriers + ;;; Date: 2020-08-14 ;;; From: Daniel KochmaƄski ;;; Description: @@ -750,105 +739,226 @@ creating stray processes." (is (= 3 (mp:barrier-count barrier))) (is (= 0 (mp:barrier-arrivers-count barrier))))) -(test mp.barrier.blocking - (let ((barrier (mp:make-barrier 3)) - (before-barrier 0) - (after-barrier 0)) - (labels ((try-barrier () - (mp:process-run-function - "try-barrier" - (lambda () - (incf before-barrier) - (mp:barrier-wait barrier) - (incf after-barrier)))) - (check-barrier (before after arrivers) - (try-barrier) - (sleep 0.01) - (is (= before before-barrier)) - (is (= after after-barrier)) - (is (= arrivers (mp:barrier-arrivers-count barrier))))) - (check-barrier 1 0 1) - (check-barrier 2 0 2) - (check-barrier 3 3 0) - (check-barrier 4 3 1) - (check-barrier 5 3 2) - (check-barrier 6 6 0)))) - -(test mp.barrier.unblock-1 - (let ((barrier (mp:make-barrier 3)) - (before-barrier 0) - (after-barrier 0)) - (labels ((try-barrier () - (mp:process-run-function +(let (barrier before-barrier after-barrier all-processes mutex) + (labels ((try-barrier () + (push (mp:process-run-function "try-barrier" (lambda () - (incf before-barrier) + (mp:with-lock (mutex) + (incf before-barrier)) (mp:barrier-wait barrier) - (incf after-barrier)))) - (check-barrier (before after arrivers) - (try-barrier) - (sleep 0.01) - (is (= before before-barrier)) - (is (= after after-barrier)) - (is (= arrivers (mp:barrier-arrivers-count barrier)))) - (wake-barrier () - (mp:barrier-unblock barrier :kill-waiting nil)) - (kill-barrier () - (mp:barrier-unblock barrier :kill-waiting t))) - (check-barrier 1 0 1) - (check-barrier 2 0 2) - (wake-barrier) - (sleep 0.01) - (check-barrier 3 2 1) - (check-barrier 4 2 2) - (kill-barrier) - (sleep 0.01) - (check-barrier 5 2 1)))) - -(test mp.barrier.unblock-2 - (let ((barrier (mp:make-barrier 3)) - (before-barrier 0) - (after-barrier 0)) - (labels ((try-barrier () - (mp:process-run-function - "try-barrier" - (lambda () - (incf before-barrier) - (mp:barrier-wait barrier) - (incf after-barrier)))) - (check-barrier (before after arrivers) - (try-barrier) - (sleep 0.01) - (is (= before before-barrier)) - (is (= after after-barrier)) - (is (= arrivers (mp:barrier-arrivers-count barrier))))) - (mp:barrier-unblock barrier :disable t) - (check-barrier 1 1 0) - (check-barrier 2 2 0) - (check-barrier 3 3 0) - (check-barrier 4 4 0)))) - -(test mp.barrier.unblock-3 - (let ((barrier (mp:make-barrier 3)) - (before-barrier 0) - (after-barrier 0)) - (labels ((try-barrier () - (mp:process-run-function - "try-barrier" - (lambda () - (incf before-barrier) - (mp:barrier-wait barrier) - (incf after-barrier)))) - (check-barrier (before after arrivers) - (try-barrier) - (sleep 0.01) - (is (= before before-barrier)) - (is (= after after-barrier)) - (is (= arrivers (mp:barrier-arrivers-count barrier))))) - (mp:barrier-unblock barrier :reset-count 4) - (check-barrier 1 0 1) - (check-barrier 2 0 2) - (check-barrier 3 0 3) - (check-barrier 4 4 0) - (check-barrier 5 4 1) - (check-barrier 6 4 2)))) + (mp:with-lock (mutex) + (incf after-barrier)))) + all-processes)) + (check-barrier (before after arrivers) + (try-barrier) + (loop until (= before before-barrier)) + (loop until (= after after-barrier)) + (loop until (= arrivers (mp:barrier-arrivers-count barrier)))) + (wake-barrier () + (mp:barrier-unblock barrier :kill-waiting nil)) + (kill-barrier () + (mp:barrier-unblock barrier :kill-waiting t))) + + (test-with-timeout mp.barrier.blocking + (setf barrier (mp:make-barrier 3) + before-barrier 0 + after-barrier 0 + all-processes nil + mutex (mp:make-lock)) + (check-barrier 1 0 1) + (check-barrier 2 0 2) + (check-barrier 3 3 0) + (check-barrier 4 3 1) + (check-barrier 5 3 2) + (check-barrier 6 6 0) + ;; clean up + (mapc #'mp:process-join all-processes)) + + (test-with-timeout mp.barrier.unblock-1 + (setf barrier (mp:make-barrier 3) + before-barrier 0 + after-barrier 0 + all-processes nil + mutex (mp:make-lock)) + (check-barrier 1 0 1) + (check-barrier 2 0 2) + (wake-barrier) + (mapc #'mp:process-join all-processes) + (check-barrier 3 2 1) + (check-barrier 4 2 2) + (kill-barrier) + (mapc #'mp:process-join all-processes) + (setf all-processes nil) + (check-barrier 5 2 1) + ;; clean up + (mapc #'mp:process-kill all-processes) + (mapc #'mp:process-join all-processes)) + + (test-with-timeout mp.barrier.unblock-2 + (setf barrier (mp:make-barrier 3) + before-barrier 0 + after-barrier 0 + all-processes nil + mutex (mp:make-lock)) + (mp:barrier-unblock barrier :disable t) + (check-barrier 1 1 0) + (check-barrier 2 2 0) + (check-barrier 3 3 0) + (check-barrier 4 4 0) + ;; clean up + (mapc #'mp:process-join all-processes)) + + (test-with-timeout mp.barrier.unblock-3 + (setf barrier (mp:make-barrier 3) + before-barrier 0 + after-barrier 0 + all-processes nil + mutex (mp:make-lock)) + (mp:barrier-unblock barrier :reset-count 4) + (check-barrier 1 0 1) + (check-barrier 2 0 2) + (check-barrier 3 0 3) + (check-barrier 4 4 0) + (mapc #'mp:process-join all-processes) + (setf all-processes nil) + (check-barrier 5 4 1) + (check-barrier 6 4 2) + ;; clean up + (mapc #'mp:process-kill all-processes) + (mapc #'mp:process-join all-processes)))) + + +;; Condition variables +(test-with-timeout mp.cv.wait-and-signal + (let* ((mutex (mp:make-lock :name "cv.wait-and-signal")) + (cv (mp:make-condition-variable)) + (counter-before 0) + (counter-after 0) + (n-threads 10) + (waiting-processes + (loop repeat n-threads collect + (mp:process-run-function + "cv.wait-and-signal" + (lambda () + (mp:get-lock mutex) + (incf counter-before) + (mp:condition-variable-wait cv mutex) + (when (not (eq (mp:lock-owner mutex) mp:*current-process*)) + (error "Wrong lock owner")) + (incf counter-after) + (mp:giveup-lock mutex)))))) + (loop until (mp:with-lock (mutex) (= counter-before n-threads))) + ;; signal wakes up _at least_ one thread each time it is called (for + ;; efficiency reasons, the posix specification explicitely allows waking + ;; up more than one thread) + (loop with n-signaled = 0 ; minimum number of threads that have woken up + until (= n-signaled n-threads) + do (mp:get-lock mutex) + (is (eq (mp:lock-owner mutex) mp:*current-process*)) + (mp:condition-variable-signal cv) + (mp:giveup-lock mutex) + (is (not (eq (mp:lock-owner mutex) mp:*current-process*))) + (loop until (mp:with-lock (mutex) + (and (> counter-after n-signaled) + (setf n-signaled counter-after))))))) + +(test-with-timeout mp.cv.wait-and-broadcast + (let* ((mutex (mp:make-lock :name "cv.wait-and-broadcast")) + (cv (mp:make-condition-variable)) + (counter-before 0) + (counter-after 0) + (wakeup nil) + (n-threads 10) + (waiting-processes + (loop repeat n-threads collect + (mp:process-run-function + "cv.wait-and-broadcast" + (lambda () + (mp:get-lock mutex) + (incf counter-before) + (loop until wakeup ; ignore spurious wakeups (allowed by posix) + do (mp:condition-variable-wait cv mutex)) + (when (not (eq (mp:lock-owner mutex) mp:*current-process*)) + (error "Wrong lock owner")) + (incf counter-after) + (mp:giveup-lock mutex)))))) + (loop until (mp:with-lock (mutex) (= counter-before n-threads))) + ;; broadcast wakes up all threads + (mp:get-lock mutex) + (is (eq (mp:lock-owner mutex) mp:*current-process*)) + (setf wakeup t) + (mp:condition-variable-broadcast cv) + (mp:giveup-lock mutex) + (is (not (eq (mp:lock-owner mutex) mp:*current-process*))) + (mapc #'mp:process-join waiting-processes) + (is (= counter-after n-threads)))) + +(test-with-timeout (mp.cv.timedwait-timeout 30) ; whole test times out after 30 seconds + (let* ((mutex (mp:make-lock :name "cv.timedwait-timeout")) + (cv (mp:make-condition-variable)) + (flag 0) + (waiting-process + (mp:process-run-function + "cv.timedwait-timeout" + (lambda () + (mp:get-lock mutex) + (setf flag 1) + ;; condition variable times out after 1 second, before + ;; whole test times out + (mp:condition-variable-timedwait cv mutex 1) + (when (not (eq (mp:lock-owner mutex) mp:*current-process*)) + (error "Wrong lock owner")) + (setf flag 2) + (mp:giveup-lock mutex))))) + (loop until (mp:with-lock (mutex) (/= flag 0))) + (mp:process-join waiting-process) + (is (null (mp:lock-owner mutex))) + (is (= flag 2)))) + +(test-with-timeout (mp.cv.timedwait-signal 30) ; whole test times out after 30 seconds + (let* ((mutex (mp:make-lock :name "cv.timedwait-signal")) + (cv (mp:make-condition-variable)) + (flag 0) + (waiting-process + (mp:process-run-function + "cv.timedwait-signal" + (lambda () + (mp:get-lock mutex) + (setf flag 1) + ;; condition variable times out after 60 seconds, after + ;; whole test times out + (mp:condition-variable-timedwait cv mutex 60) + (when (not (eq (mp:lock-owner mutex) mp:*current-process*)) + (error "Wrong lock owner")) + (setf flag 2) + (mp:giveup-lock mutex))))) + (loop until (mp:with-lock (mutex) (/= flag 0))) + (mp:get-lock mutex) + (is (eq (mp:lock-owner mutex) mp:*current-process*)) + (mp:condition-variable-signal cv) + (mp:giveup-lock mutex) + (is (not (eq (mp:lock-owner mutex) mp:*current-process*))) + (mp:process-join waiting-process) + (is (= flag 2)))) + +(test-with-timeout mp.cv.interruptible + (let* ((mutex (mp:make-lock :name "cv.interruptible")) + (cv (mp:make-condition-variable)) + (flag 0) + (waiting-process + (mp:process-run-function + "cv.interruptible" + (lambda () + (mp:get-lock mutex) + (setf flag 1) + (mp:condition-variable-wait cv mutex) + (setf flag 2) + (error "We shouldn't have gotten to this point"))))) + (loop until (mp:with-lock (mutex) (/= flag 0))) + (sleep 0.1) + (is (mp:process-kill waiting-process)) + (mp:process-join waiting-process) + (is (null (mp:lock-owner mutex))) + (is (= flag 1)))) + diff --git a/src/util/emacs.el b/src/util/emacs.el index 1e11c74ac03a7ef92f03de07228ed707ac9d2dbb..925ab5b0ef818debc83007f9826fa8142aa13fb6 100644 --- a/src/util/emacs.el +++ b/src/util/emacs.el @@ -208,7 +208,6 @@ "c/threads/mailbox.d" "c/threads/mutex.d" "c/threads/process.d" -"c/threads/queue.d" "c/threads/rwlock.d" "c/threads/semaphore.d" "c/time.d" @@ -237,6 +236,7 @@ "h/object.h" "h/page.h" "h/stacks.h" +"h/threads.h" "lsp/arraylib.lsp" "lsp/assert.lsp" "lsp/autoload.lsp"