博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
简单多线程拷贝单文件v2.1
阅读量:4597 次
发布时间:2019-06-09

本文共 4703 字,大约阅读时间需要 15 分钟。

相对于《》中,将文件下载的任务分块也作为线程,可以动态添加文件块任务。

即是生产者与消费者模型。

用了本Blog的《》中的实现。

用了《》中提到的参考代码方式。

作为基类的msg_block_t。

typedef struct ares_msg_block {
pthread_t pid; enum msg_object_type msg_type; void *data;///<*用于扩展的私有数据指针 }msg_block_t;

继承的msg_block_t的msg_thread_block_t

typedef struct msg_thread_block {
msg_block_t parent; ///<*parent size_t start_position; ///<*文件的写入起始位置 size_t block_size; ///<*本次线程执行写入的大小 }msg_thread_block_t;

消息邮箱的定义,其实现参考《》。

typedef struct ares_msg_box {
pthread_cond_t not_full; pthread_cond_t not_empty; pthread_mutex_t mutex; //消息的循环队列 size_t first; size_t last; size_t size; size_t nready; ///多少个消息,判断满与空 msg_block_t * msg_array[0]; ///柔性数组 }msg_box_t;

相对于v2,添加一个新的关于任务的结构 thread_task_block_t ,其next指针表明,可能有多个任务,做扩展用,每个任务自带一个邮箱的指针。

typedef struct thread_task_block {
int infd; int outfd; size_t file_size; size_t thread_cnt; msg_box_t *mbox; int finished; struct thread_task_block *next; }thread_task_block_t;

有三个函数

int thread_block_copy(msg_thread_block_t *block,char *buf,size_t len); void *thread_task_split(void *arg); void *thread_copy_fn(void *arg);

thread_block_copy 是《》中的thread_copy_fn改写的,在此不贴代码了。

但注意其声明中,添加了一个len的参数。

因为write的时候,写入的最后一个参数为sizeof(buf)——此时应该修改为len——当数组作为函数的参数传入函数内后,

此时sizeof(buf) = 4 (IA32-x86切记)。

 

thread_task_split 是作为生产者,将下载的单文件任务分解成为单个线程下载的线程块。

代码流程不难理解,new一个消息,post到邮箱中,然后sleep下。

thread_copy_fn 是消费者,将调用thread_block_copy。

从邮箱中fetch到邮箱中,然后调用thread_block_copy 执行拷贝线程任务。

对于终止条件的判断,做了双重判断,应该可以有改进的余地——可能需要信号量的支持。

while(!(tblock->finished && msgbox_empty(tblock->mbox)))

生产者线程

thread_task_split
void *thread_task_split(void *arg) {
thread_task_block_t *tblock = (thread_task_block_t *)arg; size_t block_size = tblock->file_size /tblock->thread_cnt; size_t task_size; int i = 0; printf("At split thread \n"); thread_task_block_printf(tblock); for(; i <= tblock->thread_cnt;++i) {
msg_thread_block_t *mblock =\ (msg_thread_block_t *)msgblock_new(MSG_Object_Thread,pthread_self()); if(i == tblock->thread_cnt) task_size = tblock->file_size % tblock->thread_cnt; else task_size = block_size; msg_thread_block_init(mblock,tblock,i * block_size,task_size); msg_thread_block_printf(mblock); msgbox_post(tblock->mbox,&mblock->parent); msgbox_printf(tblock->mbox); sleep(1); } tblock->finished = 1; printf("#####Post Thread exit %ld#####\n",pthread_self()); pthread_exit(NULL); }

消费者线程

thread_copy_fn
void *thread_copy_fn(void *arg) {
thread_task_block_t *tblock = (thread_task_block_t *)arg; //msg_box_t *mbox = tblock->mbox; char buf[THREADS_BUFF_SIZE]; int ret; struct msg_thread_block *block; thread_task_block_printf(tblock); //while((!(tblock->finished))||(!msgbox_empty(mbox))) while(!(tblock->finished && msgbox_empty(tblock->mbox))) {
//printf("###at copy $$$$$$$$$$$\n"); msgbox_printf(tblock->mbox); block = (msg_thread_block_t *)msgbox_fetch(tblock->mbox); msg_thread_block_printf(block); ret = thread_block_copy(block,buf,sizeof(buf)); msgblock_free(&block->parent); sleep(1); //printf("End of copy ########\n"); }//end while printf("#####Copy Thread exit %ld#####\n",pthread_self()); pthread_exit(NULL); }

 

main测试函数

main
nt main(int argc,char *argv[]) {
if(argc < 3) {
usage(); return -1; } /*msg_box_t mbox; msgbox_init(&mbox,MBOX_SIZE); */ msg_box_t *mbox = msgbox_new(MBOX_SIZE); //获取任务队列 thread_task_block_t tblock; thread_task_block_init(&tblock,mbox,argv[1],argv[2]); thread_task_block_printf(&tblock); /** 消息分块线程 */ pthread_t task_split; pthread_create(&task_split,NULL,thread_task_split,&tblock); size_t thread_size = get_task_threadcnt(&tblock); pthread_t ptid[thread_size]; ///创建线程 printf("#######At main#########\n"); thread_task_block_printf(&tblock); int i ; for(i = 0 ; i < thread_size; ++i) {
pthread_create(&ptid[i],NULL,thread_copy_fn,&tblock); } ///线程Join for(i = 0 ; i < thread_size; ++i) {
pthread_join(ptid[i],NULL); } pthread_join(task_split,NULL); thread_task_block_destroy(&tblock); //msgbox_destroy(&mbox); msgbox_free(mbox); return 0; }

由于在msg_box中用到了柔性数组,所以必须new生成msg_box,这也是写《》的原因——因为出过错误,上过当。
实现的问题:

由于fetch实现是一直等待邮箱有消息,所以出现了当生产者线程结束,只能有一个消费者线程接受,其它线程都在等待fetch中,只能强制终止程序,有什么方案能够通知其也结束呢——这几天一直考虑中。

输入a.out list.c list.c.kb 得到如下

成功拷贝。

 

转载于:https://www.cnblogs.com/westfly/archive/2012/03/23/2414324.html

你可能感兴趣的文章
转:大灰狼的汇编视频教程笔记(下)
查看>>
javascript常见的几种事件类型
查看>>
关于大型网站技术演进的思考(八)--存储的瓶颈终篇(8)
查看>>
20+ 个很棒的 jQuery 文件上传插件或教程
查看>>
关于Struts2的多文件上传
查看>>
hosts学习整理
查看>>
github上的版本和本地版本冲突的解决方法
查看>>
ModalPopupExtender控件属性、功能
查看>>
js 工厂模式、简单模式、抽象模式
查看>>
扩展卡尔曼滤波(MRPT)
查看>>
如何解决Bluetooth系统设计的棘手问题
查看>>
加班两个星期做的一个小系统~(winform)
查看>>
排版系列1
查看>>
IDEA 生成 jar 包
查看>>
加减乘除混合版
查看>>
linux基础6-bash shell编程
查看>>
php 语法
查看>>
回顾MySpace架构的坎坷之路
查看>>
ubuntu系统无eth0网卡解决办法
查看>>
六.计算机网络互联基础
查看>>