(转)IikPlayer packet queue分析
ffplay packet queue分析
ffplay用PacketQueue保存解封装后的数据,即保存AVPacket。
ffplay首先定义了一个结构体MyAVPacketList:
typedef struct MyAVPacketList {
AVPacket pkt;//解封装后的数据
struct MyAVPacketList *next;//下一个节点
int serial;//序列号
} MyAVPacketList;
可以理解为是队列的一个节点。可以通过其next字段访问下一个节点。
所以这里我认为命名为MyAVPacketNode更为合理 serial字段主要用于标记当前节点的序列号,ffplay中多处用到serial的概念,一般用于区分是否连续数据。在后面的代码分析中我们还会看到它的作用。
接着定义另一个结构体PacketQueue:
typedef struct PacketQueue {
MyAVPacketList *first_pkt, *last_pkt;//队首,队尾
int nb_packets;//队列中一共有多少个节点
int size;//队列所有节点字节总数,用于计算cache大小
int64_t duration;//队列所有节点的合计时长
int abort_request;//是否要中止队列操作,用于安全快速退出播放
int serial;//序列号,和MyAVPacketList的serial作用相同,但改变的时序稍微有点不同
SDL_mutex *mutex;//用于维持PacketQueue的多线程安全(SDL_mutex可以按pthread_mutex_t理解)
SDL_cond *cond;//用于读、写线程相互通知(SDL_cond可以按pthread_cond_t理解)
} PacketQueue;
这个结构体内定义了“队列”自身的属性。上面的注释对每个字段作了简单的介绍,接下来我们从队列的操作函数具体分析各个字段的含义。
PacketQueue操作提供以下方法:
- packet_queue_init:初始化
- packet_queue_destroy:销毁
- packet_queue_start:启用
- packet_queue_abort:中止
- packet_queue_get:获取一个节点
- packet_queue_put:存入一个节点
- packet_queue_put_nullpacket:存入一个空节点
- packet_queue_flush:清除队列内所有的节点
初始化用于初始各个字段的值,并创建mutex和cond:
static int packet_queue_init(PacketQueue *q) { memset(q, 0, sizeof(PacketQueue)); q->mutex = SDL_CreateMutex(); if (!q->mutex) { av_log(NULL, AV_LOG_FATAL, "SDL_CreateMutex(): %s\n", SDL_GetError()); return AVERROR(ENOMEM); } q->cond = SDL_CreateCond(); if (!q->cond) { av_log(NULL, AV_LOG_FATAL, "SDL_CreateCond(): %s\n", SDL_GetError()); return AVERROR(ENOMEM); } q->abort_request = 1; return 0; }
相应的,销毁过程负责清理mutex和cond:
static void packet_queue_destroy(PacketQueue *q)
{
packet_queue_flush(q);//先清除所有的节点
SDL_DestroyMutex(q->mutex);
SDL_DestroyCond(q->cond);
}
启用队列:
static void packet_queue_start(PacketQueue *q)
{
SDL_LockMutex(q->mutex);
q->abort_request = 0;
packet_queue_put_private(q, &flush_pkt);//这里放入了一个flush_pkt
SDL_UnlockMutex(q->mutex);
}
flush_pkt定义是static AVPacket flush_pkt;,是一个特殊的packet,主要用来作为非连续的两端数据的“分界”标记。
中止队列:
static void packet_queue_abort(PacketQueue *q)
{
SDL_LockMutex(q->mutex);
q->abort_request = 1;
SDL_CondSignal(q->cond);//释放一个条件信号
SDL_UnlockMutex(q->mutex);
}
这里SDL_CondSignal的作用在于确保当前等待该条件的线程能被激活并继续执行退出流程。
读、写是PacketQueue的主要方法。
先看写——往队列中放入一个节点:
static int packet_queue_put(PacketQueue *q, AVPacket *pkt)
{
int ret;
SDL_LockMutex(q->mutex);
ret = packet_queue_put_private(q, pkt);//主要实现在这里
SDL_UnlockMutex(q->mutex);
if (pkt != &flush_pkt && ret < 0)
av_packet_unref(pkt);//放入失败,释放AVPacket
return ret;
}
主要实现在函数packet_queue_put_private,这里需要注意的是如果放入失败,需要释放AVPacket。
static int packet_queue_put_private(PacketQueue *q, AVPacket *pkt)
{
MyAVPacketList *pkt1;
if (q->abort_request)//如果已中止,则放入失败
return -1;
pkt1 = av_malloc(sizeof(MyAVPacketList));//分配节点内存
if (!pkt1)//内存不足,则放入失败
return -1;
pkt1->pkt = *pkt;//拷贝AVPacket(浅拷贝,AVPacket.data等内存并没有拷贝)
pkt1->next = NULL;
if (pkt == &flush_pkt)//如果放入的是flush_pkt,需要增加队列的序列号,以区分不连续的两段数据
q->serial++;
pkt1->serial = q->serial;//用队列序列号标记节点
//队列操作:如果last_pkt为空,说明队列是空的,新增节点为队头;否则,队列有数据,则让原队尾的next为新增节点。 最后将队尾指向新增节点
if (!q->last_pkt)
q->first_pkt = pkt1;
else
q->last_pkt->next = pkt1;
q->last_pkt = pkt1;
//队列属性操作:增加节点数、cache大小、cache总时长
q->nb_packets++;
q->size += pkt1->pkt.size + sizeof(*pkt1);
q->duration += pkt1->pkt.duration;
/* XXX: should duplicate packet data in DV case */
//发出信号,表明当前队列中有数据了,通知等待中的读线程可以取数据了
SDL_CondSignal(q->cond);
return 0;
}
对packet_queue_put_private笔者增加了详细注释,应该比较容易理解了。
主要完成3件事:
计算serial。serial标记了这个节点内的数据是何时的。一般情况下新增节点与上一个节点的serial是一样的,但当队列中加入一个flush_pkt后,后续节点的serial会比之前大1. 队列操作。经典的队列实现方式,不展开了。 队列属性操作。更新队列中节点的数目、占用字节数(含AVPacket.data的大小)及其时长。 再来看读——从队列中取一个节点:
/* return < 0 if aborted, 0 if no packet and > 0 if packet. */
//block: 调用者是否需要在没节点可取的情况下阻塞等待
//AVPacket: 输出参数,即MyAVPacketList.pkt
//serial: 输出参数,即MyAVPacketList.serial
static int packet_queue_get(PacketQueue *q, AVPacket *pkt, int block, int *serial)
{
MyAVPacketList *pkt1;
int ret;
SDL_LockMutex(q->mutex);
for (;;) {
if (q->abort_request) {
ret = -1;
break;
}
//......这里是省略的代码,取一个节点,然后break
}
SDL_UnlockMutex(q->mutex);
return ret;
}
函数较长,我们先省略for循环的主体部分,简单看下函数整体流程。整体流程比较清晰:加锁,进入循环,如果此时需要退出,则break,返回-1;否则,取一个节点,然后break。
这里for循环主要充当一个“壳”,以方便在一块多分支代码中可以通过break调到统一的出口。 对于加锁情况下的多分支return,这是一个不错的写法。但要小心这是一把双刃剑,没有仔细处理每个分支,容易陷入死循环。 然后看for的主体:
pkt1 = q->first_pkt;//MyAVPacketList *pkt1; 从队头拿数据
if (pkt1) {//队列中有数据
q->first_pkt = pkt1->next;//队头移到第二个节点
if (!q->first_pkt)
q->last_pkt = NULL;
q->nb_packets--;//节点数减1
q->size -= pkt1->pkt.size + sizeof(*pkt1);//cache大小扣除一个节点
q->duration -= pkt1->pkt.duration;//总时长扣除一个节点
*pkt = pkt1->pkt;//返回AVPacket,这里发生一次AVPacket结构体拷贝,AVPacket的data只拷贝了指针
if (serial)//如果需要输出serial,把serial输出
*serial = pkt1->serial;
av_free(pkt1);//释放节点内存
ret = 1;
break;
} else if (!block) {//队列中没有数据,且非阻塞调用
ret = 0;
break;
} else {//队列中没有数据,且阻塞调用
SDL_CondWait(q->cond, q->mutex);//这里没有break。for循环的另一个作用是在条件变量满足后重复上述代码取出节点
}
我们知道队列是一个先进先出的模型,所以从队头拿数据。对于没有取到数据的情况,根据block参数进行判断是否阻塞,如果阻塞,通过SDL_CondWait等待信号。
如果有取到数据,主要分3个步骤:
队列操作:转移队头、扣除大小。这里nb_packets和duration的运算较明显,size需要注意也要扣除AVPacket的size 给输出参数赋值:基本就是MyAVPacketList拍平传递给输出参数pkt和serial即可 释放节点内存:释放放入队列时申请的节点内存 最后是提供了几个”util”方法:
packet_queue_put_nullpacket放入“空包”。放入空包意味着流的结束,一般在视频读取完成的时候放入空包。该函数的实现很明了,构建一个空包,然后调用packet_queue_put:
static int packet_queue_put_nullpacket(PacketQueue *q, int stream_index)
{
AVPacket pkt1, *pkt = &pkt1;
av_init_packet(pkt);
pkt->data = NULL;
pkt->size = 0;
pkt->stream_index = stream_index;
return packet_queue_put(q, pkt);
}
packet_queue_flush用于将队列中的所有节点清除。比如用于销毁队列、seek操作等。
static void packet_queue_flush(PacketQueue *q)
{
MyAVPacketList *pkt, *pkt1;
SDL_LockMutex(q->mutex);
for (pkt = q->first_pkt; pkt; pkt = pkt1) {
pkt1 = pkt->next;
av_packet_unref(&pkt->pkt);
av_freep(&pkt);
}
q->last_pkt = NULL;
q->first_pkt = NULL;
q->nb_packets = 0;
q->size = 0;
q->duration = 0;
SDL_UnlockMutex(q->mutex);
}
函数主体的for循环是队列遍历,遍历过程释放节点和AVPacket。最后将PacketQueue的属性恢复为空队列状态。
至此,我们分析了PacketQueue的实现和主要的操作方法。
现在总结下两个关键的点:
第一,PacketQueue的内存管理:
MyAVPacketList的内存是完全由PacketQueue维护的,在put的时候malloc,在get的时候free。
AVPacket分两块,一部分是AVPacket结构体的内存,这部分从MyAVPacketList的定义可以看出是和MyAVPacketList共存亡的。另一部分是AVPacket字段指向的内存,这部分一般通过av_packet_unref函数释放。一般情况下,是在get后由调用者负责用av_packet_unref函数释放。特殊的情况是当碰到packet_queue_flush或put失败时,这时需要队列自己处理。
第二,serial的变化过程:
如上图所示,左边是队头,右边是队尾,从左往右标注了5个节点的serial,以及放入对应节点时queue的serial。
可以看到放入flush_pkt的时候后,serial增加了1.
要区分的是上图虽然看起来queue的serial和节点的serial是相等的,但这是放入时相等,在取出时是不等的。假设,现在要从队头取出一个节点,那么取出的节点是serial 1,而PacketQueue自身的queue已经增长到了2.
代码背后的设计思路:
设计一个多线程安全的队列,保存AVPacket,同时统计队列内已缓存的数据大小。(这个统计数据会用来后续设置要缓存的数据量) 引入serial的概念,区别前后数据包是否连续 设计了两类特殊的packet——flush_pkt和nullpkt,用于更细致的控制(类似用于多线程编程的事件模型——往队列中放入flush事件、放入null事件)