diff --git a/ggml.c b/ggml.c index 7d2f465..cd17a15 100644 --- a/ggml.c +++ b/ggml.c @@ -303,6 +303,91 @@ int64_t ggml_cycles_per_ms(void) { #define ggml_perf_cycles_per_ms() 0 #endif +// +// thread data +// +// synchronization is done via busy loops +// I tried using spin locks, but not sure how to use them correctly - the things I tried were slower than busy loops +// + +#ifdef __APPLE__ + +//#include + +//typedef os_unfair_lock ggml_lock_t; +// +//#define ggml_lock_init(x) UNUSED(x) +//#define ggml_lock_destroy(x) UNUSED(x) +//#define ggml_lock_lock os_unfair_lock_lock +//#define ggml_lock_unlock os_unfair_lock_unlock +// +//#define GGML_LOCK_INITIALIZER OS_UNFAIR_LOCK_INIT + +//typedef int ggml_lock_t; +// +//#define ggml_lock_init(x) UNUSED(x) +//#define ggml_lock_destroy(x) UNUSED(x) +//#define ggml_lock_lock(x) UNUSED(x) +//#define ggml_lock_unlock(x) UNUSED(x) +// +//#define GGML_LOCK_INITIALIZER 0 + +typedef pthread_mutex_t ggml_lock_t; +typedef pthread_cond_t ggml_cond_t; + +#define ggml_lock_init(x) pthread_mutex_init(x, NULL) +#define ggml_lock_destroy(x) pthread_mutex_destroy(x) +#define ggml_lock_lock pthread_mutex_lock +#define ggml_lock_unlock pthread_mutex_unlock +#define ggml_cond_init(x) pthread_cond_init(x, NULL) +#define ggml_cond_destroy(x) pthread_cond_destroy(x) +#define ggml_cond_wait pthread_cond_wait +#define ggml_cond_broadcast pthread_cond_broadcast + +#define GGML_LOCK_INITIALIZER PTHREAD_MUTEX_INITIALIZER +#define GGML_COND_INITIALIZER PTHREAD_COND_INITIALIZER + +#else + +//typedef pthread_spinlock_t ggml_lock_t; + +//#define ggml_lock_init(x) pthread_spin_init(x, PTHREAD_PROCESS_PRIVATE) +//#define ggml_lock_destroy pthread_spin_destroy +//#define ggml_lock_lock pthread_spin_lock +//#define ggml_lock_unlock pthread_spin_unlock + +//typedef int ggml_lock_t; +// +//#define ggml_lock_init(x) UNUSED(x) +//#define ggml_lock_destroy(x) UNUSED(x) +//#define ggml_lock_lock(x) UNUSED(x) +//#define ggml_lock_unlock(x) UNUSED(x) +// +//#define GGML_LOCK_INITIALIZER 0 + +typedef pthread_mutex_t ggml_lock_t; +typedef pthread_cond_t ggml_cond_t; + +#define ggml_lock_init(x) pthread_mutex_init(x, NULL) +#define ggml_lock_destroy(x) pthread_mutex_destroy(x) +#define ggml_lock_lock pthread_mutex_lock +#define ggml_lock_unlock pthread_mutex_unlock +#define ggml_cond_init(x) pthread_cond_init(x, NULL) +#define ggml_cond_destroy(x) pthread_cond_destroy(x) +#define ggml_cond_wait pthread_cond_wait +#define ggml_cond_broadcast pthread_cond_broadcast + +#define GGML_LOCK_INITIALIZER PTHREAD_MUTEX_INITIALIZER +#define GGML_COND_INITIALIZER PTHREAD_COND_INITIALIZER + +#endif + +typedef pthread_t ggml_thread_t; + +#define ggml_thread_create pthread_create +#define ggml_thread_join pthread_join + + // // cache line // @@ -1205,12 +1290,38 @@ struct ggml_compute_params { void * wdata; }; +struct ggml_compute_state_shared; + +struct ggml_compute_state { + ggml_thread_t thrd; + + struct ggml_compute_params params; + struct ggml_tensor * node; + + struct ggml_compute_state_shared * shared; +}; + +struct ggml_thread_pool { + bool is_used; + int n_threads; + int n_ready; + + atomic_bool stop; + + ggml_lock_t lock; + ggml_cond_t cond; + + struct ggml_compute_state states[GGML_MAX_THREADS]; +}; + // // ggml state // struct ggml_state { struct ggml_context_container contexts[GGML_MAX_CONTEXTS]; + + struct ggml_thread_pool th_pools[GGML_MAX_THREAD_POOLS]; }; // global state @@ -1393,12 +1504,25 @@ struct ggml_context * ggml_init(struct ggml_init_params params) { g_state = (struct ggml_state) { /*.contexts =*/ { 0 }, + /*.th_pools =*/ { 0 }, }; for (int i = 0; i < GGML_MAX_CONTEXTS; ++i) { g_state.contexts[i].used = false; } + for (int i = 0; i < GGML_MAX_THREAD_POOLS; ++i) { + g_state.th_pools[i] = (struct ggml_thread_pool) { + /*.is_used =*/ false, + /*.n_threads =*/ 0, + /*.n_ready =*/ 0, + /*.stop =*/ false, + /*.lock =*/ GGML_LOCK_INITIALIZER, + /*.cond =*/ GGML_COND_INITIALIZER, + /*.states =*/ { 0 }, + }; + } + const uint64_t t_end = ggml_time_us(); UNUSED(t_end); GGML_PRINT_DEBUG("%s: g_state initialized in %f ms\n", __func__, (t_end - t_start)/1000.0f); @@ -1450,7 +1574,8 @@ void ggml_free(struct ggml_context * ctx) { // make this function thread safe ggml_critical_section_start(); - bool found = false; + bool found = false; + bool is_last = true; for (int i = 0; i < GGML_MAX_CONTEXTS; i++) { if (&g_state.contexts[i].context == ctx) { @@ -1464,12 +1589,16 @@ void ggml_free(struct ggml_context * ctx) { } found = true; - break; + } else if (g_state.contexts[i].used) { + is_last = false; } } if (!found) { GGML_PRINT_DEBUG("%s: context not found\n", __func__); + } else if (is_last) { + // stop all threads + fprintf(stderr, "%s: stopping all threads XXXXXXXXX\n", __func__); } ggml_critical_section_end(); @@ -6858,126 +6987,91 @@ struct ggml_cgraph ggml_build_backward(struct ggml_context * ctx, struct ggml_cg } // -// thread data +// compute // -// synchronization is done via busy loops -// I tried using spin locks, but not sure how to use them correctly - the things I tried were slower than busy loops -// - -#ifdef __APPLE__ - -//#include - -//typedef os_unfair_lock ggml_lock_t; -// -//#define ggml_lock_init(x) UNUSED(x) -//#define ggml_lock_destroy(x) UNUSED(x) -//#define ggml_lock_lock os_unfair_lock_lock -//#define ggml_lock_unlock os_unfair_lock_unlock -// -//#define GGML_LOCK_INITIALIZER OS_UNFAIR_LOCK_INIT - -typedef int ggml_lock_t; - -#define ggml_lock_init(x) UNUSED(x) -#define ggml_lock_destroy(x) UNUSED(x) -#define ggml_lock_lock(x) UNUSED(x) -#define ggml_lock_unlock(x) UNUSED(x) - -#define GGML_LOCK_INITIALIZER 0 - -typedef pthread_t ggml_thread_t; - -#define ggml_thread_create pthread_create -#define ggml_thread_join pthread_join - -#else - -//typedef pthread_spinlock_t ggml_lock_t; - -//#define ggml_lock_init(x) pthread_spin_init(x, PTHREAD_PROCESS_PRIVATE) -//#define ggml_lock_destroy pthread_spin_destroy -//#define ggml_lock_lock pthread_spin_lock -//#define ggml_lock_unlock pthread_spin_unlock - -typedef int ggml_lock_t; - -#define ggml_lock_init(x) UNUSED(x) -#define ggml_lock_destroy(x) UNUSED(x) -#define ggml_lock_lock(x) UNUSED(x) -#define ggml_lock_unlock(x) UNUSED(x) - -#define GGML_LOCK_INITIALIZER 0 - -typedef pthread_t ggml_thread_t; - -#define ggml_thread_create pthread_create -#define ggml_thread_join pthread_join - -#endif struct ggml_compute_state_shared { - ggml_lock_t spin; - int n_threads; - // synchronization primitives + ggml_lock_t spin; + atomic_int n_ready; atomic_bool has_work; atomic_bool stop; // stop all threads -}; -struct ggml_compute_state { - ggml_thread_t thrd; - - struct ggml_compute_params params; - struct ggml_tensor * node; - - struct ggml_compute_state_shared * shared; + struct ggml_thread_pool * th_pool; }; +// function used by each compute thread static thread_ret_t ggml_graph_compute_thread(void * data) { struct ggml_compute_state * state = (struct ggml_compute_state *) data; - const int n_threads = state->shared->n_threads; - while (true) { - if (atomic_fetch_add(&state->shared->n_ready, 1) == n_threads - 1) { - atomic_store(&state->shared->has_work, false); - } else { - while (atomic_load(&state->shared->has_work)) { - if (atomic_load(&state->shared->stop)) { - return 0; - } - ggml_lock_lock (&state->shared->spin); - ggml_lock_unlock(&state->shared->spin); - } + ggml_lock_lock(&state->shared->th_pool->lock); + state->shared->th_pool->n_ready++; + ggml_cond_wait(&state->shared->th_pool->cond, &state->shared->th_pool->lock); + state->shared->th_pool->n_ready--; + ggml_lock_unlock(&state->shared->th_pool->lock); + + if (state->shared->th_pool->stop) { + break; } - atomic_fetch_sub(&state->shared->n_ready, 1); + const int n_threads = state->shared->n_threads; - // wait for work - while (!atomic_load(&state->shared->has_work)) { - if (atomic_load(&state->shared->stop)) { - return 0; - } - ggml_lock_lock (&state->shared->spin); - ggml_lock_unlock(&state->shared->spin); + if (state->params.ith > n_threads) { + continue; } - // check if we should stop - if (atomic_load(&state->shared->stop)) { - break; - } + while (true) { + // spin lock + { + if (atomic_fetch_add(&state->shared->n_ready, 1) == n_threads - 1) { + atomic_store(&state->shared->has_work, false); + } else { + while (atomic_load(&state->shared->has_work)) { + if (atomic_load(&state->shared->stop)) { + break; + } + //ggml_lock_lock (&state->shared->spin); + //ggml_lock_unlock(&state->shared->spin); + } - if (state->node) { - ggml_compute_forward(&state->params, state->node); - state->node = NULL; - } else { - break; + if (atomic_load(&state->shared->stop)) { + break; + } + } + + atomic_fetch_sub(&state->shared->n_ready, 1); + } + + // spin lock + { + // wait for work + while (!atomic_load(&state->shared->has_work)) { + if (atomic_load(&state->shared->stop)) { + break; + } + //ggml_lock_lock (&state->shared->spin); + //ggml_lock_unlock(&state->shared->spin); + } + + // check if we should stop + if (atomic_load(&state->shared->stop)) { + break; + } + } + + if (state->node) { + ggml_compute_forward(&state->params, state->node); + state->node = NULL; + } else { + break; + } } } + printf("thread %d exiting\n", state->params.ith); return 0; } @@ -6986,26 +7080,53 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph) cgraph->n_threads = 8; } + if (cgraph->n_threads > GGML_MAX_THREADS) { + cgraph->n_threads = GGML_MAX_THREADS; + } + const int n_threads = cgraph->n_threads; struct ggml_compute_state_shared state_shared = { - /*.spin =*/ GGML_LOCK_INITIALIZER, /*.n_threads =*/ n_threads, + /*.spin =*/ GGML_LOCK_INITIALIZER, /*.n_ready =*/ 0, /*.has_work =*/ false, /*.stop =*/ false, + /*.th_pool =*/ NULL, }; - struct ggml_compute_state * workers = n_threads > 1 ? alloca(sizeof(struct ggml_compute_state)*(n_threads - 1)) : NULL; - // create thread pool + struct ggml_compute_state * workers = NULL; + + // find thread pool that is not currently in use if (n_threads > 1) { - ggml_lock_init(&state_shared.spin); + ggml_critical_section_start(); - atomic_store(&state_shared.has_work, true); + for (int i = 0; i < GGML_MAX_THREAD_POOLS; ++i) { + if (g_state.th_pools[i].is_used) { + continue; + } + + state_shared.th_pool = &g_state.th_pools[i]; + state_shared.th_pool->is_used = true; + + break; + } + + ggml_critical_section_end(); + + if (!state_shared.th_pool) { + fprintf(stderr, "%s: no thread pool available for graph computation\n", __func__); + GGML_ASSERT(false); // TODO: maybe dynamically allocate threads in the future + return; + } + + workers = state_shared.th_pool->states; for (int j = 0; j < n_threads - 1; j++) { + const ggml_thread_t th_save = workers[j].thrd; + workers[j] = (struct ggml_compute_state) { - .thrd = 0, + .thrd = th_save, .params = { .type = GGML_TASK_COMPUTE, .ith = j + 1, @@ -7016,10 +7137,27 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph) .node = NULL, .shared = &state_shared, }; - int rc = ggml_thread_create(&workers[j].thrd, NULL, ggml_graph_compute_thread, &workers[j]); - assert(rc == 0); - UNUSED(rc); + + if (workers[j].thrd == 0) { + int rc = ggml_thread_create(&workers[j].thrd, NULL, ggml_graph_compute_thread, &workers[j]); + assert(rc == 0); + UNUSED(rc); + + state_shared.th_pool->n_threads++; + } } + + ggml_lock_lock(&state_shared.th_pool->lock); + while (state_shared.th_pool->n_ready < state_shared.th_pool->n_threads) { + ggml_lock_unlock(&state_shared.th_pool->lock); + ggml_lock_lock (&state_shared.th_pool->lock); + // busy loop + } + + atomic_store(&state_shared.has_work, true); + + ggml_cond_broadcast(&state_shared.th_pool->cond); + ggml_lock_unlock (&state_shared.th_pool->lock); } // initialize tasks + work buffer @@ -7235,13 +7373,16 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph) // COMPUTE if (node->n_tasks > 1) { - if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) { - atomic_store(&state_shared.has_work, false); - } + // spin lock + { + if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) { + atomic_store(&state_shared.has_work, false); + } - while (atomic_load(&state_shared.has_work)) { - ggml_lock_lock (&state_shared.spin); - ggml_lock_unlock(&state_shared.spin); + while (atomic_load(&state_shared.has_work)) { + //ggml_lock_lock (&state_shared.spin); + //ggml_lock_unlock(&state_shared.spin); + } } // launch thread pool @@ -7256,14 +7397,17 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph) workers[j].node = node; } - atomic_fetch_sub(&state_shared.n_ready, 1); + // spin lock + { + atomic_fetch_sub(&state_shared.n_ready, 1); + + while (atomic_load(&state_shared.n_ready) > 0) { + //ggml_lock_lock (&state_shared.spin); + //ggml_lock_unlock(&state_shared.spin); + } - while (atomic_load(&state_shared.n_ready) > 0) { - ggml_lock_lock (&state_shared.spin); - ggml_lock_unlock(&state_shared.spin); + atomic_store(&state_shared.has_work, true); } - - atomic_store(&state_shared.has_work, true); } params.type = GGML_TASK_COMPUTE; @@ -7271,32 +7415,38 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph) // wait for thread pool if (node->n_tasks > 1) { - if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) { - atomic_store(&state_shared.has_work, false); - } + // spin lock + { + if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) { + atomic_store(&state_shared.has_work, false); + } - while (atomic_load(&state_shared.has_work)) { - ggml_lock_lock (&state_shared.spin); - ggml_lock_unlock(&state_shared.spin); - } + while (atomic_load(&state_shared.has_work)) { + //ggml_lock_lock (&state_shared.spin); + //ggml_lock_unlock(&state_shared.spin); + } - atomic_fetch_sub(&state_shared.n_ready, 1); + atomic_fetch_sub(&state_shared.n_ready, 1); - while (atomic_load(&state_shared.n_ready) != 0) { - ggml_lock_lock (&state_shared.spin); - ggml_lock_unlock(&state_shared.spin); + while (atomic_load(&state_shared.n_ready) != 0) { + //ggml_lock_lock (&state_shared.spin); + //ggml_lock_unlock(&state_shared.spin); + } } } // FINALIZE if (node->n_tasks > 1) { - if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) { - atomic_store(&state_shared.has_work, false); - } + // spin lock + { + if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) { + atomic_store(&state_shared.has_work, false); + } - while (atomic_load(&state_shared.has_work)) { - ggml_lock_lock (&state_shared.spin); - ggml_lock_unlock(&state_shared.spin); + while (atomic_load(&state_shared.has_work)) { + //ggml_lock_lock (&state_shared.spin); + //ggml_lock_unlock(&state_shared.spin); + } } // launch thread pool @@ -7311,14 +7461,17 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph) workers[j].node = node; } - atomic_fetch_sub(&state_shared.n_ready, 1); + // spin lock + { + atomic_fetch_sub(&state_shared.n_ready, 1); - while (atomic_load(&state_shared.n_ready) > 0) { - ggml_lock_lock (&state_shared.spin); - ggml_lock_unlock(&state_shared.spin); - } + while (atomic_load(&state_shared.n_ready) > 0) { + //ggml_lock_lock (&state_shared.spin); + //ggml_lock_unlock(&state_shared.spin); + } - atomic_store(&state_shared.has_work, true); + atomic_store(&state_shared.has_work, true); + } } params.type = GGML_TASK_FINALIZE; @@ -7326,20 +7479,23 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph) // wait for thread pool if (node->n_tasks > 1) { - if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) { - atomic_store(&state_shared.has_work, false); - } + // spin lock + { + if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) { + atomic_store(&state_shared.has_work, false); + } - while (atomic_load(&state_shared.has_work)) { - ggml_lock_lock (&state_shared.spin); - ggml_lock_unlock(&state_shared.spin); - } + while (atomic_load(&state_shared.has_work)) { + //ggml_lock_lock (&state_shared.spin); + //ggml_lock_unlock(&state_shared.spin); + } - atomic_fetch_sub(&state_shared.n_ready, 1); + atomic_fetch_sub(&state_shared.n_ready, 1); - while (atomic_load(&state_shared.n_ready) != 0) { - ggml_lock_lock (&state_shared.spin); - ggml_lock_unlock(&state_shared.spin); + while (atomic_load(&state_shared.n_ready) != 0) { + //ggml_lock_lock (&state_shared.spin); + //ggml_lock_unlock(&state_shared.spin); + } } } @@ -7356,16 +7512,31 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph) // join thread pool if (n_threads > 1) { - atomic_store(&state_shared.stop, true); - atomic_store(&state_shared.has_work, true); + // spin lock + { + atomic_store(&state_shared.stop, true); + atomic_store(&state_shared.has_work, true); + } - for (int j = 0; j < n_threads - 1; j++) { - int rc = ggml_thread_join(workers[j].thrd, NULL); - assert(rc == 0); - UNUSED(rc); + //for (int j = 0; j < n_threads - 1; j++) { + // int rc = ggml_thread_join(workers[j].thrd, NULL); + // assert(rc == 0); + // UNUSED(rc); + //} + + ggml_lock_lock(&state_shared.th_pool->lock); + while (state_shared.th_pool->n_ready < state_shared.th_pool->n_threads) { + ggml_lock_unlock(&state_shared.th_pool->lock); + ggml_lock_lock (&state_shared.th_pool->lock); + // busy loop } + ggml_lock_unlock(&state_shared.th_pool->lock); - ggml_lock_destroy(&state_shared.spin); + ggml_critical_section_start(); + + state_shared.th_pool->is_used = false; + + ggml_critical_section_end(); } // performance stats (graph) diff --git a/ggml.h b/ggml.h index a217d2d..3c706e9 100644 --- a/ggml.h +++ b/ggml.h @@ -177,11 +177,13 @@ extern "C" { #include #include -#define GGML_MAX_DIMS 4 -#define GGML_MAX_NODES 4096 -#define GGML_MAX_PARAMS 16 -#define GGML_MAX_CONTEXTS 64 -#define GGML_MAX_OPT 4 +#define GGML_MAX_DIMS 4 +#define GGML_MAX_NODES 4096 +#define GGML_MAX_PARAMS 16 +#define GGML_MAX_CONTEXTS 64 +#define GGML_MAX_OPT 4 +#define GGML_MAX_THREADS 64 +#define GGML_MAX_THREAD_POOLS 16 #ifdef __ARM_NEON // we use the built-in 16-bit float type