集成学习
集成学习就是将多个学习器集成在一起完成某个任务。集成学习可以有效的提升模型的性能,取得更好的结果。
Numpy广播 broadcasting
ufunc函数在对两个数组进行计算时,如果两个数组的形状不相同,则会进行广播处理,其处理的规则为:
让所有数组都向其中维数最多的数组看齐,shape属性中不足的部分都通过在前面加1补齐;
输出数组的shape属性是输入数组shape属性在各轴上的最大值;
如果输入数组的某个轴长度为1或与输出数组对应轴的长度相同,这个数组就能够用来计算,否则出错;
当输入数组的某个轴长度为1时,沿着此轴运算时都用此轴上的第一组值;
使用with打开文件的好处不多说,这里记录一下如果要打开多个文件,该怎么书写简捷的代码。
同时打开三个文件,文件行数一样,程序实现每个文件依次读取一行,同时输出。 首先来一种比较容易想到的写法,如下一样嵌套:
What is the different between the list methods append()
and extend()
?
append: appends object at end
gives you: [1,2,3,[4,5]]
extend:extends list by appending elements from the iterable
gives you [1,2,3,4,5]
tensorflow的数据读取
tensorflow在读取像imagenet这种大量图像数据,不能一次性load进内存时有几个坑,Mark一记,以助后来者。
<!-- more -->
关于多GPU和分布式,本文只讨论数据并行方式,即每个GPU上面运行一个网络,称为tower。
* 数据格式TFRecord
* 数据读取第一层封装——Queue,Coordinator,QueueRunner
* 常用封装——tf.train.xxx_input_producer,tf.reader,tf.train.batch
TFRecord本身是一个集合,每条记录是一个tf.train.Example(一种protocol buffer),包含有多个字段(称为feature),TFRecord提供的格式一般包含几个重要的feature,比如代表大小的image/height、image/width,代表类别的image/class/label、image/class/text,代表数据的image/encoded等等。
创建TFRecord文件就是按照feature名把相应字段填充,接口为tf.python_io.TFRecordWriter和tf.train.Example,典型代码如下:
读取则是读出相应字段,接口为tf.TFRecordReader和tf.parse_single_example。他们本身都是op,需要在session中执行,无法过程式执行。对于jpg、png等压缩图片格式,读出image/encoded相应数据后需要用tf.image.decode_jpeg,tf.image.decode_png等op来进行解码,解析TFRecord的典型代码如下(tf.TFRecordReader的代码见下节):
需要特别注意的是,TF中一般一个TFRecord文件会写入多个example,这样可以把原始文件个数减少很多,从而减轻filename Queue(见下节)的负担。读取的方法见下节。
tensorflow的model zoo中的inception模型示例了imagenet的读取方法。
建立TFRecord文件集只需要略微修改build_image_data或者build_imagenet_data 就好了,他们的主体部分实现了多线程读取图像、转换和输出,前提是图像以dir/label/xxx.jpg 的命名规则存放。关键代码就是每幅图像创建tf.train.Example以及多次调用tf.python_io.TFRecordWriter的write方法写入磁盘。
读取的代码在image_processing中,imagenet_train和imagnet_distributed_train都是调用了这个文件
如前所述,典型情况下,在目录dir下有若干TFRecord文件,每个文件有若干记录,每个记录是一幅图像,可以是编码后的也可以是raw data。
流程的起点,tf读取dir下的所有TFRecord文件名并放在内存,整个系统有三个队列缓冲:FilnameQueue,ExampleQueue,BatchQueue。
那么在训练和测试模型的时候,数据读取的流程就是:
前两个操作如下图所示,注意到在用多个reader的时候一般是多线程的,每个线程有一个reader读取图像,还有一个enqueue的op把处理后的图像放进example enqueue
多线程读取
tf.train.XXXbatchXXX()封装了从一个op–examplequeue.dequeue到读出batchsize个example的过程。在这个调用中,tf创建一个新的BatchQueue,并创建多个线程用来读取ExampleQueue并填到BatchQueue中,典型代码如下:
此处有诸多值得推敲的细节:
|
|
或者直接输入到模型中训练或测试:
以上代码是整个数据读取pipeline的核心,计算流图的时候回溯执行,有以下几个步骤:
注意就读取任务而言,单机上单GPU(单tower)、多GPU(tower)是有区别的,而是否运行于多机集群环境,事实上并不重要,关键还是每台机器上面是多GPU还是单GPU。因为集群中ps并不读取图像数据,而每个worker独立读取各自的文件,并不相互影响。多tower的读取留待下篇,与tensorflow的分布式并行一起讨论。
以上介绍了高层API接口,本节其底层原理及接口,下节在此基础上重新看待上述高层接口。本节介绍的接口主要用于理解,以笔者的经验,实际使用中高层接口已经足够了。本节主要解释:
tf.train.queue_runner.QueueRunner的多线程原理
tf.train.batch_join的多线程原理
tf.TFRecordReader读取细节
QueueRunner
典型的多线程FIFO缓冲,
Coordinator的核心实体是一个Python标准库里的threading Event Object,然后封装了一些操作方法来同步各个线程一起stop。所以要在创建线程的时候把Coordinator传进去用,这样的用法需要显式开辟和管理线程,会比较麻烦,而这些线程主要用来进行enqueue,所以tf提供了QueueRunner来管理这些用于enqueue的线程。
QueueRunner在初始化时需要提供queue和一个包含enqueue操作的list,然后调用create_thread,提供Coordinator来创建和同步线程。也可以不显式调用create_thread,可以使用tf.train.start_queue_runner()一次性启动所有runner,该函数内部会自己维护一个coordinatior,从而简化了编程。
QueueRunner的线程就是在loop执行enqueue,而enqueue时如果queue已经满了就直接返回,而dequeue时候如果queue空了则抛出异常OutOfRange,见FIFOQueue的源代码。
注意,QueueRunner的enqueue一般需要dequeue前一个queue,所以QueueRunner的enqueue依赖于前一个queue的dequeue操作,即QueueRunner的每个线程不仅对本queue执行了enqueue一般也会对前一个queue执行dequeue
整个过程就是,初始化的时候在主线程中建立各个queue,然后通过Runner和Coordinator建立相应的enqueue子线程。启动session后,子线程start,填满queue之后的loop实际上无操作,然后主线程执行train或者inference,反向遍历网络中的op,到达数据读取那个op,即对example batch queue的dequeue操作,就会产生连锁反应使得各个queue中的entry出队,一直在loop的子线程就会马上读取entry并enqueue进去。
batch_join()
tf.train中的batch族函数有四个,调用他们时都会创建一个Queue和QueueRunner,就像上节所描述的那样,只是做了封装。
tf.train.batch_join()会把tensors_list中的每个元素都开一个线程来单独获取数据,并把这些得到的数据按照batch_size大小组成新的张量放进Queue,然后再调用Queue的dequeue来返回。
reader.read()
一个重要的问题是,shuffle读取batch的时候如何在一个epoch内不重复?一个epoch之后queue是不是就空了?
第二个问题很好回答,因为调用tf.train.string_input_producer的时候会创建相应的QueueRunner来持续填充FilenameQueue。在一开始,producer会存储下文件名集合,它内部有epoch的概念:epoch的大小也就是集合元素数目,每次enqueue都是顺序从集合中取值,并且会计数,够一个epoch之后会把集合shffule,这样就既保证了一个epoch内的随机性,又保证了一个epoch之内所有样本全部遍历一遍且不重复。
代码导读
在Coordinator的源码中可以看出,其主要接口join、request_stop、should_stop都是对自身成员变量_stop_event的操作。
tf的queue系统是用C++实现的