相对于《》中,将文件下载的任务分块也作为线程,可以动态添加文件块任务。
即是生产者与消费者模型。
用了本Blog的《》中的实现。
用了《》中提到的参考代码方式。
作为基类的msg_block_t。
typedef struct ares_msg_block { pthread_t pid; enum msg_object_type msg_type; void *data;///<*用于扩展的私有数据指针 }msg_block_t;
继承的msg_block_t的msg_thread_block_t
typedef struct msg_thread_block { msg_block_t parent; ///<*parent size_t start_position; ///<*文件的写入起始位置 size_t block_size; ///<*本次线程执行写入的大小 }msg_thread_block_t;
消息邮箱的定义,其实现参考《》。
typedef struct ares_msg_box { pthread_cond_t not_full; pthread_cond_t not_empty; pthread_mutex_t mutex; //消息的循环队列 size_t first; size_t last; size_t size; size_t nready; ///多少个消息,判断满与空 msg_block_t * msg_array[0]; ///柔性数组 }msg_box_t;
相对于v2,添加一个新的关于任务的结构 thread_task_block_t ,其next指针表明,可能有多个任务,做扩展用,每个任务自带一个邮箱的指针。
typedef struct thread_task_block { int infd; int outfd; size_t file_size; size_t thread_cnt; msg_box_t *mbox; int finished; struct thread_task_block *next; }thread_task_block_t;
有三个函数
int thread_block_copy(msg_thread_block_t *block,char *buf,size_t len); void *thread_task_split(void *arg); void *thread_copy_fn(void *arg);
thread_block_copy 是《》中的thread_copy_fn改写的,在此不贴代码了。
但注意其声明中,添加了一个len的参数。
因为write的时候,写入的最后一个参数为sizeof(buf)——此时应该修改为len——当数组作为函数的参数传入函数内后,
此时sizeof(buf) = 4 (IA32-x86切记)。
thread_task_split 是作为生产者,将下载的单文件任务分解成为单个线程下载的线程块。
代码流程不难理解,new一个消息,post到邮箱中,然后sleep下。
thread_copy_fn 是消费者,将调用thread_block_copy。从邮箱中fetch到邮箱中,然后调用thread_block_copy 执行拷贝线程任务。
对于终止条件的判断,做了双重判断,应该可以有改进的余地——可能需要信号量的支持。
while(!(tblock->finished && msgbox_empty(tblock->mbox)))
生产者线程
void *thread_task_split(void *arg) { thread_task_block_t *tblock = (thread_task_block_t *)arg; size_t block_size = tblock->file_size /tblock->thread_cnt; size_t task_size; int i = 0; printf("At split thread \n"); thread_task_block_printf(tblock); for(; i <= tblock->thread_cnt;++i) { msg_thread_block_t *mblock =\ (msg_thread_block_t *)msgblock_new(MSG_Object_Thread,pthread_self()); if(i == tblock->thread_cnt) task_size = tblock->file_size % tblock->thread_cnt; else task_size = block_size; msg_thread_block_init(mblock,tblock,i * block_size,task_size); msg_thread_block_printf(mblock); msgbox_post(tblock->mbox,&mblock->parent); msgbox_printf(tblock->mbox); sleep(1); } tblock->finished = 1; printf("#####Post Thread exit %ld#####\n",pthread_self()); pthread_exit(NULL); }
消费者线程
void *thread_copy_fn(void *arg) { thread_task_block_t *tblock = (thread_task_block_t *)arg; //msg_box_t *mbox = tblock->mbox; char buf[THREADS_BUFF_SIZE]; int ret; struct msg_thread_block *block; thread_task_block_printf(tblock); //while((!(tblock->finished))||(!msgbox_empty(mbox))) while(!(tblock->finished && msgbox_empty(tblock->mbox))) { //printf("###at copy $$$$$$$$$$$\n"); msgbox_printf(tblock->mbox); block = (msg_thread_block_t *)msgbox_fetch(tblock->mbox); msg_thread_block_printf(block); ret = thread_block_copy(block,buf,sizeof(buf)); msgblock_free(&block->parent); sleep(1); //printf("End of copy ########\n"); }//end while printf("#####Copy Thread exit %ld#####\n",pthread_self()); pthread_exit(NULL); }
main测试函数
nt main(int argc,char *argv[]) { if(argc < 3) { usage(); return -1; } /*msg_box_t mbox; msgbox_init(&mbox,MBOX_SIZE); */ msg_box_t *mbox = msgbox_new(MBOX_SIZE); //获取任务队列 thread_task_block_t tblock; thread_task_block_init(&tblock,mbox,argv[1],argv[2]); thread_task_block_printf(&tblock); /** 消息分块线程 */ pthread_t task_split; pthread_create(&task_split,NULL,thread_task_split,&tblock); size_t thread_size = get_task_threadcnt(&tblock); pthread_t ptid[thread_size]; ///创建线程 printf("#######At main#########\n"); thread_task_block_printf(&tblock); int i ; for(i = 0 ; i < thread_size; ++i) { pthread_create(&ptid[i],NULL,thread_copy_fn,&tblock); } ///线程Join for(i = 0 ; i < thread_size; ++i) { pthread_join(ptid[i],NULL); } pthread_join(task_split,NULL); thread_task_block_destroy(&tblock); //msgbox_destroy(&mbox); msgbox_free(mbox); return 0; }
由于fetch实现是一直等待邮箱有消息,所以出现了当生产者线程结束,只能有一个消费者线程接受,其它线程都在等待fetch中,只能强制终止程序,有什么方案能够通知其也结束呢——这几天一直考虑中。
输入a.out list.c list.c.kb 得到如下
成功拷贝。