Kurento 源码解析系列(3)- RtpEndpoint端点初始化解析

    科技2025-06-19  5

    关于kurento的rtpendpoint 从客户端发起rpc调用后,在服务器内部如何从cpp层到c层进行相关实例化

     需要先回顾以下每个可直接实例化对象的创建的流程  首先是JAVA客户端通过rpc调用在media-server工程中的ServerMethods.cpp中create方法;

    void ServerMethods::create (const Json::Value &params,                        Json::Value &response)

    然后根据type获取到本次rpc获取到的对象TYPE类型来选择本次创建对象的工厂类型 我们本次是1个rtpendpoint,因此获取的工厂是rtpendpoint相应的工厂

    factory = moduleManager.getFactory (type);

     

    然后在通过factory来创建本次真正创建的对象 创建过程中,同时根据sesion把object放入到MediaSet中,方便后续使用。

        object = std::dynamic_pointer_cast<MediaObjectImpl> (                factory->createObject (config, sessionId, params["constructorParams"]) );

    这里则将根据type类型来创建相应的对象,所有端点都是继承与MediaObejctImpl,因此返回1个基于MediaObjectImpl的共享指针

     

    std::shared_ptr< MediaObjectImpl > Factory::createObject (const boost::property_tree::ptree &conf,                        const std::string &session, const Json::Value &params) const {   std::shared_ptr< MediaObjectImpl > object; //createObjectPointer为具体对象的创建方法,传入形参则是全局配置和该对象的参数   object = MediaSet::getMediaSet()->ref (dynamic_cast <MediaObjectImpl *>                                          (createObjectPointer (conf, params) ) );

      MediaSet::getMediaSet()->ref (session, object);      return object; }

    kurento中所有对象的factory实现类都是在编译时候生成的,因此查看源代码需要在build下的文件夹找到相应的工厂实现类 以下这个方法则是RtpEndpointImplInternal.cpp文件中的创建对象真正的方法

    MediaObjectImpl *RtpEndpointImplFactory::createObjectPointer (const boost::property_tree::ptree &conf, const Json::Value &params) const {   kurento::JsonSerializer s (false);   RtpEndpointConstructor constructor;

      s.JsonValue = params;   constructor.Serialize (s); //constructor主要是将RPC传递过来的序列化字符创,反序列化为对象,然后交给createObject对新的RtpEndpoint进行构造初始化 //下面的creatteOBject则是调用到了RtpEndpointImpl.cpp中的createObject方法   return createObject (conf, constructor.getMediaPipeline(), constructor.getCrypto(), constructor.getUseIpv6() ); }

    RtpEndpointImpl.cpp类中具体调用对象实例化的方法

    MediaObjectImpl * RtpEndpointImplFactory::createObject (const boost::property_tree::ptree &conf,     std::shared_ptr<MediaPipeline> mediaPipeline, std::shared_ptr<SDES> crypto,     bool useIpv6) const {   return new RtpEndpointImpl (conf, mediaPipeline, crypto, useIpv6); }

    以下是具体的类的构造方法,肉眼可见的继承了BaseRtpEndpointImpl.cpp类

    RtpEndpointImpl::RtpEndpointImpl (const boost::property_tree::ptree &conf,     std::shared_ptr<MediaPipeline> mediaPipeline, std::shared_ptr<SDES> crypto,     bool useIpv6)     : BaseRtpEndpointImpl (conf,         std::dynamic_pointer_cast<MediaObjectImpl> (mediaPipeline),         FACTORY_NAME, useIpv6) {   if (!crypto->isSetCrypto ()) {     return;   }

      if (!crypto->isSetKey () && !crypto->isSetKeyBase64 ()) {     /* Use random key */     g_object_set (         element, "crypto-suite", crypto->getCrypto ()->getValue (), NULL);     return;   }

      gsize expect_size;

      switch (crypto->getCrypto ()->getValue ()) {   case CryptoSuite::AES_128_CM_HMAC_SHA1_32:   case CryptoSuite::AES_128_CM_HMAC_SHA1_80:     expect_size = KMS_SRTP_CIPHER_AES_CM_128_SIZE;     break;   case CryptoSuite::AES_256_CM_HMAC_SHA1_32:   case CryptoSuite::AES_256_CM_HMAC_SHA1_80:     expect_size = KMS_SRTP_CIPHER_AES_CM_256_SIZE;     break;   default:     throw KurentoException (         MEDIA_OBJECT_ILLEGAL_PARAM_ERROR, "Invalid crypto suite");   }

      std::string key_b64;   gsize key_data_size = 0;

      if (crypto->isSetKey ()) {     std::string tmp = crypto->getKey ();     key_data_size = tmp.length ();

        gchar *tmp_b64 =         g_base64_encode ((const guchar *) tmp.data (), tmp.length ());     key_b64 = std::string (tmp_b64);     g_free (tmp_b64);   } else if (crypto->isSetKeyBase64 ()) {     key_b64 = crypto->getKeyBase64 ();     guchar *tmp_b64 = g_base64_decode (key_b64.data (), &key_data_size);     if (!tmp_b64) {       GST_ERROR_OBJECT (element, "Master key is not valid Base64");       throw KurentoException (           MEDIA_OBJECT_ILLEGAL_PARAM_ERROR, "Master key is not valid Base64");     }     g_free (tmp_b64);   }

      if (key_data_size != expect_size) {     GST_ERROR_OBJECT (element,         "Bad Base64-decoded master key size: got %lu, expected %lu",         key_data_size, expect_size);     throw KurentoException (         MEDIA_OBJECT_ILLEGAL_PARAM_ERROR, "Master key size is wrong");   }

      g_object_set (element, "master-key", key_b64.data (), "crypto-suite",       crypto->getCrypto ()->getValue (), NULL); }

    读完所有代码,可以知道,没有任何对C层gobject对象(也就是gstream的模拟对象)的相关操作,因此基本上就可以推断,对C层的gobject对象的实例化是在更顶级的父类进行的 而且所有的endpoint都是这样的思路来构造,那gobject的构造肯定在最顶级的父类构造, 继续查看顶级父类的构造函数,可以证明这一点,最终gobject的构造是在最顶级的cpp父类

    另外注意的是这个方法,每当这个对象被引用1次,将触发一次这个方法

    void RtpEndpointImpl::postConstructor () {   BaseRtpEndpointImpl::postConstructor (); //这里其实也仅仅是注册了1个事件监听回调   handlerOnKeySoftLimit =       register_signal_handler (G_OBJECT (element), "key-soft-limit",           std::function<void (GstElement *, gchar *)> (std::bind (               &RtpEndpointImpl::onKeySoftLimit, this, std::placeholders::_2)),           std::dynamic_pointer_cast<RtpEndpointImpl> (shared_from_this ())); } //具体调用所有endpoint的postConstructor这个方法的代码如下 std::shared_ptr<MediaObjectImpl> MediaSet::ref (MediaObjectImpl *mediaObjectPtr) {   std::unique_lock <std::recursive_mutex> lock (recMutex);   std::shared_ptr<MediaObjectImpl> mediaObject;

      if (mediaObjectPtr == nullptr) {     throw KurentoException (MEDIA_OBJECT_NOT_FOUND, "Invalid object");   }

      mediaObject =  std::shared_ptr<MediaObjectImpl> (mediaObjectPtr, [this] (   MediaObjectImpl * obj) {     // this will always exist because destructor is waiting for its threads     this->releasePointer (obj);   });

      objectsMap[mediaObject->getId()] = std::weak_ptr<MediaObjectImpl> (mediaObject);

      if (mediaObject->getParent() ) {     std::shared_ptr<MediaObjectImpl> parent = std::dynamic_pointer_cast         <MediaObjectImpl> (mediaObject->getParent() );

        childrenMap[parent->getId()][mediaObject->getId()] = mediaObject;   }

      auto parent = mediaObject->getParent();

      if (parent) {     for (auto session : reverseSessionMap[parent->getId()]) {       ref (session, mediaObject);     }   } //这里,如果ref一次,将调用1次这个方法   mediaObject->postConstructor ();

      if (this->serverManager) {     lock.unlock ();     serverManager->signalObjectCreated (ObjectCreated (this->serverManager,                                         std::dynamic_pointer_cast<MediaObject> (mediaObject) ) );   }

      return mediaObject; }

    继续看endpoint的顶级初始化的gobject的父类 继承关系如下: RtpEndpointImpl-->BaseRtpEndpointImpl-->SdpEndpointImpl-->SessionEndpointImpl-->EndpointImpl-->MediaElementImpl-->MediaObjectImpl

    当然每1个父类构造方法中其实都会做一些这个层面的类需要做的一些事情 最终初始化gobject的对象是在MediaElementImpl的构造方法中处理的,MediaObjectImpl的构造方法中则是生成了对象创建时间和唯一ID

    MediaObjectImpl.cpp的构造方法,没有太多需要关注的内容

    MediaObjectImpl::MediaObjectImpl (const boost::property_tree::ptree &config,                                   std::shared_ptr< MediaObject > parent) : config (config) {   this->parent = parent; //all the object id is only ;   creationTime = time(nullptr);   initialId = createId();   this->sendTagsInEvents = false; }

    MediaElementImpl.cpp的构造方法,是整个kurento的CPP层和c层互相联系的核心

    MediaElementImpl::MediaElementImpl (const boost::property_tree::ptree &config,                                     std::shared_ptr<MediaObjectImpl> parent,                                     const std::string &factoryName) : MediaObjectImpl (config, parent) {

      std::shared_ptr<MediaPipelineImpl> pipe; //这里会先获取到1个公共的Pipeline,当然这里就需要了解gstreamer编程原理,可以理解为1个顶级的容器,所有endpoint都需要在这个容器中,其之间的数据才可以相互流转   pipe = std::dynamic_pointer_cast<MediaPipelineImpl> (getMediaPipeline() );

    //根据RPC中传递过来的工厂的名字,来生成gstreamer的C层element对象 //我们这里解析rtpendpoint,当然会生成rtpendpoint的c层的对象,这样就和gstreamer的C层代码关联了起来。 //在C层的kmsrtpendpoint.c文件中,我们可以看见#define PLUGIN_NAME "rtpendpoint"这个定义,也就是将kmsrtpendpoint.c定义为了名字为rtpendpoint的插件 //下面这个方法会根据传递过来的“rtpendpoint”这个名字,找到相关的gobject插件,然后进初始化   element = gst_element_factory_make(factoryName.c_str(), nullptr);

      if (element == nullptr) {     throw KurentoException (MEDIA_OBJECT_NOT_AVAILABLE,                             "Cannot create gstreamer element: " + factoryName);   }

      bus = gst_pipeline_get_bus (GST_PIPELINE (pipe->getPipeline () ) ); //注册事件回调方法   handlerId = g_signal_connect (bus, "message",                                 G_CALLBACK (processBusMessage), this);

    //注册又pad添加的事件回调方法,pad在gstreamer对象中类似于数据交互的端口   padAddedHandlerId = g_signal_connect (element, "pad_added",                                         G_CALLBACK (_media_element_pad_added), this);

    //引用element   g_object_ref (element); //将当前这个element添加到pipe这个大容器中   pipe->addElement (element);

      //read default configuration for output bitrate   int bitrate = 0; //设置码率   if (getConfigValue<int, MediaElement> (&bitrate, "outputBitrate")) {     GST_DEBUG ("Output bitrate configured to %d bps", bitrate);     g_object_set (G_OBJECT (element), MIN_OUTPUT_BITRATE, bitrate,                   MAX_OUTPUT_BITRATE, bitrate, NULL);   } }

     

    接下来就是C层代码的处理 c层代码的处理就是直接调用gstreamer的相关模拟对象进行网络端点或者媒体端点的相关处理 首先我们还是直接查看rtpendpoint插件所在的c文件,kmsrtpendpoint.c 因为这是1个gobject的模拟对象,因此他会在该文件中的init方法和class_init方法中进行相关的初始化工作 为什么是两个方法进行初始化,这个需要自行了解gobject的相关对象模拟机制。

    首先gst对象类型的初始化方法,gobject的对象模拟,创建对象时会创建1个类对象和实例对象,其实细想和java的对象创建也是一样的,只是java的类对象的创建是又虚拟机自动完成了。 各个对象需要共享的数据,将存放在这个模拟的类对象中,而如果是这个模拟对象单独所有的私有数据,则应该放在模拟的实例对象中 还有1个重要点,如果rtpednpoint这个模拟对象被new了多次,实际上class_init只会被初始化1次,并且在kurento中,这个是在kurento启动注册注册组件的时候就初始化了。而后面的实例对象才会被new多个,初始化多次

    static void kms_rtp_endpoint_class_init (KmsRtpEndpointClass *klass) { //这里把所有rtpendpoint所需要的模拟父类类对象都进行了初始化。实际上就是把该对象的指针都关联给了相应的模拟父类   GObjectClass *gobject_class;   KmsBaseSdpEndpointClass *base_sdp_endpoint_class;   GstElementClass *gstelement_class;

    //下面这个都是gobject的标准操作,初始化object的基类,   gobject_class = G_OBJECT_CLASS (klass); //设置该类的set,get方法   gobject_class->set_property = kms_rtp_endpoint_set_property;   gobject_class->get_property = kms_rtp_endpoint_get_property; //设置gobject的析构方法   gobject_class->finalize = kms_rtp_endpoint_finalize;   gstelement_class = GST_ELEMENT_CLASS (klass); //给这个类设置一点信息,没有LUAN用   gst_element_class_set_details_simple (gstelement_class, "RtpEndpoint",       "RTP/Stream/RtpEndpoint", "Rtp Endpoint element",       "José Antonio Santos Cadenas <santoscadenas@kurento.com>"); //初始化日志

    //这里也需要注意,如果在这个日志初始化前面使用gst的日志系统,将无法打印,可以使用printf直接打印   GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, PLUGIN_NAME, 0, PLUGIN_NAME); //初始化父类的类对象,同样指向当前对象的内存地址 

    base_sdp_endpoint_class = KMS_BASE_SDP_ENDPOINT_CLASS (klass);

    //可以理解为当前对象实现l了父对象的虚方法,方法实现所有对象实例都是一样的

    //因此在class中初始化,所有对象实例都会共享这个类对象的方法   base_sdp_endpoint_class->create_session_internal =       kms_rtp_endpoint_create_session_internal;   base_sdp_endpoint_class->start_transport_send =       kms_rtp_endpoint_start_transport_send;

    /* Media handler management */  base_sdp_endpoint_class->create_media_handler =       kms_rtp_endpoint_create_media_handler;

     base_sdp_endpoint_class->configure_media = kms_rtp_endpoint_configure_media; //给这个gobject的类对象安装属性,第一个参数是该类对象,第二个是属性的ID,一般是1个枚举值或者1个宏定义,第三方则是要安装的属性变量信息GParamSpec

    //这里也就明白了,为什么gobject需要有1个模拟的类对象,因为属性和方法相关的元数据信息都在这里初始化或赋值

    //而属性或者方法的具体的值或实现则才是   g_object_class_install_property (gobject_class, PROP_USE_SDES,       g_param_spec_boolean ("use-sdes", "Use SDES",           "Set if Session Description Protocol Decurity"           " Description (SDES) is used",           DEFAULT_USE_SDES, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));

      g_object_class_install_property (gobject_class, PROP_MASTER_KEY,       g_param_spec_string ("master-key", "Master key",           "Master key (either 30 or 46 bytes, depending on the"           " crypto-suite used)",           DEFAULT_MASTER_KEY,           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));

      g_object_class_install_property (gobject_class, PROP_CRYPTO_SUITE,       g_param_spec_enum ("crypto-suite", "Crypto suite",           "Describes the encryption and authentication algorithms",           KMS_TYPE_RTP_SDES_CRYPTO_SUITE, DEFAULT_CRYPTO_SUITE,           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS)); //其中注意g_param_spec_XXX函数,表示打包1个属性值,这个是个XXX类型 //第一个参数用于设定键名,第二个参数是键名的昵称,第三个参数是对这个键值对的详细描述,第四个参数用于表示键值的访问权限,G_PARAM_READWRITE 是指定属性即可读又可写,G_PARAM_CONSTRUCT 是设定属性可以在对象示例化之时被设置。

    //创建1个信号事件并将信号ID放到obj_signals的集合中  //其中G_STRUCT_OFFSET (KmsRtpEndpointClass, key_soft_limit)可以理解获取KmsRtpEndpointClass成员中的key_soft_limit函数指针   obj_signals[SIGNAL_KEY_SOFT_LIMIT] = g_signal_new ("key-soft-limit",       G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,       G_STRUCT_OFFSET (KmsRtpEndpointClass, key_soft_limit), NULL, NULL,       g_cclosure_marshal_VOID__STRING, G_TYPE_NONE, 1, G_TYPE_STRING);

    //这里,利用gobject的特性,将rtpednpoint所需要的相关私有属性添加到当前的对象中。   g_type_class_add_private (klass, sizeof (KmsRtpEndpointPrivate)); }

     

     

    下面这个方法则是模拟对象实例的初始化

    static void kms_rtp_endpoint_init (KmsRtpEndpoint *self) {   //获取到他下面包含的结构体   self->priv = KMS_RTP_ENDPOINT_GET_PRIVATE (self);   self->priv->sdes_keys = g_hash_table_new_full (       g_str_hash, g_str_equal, g_free, (GDestroyNotify) kms_ref_struct_unref);

    //初始化了一个HASHTABLE , 用来存储RTP对应的KMSrtpconnection   self->priv->comedia.rtp_conns =       g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);   // 另外一张表signal_ids则是保存了connection的hash表和所有相关的信号量ID   self->priv->comedia.signal_ids =       g_hash_table_new_full (NULL, NULL, g_object_unref, NULL); //给自身的rtp对象设置相关的参数,其中max-video-recv-bandwidth为0表示不限制宽带   g_object_set (G_OBJECT (self), "bundle", FALSE, "rtcp-mux", FALSE,       "rtcp-nack", TRUE, "rtcp-remb", TRUE, "max-video-recv-bandwidth", 0,       NULL); }

    直接再看rtpendpoint.h这个头文件的属性定义了写什么内容 首先定义了1个结构体,包含了base本身的结构体和一个rtp的私有结构体 这个是gobject模拟的对象实例,用于存放模拟对象实例的指针和私有数据

    struct _KmsRtpEndpoint {   KmsBaseRtpEndpoint parent;

      KmsRtpEndpointPrivate *priv; };

    再看这个私有的结构体定义的内容,则是定义了rtp这个类才会需要的一些私有数据

    struct _KmsRtpEndpointPrivate {   gboolean use_sdes;   GHashTable *sdes_keys;

      gchar *master_key; // SRTP Master Key, base64 encoded   KmsRtpSDESCryptoSuite crypto;

      /* COMEDIA (passive port discovery) */ //最关键的就是这个了,用于存放已有rtp的网络连接的集合,CO应该表示的是集合   KmsComedia comedia; };

    //对象里面的两个hashmap

    typedef struct _KmsComedia KmsComedia; struct _KmsComedia {   GHashTable *rtp_conns;  // GHashTable<RTPSession*, KmsIRtpConnection*>   GHashTable *signal_ids; // GHashTable<RTPSession*, int> };

    //这个是gobject的模拟对象的类对象,存放一些公共的数据,包括属性名、方法名

    //只是这里 //定义了1继承1个父类的类对象和1个信号(函数指针),这个函数指正则在具体的调用类中进行实现

    struct _KmsRtpEndpointClass {   KmsBaseRtpEndpointClass parent_class;

      /* signals */   void (*key_soft_limit) (KmsRtpEndpoint *obj, gchar *media); };

    rtpendpoint本身的初始化就算是完成了

    下一篇我们将解析rtpendpoint如何进行udp的数据通信和数据转发。  

    Processed: 0.010, SQL: 8