Skip to content

Commit

Permalink
mp_thread: prefer tracking threads with id
Browse files Browse the repository at this point in the history
This change essentially removes mp_thread_self() and instead add
mp_thread_id to track threads and have ability to query current thread
id during runtime.

This will be useful for upcoming win32 implementation, where accessing
thread handle is different than on pthreads. Greatly reduces complexity.
Otherweis locked map of tid <-> handle is required which is completely
unnecessary for all mpv use-cases.

Note that this is the mp_thread_id, not to confuse with system tid. For
example on threads-posix implementation it is simply pthread_t.
  • Loading branch information
kasper93 authored and Dudemanguy committed Nov 5, 2023
1 parent 174df99 commit 55ed50b
Show file tree
Hide file tree
Showing 13 changed files with 43 additions and 40 deletions.
10 changes: 5 additions & 5 deletions common/stats.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ struct stat_entry {
int64_t val_th;
int64_t time_start_ns;
int64_t cpu_start_ns;
mp_thread thread;
mp_thread_id thread_id;
};

#define IS_ACTIVE(ctx) \
Expand Down Expand Up @@ -181,7 +181,7 @@ void stats_global_query(struct mpv_global *global, struct mpv_node *out)
break;
}
case VAL_THREAD_CPU_TIME: {
int64_t t = mp_thread_cpu_time_ns(e->thread);
int64_t t = mp_thread_cpu_time_ns(e->thread_id);
if (!e->cpu_start_ns)
e->cpu_start_ns = t;
double t_msec = MP_TIME_NS_TO_MS(t - e->cpu_start_ns);
Expand Down Expand Up @@ -273,7 +273,7 @@ void stats_time_start(struct stats_ctx *ctx, const char *name)
return;
mp_mutex_lock(&ctx->base->lock);
struct stat_entry *e = find_entry(ctx, name);
e->cpu_start_ns = mp_thread_cpu_time_ns(mp_thread_self());
e->cpu_start_ns = mp_thread_cpu_time_ns(mp_thread_current_id());
e->time_start_ns = mp_time_ns();
mp_mutex_unlock(&ctx->base->lock);
}
Expand All @@ -288,7 +288,7 @@ void stats_time_end(struct stats_ctx *ctx, const char *name)
if (e->time_start_ns) {
e->type = VAL_TIME;
e->val_rt += mp_time_ns() - e->time_start_ns;
e->val_th += mp_thread_cpu_time_ns(mp_thread_self()) - e->cpu_start_ns;
e->val_th += mp_thread_cpu_time_ns(mp_thread_current_id()) - e->cpu_start_ns;
e->time_start_ns = 0;
}
mp_mutex_unlock(&ctx->base->lock);
Expand All @@ -311,7 +311,7 @@ static void register_thread(struct stats_ctx *ctx, const char *name,
mp_mutex_lock(&ctx->base->lock);
struct stat_entry *e = find_entry(ctx, name);
e->type = type;
e->thread = mp_thread_self();
e->thread_id = mp_thread_current_id();
mp_mutex_unlock(&ctx->base->lock);
}

Expand Down
2 changes: 1 addition & 1 deletion common/stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,5 @@ void stats_event(struct stats_ctx *ctx, const char *name);
// or stats_unregister_thread() is called, otherwise UB will occur.
void stats_register_thread_cputime(struct stats_ctx *ctx, const char *name);

// Remove reference to mp_thread_self().
// Remove reference to the current thread.
void stats_unregister_thread(struct stats_ctx *ctx, const char *name);
2 changes: 1 addition & 1 deletion input/input.c
Original file line number Diff line number Diff line change
Expand Up @@ -1610,7 +1610,7 @@ void mp_input_src_init_done(struct mp_input_src *src)
{
assert(!src->in->init_done);
assert(src->in->thread_running);
assert(mp_thread_equal(src->in->thread, mp_thread_self()));
assert(mp_thread_id_equal(mp_thread_get_id(src->in->thread), mp_thread_current_id()));
src->in->init_done = true;
mp_rendezvous(&src->in->init_done, 0);
}
Expand Down
3 changes: 1 addition & 2 deletions input/ipc-unix.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,6 @@ static int ipc_write_str(struct client_arg *client, const char *buf)

static MP_THREAD_VOID client_thread(void *p)
{
pthread_detach(mp_thread_self());

// We don't use MSG_NOSIGNAL because the moldy fruit OS doesn't support it.
struct sigaction sa = { .sa_handler = SIG_IGN, .sa_flags = SA_RESTART };
sigfillset(&sa.sa_mask);
Expand Down Expand Up @@ -234,6 +232,7 @@ static bool ipc_start_client(struct mp_ipc_ctx *ctx, struct client_arg *client,
mp_thread client_thr;
if (mp_thread_create(&client_thr, client_thread, client))
goto err;
mp_thread_detach(client_thr);

return true;

Expand Down
3 changes: 1 addition & 2 deletions input/ipc-win.c
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,6 @@ static void report_read_error(struct client_arg *arg, DWORD error)

static MP_THREAD_VOID client_thread(void *p)
{
pthread_detach(mp_thread_self());

struct client_arg *arg = p;
char buf[4096];
HANDLE wakeup_event = CreateEventW(NULL, TRUE, FALSE, NULL);
Expand Down Expand Up @@ -321,6 +319,7 @@ static void ipc_start_client(struct mp_ipc_ctx *ctx, struct client_arg *client)
CloseHandle(client->client_h);
talloc_free(client);
}
mp_thread_detach(client_thr);
}

static void ipc_start_client_json(struct mp_ipc_ctx *ctx, int id, HANDLE h)
Expand Down
14 changes: 7 additions & 7 deletions misc/dispatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ struct mp_dispatch_queue {
// The target thread is in mp_dispatch_queue_process() (and either idling,
// locked, or running a dispatch callback).
bool in_process;
mp_thread in_process_thread;
mp_thread_id in_process_thread_id;
// The target thread is in mp_dispatch_queue_process(), and currently
// something has exclusive access to it (e.g. running a dispatch callback,
// or a different thread got it with mp_dispatch_lock()).
Expand All @@ -48,7 +48,7 @@ struct mp_dispatch_queue {
size_t lock_requests;
// locked==true is due to a mp_dispatch_lock() call (for debugging).
bool locked_explicit;
mp_thread locked_explicit_thread;
mp_thread_id locked_explicit_thread_id;
};

struct mp_dispatch_item {
Expand Down Expand Up @@ -275,7 +275,7 @@ void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout)
queue->wait = timeout > 0 ? mp_time_ns_add(mp_time_ns(), timeout) : 0;
assert(!queue->in_process); // recursion not allowed
queue->in_process = true;
queue->in_process_thread = mp_thread_self();
queue->in_process_thread_id = mp_thread_current_id();
// Wake up thread which called mp_dispatch_lock().
if (queue->lock_requests)
mp_cond_broadcast(&queue->cond);
Expand Down Expand Up @@ -366,10 +366,10 @@ void mp_dispatch_lock(struct mp_dispatch_queue *queue)
mp_mutex_lock(&queue->lock);
// Must not be called recursively from dispatched callbacks.
if (queue->in_process)
assert(!mp_thread_equal(queue->in_process_thread, mp_thread_self()));
assert(!mp_thread_id_equal(queue->in_process_thread_id, mp_thread_current_id()));
// Must not be called recursively at all.
if (queue->locked_explicit)
assert(!mp_thread_equal(queue->locked_explicit_thread, mp_thread_self()));
assert(!mp_thread_id_equal(queue->locked_explicit_thread_id, mp_thread_current_id()));
queue->lock_requests += 1;
// And now wait until the target thread gets "trapped" within the
// mp_dispatch_queue_process() call, which will mean we get exclusive
Expand All @@ -394,7 +394,7 @@ void mp_dispatch_lock(struct mp_dispatch_queue *queue)
assert(!queue->locked_explicit);
queue->locked = true;
queue->locked_explicit = true;
queue->locked_explicit_thread = mp_thread_self();
queue->locked_explicit_thread_id = mp_thread_current_id();
mp_mutex_unlock(&queue->lock);
}

Expand All @@ -405,7 +405,7 @@ void mp_dispatch_unlock(struct mp_dispatch_queue *queue)
assert(queue->locked);
// Must be called after a mp_dispatch_lock(), from the same thread.
assert(queue->locked_explicit);
assert(mp_thread_equal(queue->locked_explicit_thread, mp_thread_self()));
assert(mp_thread_id_equal(queue->locked_explicit_thread_id, mp_thread_current_id()));
// "Unlock".
queue->locked = false;
queue->locked_explicit = false;
Expand Down
6 changes: 4 additions & 2 deletions misc/thread_pool.c
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,10 @@ static MP_THREAD_VOID worker_thread(void *arg)
// timeout, and nobody is waiting for us. We have to remove ourselves.
if (!pool->terminate) {
for (int n = 0; n < pool->num_threads; n++) {
if (mp_thread_equal(pool->threads[n], mp_thread_self())) {
pthread_detach(mp_thread_self());
if (mp_thread_id_equal(mp_thread_get_id(pool->threads[n]),
mp_thread_current_id()))
{
mp_thread_detach(pool->threads[n]);
MP_TARRAY_REMOVE_AT(pool->threads, pool->num_threads, n);
mp_mutex_unlock(&pool->lock);
MP_THREAD_RETURN();
Expand Down
10 changes: 7 additions & 3 deletions osdep/threads-posix.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ typedef pthread_cond_t mp_cond;
typedef pthread_mutex_t mp_mutex;
typedef pthread_mutex_t mp_static_mutex;
typedef pthread_once_t mp_once;
typedef pthread_t mp_thread_id;
typedef pthread_t mp_thread;

#define MP_STATIC_COND_INITIALIZER PTHREAD_COND_INITIALIZER
Expand Down Expand Up @@ -105,8 +106,11 @@ static inline int mp_cond_timedwait_until(mp_cond *cond, mp_mutex *mutex, int64_

#define mp_thread_create(t, f, a) pthread_create(t, NULL, f, a)
#define mp_thread_join(t) pthread_join(t, NULL)
#define mp_thread_self pthread_self
#define mp_thread_equal pthread_equal
#define mp_thread_join_id(t) pthread_join(t, NULL)
#define mp_thread_detach pthread_detach
#define mp_thread_current_id pthread_self
#define mp_thread_id_equal(a, b) ((a) == (b))
#define mp_thread_get_id(thread) (thread)

static inline void mp_thread_set_name(const char *name)
{
Expand All @@ -123,7 +127,7 @@ static inline void mp_thread_set_name(const char *name)
#endif
}

static inline int64_t mp_thread_cpu_time_ns(mp_thread thread)
static inline int64_t mp_thread_cpu_time_ns(mp_thread_id thread)
{
#if defined(_POSIX_TIMERS) && _POSIX_TIMERS > 0 && defined(_POSIX_THREAD_CPUTIME)
clockid_t id;
Expand Down
10 changes: 5 additions & 5 deletions player/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -423,9 +423,9 @@ static void abort_async(struct MPContext *mpctx, mpv_handle *ctx,
mp_mutex_unlock(&mpctx->abort_lock);
}

static void get_thread(void *ptr)
static void get_thread_id(void *ptr)
{
*(mp_thread *)ptr = mp_thread_self();
*(mp_thread_id *)ptr = mp_thread_current_id();
}

static void mp_destroy_client(mpv_handle *ctx, bool terminate)
Expand Down Expand Up @@ -522,8 +522,8 @@ static void mp_destroy_client(mpv_handle *ctx, bool terminate)
mpctx->stop_play = PT_QUIT;
mp_dispatch_unlock(mpctx->dispatch);

mp_thread playthread;
mp_dispatch_run(mpctx->dispatch, get_thread, &playthread);
mp_thread_id playthread;
mp_dispatch_run(mpctx->dispatch, get_thread_id, &playthread);

// Ask the core thread to stop.
mp_mutex_lock(&clients->lock);
Expand All @@ -532,7 +532,7 @@ static void mp_destroy_client(mpv_handle *ctx, bool terminate)
mp_wakeup_core(mpctx);

// Blocking wait for all clients and core thread to terminate.
mp_thread_join(playthread);
mp_thread_join_id(playthread);

mp_destroy(mpctx);
}
Expand Down
6 changes: 3 additions & 3 deletions player/command.c
Original file line number Diff line number Diff line change
Expand Up @@ -4848,7 +4848,7 @@ struct cmd_list_ctx {
struct mp_cmd_ctx *parent;

bool current_valid;
mp_thread current;
mp_thread_id current_tid;
bool completed_recursive;

// list of sub commands yet to run
Expand All @@ -4862,7 +4862,7 @@ static void on_cmd_list_sub_completion(struct mp_cmd_ctx *cmd)
{
struct cmd_list_ctx *list = cmd->on_completion_priv;

if (list->current_valid && mp_thread_equal(list->current, mp_thread_self())) {
if (list->current_valid && mp_thread_id_equal(list->current_tid, mp_thread_current_id())) {
list->completed_recursive = true;
} else {
continue_cmd_list(list);
Expand All @@ -4885,7 +4885,7 @@ static void continue_cmd_list(struct cmd_list_ctx *list)

list->completed_recursive = false;
list->current_valid = true;
list->current = mp_thread_self();
list->current_tid = mp_thread_current_id();

run_command(list->mpctx, sub, NULL, on_cmd_list_sub_completion, list);

Expand Down
3 changes: 1 addition & 2 deletions player/scripting.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,6 @@ static void run_script(struct mp_script_args *arg)

static MP_THREAD_VOID script_thread(void *p)
{
pthread_detach(mp_thread_self());

struct mp_script_args *arg = p;
run_script(arg);

Expand Down Expand Up @@ -198,6 +196,7 @@ static int64_t mp_load_script(struct MPContext *mpctx, const char *fname)
talloc_free(arg);
return -1;
}
mp_thread_detach(thread);
}

return id;
Expand Down
8 changes: 4 additions & 4 deletions video/out/dr_helper.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

struct dr_helper {
mp_mutex thread_lock;
mp_thread thread;
mp_thread_id thread_id;
bool thread_valid; // (POSIX defines no "unset" mp_thread value yet)

struct mp_dispatch_queue *dispatch;
Expand Down Expand Up @@ -57,7 +57,7 @@ void dr_helper_acquire_thread(struct dr_helper *dr)
mp_mutex_lock(&dr->thread_lock);
assert(!dr->thread_valid); // fails on API user errors
dr->thread_valid = true;
dr->thread = mp_thread_self();
dr->thread_id = mp_thread_current_id();
mp_mutex_unlock(&dr->thread_lock);
}

Expand All @@ -66,7 +66,7 @@ void dr_helper_release_thread(struct dr_helper *dr)
mp_mutex_lock(&dr->thread_lock);
// Fails on API user errors.
assert(dr->thread_valid);
assert(mp_thread_equal(dr->thread, mp_thread_self()));
assert(mp_thread_id_equal(dr->thread_id, mp_thread_current_id()));
dr->thread_valid = false;
mp_mutex_unlock(&dr->thread_lock);
}
Expand Down Expand Up @@ -94,7 +94,7 @@ static void free_dr_buffer_on_dr_thread(void *opaque, uint8_t *data)

mp_mutex_lock(&dr->thread_lock);
bool on_this_thread =
dr->thread_valid && mp_thread_equal(ctx->dr->thread, mp_thread_self());
dr->thread_valid && mp_thread_id_equal(ctx->dr->thread_id, mp_thread_current_id());
mp_mutex_unlock(&dr->thread_lock);

// The image could be unreffed even on the DR thread. In practice, this
Expand Down
6 changes: 3 additions & 3 deletions video/out/dr_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ struct dr_helper *dr_helper_create(struct mp_dispatch_queue *dispatch,
int stride_align, int flags),
void *get_image_ctx);

// Make DR release calls (freeing images) reentrant if they are called on this
// (mp_thread_self()) thread. That means any free call will directly release the
// image as allocated with get_image().
// Make DR release calls (freeing images) reentrant if they are called on current
// thread. That means any free call will directly release the image as allocated
// with get_image().
// Only 1 thread can use this at a time. Note that it would make no sense to
// call this on more than 1 thread, as get_image is assumed not thread-safe.
void dr_helper_acquire_thread(struct dr_helper *dr);
Expand Down

0 comments on commit 55ed50b

Please sign in to comment.