博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ucontext实现的用户级多线程框架2(抢先式多线程)
阅读量:6087 次
发布时间:2019-06-20

本文共 11008 字,大约阅读时间需要 36 分钟。

以前曾经写过一篇blog,使用linux ucontext实现的用户级多线程框架.那个框架实现的是协作式多线程序,也就是只有当正在执行的coroutine
主动放弃处理器时,其它coroutine才有机会得以执行.
 
今天用ucontext实现了一个抢先式的用户级多线程框架,其主要思想是,用一个物理线程作为中断发生器,以固定的时间间隔发送SIGUSR1信号.
另一个物理线程运行Scheduler和用户级线程。每当这个物理线程收到信号的时候,就会将执行权切换到Scheduler,由Scheduler挑选一个
个用户线程执行.
 
thread.h
// C/C++ header file // Author:   root // File:     /mnt/win-c/project/include/thread.h // Created:  16:18:04 2008-05-04 // Modified: 08:39:43 2008-05-11 // Brief:     #ifndef _THREAD_H #define _THREAD_H #include 
#include
#include
#include
#include
#include
#include
#include
#include
#include
/* *可执行接口 */ struct runnable { public: runnable():terminated(false){} virtual bool run() = 0; virtual void setTerminate() { printf("runnable terminated/n"); terminated = true; } bool isTerminate() { return terminated; } void clearTerminate() { terminated = false; } protected: volatile bool terminated; }; class Thread { public: Thread(runnable *_runnable,std::string name="kenny",bool _isjoinable=true) :name(name),_runnable(_runnable),isjoinable(_isjoinable) { } Thread(std::string name="kenny",bool _isjoinable = true):name(name),isjoinable(_isjoinable){} void setRunnable(runnable *_runnable) { this->_runnable = _runnable; } ~Thread() { /*if(_runnable) delete _runnable; _runnable = NULL; */ } bool start(); static void *threadFun(void* arg) { runnable *r = (runnable*)arg; r->run(); } void join() { if(isjoinable) ::pthread_join(id, NULL); } bool isJoinable() { return isjoinable; } void setTerminate() { printf("thread setterminate %s/n",name.c_str()); _runnable->setTerminate(); } bool isTerminate() { return _runnable->isTerminate(); } void clearTerminate() { _runnable->clearTerminate(); } /** * /brief 使当前线程睡眠指定的时间,秒 * * * /param sec 指定的时间,秒 */ static void sleep(const long sec) { ::sleep(sec); } /** * /brief 使当前线程睡眠指定的时间,毫秒 * * * /param msec 指定的时间,毫秒 */ static void msleep(const long msec) { ::usleep(1000 * msec); } /** * /brief 使当前线程睡眠指定的时间,微秒 * * * /param usec 指定的时间,微秒 */ static void usleep(const long usec) { ::usleep(usec); } /** * /brief 使当前线程睡眠指定的时间,纳秒 * * * /param nsec 指定的时间,纳秒 */ static void nanosleep(const long nsec) { struct timespec req; req.tv_sec = nsec / 1000000000; req.tv_nsec = nsec; ::nanosleep(&req, NULL); } pthread_t GetId() { return id; } protected: runnable *_runnable; pthread_t id; bool isjoinable; std::string name; }; #endif
thread.cpp
// C++ source file // Author:   root // File:     /mnt/win-c/IOCP/thread.cpp // Created:  16:08:50 2008-05-10 // Modified: 08:34:42 2008-05-11 // Brief:     #include "thread.h" bool Thread::start() {
pthread_attr_t attr; pthread_attr_init(&attr); if(isjoinable) pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_JOINABLE); else pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED); pthread_create(&id,&attr,threadFun,(void*)this->_runnable); pthread_attr_destroy(&attr); }
uthread.h
/* * brief: 用ucontext实现的用户级线程框架 * author: kenny huang * date: 2009/10/13 * email: huangweilook@21cn.com */ #ifndef _UTHREAD_H #define _UTHREAD_H #include 
#include
#include
#include
#include
#include
#define MAX_UTHREAD 128 typedef int uthread_id; #define INVAID_ID -1 void int_signal_handler(int sig); //用户态线程的当前状态 enum thread_status { ACTIVED = 0,//可运行的 BLOCKED,//被阻塞 SLEEP,//主动休眠 DIE,//死死亡 }; class uthread_runnable { public: virtual void main_routine() = 0; }; class Scheduler; /* * 用户态线程 */ class u_thread { friend class Scheduler; private: u_thread(uthread_runnable *rable,unsigned int ssize,uthread_id uid) :ssize(ssize),_status(BLOCKED),rable(rable),uid(uid) { stack = new char[ssize]; ucontext.uc_stack.ss_sp = stack; ucontext.uc_stack.ss_size = ssize; getcontext(&ucontext); } ~u_thread() { delete []stack; } static void star_routine(); public: ucontext_t &GetContext() { return ucontext; } void SetStatus(thread_status _status) { this->_status = _status; } thread_status GetStatus() { return _status; } uthread_id GetUid() { return uid; } //休眠time时间 static void sleep(uthread_id utid,int t); private: ucontext_t ucontext; char *stack;//coroutine使用的栈 unsigned int ssize;//栈的大小 thread_status _status; uthread_runnable *rable; uthread_id uid; }; /* * 任务调度器 */ class Scheduler { //friend void u_thread::star_routine(); friend class u_thread; public: static void scheduler_init(); static void schedule(); static uthread_id uthread_create(uthread_runnable *rable,unsigned int stacksize); static void int_sig(); private: static u_thread *GetCurrentUThread() { if(current == -1) return NULL; return threads[current]; } //休眠time时间 static void sleep(uthread_id utid,int t); private: static std::list
activeList;//可运行uthread列表 static std::list
> sleepList;//正在睡眠uthread列表 static char stack[4096]; static ucontext_t ucontext;//Scheduler的context static ucontext_t* p_curcontext;//指向当前正在使用的context static u_thread *threads[MAX_UTHREAD]; static int total_count; static int current;//在uthread创建时使用的 }; /*心跳发射器,发射器必须运行在一个独立的线程中,以固定的间隔 * 往所有运行着coroutine的线程发送中断信号 */ class beat { public: beat(unsigned int interval):interval(interval) {} void addTread(pthread_t id) { allthread.push_back(id); } void loop() { while(true) { //每隔固定时间向所有线程发中断信号 ::usleep(1000 * interval); std::list
::iterator it = allthread.begin(); std::list
::iterator end = allthread.end(); for( ; it != end ;++it) { pthread_kill(*it,SIGUSR1); } } } private: unsigned int interval;//发送中断的间隔(豪秒) std::list
allthread; }; #endif
uthread.cpp
#include "uthread.h" #include 
#include
#include
ucontext_t Scheduler::ucontext; char Scheduler::stack[4096]; ucontext_t* Scheduler::p_curcontext = NULL; u_thread *Scheduler::threads[128]; int Scheduler::total_count = 0; int Scheduler::current = -1; std::list
Scheduler::activeList; std::list
> Scheduler::sleepList; extern sigset_t globalset; void block_sigusr1() { sigprocmask(SIG_BLOCK,&globalset,NULL); } void unblock_sigusr1() { sigprocmask(SIG_UNBLOCK,&globalset,NULL); } void int_signal_handler(int sig) { printf("recv int "); Scheduler::int_sig(); } void u_thread::sleep(uthread_id utid,int t) { Scheduler::sleep(utid,t); } void u_thread::star_routine() { u_thread *current_uthread = Scheduler::GetCurrentUThread(); assert(current_uthread); //回到Scheduler::uthread_create current_uthread->SetStatus(ACTIVED); ucontext_t &cur_context = current_uthread->GetContext(); swapcontext(&cur_context,&Scheduler::ucontext); current_uthread->rable->main_routine(); current_uthread->SetStatus(DIE); } void Scheduler::scheduler_init() { for(int i = 0; i < MAX_UTHREAD; ++i) threads[i] = 0; getcontext(&ucontext); ucontext.uc_stack.ss_sp = stack; ucontext.uc_stack.ss_size = sizeof(stack); ucontext.uc_link = NULL; makecontext(&ucontext,schedule, 0); p_curcontext = &ucontext; } void Scheduler::schedule() { while(total_count > 0) { //首先执行active列表中的uthread std::list
::iterator it = activeList.begin(); std::list
::iterator end = activeList.end(); for( ; it != end; ++it) { if(*it && (*it)->GetStatus() == ACTIVED) { ucontext_t &cur_context = (*it)->GetContext(); p_curcontext = &cur_context; swapcontext(&ucontext,&cur_context); //回到调度器,阻塞SIGUSR1 block_sigusr1(); p_curcontext = &ucontext; uthread_id uid = (*it)->GetUid(); if((*it)->GetStatus() == DIE) { printf("%d die/n",uid); delete threads[uid]; threads[uid] = 0; --total_count; activeList.erase(it); break; } else if((*it)->GetStatus() == SLEEP) { printf("%d sleep/n",uid); activeList.erase(it); break; } } } //看看Sleep列表中是否有uthread该醒来了 std::list
>::iterator its = sleepList.begin(); std::list
>::iterator ends = sleepList.end(); time_t now = time(NULL); for( ; its != ends; ++its) { //可以醒来了 if(now >= its->second) { u_thread *uthread = its->first; uthread->SetStatus(ACTIVED); activeList.push_back(uthread); sleepList.erase(its); break; } } } printf("scheduler end/n"); } uthread_id Scheduler::uthread_create(uthread_runnable *rable,unsigned int stacksize) { if(total_count >= MAX_UTHREAD) return INVAID_ID; int i = 0; for( ; i < MAX_UTHREAD; ++i) { if(threads[i] == 0) { threads[i] = new u_thread(rable,stacksize,i); ++total_count; current = i; ucontext_t &cur_context = threads[i]->GetContext(); cur_context.uc_link = &ucontext; makecontext(&cur_context,u_thread::star_routine, 0); swapcontext(&ucontext, &cur_context); current = -1; activeList.push_back(threads[i]); return i; } } } void Scheduler::sleep(uthread_id utid,int t) { if(utid == INVAID_ID) return; assert(threads[utid]); time_t now = time(NULL); now += t; printf("wake up time %u/n",now); //插入到sleep列表中 sleepList.push_back(std::make_pair(threads[utid],now)); //保存当前上下文切换回scheduler threads[utid]->SetStatus(SLEEP); ucontext_t &cur_context = threads[utid]->GetContext(); swapcontext(&cur_context,&Scheduler::ucontext); //回到coroutine,开启动SIGUSR1 unblock_sigusr1(); } void Scheduler::int_sig() { //收到中断信号,如果当前正在Scheduler中则不做处理 if(p_curcontext == NULL || p_curcontext == &ucontext) return; printf("%d/n",&(*p_curcontext)); //否则,将上下文切换回Scheduler swapcontext(p_curcontext,&Scheduler::ucontext); //回到coroutine,开启动SIGUSR1 unblock_sigusr1(); printf("i'm come back %d/n",&(*p_curcontext)); }
test.cpp
// kcoroutine.cpp : 定义控制台应用程序的入口点。 // #include "uthread.h" #include "thread.h" sigset_t globalset; class runable_test : public uthread_runnable {
public: runable_test(const char *name):name(name){} void main_routine() {
unsigned long c = 0; while(1) {
//if(c % 10000 == 0) // printf("%s/n",name); // ++c; //u_thread::sleep(uid,1); //printf("%s wake up/n",name); } } const char *name; uthread_id uid; }; class uthreadRunner : public runnable {
public: bool run() {
//初始化信号 struct sigaction sigusr1; sigusr1.sa_flags = 0; sigusr1.sa_handler = int_signal_handler; sigemptyset(&sigusr1.sa_mask); int status = sigaction(SIGUSR1,&sigusr1,NULL); if(status == -1) {
printf("error sigaction/n"); return false; } //首先初始化调度器 Scheduler::scheduler_init(); runable_test t1("0"); runable_test t2("1"); runable_test t3("2"); runable_test t4("3"); //创建4个用户级线程 t1.uid = Scheduler::uthread_create(&t1,4096); t2.uid = Scheduler::uthread_create(&t2,4096); t3.uid = Scheduler::uthread_create(&t3,4096); t4.uid = Scheduler::uthread_create(&t4,4096); printf("create finish/n"); //开始调度线程的运行 Scheduler::schedule(); } }; int main() {
sigemptyset(&globalset); sigaddset(&globalset,SIGUSR1); //首先创建运行coroutine的线程 uthreadRunner ur; Thread c(&ur); c.start(); //创建心跳中断线程 beat b(200); b.addTread(c.GetId()); b.loop(); return 0; }

转载于:https://www.cnblogs.com/sniperHW/archive/2012/04/02/2429642.html

你可能感兴趣的文章
第十四篇:获取系统数据文件信息
查看>>
为什么有些语言可以被反编译?而有的不能?
查看>>
JVM 调优
查看>>
最大似然估计
查看>>
如何使用Total Recorder录制软件发出的声音
查看>>
把异步架构延伸到客户端
查看>>
ORACLE数据库表解锁record is locked by another user
查看>>
ImportError: libmysqlclient_r.so.16: cannot open shared object file: No such file or directory
查看>>
Qualcomm 8X camera过程解析【转】
查看>>
配置管理之PackageProvider接口
查看>>
Oracle业务用户密码过期问题的解决
查看>>
EasyBoot使用方法
查看>>
Spring中基于Java的配置@Configuration和@Bean用法 (转)
查看>>
asp.net页面后退,重复弹出上一页对话框处理办法
查看>>
SolidWorks如何绘制抽壳零件
查看>>
js 数组排序
查看>>
简洁的python测试框架——Croner
查看>>
快速编译system.img和boot.img的方法【转】
查看>>
docker 学习
查看>>
《杀死一只知更鸟》哪个译本好?
查看>>