bthread 中使用
libcontext
实现协程间的切换,原理类似
汇编魔法实现 C++ 协程
中的方法。看一个单元测试中的例子(
在线执行
):
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <iostream>
typedef void *bthread_fcontext_t;
extern "C" bthread_fcontext_t bthread_make_fcontext(void *sp, size_t size,
void (*fn)(intptr_t));
__asm(
".text\n"
".globl bthread_make_fcontext\n"
".type bthread_make_fcontext,@function\n"
".align 16\n"
"bthread_make_fcontext:\n"
" movq %rdi, %rax\n"
" andq $-16, %rax\n"
" leaq -0x48(%rax), %rax\n"
" movq %rdx, 0x38(%rax)\n"
" stmxcsr (%rax)\n"
" fnstcw 0x4(%rax)\n"
" leaq finish(%rip), %rcx\n"
" movq %rcx, 0x40(%rax)\n"
" ret \n"
"finish:\n"
" xorq %rdi, %rdi\n"
" call _exit@PLT\n"
" hlt\n"
".size bthread_make_fcontext,.-bthread_make_fcontext\n"
".section .note.GNU-stack,\"\",%progbits\n"
".previous\n");
extern "C" intptr_t bthread_jump_fcontext(bthread_fcontext_t *ofc,
bthread_fcontext_t nfc, intptr_t vp);
__asm(
".text\n"
".globl bthread_jump_fcontext\n"
".type bthread_jump_fcontext,@function\n"
".align 16\n"
"bthread_jump_fcontext:\n"
" pushq %rbp \n"
" pushq %rbx \n"
" pushq %r15 \n"
" pushq %r14 \n"
" pushq %r13 \n"
" pushq %r12 \n"
" leaq -0x8(%rsp), %rsp\n"
" movq %rsp, (%rdi)\n"
" movq %rsi, %rsp\n"
" leaq 0x8(%rsp), %rsp\n"
" popq %r12 \n"
" popq %r13 \n"
" popq %r14 \n"
" popq %r15 \n"
" popq %rbx \n"
" popq %rbp \n"
" popq %r8\n"
" movq %rdx, %rax\n"
" movq %rdx, %rdi\n"
" jmp *%r8\n"
".size bthread_jump_fcontext,.-bthread_jump_fcontext\n"
".section .note.GNU-stack,\"\",%progbits\n"
".previous\n");
bthread_fcontext_t fcm;
bthread_fcontext_t fc;
typedef std::pair<int, int> pair_t;
static void f(intptr_t param) {
pair_t *p = (pair_t *)param;
printf("In Routine: fcm %p fc %p\n", fcm, fc);
p = (pair_t *)bthread_jump_fcontext(&fc, fcm,
(intptr_t)(p->first + p->second));
printf("In Routine Again: fcm %p fc %p\n", fcm, fc);
bthread_jump_fcontext(&fc, fcm, (intptr_t)(p->first + p->second));
int main() {
fcm = NULL;
std::size_t size(8192);
void *sp = malloc(size);
pair_t p(std::make_pair(2, 7));
fc = bthread_make_fcontext((char *)sp + size, size, f);
printf("Start Routine: fcm %p fc %p\n", fcm, fc);
int res = (int)bthread_jump_fcontext(&fcm, fc, (intptr_t)&p);
printf("Back to Main: %d + %d = %d\n", p.first, p.second, res);
p = std::make_pair(5, 6);
printf("Resume Routine: fcm %p fc %p\n", fcm, fc);
res = (int)bthread_jump_fcontext(&fcm, fc, (intptr_t)&p);
printf("Back to Main Again: %d + %d = %d\n", p.first, p.second, res);
return 0;
bthread 中使用 TaskMeta
结构封装协程的元信息:
struct TaskMeta {
butil::atomic<ButexWaiter*> current_waiter;
uint64_t current_sleep;
bool stop;
bool interrupted;
bool about_to_quit;
pthread_spinlock_t version_lock;
uint32_t* version_butex;
bthread_t tid;
void* (*fn)(void*);
void* arg;
ContextualStack* stack;
bthread_attr_t attr;
int64_t cpuwide_start_ns;
TaskStatistics stat;
LocalStorage local_storage;
public:
TaskMeta()
: current_waiter(NULL)
, current_sleep(0)
, stack(NULL) {
pthread_spin_init(&version_lock, 0);
version_butex = butex_create_checked<uint32_t>();
*version_butex = 1;
~TaskMeta() {
butex_destroy(version_butex);
version_butex = NULL;
pthread_spin_destroy(&version_lock);
void set_stack(ContextualStack* s) {
stack = s;
ContextualStack* release_stack() {
ContextualStack* tmp = stack;
stack = NULL;
return tmp;
StackType stack_type() const {
return static_cast<StackType>(attr.stack_type);
这里先只关注协程栈 stack
。协程首次调度前会分配对应的调用栈:
ContextualStack* stk = get_stack(next_meta->stack_type(), task_runner);
inline ContextualStack* get_stack(StackType type, void (*entry)(intptr_t)) {
switch (type) {
case STACK_TYPE_PTHREAD:
return NULL;
case STACK_TYPE_SMALL:
return StackFactory<SmallStackClass>::get_stack(entry);
case STACK_TYPE_NORMAL:
return StackFactory<NormalStackClass>::get_stack(entry);
case STACK_TYPE_LARGE:
return StackFactory<LargeStackClass>::get_stack(entry);
case STACK_TYPE_MAIN:
return StackFactory<MainStackClass>::get_stack(entry);
return NULL;
template <typename StackClass> struct StackFactory {
struct Wrapper : public ContextualStack {
explicit Wrapper(void (*entry)(intptr_t)) {
if (allocate_stack_storage(&storage, *StackClass::stack_size_flag,
FLAGS_guard_page_size) != 0) {
storage.zeroize();
context = NULL;
return;
context = bthread_make_fcontext(storage.bottom, storage.stacksize, entry);
stacktype = (StackType)StackClass::stacktype;
~Wrapper() {
if (context) {
context = NULL;
deallocate_stack_storage(&storage);
storage.zeroize();
static ContextualStack* get_stack(void (*entry)(intptr_t)) {
return butil::get_object<Wrapper>(entry);
static void return_stack(ContextualStack* sc) {
butil::return_object(static_cast<Wrapper*>(sc));
struct StackStorage {
int stacksize;
int guardsize;
void* bottom;
unsigned valgrind_stack_id;
void zeroize() {
stacksize = 0;
guardsize = 0;
bottom = NULL;
valgrind_stack_id = 0;
int allocate_stack_storage(StackStorage* s, int stacksize, int guardsize);
void deallocate_stack_storage(StackStorage* s);
enum StackType {
STACK_TYPE_MAIN = 0,
STACK_TYPE_PTHREAD = BTHREAD_STACKTYPE_PTHREAD,
STACK_TYPE_SMALL = BTHREAD_STACKTYPE_SMALL,
STACK_TYPE_NORMAL = BTHREAD_STACKTYPE_NORMAL,
STACK_TYPE_LARGE = BTHREAD_STACKTYPE_LARGE
struct ContextualStack {
bthread_fcontext_t context;
StackType stacktype;
StackStorage storage;
int allocate_stack_storage(StackStorage* s, int stacksize_in, int guardsize_in) {
const static int PAGESIZE = getpagesize();
const int PAGESIZE_M1 = PAGESIZE - 1;
const int MIN_STACKSIZE = PAGESIZE * 2;
const int MIN_GUARDSIZE = PAGESIZE;
const int stacksize =
(std::max(stacksize_in, MIN_STACKSIZE) + PAGESIZE_M1) &
~PAGESIZE_M1;
if (guardsize_in <= 0) {
...
} else {
const int guardsize =
(std::max(guardsize_in, MIN_GUARDSIZE) + PAGESIZE_M1) &
~PAGESIZE_M1;
const int memsize = stacksize + guardsize;
void* const mem = mmap(NULL, memsize, (PROT_READ | PROT_WRITE),
(MAP_PRIVATE | MAP_ANONYMOUS), -1, 0);
if (MAP_FAILED == mem) {
PLOG_EVERY_SECOND(ERROR)
<< "Fail to mmap size=" << memsize << " stack_count="
<< s_stack_count.load(butil::memory_order_relaxed)
<< ", possibly limited by /proc/sys/vm/max_map_count";
return -1;
void* aligned_mem = (void*)(((intptr_t)mem + PAGESIZE_M1) & ~PAGESIZE_M1);
if (aligned_mem != mem) {
LOG_ONCE(ERROR) << "addr=" << mem << " returned by mmap is not "
"aligned by pagesize=" << PAGESIZE;
const int offset = (char*)aligned_mem - (char*)mem;
if (guardsize <= offset ||
mprotect(aligned_mem, guardsize - offset, PROT_NONE) != 0) {
munmap(mem, memsize);
PLOG_EVERY_SECOND(ERROR)
<< "Fail to mprotect " << (void*)aligned_mem << " length="
<< guardsize - offset;
return -1;
s_stack_count.fetch_add(1, butil::memory_order_relaxed);
s->bottom = (char*)mem + memsize;
s->stacksize = stacksize;
s->guardsize = guardsize;
if (RunningOnValgrind()) {
s->valgrind_stack_id = VALGRIND_STACK_REGISTER(
s->bottom, (char*)s->bottom - stacksize);
} else {
s->valgrind_stack_id = 0;
return 0;
int* SmallStackClass::stack_size_flag = &FLAGS_stack_size_small;
int* NormalStackClass::stack_size_flag = &FLAGS_stack_size_normal;
int* LargeStackClass::stack_size_flag = &FLAGS_stack_size_large;
核心思路是当本线程内没有待执行的任务时,从其他线程的任务队列中窃取任务执行。首先来看 work stealing 时使用的无锁队列 src/bthread/work_stealing_queue.h:
template <typename T>
class WorkStealingQueue {
public:
WorkStealingQueue() : _bottom(1), _capacity(0), _buffer(NULL), _top(1) {}
int init(size_t capacity) {
if (_capacity != 0) {
LOG(ERROR) << "Already initialized";
return -1;
if (capacity == 0) {
LOG(ERROR) << "Invalid capacity=" << capacity;
return -1;
if (capacity & (capacity - 1)) {
LOG(ERROR) << "Invalid capacity=" << capacity
<< " which must be power of 2";
return -1;
_buffer = new (std::nothrow) T[capacity];
if (NULL == _buffer) {
return -1;
_capacity = capacity;
return 0;
bool push(const T& x) {
const size_t b = _bottom.load(butil::memory_order_relaxed);
const size_t t = _top.load(butil::memory_order_acquire);
if (b >= t + _capacity) {
return false;
_buffer[b & (_capacity - 1)] = x;
_bottom.store(b + 1, butil::memory_order_release);
return true;
bool pop(T* val) {
const size_t b = _bottom.load(butil::memory_order_relaxed);
size_t t = _top.load(butil::memory_order_relaxed);
if (t >= b) {
return false;
const size_t newb = b - 1;
_bottom.store(newb, butil::memory_order_relaxed);
butil::atomic_thread_fence(butil::memory_order_seq_cst);
t = _top.load(butil::memory_order_relaxed);
if (t > newb) {
_bottom.store(b, butil::memory_order_relaxed);
return false;
*val = _buffer[newb & (_capacity - 1)];
if (t != newb) {
return true;
const bool popped = _top.compare_exchange_strong(
t, t + 1, butil::memory_order_seq_cst, butil::memory_order_relaxed);
_bottom.store(b, butil::memory_order_relaxed);
return popped;
bool steal(T* val) {
size_t t = _top.load(butil::memory_order_acquire);
size_t b = _bottom.load(butil::memory_order_acquire);
if (t >= b) {
return false;
do {
butil::atomic_thread_fence(butil::memory_order_seq_cst);
b = _bottom.load(butil::memory_order_acquire);
if (t >= b) {
return false;
*val = _buffer[t & (_capacity - 1)];
} while (!_top.compare_exchange_strong(
t, t + 1, butil::memory_order_seq_cst, butil::memory_order_relaxed));
return true;
private:
DISALLOW_COPY_AND_ASSIGN(WorkStealingQueue);
butil::atomic<size_t> _bottom;
size_t _capacity;
T* _buffer;
butil::atomic<size_t> BAIDU_CACHELINE_ALIGNMENT _top;
push
和 pop
仅在底部操作,非线程安全。steal
仅在顶部窃取,通过 CAS 保证线程安全。
接着来看 bthread 启动的流程:
TEST_F(BthreadTest, sanity) {
LOG(INFO) << "main thread " << pthread_self();
bthread_t th1;
ASSERT_EQ(0, bthread_start_urgent(&th1, NULL, misc, (void*)1));
LOG(INFO) << "back to main thread " << th1 << " " << pthread_self();
ASSERT_EQ(0, bthread_join(th1, NULL));
int bthread_start_urgent(bthread_t* __restrict tid,
const bthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg) {
bthread::TaskGroup* g = bthread::tls_task_group;
if (g) {
return bthread::TaskGroup::start_foreground(&g, tid, attr, fn, arg);
return bthread::start_from_non_worker(tid, attr, fn, arg);
BUTIL_FORCE_INLINE int
start_from_non_worker(bthread_t* __restrict tid,
const bthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg) {
TaskControl* c = get_or_new_task_control();
if (NULL == c) {
return ENOMEM;
if (attr != NULL && (attr->flags & BTHREAD_NOSIGNAL)) {
TaskGroup* g = tls_task_group_nosignal;
if (NULL == g) {
g = c->choose_one_group();
tls_task_group_nosignal = g;
return g->start_background<true>(tid, attr, fn, arg);
return c->choose_one_group()->start_background<true>(
tid, attr, fn, arg);
inline TaskControl* get_or_new_task_control() {
butil::atomic<TaskControl*>* p = (butil::atomic<TaskControl*>*)&g_task_control;
TaskControl* c = p->load(butil::memory_order_consume);
if (c != NULL) {
return c;
BAIDU_SCOPED_LOCK(g_task_control_mutex);
c = p->load(butil::memory_order_consume);
if (c != NULL) {
return c;
c = new (std::nothrow) TaskControl;
if (NULL == c) {
return NULL;
int concurrency = FLAGS_bthread_min_concurrency > 0 ?
FLAGS_bthread_min_concurrency :
FLAGS_bthread_concurrency;
if (c->init(concurrency) != 0) {
LOG(ERROR) << "Fail to init g_task_control";
delete c;
return NULL;
p->store(c, butil::memory_order_release);
return c;
int TaskControl::init(int concurrency) {
if (_concurrency != 0) {
LOG(ERROR) << "Already initialized";
return -1;
if (concurrency <= 0) {
LOG(ERROR) << "Invalid concurrency=" << concurrency;
return -1;
_concurrency = concurrency;
if (get_or_create_global_timer_thread() == NULL) {
LOG(ERROR) << "Fail to get global_timer_thread";
return -1;
_workers.resize(_concurrency);
for (int i = 0; i < _concurrency; ++i) {
const int rc = pthread_create(&_workers[i], NULL, worker_thread, this);
if (rc) {
LOG(ERROR) << "Fail to create _workers[" << i << "], " << berror(rc);
return -1;
_worker_usage_second.expose("bthread_worker_usage");
_switch_per_second.expose("bthread_switch_second");
_signal_per_second.expose("bthread_signal_second");
_status.expose("bthread_group_status");
while (_ngroup == 0) {
usleep(100);
return 0;
bthread 后台会开启多个 worker_thread
线程执行 bthread 任务:
void* TaskControl::worker_thread(void* arg) {
run_worker_startfn();
TaskControl* c = static_cast<TaskControl*>(arg);
TaskGroup* g = c->create_group();
TaskStatistics stat;
if (NULL == g) {
LOG(ERROR) << "Fail to create TaskGroup in pthread=" << pthread_self();
return NULL;
BT_VLOG << "Created worker=" << pthread_self()
<< " bthread=" << g->main_tid();
tls_task_group = g;
c->_nworkers << 1;
g->run_main_task();
stat = g->main_stat();
BT_VLOG << "Destroying worker=" << pthread_self() << " bthread="
<< g->main_tid() << " idle=" << stat.cputime_ns / 1000000.0
<< "ms uptime=" << g->current_uptime_ns() / 1000000.0 << "ms";
tls_task_group = NULL;
g->destroy_self();
c->_nworkers << -1;
return NULL;
void TaskGroup::run_main_task() {
bvar::PassiveStatus<double> cumulated_cputime(
get_cumulated_cputime_from_this, this);
std::unique_ptr<bvar::PerSecond<bvar::PassiveStatus<double> > > usage_bvar;
TaskGroup* dummy = this;
bthread_t tid;
while (wait_task(&tid)) {
TaskGroup::sched_to(&dummy, tid);
DCHECK_EQ(this, dummy);
DCHECK_EQ(_cur_meta->stack, _main_stack);
if (_cur_meta->tid != _main_tid) {
TaskGroup::task_runner(1);
bool TaskGroup::wait_task(bthread_t* tid) {
do {
if (_last_pl_state.stopped()) {
return false;
_pl->wait(_last_pl_state);
if (steal_task(tid)) {
return true;
} while (true);
bool steal_task(bthread_t* tid) {
if (_remote_rq.pop(tid)) {
return true;
return _control->steal_task(tid, &_steal_seed, _steal_offset);
bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) {
const size_t ngroup = _ngroup.load(butil::memory_order_acquire);
if (0 == ngroup) {
return false;
bool stolen = false;
size_t s = *seed;
for (size_t i = 0; i < ngroup; ++i, s += offset) {
TaskGroup* g = _groups[s % ngroup];
if (g) {
if (g->_rq.steal(tid)) {
stolen = true;
break;
if (g->_remote_rq.pop(tid)) {
stolen = true;
break;
*seed = s;
return stolen;
inline void TaskGroup::sched_to(TaskGroup** pg, bthread_t next_tid) {
TaskMeta* next_meta = address_meta(next_tid);
if (next_meta->stack == NULL) {
ContextualStack* stk = get_stack(next_meta->stack_type(), task_runner);
if (stk) {
next_meta->set_stack(stk);
} else {
next_meta->attr.stack_type = BTHREAD_STACKTYPE_PTHREAD;
next_meta->set_stack((*pg)->_main_stack);
sched_to(pg, next_meta);
void TaskGroup::sched_to(TaskGroup** pg, TaskMeta* next_meta) {
TaskGroup* g = *pg;
const int saved_errno = errno;
void* saved_unique_user_ptr = tls_unique_user_ptr;
TaskMeta* const cur_meta = g->_cur_meta;
const int64_t now = butil::cpuwide_time_ns();
const int64_t elp_ns = now - g->_last_run_ns;
g->_last_run_ns = now;
cur_meta->stat.cputime_ns += elp_ns;
if (cur_meta->tid != g->main_tid()) {
g->_cumulated_cputime_ns += elp_ns;
++cur_meta->stat.nswitch;
++ g->_nswitch;
if (__builtin_expect(next_meta != cur_meta, 1)) {
g->_cur_meta = next_meta;
cur_meta->local_storage = tls_bls;
tls_bls = next_meta->local_storage;
if ((cur_meta->attr.flags & BTHREAD_LOG_CONTEXT_SWITCH) ||
(next_meta->attr.flags & BTHREAD_LOG_CONTEXT_SWITCH)) {
LOG(INFO) << "Switch bthread: " << cur_meta->tid << " -> "
<< next_meta->tid;
if (cur_meta->stack != NULL) {
if (next_meta->stack != cur_meta->stack) {
jump_stack(cur_meta->stack, next_meta->stack);
g = tls_task_group;
} else {
LOG(FATAL) << "bthread=" << g->current_tid() << " sched_to itself!";
while (g->_last_context_remained) {
RemainedFn fn = g->_last_context_remained;
g->_last_context_remained = NULL;
fn(g->_last_context_remained_arg);
g = tls_task_group;
errno = saved_errno;
tls_unique_user_ptr = saved_unique_user_ptr;
*pg = g;
inline void jump_stack(ContextualStack* from, ContextualStack* to) {
bthread_jump_fcontext(&from->context, to->context, 0);
从外部线程通过 TaskControl 新增 bthread 的流程:
template <bool REMOTE>
int TaskGroup::start_background(bthread_t* __restrict th,
const bthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg) {
if (__builtin_expect(!fn, 0)) {
return EINVAL;
const int64_t start_ns = butil::cpuwide_time_ns();
const bthread_attr_t using_attr = (attr ? *attr : BTHREAD_ATTR_NORMAL);
butil::ResourceId<TaskMeta> slot;
TaskMeta* m = butil::get_resource(&slot);
if (__builtin_expect(!m, 0)) {
return ENOMEM;
CHECK(m->current_waiter.load(butil::memory_order_relaxed) == NULL);
m->stop = false;
m->interrupted = false;
m->about_to_quit = false;
m->fn = fn;
m->arg = arg;
CHECK(m->stack == NULL);
m->attr = using_attr;
m->local_storage = LOCAL_STORAGE_INIT;
m->cpuwide_start_ns = start_ns;
m->stat = EMPTY_STAT;
m->tid = make_tid(*m->version_butex, slot);
*th = m->tid;
if (using_attr.flags & BTHREAD_LOG_START_AND_FINISH) {
LOG(INFO) << "Started bthread " << m->tid;
_control->_nbthreads << 1;
if (REMOTE) {
ready_to_run_remote(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
} else {
ready_to_run(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
return 0;
void TaskGroup::ready_to_run_remote(bthread_t tid, bool nosignal) {
_remote_rq._mutex.lock();
while (!_remote_rq.push_locked(tid)) {
flush_nosignal_tasks_remote_locked(_remote_rq._mutex);
LOG_EVERY_SECOND(ERROR) << "_remote_rq is full, capacity="
<< _remote_rq.capacity();
::usleep(1000);
_remote_rq._mutex.lock();
if (nosignal) {
++_remote_num_nosignal;
_remote_rq._mutex.unlock();
} else {
const int additional_signal = _remote_num_nosignal;
_remote_num_nosignal = 0;
_remote_nsignaled += 1 + additional_signal;
_remote_rq._mutex.unlock();
_control->signal_task(1 + additional_signal);
Linux 中提供了快速的用户态锁 futex,bRPC 中进行了简单的封装:
inline int futex_wait_private(void *addr1, int expected,
const timespec *timeout) {
return syscall(SYS_futex, addr1, (FUTEX_WAIT | FUTEX_PRIVATE_FLAG), expected,
timeout, NULL, 0);
inline int futex_wake_private(void *addr1, int nwake) {
return syscall(SYS_futex, addr1, (FUTEX_WAKE | FUTEX_PRIVATE_FLAG), nwake,
NULL, NULL, 0);
对于 MacOS,bthread 也提供了用户态的实现,可以自行查看 src/bthread/sys_futex.cpp 中对应的实现。使用 futex 可以构造更快速的互斥锁:
class FastPthreadMutex {
public:
FastPthreadMutex() : _futex(0) {}
~FastPthreadMutex() {}
void lock();
void unlock();
bool try_lock();
private:
DISALLOW_COPY_AND_ASSIGN(FastPthreadMutex);
int lock_contended();
unsigned _futex;
struct MutexInternal {
butil::static_atomic<unsigned char> locked;
butil::static_atomic<unsigned char> contended;
unsigned short padding;
const MutexInternal MUTEX_CONTENDED_RAW = {{1}, {1}, 0};
const MutexInternal MUTEX_LOCKED_RAW = {{1}, {0}, 0};
#define BTHREAD_MUTEX_CONTENDED \
(*(const unsigned *)&bthread::MUTEX_CONTENDED_RAW)
#define BTHREAD_MUTEX_LOCKED (*(const unsigned *)&bthread::MUTEX_LOCKED_RAW)
int FastPthreadMutex::lock_contended() {
butil::atomic<unsigned> *whole = (butil::atomic<unsigned> *)&_futex;
while (whole->exchange(BTHREAD_MUTEX_CONTENDED) & BTHREAD_MUTEX_LOCKED) {
if (futex_wait_private(whole, BTHREAD_MUTEX_CONTENDED, NULL) < 0 &&
errno != EWOULDBLOCK) {
return errno;
return 0;
void FastPthreadMutex::lock() {
bthread::MutexInternal *split = (bthread::MutexInternal *)&_futex;
if (split->locked.exchange(1, butil::memory_order_acquire)) {
(void)lock_contended();
bool FastPthreadMutex::try_lock() {
bthread::MutexInternal *split = (bthread::MutexInternal *)&_futex;
return !split->locked.exchange(1, butil::memory_order_acquire);
void FastPthreadMutex::unlock() {
butil::atomic<unsigned> *whole = (butil::atomic<unsigned> *)&_futex;
const unsigned prev = whole->exchange(0, butil::memory_order_release);
if (prev != BTHREAD_MUTEX_LOCKED) {
futex_wake_private(whole, 1);
写了一个简单的测试样例,这里的 FastPthreadMutex
可以比 std::mutex
快一倍。
对于 bthread,如果直接使用互斥锁,会导致整个工作线程阻塞,进而影响同线程下的其他 bthread。这里更好的解决方案是仅挂起 bthread,主动挂起出让 CPU 让其他 bthread 继续执行。bthread 中使用 butex 提供类 futex 的同步原语,下面来看具体实现:
void *butex_create();
int butex_wake(void *butex);
int butex_wait(void *butex, int expected_value, const timespec *abstime);
struct ButexWaiter : public butil::LinkNode<ButexWaiter> {
bthread_t tid;
butil::atomic<Butex *> container;
struct ButexBthreadWaiter : public ButexWaiter {
TaskMeta *task_meta;
TimerThread::TaskId sleep_id;
WaiterState waiter_state;
int expected_value;
Butex *initial_butex;
TaskControl *control;
typedef butil::LinkedList<ButexWaiter> ButexWaiterList;
struct BAIDU_CACHELINE_ALIGNMENT Butex {
Butex() {}
~Butex() {}
butil::atomic<int> value;
ButexWaiterList waiters;
internal::FastPthreadMutex waiter_lock;
BAIDU_CASSERT(offsetof(Butex, value) == 0, offsetof_value_must_0);
void *butex_create() {
Butex *b = butil::get_object<Butex>();
if (b) {
return &b->value;
return NULL;
int butex_wait(void *arg, int expected_value, const timespec *abstime) {
Butex *b = container_of(static_cast<butil::atomic<int> *>(arg), Butex, value);
if (b->value.load(butil::memory_order_relaxed) != expected_value) {
errno = EWOULDBLOCK;
butil::atomic_thread_fence(butil::memory_order_acquire);
return -1;
TaskGroup *g = tls_task_group;
if (NULL == g || g->is_current_pthread_task()) {
return butex_wait_from_pthread(g, b, expected_value, abstime);
ButexBthreadWaiter bbw;
bbw.tid = g->current_tid();
bbw.container.store(NULL, butil::memory_order_relaxed);
bbw.task_meta = g->current_task();
bbw.sleep_id = 0;
bbw.waiter_state = WAITER_STATE_READY;
bbw.expected_value = expected_value;
bbw.initial_butex = b;
bbw.control = g->control();
if (abstime != NULL) {
...
bbw.task_meta->current_waiter.store(&bbw, butil::memory_order_release);
g->set_remained(wait_for_butex, &bbw);
TaskGroup::sched(&g);
BT_LOOP_WHEN(unsleep_if_necessary(&bbw, get_global_timer_thread()) < 0,
30 );
BT_LOOP_WHEN(bbw.task_meta->current_waiter.exchange(
NULL, butil::memory_order_acquire) == NULL,
30 );
bool is_interrupted = false;
if (bbw.task_meta->interrupted) {
bbw.task_meta->interrupted = false;
is_interrupted = true;
if (WAITER_STATE_TIMEDOUT == bbw.waiter_state) {
errno = ETIMEDOUT;
return -1;
} else if (WAITER_STATE_UNMATCHEDVALUE == bbw.waiter_state) {
errno = EWOULDBLOCK;
return -1;
} else if (is_interrupted) {
errno = EINTR;
return -1;
return 0;
static void wait_for_butex(void *arg) {
ButexBthreadWaiter *const bw = static_cast<ButexBthreadWaiter *>(arg);
Butex *const b = bw->initial_butex;
BAIDU_SCOPED_LOCK(b->waiter_lock);
if (b->value.load(butil::memory_order_relaxed) != bw->expected_value) {
bw->waiter_state = WAITER_STATE_UNMATCHEDVALUE;
} else if (bw->waiter_state == WAITER_STATE_READY &&
!bw->task_meta->interrupted) {
b->waiters.Append(bw);
bw->container.store(b, butil::memory_order_relaxed);
return;
unsleep_if_necessary(bw, get_global_timer_thread());
tls_task_group->ready_to_run(bw->tid);
int butex_wake(void *arg) {
Butex *b = container_of(static_cast<butil::atomic<int> *>(arg), Butex, value);
ButexWaiter *front = NULL;
BAIDU_SCOPED_LOCK(b->waiter_lock);
if (b->waiters.empty()) {
return 0;
front = b->waiters.head()->value();
front->RemoveFromList();
front->container.store(NULL, butil::memory_order_relaxed);
if (front->tid == 0) {
wakeup_pthread(static_cast<ButexPthreadWaiter *>(front));
return 1;
ButexBthreadWaiter *bbw = static_cast<ButexBthreadWaiter *>(front);
unsleep_if_necessary(bbw, get_global_timer_thread());
TaskGroup *g = tls_task_group;
if (g) {
TaskGroup::exchange(&g, bbw->tid);
} else {
bbw->control->choose_one_group()->ready_to_run_remote(bbw->tid);
return 1;
没错,Tokio 源码分析被“鸽”置了。去年下半年组里项目非常忙,导致周末也不想学习。好在项目赶在年前上线了,年初的答辩也挺顺利,今年就有更多时间自我提升了。开 bRPC 的新坑是因为该项目里有不少先进经验可以应用到自己的工作上,Tokio 会排在 bRPC 之后补上。
Except where otherwise noted, content on this site is licensed under a CC BY-SA 4.0 license.