#define THREAD 32
#define QUEUE 256
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <assert.h>
#include "threadpool.h"
int tasks = 0, done = 0;
pthread_mutex_t lock;
void dummy_task(void *arg) {
usleep(10000);
pthread_mutex_lock(&lock);
/* 记录成功完成的任务数 */
done++;
pthread_mutex_unlock(&lock);
int main(int argc, char **argv)
threadpool_t *pool;
/* 初始化互斥锁 */
pthread_mutex_init(&lock, NULL);
/* 断言线程池创建成功 */
assert((pool = threadpool_create(THREAD, QUEUE, 0)) != NULL);
fprintf(stderr, "Pool started with %d threads and "
"queue size of %d\n", THREAD, QUEUE);
/* 只要任务队列还没满,就一直添加 */
while(threadpool_add(pool, &dummy_task, NULL, 0) == 0) {
pthread_mutex_lock(&lock);
tasks++;
pthread_mutex_unlock(&lock);
fprintf(stderr, "Added %d tasks\n", tasks);
/* 不断检查任务数是否完成一半以上,没有则继续休眠 */
while((tasks / 2) > done) {
usleep(10000);
/* 这时候销毁线程池,0 代表 immediate_shutdown */
assert(threadpool_destroy(pool, 0) == 0);
fprintf(stderr, "Did %d tasks\n", done);
return 0;
源码注释一并放在 github, 点我。
threadpool.h
* All rights reserved.
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#ifndef _THREADPOOL_H_
#define _THREADPOOL_H_
#ifdef __cplusplus
/* 对于 C++ 编译器,指定用 C 的语法编译 */
extern "C" {
#endif
* @file threadpool.h
* @brief Threadpool Header File
* Increase this constants at your own risk
* Large values might slow down your system
#define MAX_THREADS 64
#define MAX_QUEUE 65536
/* 简化变量定义 */
typedef struct threadpool_t threadpool_t;
/* 定义错误码 */
typedef enum {
threadpool_invalid = -1,
threadpool_lock_failure = -2,
threadpool_queue_full = -3,
threadpool_shutdown = -4,
threadpool_thread_failure = -5
} threadpool_error_t;
typedef enum {
threadpool_graceful = 1
} threadpool_destroy_flags_t;
/* 以下是线程池三个对外 API */
* @function threadpool_create
* @brief Creates a threadpool_t object.
* @param thread_count Number of worker threads.
* @param queue_size Size of the queue.
* @param flags Unused parameter.
* @return a newly created thread pool or NULL
* 创建线程池,有 thread_count 个线程,容纳 queue_size 个的任务队列,flags 参数没有使用
threadpool_t *threadpool_create(int thread_count, int queue_size, int flags);
* @function threadpool_add
* @brief add a new task in the queue of a thread pool
* @param pool Thread pool to which add the task.
* @param function Pointer to the function that will perform the task.
* @param argument Argument to be passed to the function.
* @param flags Unused parameter.
* @return 0 if all goes well, negative values in case of error (@see
* threadpool_error_t for codes).
* 添加任务到线程池, pool 为线程池指针,routine 为函数指针, arg 为函数参数, flags 未使用
int threadpool_add(threadpool_t *pool, void (*routine)(void *),
void *arg, int flags);
* @function threadpool_destroy
* @brief Stops and destroys a thread pool.
* @param pool Thread pool to destroy.
* @param flags Flags for shutdown
* Known values for flags are 0 (default) and threadpool_graceful in
* which case the thread pool doesn't accept any new tasks but
* processes all pending tasks before shutdown.
* 销毁线程池,flags 可以用来指定关闭的方式
int threadpool_destroy(threadpool_t *pool, int flags);
#ifdef __cplusplus
#endif
#endif /* _THREADPOOL_H_ */
].
].
.
].
.
].
.
.