100字范文,内容丰富有趣,生活中的好帮手!
100字范文 > 流媒体协议分析之webrtc 协议 srs 服务器实现

流媒体协议分析之webrtc 协议 srs 服务器实现

时间:2022-05-02 06:18:15

相关推荐

流媒体协议分析之webrtc 协议 srs 服务器实现

1.信令交互 ,sdp信令交互。

listen_udp : 注册udp 监听:

listen_api :注册信令交互接口:

#ifdef SRS_RTC_srs_hybrid->register_server(new RtcServerAdapter());#endifsrs_error_t RtcServerAdapter::run(SrsWaitGroup* wg){srs_error_t err = srs_success;if ((err = rtc->listen_udp()) != srs_success) {return srs_error_wrap(err, "listen udp");}if ((err = rtc->listen_api()) != srs_success) {return srs_error_wrap(err, "listen api");}if ((err = _srs_rtc_manager->start()) != srs_success) {return srs_error_wrap(err, "start manager");}return err;}srs_error_t SrsRtcServer::listen_udp(){srs_error_t err = srs_success;if (!_srs_config->get_rtc_server_enabled()) {return err;}int port = _srs_config->get_rtc_server_listen();if (port <= 0) {return srs_error_new(ERROR_RTC_PORT, "invalid port=%d", port);}string ip = srs_any_address_for_listener();srs_assert(listeners.empty());int nn_listeners = _srs_config->get_rtc_server_reuseport();for (int i = 0; i < nn_listeners; i++) {SrsUdpMuxListener* listener = new SrsUdpMuxListener(this, ip, port);if ((err = listener->listen()) != srs_success) {srs_freep(listener);return srs_error_wrap(err, "listen %s:%d", ip.c_str(), port);}srs_trace("rtc listen at udp://%s:%d, fd=%d", ip.c_str(), port, listener->fd());listeners.push_back(listener);}return err;}srs_error_t SrsRtcServer::listen_api(){srs_error_t err = srs_success;// TODO: FIXME: Fetch api from hybrid manager, not from SRS.ISrsHttpServeMux* http_api_mux = _srs_hybrid->srs()->instance()->api_server();if ((err = http_api_mux->handle("/rtc/v1/play/", new SrsGoApiRtcPlay(this))) != srs_success) {return srs_error_wrap(err, "handle play");}if ((err = http_api_mux->handle("/rtc/v1/publish/", new SrsGoApiRtcPublish(this))) != srs_success) {return srs_error_wrap(err, "handle publish");}// Generally, WHIP is a publishing protocol, but it can be also used as playing.if ((err = http_api_mux->handle("/rtc/v1/whip/", new SrsGoApiRtcWhip(this))) != srs_success) {return srs_error_wrap(err, "handle whip");}// We create another mount, to support play with the same query string as publish.if ((err = http_api_mux->handle("/rtc/v1/whip-play/", new SrsGoApiRtcWhip(this))) != srs_success) {return srs_error_wrap(err, "handle whip play");}#ifdef SRS_SIMULATORif ((err = http_api_mux->handle("/rtc/v1/nack/", new SrsGoApiRtcNACK(this))) != srs_success) {return srs_error_wrap(err, "handle nack");}#endifreturn err;}

SrsGoApiRtcPlay和SrsGoApiRtcPublish类,其对应的serve_http函数处理流程

serve_http 调用栈。

SrsHttpConn::SrsHttpConn(ISrsHttpConnOwner* handler, ISrsProtocolReadWriter* fd, ISrsHttpServeMux* m, string cip, int cport)new SrsSTCoroutine("http", this, _srs_context->get_id()); 创建协议,执行SrsSTCoroutine:cycle()SrsHttpConn::cycle()SrsHttpConn::do_cycle()SrsHttpConn::process_requestsSrsHttpConn::process_request(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, int rid)srs_error_t SrsHttpCorsMux::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)next->serve_http(w, r); //SrsHttpCorsMux::initialize 将next指向SrsHttpServeMux 对象rs_error_t SrsHttpServeMux::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)SrsHttpServeMux::find_handler(ISrsHttpMessage* r, ISrsHttpHandler** ph) 找到字符串对应的回调函数h->serve_http

SrsGoApiRtcPublish 类serve_http函数处理: 创建SrsRtcConnection类,SrsRtcPublishStream类,SrsRtcSource 类。

SrsGoApiRtcPlay::serve_httpSrsGoApiRtcPlay::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, SrsJsonObject* res)srs_error_t SrsGoApiRtcPlay::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, SrsRtcUserConfig* ruc)server_->create_session(ruc, local_sdp, &session)) != srs_success)SrsRtcServer::create_session(SrsRtcUserConfig* ruc, SrsSdp& local_sdp, SrsRtcConnection** psession)SrsRtcConnection* session = new SrsRtcConnection(this, cid);SrsRtcServer::do_create_session(SrsRtcUserConfig* ruc, SrsSdp& local_sdp, SrsRtcConnection* session)session->add_publisherSrsRtcConnection::add_publisher(SrsRtcUserConfig* ruc, SrsSdp& local_sdp)SrsRtcConnection::create_publisher(SrsRequest* req, SrsRtcSourceDescription* stream_desc)SrsRtcPublishStream* publisher = new SrsRtcPublishStream(this, _srs_context->get_id());_srs_rtc_sources->fetch_or_create(req, &source)SrsRtcSourceManager::fetch_or_create(SrsRequest* r, SrsRtcSource** pps)source = new SrsRtcSource();srs_error_t SrsRtcSource::on_rtp(SrsRtpPacket* pkt){srs_error_t err = srs_success;// If circuit-breaker is dying, drop packet.if (_srs_circuit_breaker->hybrid_dying_water_level()) {_srs_pps_aloss2->sugar += (int64_t)consumers.size();return err;}for (int i = 0; i < (int)consumers.size(); i++) {SrsRtcConsumer* consumer = consumers.at(i);if ((err = consumer->enqueue(pkt->copy())) != srs_success) {return srs_error_wrap(err, "consume message");}}if (bridge_ && (err = bridge_->on_rtp(pkt)) != srs_success) {return srs_error_wrap(err, "bridge consume message");}return err;}

SrsGoApiRtcPlay类的 serve_http 函数处理:创建:SrsRtcConnection类,SrsRtcPlayStream类,rSrsRtcConsumer类

SrsGoApiRtcPlay::serve_httpSrsGoApiRtcPlay::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, SrsJsonObject* res)srs_error_t SrsGoApiRtcPlay::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, SrsRtcUserConfig* ruc)server_->create_session(ruc, local_sdp, &session)) != srs_success)SrsRtcServer::create_session(SrsRtcUserConfig* ruc, SrsSdp& local_sdp, SrsRtcConnection** psession)SrsRtcConnection* session = new SrsRtcConnection(this, cid);SrsRtcServer::do_create_session(SrsRtcUserConfig* ruc, SrsSdp& local_sdp, SrsRtcConnection* session)session->add_playerSrsRtcConnection::add_player(SrsRtcUserConfig* ruc, SrsSdp& local_sdp)SrsRtcConnection::create_player(SrsRequest* req, std::map<uint32_t, SrsRtcTrackDescription*> sub_relations)SrsRtcPlayStream* player = new SrsRtcPlayStream(this, _srs_context->get_id());开始执行函数SrsRtcPlayStream::start()trd_ = new SrsFastCoroutine("rtc_sender", this, cid_); 创建协程:执行传入SrsRtcPlayStream类cycle函数。trd_->start()srs_error_t SrsRtcPlayStream::cycle(){srs_error_t err = srs_success;SrsRtcSource* source = source_;SrsRtcConsumer* consumer = NULL;SrsAutoFree(SrsRtcConsumer, consumer);if ((err = source->create_consumer(consumer)) != srs_success) {return srs_error_wrap(err, "create consumer, source=%s", req_->get_stream_url().c_str());}srs_assert(consumer);consumer->set_handler(this);// TODO: FIXME: Dumps the SPS/PPS from gop cache, without other frames.if ((err = source->consumer_dumps(consumer)) != srs_success) {return srs_error_wrap(err, "dumps consumer, url=%s", req_->get_stream_url().c_str());}realtime = _srs_config->get_realtime_enabled(req_->vhost, true);mw_msgs = _srs_config->get_mw_msgs(req_->vhost, realtime, true);// TODO: FIXME: Add cost in ms.SrsContextId cid = source->source_id();srs_trace("RTC: start play url=%s, source_id=%s/%s, realtime=%d, mw_msgs=%d", req_->get_stream_url().c_str(),cid.c_str(), source->pre_source_id().c_str(), realtime, mw_msgs);SrsErrorPithyPrint* epp = new SrsErrorPithyPrint();SrsAutoFree(SrsErrorPithyPrint, epp);while (true) {if ((err = trd_->pull()) != srs_success) {return srs_error_wrap(err, "rtc sender thread");}// Wait for amount of packets.SrsRtpPacket* pkt = NULL;consumer->dump_packet(&pkt);if (!pkt) {// TODO: FIXME: We should check the quit event.consumer->wait(mw_msgs);continue;}// Send-out the RTP packet and do cleanup// @remark Note that the pkt might be set to NULL.if ((err = send_packet(pkt)) != srs_success) {uint32_t nn = 0;if (epp->can_print(err, &nn)) {srs_warn("play send packets=%u, nn=%u/%u, err: %s", 1, epp->nn_count, nn, srs_error_desc(err).c_str());}srs_freep(err);}// Free the packet.// @remark Note that the pkt might be set to NULL.srs_freep(pkt);}}

上面流程经过sdp 交换:

SDP交换与ICE建立连接

上面的过程只创建了针对WebRTC服务的关键对象,接下来需要分析,推拉流客户端与WebRTC服务的监听端口(8000)之间如何建立连接。WebRTC客户端与服务端之间的连接建立方式采用了类P2P私网穿透的方式。这种方式的一个最大特点就是一个WebRTC客户端向服务端发起连接请求时,事先并不知道服务端的IP地址和端口号,所以WebRTC连接建立一般包括两个阶段:

1)WebRTC客户端与服务端之间以offer和answer的方式交换包含各自IP地址+端口号信息的SDP(Session Description Protocol)报文。

2)WebRTC客户端从服务端SDP报文中获取服务端的IP地址和端口号,并以ICE(Interactive Connectivity Establishment)方式,在客户端和服务端之间建立连接,用于后续音视频数据的传输。

网上关于SDP和ICE的资料比较多,可根据需要学习、参考

/a/1190000038272539 WebRTC SDP 详解和剖析

/a/1190000020794391?utm_source=sf-similar-article WebRTC会话描述协议(SDP)详解

/p/60684464 WebRTC 之ICE浅谈

下面是浏览器发送给SRS服务器的offer SDP,因为是trickle模式,所以SDP中没有包含客户端的IP地址,当然这并不影响最终的连接建立。

v=0o=- 6308787264381624235 2 IN IP4 127.0.0.1s=-t=0 0a=group:BUNDLE 0 1m=audio 9 UDP/TLS/RTP/SAVPF 111 103 104 9 0 8 106 105 13 110 112 113 126a=ice-options:tricklea=sendonlym=video 9 UDP/TLS/RTP/SAVPF 96 97 98 99 100 101 102 121 127 120 125 107 108 109 35 36 124 119 123a=ice-options:tricklea=sendonly

SRS服务端响应的answer SDP,其中candidate属性包含了SRS服务器的IP地址和端口描述信息(192.168.9.102 8000),并且服务端采用ice-lite模式简化了ICE协商过程。

v=0o=SRS/4.0.140(Leo) 32138128 2 IN IP4 0.0.0.0s=SRSPublishSessiont=0 0a=ice-litea=group:BUNDLE 0 1m=audio 9 UDP/TLS/RTP/SAVPF 111a=recvonlya=candidate:0 1 udp 2130706431 192.168.9.102 8000 typ host generation 0m=video 9 UDP/TLS/RTP/SAVPF 125 124a=recvonlya=candidate:0 1 udp 2130706431 192.168.9.102 8000 typ host generation 0

接下来,浏览器向SRS服务器的8000端口发送一个Binding Request报文,服务器给浏览器回一个Binding Success Response响应。最终,推拉流客户端与SRS服务器(8000端口)建立连接。

2. webrtc 通信,通过SDP,知道了udp 通信端口。

listen_udpSrsUdpMuxListener->listen trd = new SrsSTCoroutine("udp", this, cid); 创建协程:执行传入SrsUdpMuxListener类cycle函数。SrsUdpMuxListener::cycle()skt.recvfromhandler->on_udp_packet(&skt); 处理SrsRtcServer::on_udp_packet(SrsUdpMuxSocket* skt)

srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket* skt){srs_error_t err = srs_success;SrsRtcConnection* session = NULL;char* data = skt->data(); int size = skt->size();bool is_rtp_or_rtcp = srs_is_rtp_or_rtcp((uint8_t*)data, size);bool is_rtcp = srs_is_rtcp((uint8_t*)data, size);uint64_t fast_id = skt->fast_id();// Try fast id first, if not found, search by long peer id.if (fast_id) {session = (SrsRtcConnection*)_srs_rtc_manager->find_by_fast_id(fast_id);}if (!session) {string peer_id = skt->peer_id();session = (SrsRtcConnection*)_srs_rtc_manager->find_by_id(peer_id);}if (session) {// When got any packet, the session is alive now.session->alive();}// For STUN, the peer address may change.if (!is_rtp_or_rtcp && srs_is_stun((uint8_t*)data, size)) {++_srs_pps_rstuns->sugar;string peer_id = skt->peer_id();// TODO: FIXME: Should support ICE renomination, to switch network between candidates.SrsStunPacket ping;if ((err = ping.decode(data, size)) != srs_success) {return srs_error_wrap(err, "decode stun packet failed");}if (!session) {session = find_session_by_username(ping.get_username());}if (session) {session->switch_to_context();}printf("recv stun packet from %s, fast=%" PRId64 ", use-candidate=%d, ice-controlled=%d, ice-controlling=%d\n",peer_id.c_str(), fast_id, ping.get_use_candidate(), ping.get_ice_controlled(), ping.get_ice_controlling());// TODO: FIXME: For ICE trickle, we may get STUN packets before SDP answer, so maybe should response it.if (!session) {return srs_error_new(ERROR_RTC_STUN, "no session, stun username=%s, peer_id=%s, fast=%" PRId64,ping.get_username().c_str(), peer_id.c_str(), fast_id);}// For each binding request, update the UDP socket.if (ping.is_binding_request()) {session->udp()->update_sendonly_socket(skt);}return session->udp()->on_stun(&ping, data, size);}printf("recv stun packet fast=%" PRId64 " \n",fast_id);// For DTLS, RTCP or RTP, which does not support peer address changing.if (!session) {string peer_id = skt->peer_id();printf("%s %d\n",__FUNCTION__,__LINE__);return srs_error_new(ERROR_RTC_STUN, "no session, peer_id=%s, fast=%" PRId64, peer_id.c_str(), fast_id);}// Note that we don't(except error) switch to the context of session, for performance issue.if (is_rtp_or_rtcp && !is_rtcp) {++_srs_pps_rrtps->sugar;err = session->udp()->on_rtp(data, size);if (err != srs_success) {session->switch_to_context();}return err;}session->switch_to_context();if (is_rtp_or_rtcp && is_rtcp) {++_srs_pps_rrtcps->sugar;return session->udp()->on_rtcp(data, size);}if (srs_is_dtls((uint8_t*)data, size)) {++_srs_pps_rstuns->sugar;return session->udp()->on_dtls(data, size);}return srs_error_new(ERROR_RTC_UDP, "unknown packet");}

函数

客户端和服务端通过STUN协议和ICE机制建立连接客户端和服务端通过DTLS协议报文完成安全认证并生成SRTP加解密所需的密钥客户端和服务端之间通过SRTP算法实现RTP报文的加解密客户端和服务端之间通过RTCP报文完成音视频数据的Qos处理

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。