百度统计
一面之猿网
让这个世界,因为我,有一点点的不一样
纯序员给你介绍图化框架的简单实现——消息机制

大家好,我是不会写代码的纯序员——Chunel Feng。大概一年之前,有位自动驾驶行业的朋友在github上看到CGraph这个项目,然后联系到我,问我小破图执行的时候,能否执行完节点A之后,后面的节点B开始执行的同时,A节点继续执行。后来,又有一位百度做视频流处理的朋友,找我问到这个问题。

我当时,跟他们解释了CGraph中的参数传递机制。虽然参数传递极为高效简洁(复杂度为O1),但很遗憾无法支持这种很常见的诉求。接下来的这一年,我曾尝试多种实现思路去解决这个问题,最终都觉得不够巧妙和优雅。于是,也就没有把代码push上去。

image-1668185448270

巧了,后来我转行,进入了自动驾驶领域的公司,工作的内容也跟视频流处理有一定关系,进而了解了一些其中的招式和套路。基于此,再结合CGraph原有的设计思路,我们推出了 Message(消息机制)这个功能。Review代码的时候,我发现这是我自己设计的最深、最复杂的模板嵌套逻辑,也是我写过的最复杂的代码。但对外暴露的接口,却极其简单。相比于之前功能,我想说:这是一个绝杀

今天,就跟大家聊一下CGraph中的消息机制。顺带对比一下,项目中涉及到的三种 param 的区别和应用场景。

我们还是照例,先上代码连接:CGraph github源码

消息机制

消息机制,对应于源码中的 GraphMessage 文件夹下的内容。设计的目的有三个:一个是为了丰富数据传递的机制;二是为了提高dag整体(可以是由多个pipeline组成)的并发度;三是避免内部资源不可控的过度消耗

我们来一个一个解释一下:

1. 丰富数据传输机制
在之前的文章中,我们提到过CGraph的 参数传递 机制。通过在任意位置,写入GParam内容,就可以在另外的任意位置(包括 node、group、aspect、daemon)中获取,并且提供了并发控制机制。看似很无敌,但是也带来了一个问题。这些都是建立在,上述内容被注册到同一个pipeline中的。如果两个节点,被注册到不同的pipeline中,那该如何进行参数互通呢?

image-1668187025198

为此,我们先实现了一个 线程安全的环形队列,又设计了一个 GMessage 类,其中包含一个环形队列,并且限定 其中仅能传入 GMessageParam 的子类对象。

然后,又设计了一层 GMessageManager 结构。在其中,包含了一个 map<topic, GMessage< T >>结构。外部只要设定 topic名称和 GMessageParam 子类的类型,就可以创建一个相关的 GMessage了。进而,你就可以不断的往里面去写具体的内容了。

同时,另一个地方,就可以在从环形队列的头部,不断的去获取最新写入的参数了。从而实现了一个简单的不同类型参数的 pub/sub 的逻辑。

我们来看一下几个相关的接口哈:

CGRAPH_CREATE_MESSAGE_TOPIC(MyMessageParam, "test", 10);    // 创建消息
CGRAPH_PUB_MPARAM(MyMessageParam, "test", mp);    // 写入消息
CGRAPH_SUB_MPARAM(MyMessageParam, "test", mp);    // 获取消息
CGRAPH_REMOVE_MESSAGE_TOPIC("test")     // 删除一条消息
CGRAPH_CLEAR_MESSAGES();    // 清空所有消息

见名知意,相当简洁是不是。

2. 提高dag整体的并发度
参数值pipeline内全局共享,get/set是快了。但也注定了pipeline执行完一圈之后,才能从头继续执行。否则,数据值就对不上了。

而上面提到的message的这个结构,其实可以很好的解耦这个问题。将原先的pipeline,分成两个部分,pipeline1 处理好一部分数据,直接将结果发送到message中,然后就继续处理自己的下一轮询了。

而pipeline2 在message中有 MessageParam 的时候,则sub出来数据,然后继续做自己的流程。这样,就算 pipeline2中处理稍慢,导致略有积压,也能一定程度上,减少整体时间的损耗了。

image-1668185573992

而这,唯一要付出的代价,就是 MessageParam 中,有一次参数的copy。

说到内存拷贝,插入一条广告。近期工作中,遇到的内存管理相关内容比较多,自己对这一块的理解也比较捉鸡。有熟悉这部分内容的大佬,请联系我一下,以便今后随时请教和交流。

3. 避免资源不可控的过度消耗
接着说上面的那种情景。如果,pipeline1处理很快,而pipeline2处理的很慢。那会导致一个情况,messsage的队列中,被疯狂写入,而来不及消费,最终甚至导致OOM问题。或者,pipeline1中的任务一直在长时间占用cpu,导致pipeline2中的内容,少有被执行的机会。

问了解决这个问题,在创建 message的时候,我们设定了一个 size 的参数,表明 环形队列的尺寸。如果其中的数据达到这个值,那pub流程将会被阻塞,避免过度写入。

三种参数对比

CGraph-v2.2.0版本中,一共有三种参数类型,分别是 GParamGPassedParamGMessageParam。我们再来一次对比介绍一下:

image-1668182947751

  • GParam

GParam 主要用于同一个pipeline中,各种值的传递。可以在 同一个pipeline的任何地方获取,比如 node、group、aspect、daemon 等。这里的获取的时间复杂度是O(1)的,而且是通过 指针传递,效率很高。也提供了控制互斥的语法,不过需要自己调用。

  • GPassedParam

GPassedParam 有好几个重命名,其实目的只有一个,就是点对点的传递参数,解决的是每个类的细节区别问题。

using GAspectParam = GPassedParam;
using GDaemonParam = GPassedParam;
using GElementParam = GPassedParam;

比如,注册n个节点,功能就是 print 信息,但是又想每次输出的内容均不同。那,这个时候,输出的具体信息,就适合用 GElementParam(也就是 GPassedParam)传入,而不是写三个不同的 print节点。

  • GMessageParam

最后,再说一下配合消息机制而推出的 GMessageParam。这个就是主要用于在不同 pipeline之间传递数据,和做一些负载控制。

image-1667745098110

希望大家在使用的时候,合理做出选择和分配。从以上总结也可以看出,目前CGraph已经支持了各种形式的进程内通信,但是并不支持跨进程、跨机器/机房 通信。如果需要的话,大家可以尝试找一些三方库来完成对应的功能。

我们接下来也会考虑,是否会推出基线版本层面的支持。需要权衡的因素,主要是一些系统基础库的不同,和暂时不太想引入三方库的缘故。

入选 awesome cpp榜单

对于入选 awesome cpp 榜单,我自己也是比较意外的。有一天晚上,也不记得当时咋想的了,就给项目组提了pr,当时真心是不抱着任何希望的哈。

image-1667750320156

一方面,我们自己开发的东东,跟行业大佬和众多大厂名项目,在质量和完整度上有不小的差距。另一方面,也从来没有出现过readme仅用纯中文写的repo,被收录之中——毕竟是一个世界范围的推荐榜单。结果,第二天早上起来的时候,发现pr已经被负责人 approval 了。当时还是激动了好几秒的,不过很快冷静下来。因为马上也还要去工地搬砖。

image-1667750252259

我想:作为一个C++开发者,你不一定调用过 awesome cpp 项目。但是你调用的项目,一定调用过 awesome cpp 里的项目。我之前一直是看这个榜单的,一些常用的基础组件,比如 json库、log库也会从这里推荐的选。真的没想到,有一天自己写的东西,也有幸能够入选。真是自己职业生涯中,浓墨重彩的一笔吧。当然,如果因此能帮助到更多有需要的朋友,我会更加高兴的。

本章小结

我是从去年五一节吧,开始写CGraph这个工程。第一个版本就已经实现了依赖和解依赖的图执行功能,从开始设计到跑通功能,再到完成自测,一共也只花了三天。之后就一直在做各种功能迭代和优化了。其中,有很多过程和功能,并非在开始的时候,我能够预见到的。到现在,无法说小有所成,但的确收获颇丰。

有一个很重大的收获,就是小破图被收录在了 awesome-cpphello-github 等业内最知名的推荐榜单上,获得了更高的曝光机会,也算是得到了业内大佬的一些肯定。更重要的,还是我因为写开源代码,有幸和正在看这篇文章的你,建立起了原本不会有的连接,进而发生了影响我职业生涯和生活的种种。

image-1668176589453

这一路,得益于各位大佬的指点、童鞋的意见,项目自身也在快速的更新和优化。同时有了几位贡献者,并且开始支持个别研究课题。我自己,也在正式工作之余有所成长,同工作形成了一定的互补。

回顾我的成长,有两件事情对我性格的形成,有很大的影响。第一件事情,就是我小时候学弹琴——那是很多年前的事情了,几乎伴随了我整个童年。还有就是,近几年开始在开源社区写代码。我是大概5年前,就有了类似的想法,会零零散散写点东西发布到 github 上。然后最近2年吧,随着看的东西稍多一些,思考也会多了一些,开始比较频繁的更新。很幸运,能被人看到,有人star和fork,也有幸支持了一些团队。两件事情,都坚持了多年,填充了我大量的业余时间。虽然内容截然不同,但想想都是一个人坐着敲键盘,有种修身养性的感觉

image-1668218660672

前面的文章也提到,前阵子换了工作,进入了自己从来没有接触过的自动驾驶领域。换行业的原因很单纯,一方面是这个行业近期火起来,还有一方面是工作内容偏技术导向。初来乍到,有很多内容不了解,需要同事事无巨细的指导。每每走到一些无人区的时候,都能在一些技术交流群里,遇到很懂行的大佬,交流指导后得以解决。现在看来,当年进入开源社区,也是一件幸事,也是一种沉淀。

今天的内容就聊到这里吧,双十一晚上下班回来,就顾着写文章了。不知不觉已经是第二天了,啥也没买,有点遗憾了。不过,又想到反正我也没啥钱,也就不遗憾了,哈哈。

mmqrcode1602771241876

                                                   [2022.11.12 by Chunel]

推荐阅读


个人信息

微信: ChunelFeng
邮箱: chunel@foxmail.com
个人网站:www.chunel.cn
github地址: https://github.com/ChunelFeng

image