ijkPlayer源码分析 PacketQueue分析

本文介绍PacketQueue,相对于FrameQueue来说比较简单,可以类比Android中的MessageQueue 。
PacketQueue总体介绍

  1. 单向链表结构 。first_pkt、last_pkt,是链表的起点和终点结点;recycle_pkt链表用于节点复用;
  2. 是一个多线程安全队列,靠等待唤醒机制保证线程安全;
  3. 当遇到flush_pkt时,serial加1自增,标志着流序列变化,区分是否是连续的流;
typedef struct MyAVPacketList {AVPacket pkt;struct MyAVPacketList *next;int serial;} MyAVPacketList;typedef struct PacketQueue {MyAVPacketList *first_pkt, *last_pkt;int nb_packets;int size;int64_t duration;int abort_request;int serial;SDL_mutex *mutex;SDL_cond *cond;MyAVPacketList *recycle_pkt;int recycle_count;int alloc_count;int is_buffer_indicator;SDL_ProfilervideoBufferProfiler;SDL_ProfileraudioBufferProfiler;void *ffp;} PacketQueue; 
PacketQueue API介绍packet_queue_init:初始化;
packet_queue_start:启动队列,设置abort_request为0,先放一个flush_pkt;
packet_queue_put:存入一个节点,;
packet_queue_put_nullpacket:存入一个空节点;
packet_queue_put_private:存入一个节点,后唤醒packet_queue_get等待锁;
packet_queue_get:获取一个节点;
packet_queue_get_or_buffering:去缓冲等待水位后获取一个节点;
packet_queue_abort:中止,设置abort_request=1后唤醒packet_queue_get等待锁;
packet_queue_flush:清除队列内所有的节点;
packet_queue_destroy:销毁;
 
初始化
【ijkPlayer源码分析 PacketQueue分析】static int packet_queue_init(PacketQueue *q) {memset(q, 0, sizeof(PacketQueue));q->mutex = SDL_CreateMutex();q->cond = SDL_CreateCond();q->abort_request = 1;return 0;}static void packet_queue_start(PacketQueue *q) {SDL_LockMutex(q->mutex);q->abort_request = 0;packet_queue_put_private(q, &flush_pkt);SDL_UnlockMutex(q->mutex);} 
put操作
/** 存入null结点,eof和error时候存入,表示流结束*/static int packet_queue_put_nullpacket(PacketQueue *q, int stream_index) {AVPacket pkt1, *pkt = &pkt1;av_init_packet(pkt);pkt->data = https://www.isolves.com/it/cxkf/bk/2021-07-13/NULL;pkt->size = 0;pkt->stream_index = stream_index;return packet_queue_put(q, pkt);}static int packet_queue_put(PacketQueue *q, AVPacket *pkt) {int ret;SDL_LockMutex(q->mutex);ret = packet_queue_put_private(q, pkt);SDL_UnlockMutex(q->mutex);if (pkt != &flush_pkt && ret < 0)av_packet_unref(pkt);return ret;}static int packet_queue_put_private(PacketQueue *q, AVPacket *pkt) {if (q->abort_request) {return -1;}// 如果有已经回收的就复用该回收的结点,没有就申请一个;MyAVPacketList *pkt1 = q->recycle_pkt;if (pkt1) {q->recycle_pkt = pkt1->next; // 移动到下一个q->recycle_count++;} else {q->alloc_count++;pkt1 = av_malloc(sizeof(MyAVPacketList));}if (!pkt1) {return -1;}pkt1->pkt = *pkt;pkt1->next = NULL;// 遇到flush_pkt就升级serial序列号,标志刚开始或进行了seekif (pkt == &flush_pkt) {q->serial++;}pkt1->serial = q->serial;//赋值first_pkt和last_pkt,定义链表的起点和终点;if (!q->last_pkt) { // 条件判断同 !q->first_pktq->first_pkt = pkt1;} else {q->last_pkt->next = pkt1;}q->last_pkt = pkt1;q->nb_packets++;q->size += pkt1->pkt.size + sizeof(*pkt1);q->duration += FFMAX(pkt1->pkt.duration, MIN_PKT_DURATION);/* XXX: should duplicate packet data in DV case */SDL_CondSignal(q->cond);return 0;} 
get操作
/** block: 是否阻塞* 返回1表示获取到了,返回值<=0表示没获取到*/static int packet_queue_get(PacketQueue *q, AVPacket *pkt, int block, int *serial) {MyAVPacketList *pkt1;int ret;// 加锁SDL_LockMutex(q->mutex);for (;;) {if (q->abort_request) {ret = -1;break;}pkt1 = q->first_pkt;if (pkt1) {q->first_pkt = pkt1->next;if (!q->first_pkt) {// 说明只有一个结点q->last_pkt = NULL;}q->nb_packets--;q->size -= pkt1->pkt.size + sizeof(*pkt1);q->duration -= FFMAX(pkt1->pkt.duration, MIN_PKT_DURATION);*pkt = pkt1->pkt;if (serial) {*serial = pkt1->serial;}// 把pkt1持有的pkt给出去后进行回收,放到recycle_pkt链表头部pkt1->next = q->recycle_pkt;q->recycle_pkt = pkt1;ret = 1;break;} else if (!block) {ret = 0;break;} else {// wait阻塞,等待put唤醒SDL_CondWait(q->cond, q->mutex);}}SDL_UnlockMutex(q->mutex);return ret;}/* *阻塞等待直到退出或者有AVPacket数据 *>= 0 即取到值; */static int packet_queue_get_or_buffering(FFPlayer *ffp, PacketQueue *q, AVPacket *pkt, int *serial,int *finished) {if (!ffp->packet_buffering)return packet_queue_get(q, pkt, 1, serial); // queue为空时会阻塞等待while (1) {int new_packet = packet_queue_get(q, pkt, 0, serial); // 非阻塞,直接返回if (new_packet < 0) {// abort_request了return -1;} else if (new_packet == 0) {// 队列为空,去缓冲if (q->is_buffer_indicator && !*finished) {ffp_toggle_buffering(ffp, 1);}// 再阻塞获取,等待水位填充满new_packet = packet_queue_get(q, pkt, 1, serial);if (new_packet < 0) {// abort_request了return -1;}}if (*finished == *serial) {av_packet_unref(pkt);continue;} else {break;}}return 1;}


推荐阅读