Compare commits

...

1 Commits

Author SHA1 Message Date
Georgi Gerganov 4e6d2e98ab
ggml : try to improve threading
2 years ago

491
ggml.c

@ -303,6 +303,91 @@ int64_t ggml_cycles_per_ms(void) {
#define ggml_perf_cycles_per_ms() 0
#endif
//
// thread data
//
// synchronization is done via busy loops
// I tried using spin locks, but not sure how to use them correctly - the things I tried were slower than busy loops
//
#ifdef __APPLE__
//#include <os/lock.h>
//typedef os_unfair_lock ggml_lock_t;
//
//#define ggml_lock_init(x) UNUSED(x)
//#define ggml_lock_destroy(x) UNUSED(x)
//#define ggml_lock_lock os_unfair_lock_lock
//#define ggml_lock_unlock os_unfair_lock_unlock
//
//#define GGML_LOCK_INITIALIZER OS_UNFAIR_LOCK_INIT
//typedef int ggml_lock_t;
//
//#define ggml_lock_init(x) UNUSED(x)
//#define ggml_lock_destroy(x) UNUSED(x)
//#define ggml_lock_lock(x) UNUSED(x)
//#define ggml_lock_unlock(x) UNUSED(x)
//
//#define GGML_LOCK_INITIALIZER 0
typedef pthread_mutex_t ggml_lock_t;
typedef pthread_cond_t ggml_cond_t;
#define ggml_lock_init(x) pthread_mutex_init(x, NULL)
#define ggml_lock_destroy(x) pthread_mutex_destroy(x)
#define ggml_lock_lock pthread_mutex_lock
#define ggml_lock_unlock pthread_mutex_unlock
#define ggml_cond_init(x) pthread_cond_init(x, NULL)
#define ggml_cond_destroy(x) pthread_cond_destroy(x)
#define ggml_cond_wait pthread_cond_wait
#define ggml_cond_broadcast pthread_cond_broadcast
#define GGML_LOCK_INITIALIZER PTHREAD_MUTEX_INITIALIZER
#define GGML_COND_INITIALIZER PTHREAD_COND_INITIALIZER
#else
//typedef pthread_spinlock_t ggml_lock_t;
//#define ggml_lock_init(x) pthread_spin_init(x, PTHREAD_PROCESS_PRIVATE)
//#define ggml_lock_destroy pthread_spin_destroy
//#define ggml_lock_lock pthread_spin_lock
//#define ggml_lock_unlock pthread_spin_unlock
//typedef int ggml_lock_t;
//
//#define ggml_lock_init(x) UNUSED(x)
//#define ggml_lock_destroy(x) UNUSED(x)
//#define ggml_lock_lock(x) UNUSED(x)
//#define ggml_lock_unlock(x) UNUSED(x)
//
//#define GGML_LOCK_INITIALIZER 0
typedef pthread_mutex_t ggml_lock_t;
typedef pthread_cond_t ggml_cond_t;
#define ggml_lock_init(x) pthread_mutex_init(x, NULL)
#define ggml_lock_destroy(x) pthread_mutex_destroy(x)
#define ggml_lock_lock pthread_mutex_lock
#define ggml_lock_unlock pthread_mutex_unlock
#define ggml_cond_init(x) pthread_cond_init(x, NULL)
#define ggml_cond_destroy(x) pthread_cond_destroy(x)
#define ggml_cond_wait pthread_cond_wait
#define ggml_cond_broadcast pthread_cond_broadcast
#define GGML_LOCK_INITIALIZER PTHREAD_MUTEX_INITIALIZER
#define GGML_COND_INITIALIZER PTHREAD_COND_INITIALIZER
#endif
typedef pthread_t ggml_thread_t;
#define ggml_thread_create pthread_create
#define ggml_thread_join pthread_join
//
// cache line
//
@ -1205,12 +1290,38 @@ struct ggml_compute_params {
void * wdata;
};
struct ggml_compute_state_shared;
struct ggml_compute_state {
ggml_thread_t thrd;
struct ggml_compute_params params;
struct ggml_tensor * node;
struct ggml_compute_state_shared * shared;
};
struct ggml_thread_pool {
bool is_used;
int n_threads;
int n_ready;
atomic_bool stop;
ggml_lock_t lock;
ggml_cond_t cond;
struct ggml_compute_state states[GGML_MAX_THREADS];
};
//
// ggml state
//
struct ggml_state {
struct ggml_context_container contexts[GGML_MAX_CONTEXTS];
struct ggml_thread_pool th_pools[GGML_MAX_THREAD_POOLS];
};
// global state
@ -1393,12 +1504,25 @@ struct ggml_context * ggml_init(struct ggml_init_params params) {
g_state = (struct ggml_state) {
/*.contexts =*/ { 0 },
/*.th_pools =*/ { 0 },
};
for (int i = 0; i < GGML_MAX_CONTEXTS; ++i) {
g_state.contexts[i].used = false;
}
for (int i = 0; i < GGML_MAX_THREAD_POOLS; ++i) {
g_state.th_pools[i] = (struct ggml_thread_pool) {
/*.is_used =*/ false,
/*.n_threads =*/ 0,
/*.n_ready =*/ 0,
/*.stop =*/ false,
/*.lock =*/ GGML_LOCK_INITIALIZER,
/*.cond =*/ GGML_COND_INITIALIZER,
/*.states =*/ { 0 },
};
}
const uint64_t t_end = ggml_time_us(); UNUSED(t_end);
GGML_PRINT_DEBUG("%s: g_state initialized in %f ms\n", __func__, (t_end - t_start)/1000.0f);
@ -1450,7 +1574,8 @@ void ggml_free(struct ggml_context * ctx) {
// make this function thread safe
ggml_critical_section_start();
bool found = false;
bool found = false;
bool is_last = true;
for (int i = 0; i < GGML_MAX_CONTEXTS; i++) {
if (&g_state.contexts[i].context == ctx) {
@ -1464,12 +1589,16 @@ void ggml_free(struct ggml_context * ctx) {
}
found = true;
break;
} else if (g_state.contexts[i].used) {
is_last = false;
}
}
if (!found) {
GGML_PRINT_DEBUG("%s: context not found\n", __func__);
} else if (is_last) {
// stop all threads
fprintf(stderr, "%s: stopping all threads XXXXXXXXX\n", __func__);
}
ggml_critical_section_end();
@ -6858,126 +6987,91 @@ struct ggml_cgraph ggml_build_backward(struct ggml_context * ctx, struct ggml_cg
}
//
// thread data
// compute
//
// synchronization is done via busy loops
// I tried using spin locks, but not sure how to use them correctly - the things I tried were slower than busy loops
//
#ifdef __APPLE__
//#include <os/lock.h>
//typedef os_unfair_lock ggml_lock_t;
//
//#define ggml_lock_init(x) UNUSED(x)
//#define ggml_lock_destroy(x) UNUSED(x)
//#define ggml_lock_lock os_unfair_lock_lock
//#define ggml_lock_unlock os_unfair_lock_unlock
//
//#define GGML_LOCK_INITIALIZER OS_UNFAIR_LOCK_INIT
typedef int ggml_lock_t;
#define ggml_lock_init(x) UNUSED(x)
#define ggml_lock_destroy(x) UNUSED(x)
#define ggml_lock_lock(x) UNUSED(x)
#define ggml_lock_unlock(x) UNUSED(x)
#define GGML_LOCK_INITIALIZER 0
typedef pthread_t ggml_thread_t;
#define ggml_thread_create pthread_create
#define ggml_thread_join pthread_join
#else
//typedef pthread_spinlock_t ggml_lock_t;
//#define ggml_lock_init(x) pthread_spin_init(x, PTHREAD_PROCESS_PRIVATE)
//#define ggml_lock_destroy pthread_spin_destroy
//#define ggml_lock_lock pthread_spin_lock
//#define ggml_lock_unlock pthread_spin_unlock
typedef int ggml_lock_t;
#define ggml_lock_init(x) UNUSED(x)
#define ggml_lock_destroy(x) UNUSED(x)
#define ggml_lock_lock(x) UNUSED(x)
#define ggml_lock_unlock(x) UNUSED(x)
#define GGML_LOCK_INITIALIZER 0
typedef pthread_t ggml_thread_t;
#define ggml_thread_create pthread_create
#define ggml_thread_join pthread_join
#endif
struct ggml_compute_state_shared {
ggml_lock_t spin;
int n_threads;
// synchronization primitives
ggml_lock_t spin;
atomic_int n_ready;
atomic_bool has_work;
atomic_bool stop; // stop all threads
};
struct ggml_compute_state {
ggml_thread_t thrd;
struct ggml_compute_params params;
struct ggml_tensor * node;
struct ggml_compute_state_shared * shared;
struct ggml_thread_pool * th_pool;
};
// function used by each compute thread
static thread_ret_t ggml_graph_compute_thread(void * data) {
struct ggml_compute_state * state = (struct ggml_compute_state *) data;
const int n_threads = state->shared->n_threads;
while (true) {
if (atomic_fetch_add(&state->shared->n_ready, 1) == n_threads - 1) {
atomic_store(&state->shared->has_work, false);
} else {
while (atomic_load(&state->shared->has_work)) {
if (atomic_load(&state->shared->stop)) {
return 0;
}
ggml_lock_lock (&state->shared->spin);
ggml_lock_unlock(&state->shared->spin);
}
ggml_lock_lock(&state->shared->th_pool->lock);
state->shared->th_pool->n_ready++;
ggml_cond_wait(&state->shared->th_pool->cond, &state->shared->th_pool->lock);
state->shared->th_pool->n_ready--;
ggml_lock_unlock(&state->shared->th_pool->lock);
if (state->shared->th_pool->stop) {
break;
}
atomic_fetch_sub(&state->shared->n_ready, 1);
const int n_threads = state->shared->n_threads;
// wait for work
while (!atomic_load(&state->shared->has_work)) {
if (atomic_load(&state->shared->stop)) {
return 0;
}
ggml_lock_lock (&state->shared->spin);
ggml_lock_unlock(&state->shared->spin);
if (state->params.ith > n_threads) {
continue;
}
// check if we should stop
if (atomic_load(&state->shared->stop)) {
break;
}
while (true) {
// spin lock
{
if (atomic_fetch_add(&state->shared->n_ready, 1) == n_threads - 1) {
atomic_store(&state->shared->has_work, false);
} else {
while (atomic_load(&state->shared->has_work)) {
if (atomic_load(&state->shared->stop)) {
break;
}
//ggml_lock_lock (&state->shared->spin);
//ggml_lock_unlock(&state->shared->spin);
}
if (state->node) {
ggml_compute_forward(&state->params, state->node);
state->node = NULL;
} else {
break;
if (atomic_load(&state->shared->stop)) {
break;
}
}
atomic_fetch_sub(&state->shared->n_ready, 1);
}
// spin lock
{
// wait for work
while (!atomic_load(&state->shared->has_work)) {
if (atomic_load(&state->shared->stop)) {
break;
}
//ggml_lock_lock (&state->shared->spin);
//ggml_lock_unlock(&state->shared->spin);
}
// check if we should stop
if (atomic_load(&state->shared->stop)) {
break;
}
}
if (state->node) {
ggml_compute_forward(&state->params, state->node);
state->node = NULL;
} else {
break;
}
}
}
printf("thread %d exiting\n", state->params.ith);
return 0;
}
@ -6986,26 +7080,53 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph)
cgraph->n_threads = 8;
}
if (cgraph->n_threads > GGML_MAX_THREADS) {
cgraph->n_threads = GGML_MAX_THREADS;
}
const int n_threads = cgraph->n_threads;
struct ggml_compute_state_shared state_shared = {
/*.spin =*/ GGML_LOCK_INITIALIZER,
/*.n_threads =*/ n_threads,
/*.spin =*/ GGML_LOCK_INITIALIZER,
/*.n_ready =*/ 0,
/*.has_work =*/ false,
/*.stop =*/ false,
/*.th_pool =*/ NULL,
};
struct ggml_compute_state * workers = n_threads > 1 ? alloca(sizeof(struct ggml_compute_state)*(n_threads - 1)) : NULL;
// create thread pool
struct ggml_compute_state * workers = NULL;
// find thread pool that is not currently in use
if (n_threads > 1) {
ggml_lock_init(&state_shared.spin);
ggml_critical_section_start();
atomic_store(&state_shared.has_work, true);
for (int i = 0; i < GGML_MAX_THREAD_POOLS; ++i) {
if (g_state.th_pools[i].is_used) {
continue;
}
state_shared.th_pool = &g_state.th_pools[i];
state_shared.th_pool->is_used = true;
break;
}
ggml_critical_section_end();
if (!state_shared.th_pool) {
fprintf(stderr, "%s: no thread pool available for graph computation\n", __func__);
GGML_ASSERT(false); // TODO: maybe dynamically allocate threads in the future
return;
}
workers = state_shared.th_pool->states;
for (int j = 0; j < n_threads - 1; j++) {
const ggml_thread_t th_save = workers[j].thrd;
workers[j] = (struct ggml_compute_state) {
.thrd = 0,
.thrd = th_save,
.params = {
.type = GGML_TASK_COMPUTE,
.ith = j + 1,
@ -7016,10 +7137,27 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph)
.node = NULL,
.shared = &state_shared,
};
int rc = ggml_thread_create(&workers[j].thrd, NULL, ggml_graph_compute_thread, &workers[j]);
assert(rc == 0);
UNUSED(rc);
if (workers[j].thrd == 0) {
int rc = ggml_thread_create(&workers[j].thrd, NULL, ggml_graph_compute_thread, &workers[j]);
assert(rc == 0);
UNUSED(rc);
state_shared.th_pool->n_threads++;
}
}
ggml_lock_lock(&state_shared.th_pool->lock);
while (state_shared.th_pool->n_ready < state_shared.th_pool->n_threads) {
ggml_lock_unlock(&state_shared.th_pool->lock);
ggml_lock_lock (&state_shared.th_pool->lock);
// busy loop
}
atomic_store(&state_shared.has_work, true);
ggml_cond_broadcast(&state_shared.th_pool->cond);
ggml_lock_unlock (&state_shared.th_pool->lock);
}
// initialize tasks + work buffer
@ -7235,13 +7373,16 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph)
// COMPUTE
if (node->n_tasks > 1) {
if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) {
atomic_store(&state_shared.has_work, false);
}
// spin lock
{
if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) {
atomic_store(&state_shared.has_work, false);
}
while (atomic_load(&state_shared.has_work)) {
ggml_lock_lock (&state_shared.spin);
ggml_lock_unlock(&state_shared.spin);
while (atomic_load(&state_shared.has_work)) {
//ggml_lock_lock (&state_shared.spin);
//ggml_lock_unlock(&state_shared.spin);
}
}
// launch thread pool
@ -7256,14 +7397,17 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph)
workers[j].node = node;
}
atomic_fetch_sub(&state_shared.n_ready, 1);
// spin lock
{
atomic_fetch_sub(&state_shared.n_ready, 1);
while (atomic_load(&state_shared.n_ready) > 0) {
//ggml_lock_lock (&state_shared.spin);
//ggml_lock_unlock(&state_shared.spin);
}
while (atomic_load(&state_shared.n_ready) > 0) {
ggml_lock_lock (&state_shared.spin);
ggml_lock_unlock(&state_shared.spin);
atomic_store(&state_shared.has_work, true);
}
atomic_store(&state_shared.has_work, true);
}
params.type = GGML_TASK_COMPUTE;
@ -7271,32 +7415,38 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph)
// wait for thread pool
if (node->n_tasks > 1) {
if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) {
atomic_store(&state_shared.has_work, false);
}
// spin lock
{
if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) {
atomic_store(&state_shared.has_work, false);
}
while (atomic_load(&state_shared.has_work)) {
ggml_lock_lock (&state_shared.spin);
ggml_lock_unlock(&state_shared.spin);
}
while (atomic_load(&state_shared.has_work)) {
//ggml_lock_lock (&state_shared.spin);
//ggml_lock_unlock(&state_shared.spin);
}
atomic_fetch_sub(&state_shared.n_ready, 1);
atomic_fetch_sub(&state_shared.n_ready, 1);
while (atomic_load(&state_shared.n_ready) != 0) {
ggml_lock_lock (&state_shared.spin);
ggml_lock_unlock(&state_shared.spin);
while (atomic_load(&state_shared.n_ready) != 0) {
//ggml_lock_lock (&state_shared.spin);
//ggml_lock_unlock(&state_shared.spin);
}
}
}
// FINALIZE
if (node->n_tasks > 1) {
if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) {
atomic_store(&state_shared.has_work, false);
}
// spin lock
{
if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) {
atomic_store(&state_shared.has_work, false);
}
while (atomic_load(&state_shared.has_work)) {
ggml_lock_lock (&state_shared.spin);
ggml_lock_unlock(&state_shared.spin);
while (atomic_load(&state_shared.has_work)) {
//ggml_lock_lock (&state_shared.spin);
//ggml_lock_unlock(&state_shared.spin);
}
}
// launch thread pool
@ -7311,14 +7461,17 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph)
workers[j].node = node;
}
atomic_fetch_sub(&state_shared.n_ready, 1);
// spin lock
{
atomic_fetch_sub(&state_shared.n_ready, 1);
while (atomic_load(&state_shared.n_ready) > 0) {
ggml_lock_lock (&state_shared.spin);
ggml_lock_unlock(&state_shared.spin);
}
while (atomic_load(&state_shared.n_ready) > 0) {
//ggml_lock_lock (&state_shared.spin);
//ggml_lock_unlock(&state_shared.spin);
}
atomic_store(&state_shared.has_work, true);
atomic_store(&state_shared.has_work, true);
}
}
params.type = GGML_TASK_FINALIZE;
@ -7326,20 +7479,23 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph)
// wait for thread pool
if (node->n_tasks > 1) {
if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) {
atomic_store(&state_shared.has_work, false);
}
// spin lock
{
if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) {
atomic_store(&state_shared.has_work, false);
}
while (atomic_load(&state_shared.has_work)) {
ggml_lock_lock (&state_shared.spin);
ggml_lock_unlock(&state_shared.spin);
}
while (atomic_load(&state_shared.has_work)) {
//ggml_lock_lock (&state_shared.spin);
//ggml_lock_unlock(&state_shared.spin);
}
atomic_fetch_sub(&state_shared.n_ready, 1);
atomic_fetch_sub(&state_shared.n_ready, 1);
while (atomic_load(&state_shared.n_ready) != 0) {
ggml_lock_lock (&state_shared.spin);
ggml_lock_unlock(&state_shared.spin);
while (atomic_load(&state_shared.n_ready) != 0) {
//ggml_lock_lock (&state_shared.spin);
//ggml_lock_unlock(&state_shared.spin);
}
}
}
@ -7356,16 +7512,31 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph)
// join thread pool
if (n_threads > 1) {
atomic_store(&state_shared.stop, true);
atomic_store(&state_shared.has_work, true);
// spin lock
{
atomic_store(&state_shared.stop, true);
atomic_store(&state_shared.has_work, true);
}
for (int j = 0; j < n_threads - 1; j++) {
int rc = ggml_thread_join(workers[j].thrd, NULL);
assert(rc == 0);
UNUSED(rc);
//for (int j = 0; j < n_threads - 1; j++) {
// int rc = ggml_thread_join(workers[j].thrd, NULL);
// assert(rc == 0);
// UNUSED(rc);
//}
ggml_lock_lock(&state_shared.th_pool->lock);
while (state_shared.th_pool->n_ready < state_shared.th_pool->n_threads) {
ggml_lock_unlock(&state_shared.th_pool->lock);
ggml_lock_lock (&state_shared.th_pool->lock);
// busy loop
}
ggml_lock_unlock(&state_shared.th_pool->lock);
ggml_lock_destroy(&state_shared.spin);
ggml_critical_section_start();
state_shared.th_pool->is_used = false;
ggml_critical_section_end();
}
// performance stats (graph)

@ -177,11 +177,13 @@ extern "C" {
#include <stddef.h>
#include <stdbool.h>
#define GGML_MAX_DIMS 4
#define GGML_MAX_NODES 4096
#define GGML_MAX_PARAMS 16
#define GGML_MAX_CONTEXTS 64
#define GGML_MAX_OPT 4
#define GGML_MAX_DIMS 4
#define GGML_MAX_NODES 4096
#define GGML_MAX_PARAMS 16
#define GGML_MAX_CONTEXTS 64
#define GGML_MAX_OPT 4
#define GGML_MAX_THREADS 64
#define GGML_MAX_THREAD_POOLS 16
#ifdef __ARM_NEON
// we use the built-in 16-bit float type

Loading…
Cancel
Save