关于kurento的rtpendpoint 从客户端发起rpc调用后,在服务器内部如何从cpp层到c层进行相关实例化
需要先回顾以下每个可直接实例化对象的创建的流程 首先是JAVA客户端通过rpc调用在media-server工程中的ServerMethods.cpp中create方法;
void ServerMethods::create (const Json::Value ¶ms, Json::Value &response)
然后根据type获取到本次rpc获取到的对象TYPE类型来选择本次创建对象的工厂类型 我们本次是1个rtpendpoint,因此获取的工厂是rtpendpoint相应的工厂
factory = moduleManager.getFactory (type);
然后在通过factory来创建本次真正创建的对象 创建过程中,同时根据sesion把object放入到MediaSet中,方便后续使用。
object = std::dynamic_pointer_cast<MediaObjectImpl> ( factory->createObject (config, sessionId, params["constructorParams"]) );
这里则将根据type类型来创建相应的对象,所有端点都是继承与MediaObejctImpl,因此返回1个基于MediaObjectImpl的共享指针
std::shared_ptr< MediaObjectImpl > Factory::createObject (const boost::property_tree::ptree &conf, const std::string &session, const Json::Value ¶ms) const { std::shared_ptr< MediaObjectImpl > object; //createObjectPointer为具体对象的创建方法,传入形参则是全局配置和该对象的参数 object = MediaSet::getMediaSet()->ref (dynamic_cast <MediaObjectImpl *> (createObjectPointer (conf, params) ) );
MediaSet::getMediaSet()->ref (session, object); return object; }
kurento中所有对象的factory实现类都是在编译时候生成的,因此查看源代码需要在build下的文件夹找到相应的工厂实现类 以下这个方法则是RtpEndpointImplInternal.cpp文件中的创建对象真正的方法
MediaObjectImpl *RtpEndpointImplFactory::createObjectPointer (const boost::property_tree::ptree &conf, const Json::Value ¶ms) const { kurento::JsonSerializer s (false); RtpEndpointConstructor constructor;
s.JsonValue = params; constructor.Serialize (s); //constructor主要是将RPC传递过来的序列化字符创,反序列化为对象,然后交给createObject对新的RtpEndpoint进行构造初始化 //下面的creatteOBject则是调用到了RtpEndpointImpl.cpp中的createObject方法 return createObject (conf, constructor.getMediaPipeline(), constructor.getCrypto(), constructor.getUseIpv6() ); }
RtpEndpointImpl.cpp类中具体调用对象实例化的方法
MediaObjectImpl * RtpEndpointImplFactory::createObject (const boost::property_tree::ptree &conf, std::shared_ptr<MediaPipeline> mediaPipeline, std::shared_ptr<SDES> crypto, bool useIpv6) const { return new RtpEndpointImpl (conf, mediaPipeline, crypto, useIpv6); }
以下是具体的类的构造方法,肉眼可见的继承了BaseRtpEndpointImpl.cpp类
RtpEndpointImpl::RtpEndpointImpl (const boost::property_tree::ptree &conf, std::shared_ptr<MediaPipeline> mediaPipeline, std::shared_ptr<SDES> crypto, bool useIpv6) : BaseRtpEndpointImpl (conf, std::dynamic_pointer_cast<MediaObjectImpl> (mediaPipeline), FACTORY_NAME, useIpv6) { if (!crypto->isSetCrypto ()) { return; }
if (!crypto->isSetKey () && !crypto->isSetKeyBase64 ()) { /* Use random key */ g_object_set ( element, "crypto-suite", crypto->getCrypto ()->getValue (), NULL); return; }
gsize expect_size;
switch (crypto->getCrypto ()->getValue ()) { case CryptoSuite::AES_128_CM_HMAC_SHA1_32: case CryptoSuite::AES_128_CM_HMAC_SHA1_80: expect_size = KMS_SRTP_CIPHER_AES_CM_128_SIZE; break; case CryptoSuite::AES_256_CM_HMAC_SHA1_32: case CryptoSuite::AES_256_CM_HMAC_SHA1_80: expect_size = KMS_SRTP_CIPHER_AES_CM_256_SIZE; break; default: throw KurentoException ( MEDIA_OBJECT_ILLEGAL_PARAM_ERROR, "Invalid crypto suite"); }
std::string key_b64; gsize key_data_size = 0;
if (crypto->isSetKey ()) { std::string tmp = crypto->getKey (); key_data_size = tmp.length ();
gchar *tmp_b64 = g_base64_encode ((const guchar *) tmp.data (), tmp.length ()); key_b64 = std::string (tmp_b64); g_free (tmp_b64); } else if (crypto->isSetKeyBase64 ()) { key_b64 = crypto->getKeyBase64 (); guchar *tmp_b64 = g_base64_decode (key_b64.data (), &key_data_size); if (!tmp_b64) { GST_ERROR_OBJECT (element, "Master key is not valid Base64"); throw KurentoException ( MEDIA_OBJECT_ILLEGAL_PARAM_ERROR, "Master key is not valid Base64"); } g_free (tmp_b64); }
if (key_data_size != expect_size) { GST_ERROR_OBJECT (element, "Bad Base64-decoded master key size: got %lu, expected %lu", key_data_size, expect_size); throw KurentoException ( MEDIA_OBJECT_ILLEGAL_PARAM_ERROR, "Master key size is wrong"); }
g_object_set (element, "master-key", key_b64.data (), "crypto-suite", crypto->getCrypto ()->getValue (), NULL); }
读完所有代码,可以知道,没有任何对C层gobject对象(也就是gstream的模拟对象)的相关操作,因此基本上就可以推断,对C层的gobject对象的实例化是在更顶级的父类进行的 而且所有的endpoint都是这样的思路来构造,那gobject的构造肯定在最顶级的父类构造, 继续查看顶级父类的构造函数,可以证明这一点,最终gobject的构造是在最顶级的cpp父类
另外注意的是这个方法,每当这个对象被引用1次,将触发一次这个方法
void RtpEndpointImpl::postConstructor () { BaseRtpEndpointImpl::postConstructor (); //这里其实也仅仅是注册了1个事件监听回调 handlerOnKeySoftLimit = register_signal_handler (G_OBJECT (element), "key-soft-limit", std::function<void (GstElement *, gchar *)> (std::bind ( &RtpEndpointImpl::onKeySoftLimit, this, std::placeholders::_2)), std::dynamic_pointer_cast<RtpEndpointImpl> (shared_from_this ())); } //具体调用所有endpoint的postConstructor这个方法的代码如下 std::shared_ptr<MediaObjectImpl> MediaSet::ref (MediaObjectImpl *mediaObjectPtr) { std::unique_lock <std::recursive_mutex> lock (recMutex); std::shared_ptr<MediaObjectImpl> mediaObject;
if (mediaObjectPtr == nullptr) { throw KurentoException (MEDIA_OBJECT_NOT_FOUND, "Invalid object"); }
mediaObject = std::shared_ptr<MediaObjectImpl> (mediaObjectPtr, [this] ( MediaObjectImpl * obj) { // this will always exist because destructor is waiting for its threads this->releasePointer (obj); });
objectsMap[mediaObject->getId()] = std::weak_ptr<MediaObjectImpl> (mediaObject);
if (mediaObject->getParent() ) { std::shared_ptr<MediaObjectImpl> parent = std::dynamic_pointer_cast <MediaObjectImpl> (mediaObject->getParent() );
childrenMap[parent->getId()][mediaObject->getId()] = mediaObject; }
auto parent = mediaObject->getParent();
if (parent) { for (auto session : reverseSessionMap[parent->getId()]) { ref (session, mediaObject); } } //这里,如果ref一次,将调用1次这个方法 mediaObject->postConstructor ();
if (this->serverManager) { lock.unlock (); serverManager->signalObjectCreated (ObjectCreated (this->serverManager, std::dynamic_pointer_cast<MediaObject> (mediaObject) ) ); }
return mediaObject; }
继续看endpoint的顶级初始化的gobject的父类 继承关系如下: RtpEndpointImpl-->BaseRtpEndpointImpl-->SdpEndpointImpl-->SessionEndpointImpl-->EndpointImpl-->MediaElementImpl-->MediaObjectImpl
当然每1个父类构造方法中其实都会做一些这个层面的类需要做的一些事情 最终初始化gobject的对象是在MediaElementImpl的构造方法中处理的,MediaObjectImpl的构造方法中则是生成了对象创建时间和唯一ID
MediaObjectImpl.cpp的构造方法,没有太多需要关注的内容
MediaObjectImpl::MediaObjectImpl (const boost::property_tree::ptree &config, std::shared_ptr< MediaObject > parent) : config (config) { this->parent = parent; //all the object id is only ; creationTime = time(nullptr); initialId = createId(); this->sendTagsInEvents = false; }
MediaElementImpl.cpp的构造方法,是整个kurento的CPP层和c层互相联系的核心
MediaElementImpl::MediaElementImpl (const boost::property_tree::ptree &config, std::shared_ptr<MediaObjectImpl> parent, const std::string &factoryName) : MediaObjectImpl (config, parent) {
std::shared_ptr<MediaPipelineImpl> pipe; //这里会先获取到1个公共的Pipeline,当然这里就需要了解gstreamer编程原理,可以理解为1个顶级的容器,所有endpoint都需要在这个容器中,其之间的数据才可以相互流转 pipe = std::dynamic_pointer_cast<MediaPipelineImpl> (getMediaPipeline() );
//根据RPC中传递过来的工厂的名字,来生成gstreamer的C层element对象 //我们这里解析rtpendpoint,当然会生成rtpendpoint的c层的对象,这样就和gstreamer的C层代码关联了起来。 //在C层的kmsrtpendpoint.c文件中,我们可以看见#define PLUGIN_NAME "rtpendpoint"这个定义,也就是将kmsrtpendpoint.c定义为了名字为rtpendpoint的插件 //下面这个方法会根据传递过来的“rtpendpoint”这个名字,找到相关的gobject插件,然后进初始化 element = gst_element_factory_make(factoryName.c_str(), nullptr);
if (element == nullptr) { throw KurentoException (MEDIA_OBJECT_NOT_AVAILABLE, "Cannot create gstreamer element: " + factoryName); }
bus = gst_pipeline_get_bus (GST_PIPELINE (pipe->getPipeline () ) ); //注册事件回调方法 handlerId = g_signal_connect (bus, "message", G_CALLBACK (processBusMessage), this);
//注册又pad添加的事件回调方法,pad在gstreamer对象中类似于数据交互的端口 padAddedHandlerId = g_signal_connect (element, "pad_added", G_CALLBACK (_media_element_pad_added), this);
//引用element g_object_ref (element); //将当前这个element添加到pipe这个大容器中 pipe->addElement (element);
//read default configuration for output bitrate int bitrate = 0; //设置码率 if (getConfigValue<int, MediaElement> (&bitrate, "outputBitrate")) { GST_DEBUG ("Output bitrate configured to %d bps", bitrate); g_object_set (G_OBJECT (element), MIN_OUTPUT_BITRATE, bitrate, MAX_OUTPUT_BITRATE, bitrate, NULL); } }
接下来就是C层代码的处理 c层代码的处理就是直接调用gstreamer的相关模拟对象进行网络端点或者媒体端点的相关处理 首先我们还是直接查看rtpendpoint插件所在的c文件,kmsrtpendpoint.c 因为这是1个gobject的模拟对象,因此他会在该文件中的init方法和class_init方法中进行相关的初始化工作 为什么是两个方法进行初始化,这个需要自行了解gobject的相关对象模拟机制。
首先gst对象类型的初始化方法,gobject的对象模拟,创建对象时会创建1个类对象和实例对象,其实细想和java的对象创建也是一样的,只是java的类对象的创建是又虚拟机自动完成了。 各个对象需要共享的数据,将存放在这个模拟的类对象中,而如果是这个模拟对象单独所有的私有数据,则应该放在模拟的实例对象中 还有1个重要点,如果rtpednpoint这个模拟对象被new了多次,实际上class_init只会被初始化1次,并且在kurento中,这个是在kurento启动注册注册组件的时候就初始化了。而后面的实例对象才会被new多个,初始化多次
static void kms_rtp_endpoint_class_init (KmsRtpEndpointClass *klass) { //这里把所有rtpendpoint所需要的模拟父类类对象都进行了初始化。实际上就是把该对象的指针都关联给了相应的模拟父类 GObjectClass *gobject_class; KmsBaseSdpEndpointClass *base_sdp_endpoint_class; GstElementClass *gstelement_class;
//下面这个都是gobject的标准操作,初始化object的基类, gobject_class = G_OBJECT_CLASS (klass); //设置该类的set,get方法 gobject_class->set_property = kms_rtp_endpoint_set_property; gobject_class->get_property = kms_rtp_endpoint_get_property; //设置gobject的析构方法 gobject_class->finalize = kms_rtp_endpoint_finalize; gstelement_class = GST_ELEMENT_CLASS (klass); //给这个类设置一点信息,没有LUAN用 gst_element_class_set_details_simple (gstelement_class, "RtpEndpoint", "RTP/Stream/RtpEndpoint", "Rtp Endpoint element", "José Antonio Santos Cadenas <santoscadenas@kurento.com>"); //初始化日志
//这里也需要注意,如果在这个日志初始化前面使用gst的日志系统,将无法打印,可以使用printf直接打印 GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, PLUGIN_NAME, 0, PLUGIN_NAME); //初始化父类的类对象,同样指向当前对象的内存地址
base_sdp_endpoint_class = KMS_BASE_SDP_ENDPOINT_CLASS (klass);
//可以理解为当前对象实现l了父对象的虚方法,方法实现所有对象实例都是一样的
//因此在class中初始化,所有对象实例都会共享这个类对象的方法 base_sdp_endpoint_class->create_session_internal = kms_rtp_endpoint_create_session_internal; base_sdp_endpoint_class->start_transport_send = kms_rtp_endpoint_start_transport_send;
/* Media handler management */ base_sdp_endpoint_class->create_media_handler = kms_rtp_endpoint_create_media_handler;
base_sdp_endpoint_class->configure_media = kms_rtp_endpoint_configure_media; //给这个gobject的类对象安装属性,第一个参数是该类对象,第二个是属性的ID,一般是1个枚举值或者1个宏定义,第三方则是要安装的属性变量信息GParamSpec
//这里也就明白了,为什么gobject需要有1个模拟的类对象,因为属性和方法相关的元数据信息都在这里初始化或赋值
//而属性或者方法的具体的值或实现则才是 g_object_class_install_property (gobject_class, PROP_USE_SDES, g_param_spec_boolean ("use-sdes", "Use SDES", "Set if Session Description Protocol Decurity" " Description (SDES) is used", DEFAULT_USE_SDES, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_MASTER_KEY, g_param_spec_string ("master-key", "Master key", "Master key (either 30 or 46 bytes, depending on the" " crypto-suite used)", DEFAULT_MASTER_KEY, G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_CRYPTO_SUITE, g_param_spec_enum ("crypto-suite", "Crypto suite", "Describes the encryption and authentication algorithms", KMS_TYPE_RTP_SDES_CRYPTO_SUITE, DEFAULT_CRYPTO_SUITE, G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS)); //其中注意g_param_spec_XXX函数,表示打包1个属性值,这个是个XXX类型 //第一个参数用于设定键名,第二个参数是键名的昵称,第三个参数是对这个键值对的详细描述,第四个参数用于表示键值的访问权限,G_PARAM_READWRITE 是指定属性即可读又可写,G_PARAM_CONSTRUCT 是设定属性可以在对象示例化之时被设置。
//创建1个信号事件并将信号ID放到obj_signals的集合中 //其中G_STRUCT_OFFSET (KmsRtpEndpointClass, key_soft_limit)可以理解获取KmsRtpEndpointClass成员中的key_soft_limit函数指针 obj_signals[SIGNAL_KEY_SOFT_LIMIT] = g_signal_new ("key-soft-limit", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (KmsRtpEndpointClass, key_soft_limit), NULL, NULL, g_cclosure_marshal_VOID__STRING, G_TYPE_NONE, 1, G_TYPE_STRING);
//这里,利用gobject的特性,将rtpednpoint所需要的相关私有属性添加到当前的对象中。 g_type_class_add_private (klass, sizeof (KmsRtpEndpointPrivate)); }
下面这个方法则是模拟对象实例的初始化
static void kms_rtp_endpoint_init (KmsRtpEndpoint *self) { //获取到他下面包含的结构体 self->priv = KMS_RTP_ENDPOINT_GET_PRIVATE (self); self->priv->sdes_keys = g_hash_table_new_full ( g_str_hash, g_str_equal, g_free, (GDestroyNotify) kms_ref_struct_unref);
//初始化了一个HASHTABLE , 用来存储RTP对应的KMSrtpconnection self->priv->comedia.rtp_conns = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL); // 另外一张表signal_ids则是保存了connection的hash表和所有相关的信号量ID self->priv->comedia.signal_ids = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL); //给自身的rtp对象设置相关的参数,其中max-video-recv-bandwidth为0表示不限制宽带 g_object_set (G_OBJECT (self), "bundle", FALSE, "rtcp-mux", FALSE, "rtcp-nack", TRUE, "rtcp-remb", TRUE, "max-video-recv-bandwidth", 0, NULL); }
直接再看rtpendpoint.h这个头文件的属性定义了写什么内容 首先定义了1个结构体,包含了base本身的结构体和一个rtp的私有结构体 这个是gobject模拟的对象实例,用于存放模拟对象实例的指针和私有数据
struct _KmsRtpEndpoint { KmsBaseRtpEndpoint parent;
KmsRtpEndpointPrivate *priv; };
再看这个私有的结构体定义的内容,则是定义了rtp这个类才会需要的一些私有数据
struct _KmsRtpEndpointPrivate { gboolean use_sdes; GHashTable *sdes_keys;
gchar *master_key; // SRTP Master Key, base64 encoded KmsRtpSDESCryptoSuite crypto;
/* COMEDIA (passive port discovery) */ //最关键的就是这个了,用于存放已有rtp的网络连接的集合,CO应该表示的是集合 KmsComedia comedia; };
//对象里面的两个hashmap
typedef struct _KmsComedia KmsComedia; struct _KmsComedia { GHashTable *rtp_conns; // GHashTable<RTPSession*, KmsIRtpConnection*> GHashTable *signal_ids; // GHashTable<RTPSession*, int> };
//这个是gobject的模拟对象的类对象,存放一些公共的数据,包括属性名、方法名
//只是这里 //定义了1继承1个父类的类对象和1个信号(函数指针),这个函数指正则在具体的调用类中进行实现
struct _KmsRtpEndpointClass { KmsBaseRtpEndpointClass parent_class;
/* signals */ void (*key_soft_limit) (KmsRtpEndpoint *obj, gchar *media); };
rtpendpoint本身的初始化就算是完成了
下一篇我们将解析rtpendpoint如何进行udp的数据通信和数据转发。