爱吹牛的人字拖 · 国际科学与工程大奖赛_百度百科· 1 周前 · |
奋斗的山楂 · 使用极限网关来处置 ...· 1 月前 · |
重情义的鸡蛋 · Serialization ...· 3 月前 · |
有情有义的汤圆 · 500 Server Error for ...· 4 月前 · |
publisher topic char zmq |
https://suntus.github.io/2015/03/02/zmq%E4%B8%AD%E6%96%87%E6%8C%87%E5%8D%97-5/ |
眉毛粗的硬盘
9 月前 |
在第3章-高级请求-回复模型和第4章-可靠的请求-回复模型中,我们讨论了zmq的高级请求-回复模型的使用。如果你已经全部掌握了,恭喜。本章我们会集中在发布-订阅模型和为了性能、可靠性、状态分发和监测使用更高级的模型对核心的pub-sub模型进行扩展。
我们会讲到:
Pub-Sub的优缺点
zmq的低级组合有它们各自不同的特性。pub-sub致力于解决一个消息传输的老问题: 广播 或 组播 。它有着跟zmq不同的一丝不苟的简单性和粗暴方式相结合的独特混合性质。它值得我们去了解pub-sub的优缺点,懂得怎么做会对我们有利,并且在需要的时候如何避免那些弊端。
首先,PUB会把每条message都发送给”所有的对端”,而不是PUSH和DEALER那样只发送给”所有对端中的某一个”。你不能简单的用PUB替换掉PUSH或者反过来又期望系统能正常工作。这个错误经常出现,因为人们真的经常建议这样做。
其次,pub-sub目标是可扩展性。这意味着大量数据快速发送给很多个接收者。如果你需要每秒给成千个节点发送百万级别的信息,你就非常需要pub-sub了。
为了可扩展性,pub-sub使用了跟push-pull相同的技巧,就是避免回复。也就是说接收者不会反过来跟发送者通信。也有些例外,比如,SUB socket会向PUB socket发送少量且匿名的订阅信息。
避免回复在实际的可扩展性中非常重要。使用pub-sub,它能清晰的映射到网络交换机处理的PGM多播协议。换句话说,订阅者并不会直接连接发布者,它们只是连接到交换机上的一个多播 组 上去,发布者会把信息发送到该组上。
因为避免了回复,我们所有的信息流都变得 非常 简单,这能让我们设计出更简单的API,更简单的协议,能让更多的人接触使用。但我们也移除了协同发送者和接收者的任何可能性。这意味着:
缺点是如果我们想设计可靠的多播这些就是必不可少的。zmq的pub-sub模型会在订阅者正在连接的时候、网络故障的时候或者订阅者或网络跟不上发布者发布速度的时候或者其他任意时候丢失message。
好处是仍有很多应用场景中 大致 可靠的多播就足够好了。当我们需要回复的时候,我们可以换成使用ROUTER-DEALER(对大多数常规应用场景中我推荐这种模型),或者为同步增加一个单独的通道(本章稍后会看到)。
pub-sub就像个广播:在加入前你会错过所有东西,然后你能获取多少东西依赖于你的接收器的品质。让人高兴的是,该模型很有用,因为它完美的反映出了真实世界中信息的分发。想想Facebook和Twitter,BBC世界新闻和体育结果。
就像在请求-发布模型中做的那样,我们来根据能发生的错误来定义 可靠性 。下面是pub-sub会发生的经典错误:
还有很多可能的故障,但这些是我们在实际系统中发现的比较典型的。从v3.x开始,zmq强制在内部缓冲区(被称作高水位或HWM)使用默认的限制,因此除非你把HWM设置为无限制,那发布者的崩溃会很少。
所有这些故障情况都有解决方案,尽管有些并不简单。对我们来说大多数时候并不需要很复杂的可靠性保证,这也是为什么zmq并不准备在外部提供可靠性保证的原因(即使可以有个全局的可靠性设计,但还是没有)。
pub-sub跟踪(浓缩咖啡模型 Espresso Pattern)
让我们从找到一种跟踪pub-sub网络的方法来开始本章内容。在第2章中我们已经看过了一个简单的proxy来做这些传输桥接。
zmq_proxy()
方法有三个参数:它桥接在一起的
frontend
和
backend
socket,和一个把所有信息都发过去的
capture
socket。
代码相当简单:
1 |
// espresso: Espresso Pattern in C |
Espresso模型通过创建一个监听线程读取一个PAIR socket的信息然后打印出来它拿到的所有东西来工作。该PAIR socket是一个管道的一端;另一端(另一个PAIR)是我们传递给
zmq_proxy()
的socket。实际中,你需要过滤一下来拿到自己希望跟踪的信息(印证了该模型的名字)。
订阅者线程订阅了”A”和”B”类型的信息,收到5条message,然后关闭它的socket。当你运行该例子,监听者打印出两条订阅信息,5条数据message,两个未订阅的信息,然后静默:
1 |
[002] 0141 |
该例子清楚的展示了发布者socket怎么在没有订阅者的时候停止发送数据的。发布线程仍然发送着信息,但该socket就静默地丢掉了这些信息。
最后值缓存
如果你已经用过了商业的pub-sub系统,你会发现在zmq的pub-sub模型中没有了一些功能。其中一个就是*最后值缓存 last value caching(LVC)*。这解决了一个新的订阅者在它加入网络的时候怎么跟上的问题。该理论就是刚一个新的订阅者加入并订阅某些特定信息的时候发布者会得到通知。然后发布者就会重新广播这类特定信息的最后一条message。
我已经解释过了为什么在有新的订阅者加入的时候发布者得不到通知了,就是因为在大型pub-sub系统中,大量的数据让这件事变得不可能。为了构建真实的大规模pub-sub网络,你需要像PGM那样的协议能提升以太网交换机到一个新高度来广播信息给成千个订阅者。用tcp的单播连接发布者到成千个订阅者不具备可扩展性。你会陷入不公平的分发(一些订阅者会在其他订阅者之前得到message),网络冲突和一些其他恼人的问题中。
PGM是单向协议:发布者向在交换机中的多播地址发送一条message,交换机会重新把该条message广播给所有对它感兴趣的订阅者。发布者永远不会看到订阅者什么时候加入或者离开:这些都发生在交换机中,我们也不想为这部分重新编码。
然而,在较少数据量少订阅者和有限个数的信息主题的网络中,我们可以使用TCP和XSUB、XPUB socket 真事的 跟每个订阅者通信,就像我们在Espresso模型中看到的那样。
能用zmq设计出一个LVC吗?答案是肯定的,只要我们在发布者和订阅者之间设计一个proxy:类似于PGM交换机的功能,但可以让我们自己控制。
我会从设计一个出现最坏情况的发布者和订阅者开始。该发布者是有缺陷的,它一启动就立即发送一千条不同主题的信息,然后每秒更新一条随机主题的信息。一个订阅者连接并订阅一个主题。没有LVC,一个订阅者可能必须等待平均500s才能得到一条数据。 To add some drama, let’s pretend there’s an escaped convict called Gregor threatening to rip the head off Roger the toy bunny if we can’t fix that 8.3 minutes’ delay.
下面是发布者的代码,注意它有连接到某个地址上的命令行选项,但同时也绑定到了一个地址上。我们会稍后在我们的LVC上使用这个东西:
1 |
// pathopub: Pathologic Publisher in C |
下面是订阅者:
1 |
// pathosub: Pathologic Subscriber in C |
试着编译运行这些东西:首先是订阅者,然后运行发布者。你会看到订阅者报告拿到了”Save Roger”:
1 |
./pathosub & |
当你运行第二个订阅者的时候你才会发现Roger的困境。你必须留给它足够长的时间它才能报告得到数据。因此,下面就是我们的最后值缓存系统。就像我说的那样,它是一个绑定两个socket然后在两端都处理数据的proxy:
1 |
// lvcache: Last Value Caching Proxy in C |
现在运行proxy,然后是发布者:
1 |
./lvcache & |
现在尽量多的运行订阅者实例,每次都连接proxy的端口5558:
1 |
./pathosub tcp://localhost:5558 |
每个订阅者都愉快的报告”Save Roger”,然后逃犯Gregor又潜回他的位子上吃饭喝茶,这是他最想做的事了。
一个要注意的:默认的,XPUB socket不会报告重复的订阅信息,这正是在你连接一个XPUB和一个XSUB的时候想要的。我们的例子偷偷绕过了这个,方法是使用随机主题,因此它不起作用的概率是百万分之一。在一个真实的LVC proxy中,你会想用
ZMQ_XPUB_VERBOSE
选项做个练习。
慢订阅者检测(慢性自杀模型) Slow Subscriber Detection (Suicidal Snail Pattern)
实际中你可能遇到的一个常见问题是慢订阅者。在一个理想的情况中,数据流全速在发布者到订阅者之间流通。但实际上,订阅者应用经常是用解释性语言编写的,或者要做很多工作,或者程序被写的很烂,导致它们跟不上发布者的速度。
我们怎么处理一个慢订阅者呢?理想的补救措施是让订阅者更快些,但那可能需要很多工作和时间。一些处理慢订阅者的传统步骤有:
这些传统的策略没一个适合的,因此我们需要创造一个新的。不是断开发布者,我们来劝劝订阅者自杀吧。这就是慢性自杀模型(Suicidal Snail pattern)。当一个订阅者检测到它运行的太慢的时候(这里”太慢”大致是个配置选项,意味着”如果你变得慢到一定程度,就大声喊出来,因为我需要知道,然后我能修复它!”),它就呜呼死掉。
订阅者怎么检测呢?一种方法是把信息顺序入队(按顺序编号)并在发布者那里设置一个HWM。现在,如果订阅者检测到一个缝隙(比如编号不连续),它就知道有些事情弄错了。然后我们调整HWM到”如果你到这个程度就自杀”的水平。
该方案有两个问题。一是如果我们有很多发布者,我们怎么顺序排列数据呢?解决办法是给每个发布者一个唯一的ID,然后在编号上加上该ID。二是如果订阅者使用
ZMQ_SUBSCRIBE
过滤,它们会天然带有缝隙。我们宝贵的序列就什么用都没了。
一些应用场景并不使用过滤器,那排序就有意义。但一个更普遍的做法是发布者给每条信息都打上时间戳。当订阅者收到一条信息,它就检查时间,如果相差超过比如说1s,它就做”呜呼死掉”的事情,可能首先是对这一些操作窗口报警。
自杀模型特别用于那些订阅者有它们自己的client和服务级协议(service-level agreement),并需要保证特定最小延迟的情况。中断订阅者并不像一个保证最小延迟的有效方法,而是一个断言模型。今天断掉,然后问题会被修复。让延迟的数据进入,在雷达上可能会造成更大的危害和更长时间的潜伏。
下面是一个很小的自杀模型的例子:
1 |
// suisnail: Suicidal Snail in C |
下面是关于慢性自杀模型例子的一些说明:
高速订阅者 High-Speed Subscribers (Black Box Pattern)
现在来看看一个让我们的订阅者变快的方法。pub-sub的一个常用场景是分发像股票变化的市场数据那样的大量数据流。典型的系统是有个连接股票变化的发布者,获取报价,然后发给很多订阅者。如果有比较少的订阅者,我们就可以使用TCP。如果有很多订阅者,就需要可靠的广播协议,例如PGM。
假设我们的订阅平均每秒有100,000条100btype的信息。这是经过过滤不需要的市场数据之后发送给订阅者的典型的速率。现在我们要记录一天的数据(可能在8小时中有250G),然后把它发给一个模拟网络,比如说一小组订阅者。尽管对zmq应用来说每秒100k条数据很简单,但我们还想 更快些 。
我们需要一组节点来构建我们的框架——一个用来做发布者,然后每个订阅者一个。这些都是定制的节点——订阅者的8核,发布者的12个核心。
在我们给订阅者发布数据的时候,需要注意两个事情:
首先我们要做的是把订阅者拆成一个多线程的设计,好让一个线程读取数据的时候另外的一组线程去处理数据。特别是,我们并不想按照相同方式处理每条数据。另外,订阅者可能根据前缀关键字来筛选一些message。当message符合某种标准的时候,订阅者会调用一个worker去处理它。在zmq的语境中,这意味着给一个工作线程发送message。
因此订阅者看起来像一个队列的装置。我们可以用很多socket去连接订阅者和worker,如果我们假设的是单向通信并且worker全是不同的,就可以用PUSH和PULL,把所有路由工作都交给zmq去完成。这是最简单和最快的实现方式。
订阅者跟发布者通信是基于TCP或PGM。订阅者跟它的workers通信是基于inproc://,它们都在同一个线程中。
现在来打破瓶颈。订阅者线程会达到100%的CPU使用率,因为它是单线程的,不能使用多个核心。单线程总会到达一个瓶颈,最多每秒处理2M、6M或者更多点的数据。我们希望能把工作分摊到多个并行运行的线程中。
这种解决方法已经被很多高性能产品使用,被称作 分片 sharding 。使用分片,我们能把工作分散到并行和独立的处理流中,比如说一半的主题关键字放到这个处理流中,另一半放另一个中。我们可以使用很多个处理流,但除非有多余的核心,否则性能得不到提升。让我们来看看怎么分成两个处理流。
要使用两个处理流,都全速工作,我们可以这样配置zmq:
理想状况下,我们希望框架中满负荷的线程数跟核心数相匹配。当线程开始竞争核心和CPU循环,增加更多的线程数带来的开销要大于收益。例如,创建更多的I/O线程就没有多大意义。
可靠的pub-sub Reliable Pub-Sub(Clone Pattern)
基于大量已经工作的例子,现在我们来设计一个可靠的pub-sub框架。我们会一步步来开发。目标是让一组应用能共享某些状态。下面是我们的技术挑战:
假设更新很小数据量,我们也并不打算做到实时。整个状态信息可以放进内存。一些可能的应用场景是:
中心式vs分布式 (Centralized Versus Decentralized)
我们首先必须要做的一个决定是是否需要一个中心服务器。最终设计会有很大区别。优缺点如下:
把状态表示成键-值对
我们会按步骤开发克隆模型,每次解决一个问题。首先,让我们看看怎么在一组client之间更新一个共享状态。我们需要决定怎么去表示我们的状态,也就是更新的信息。最简单的形式可能就是一个键-值对存储,每个键-值对代表共享状态改变的一个原子组合。
在第一章我们有个简单的pub-sub例子,天气预报server和client。我们来把server换成发送键-值对的数据,把client换成在一个hash表中存储这些数据。这样我们就可以使用传统的pub-sub模型从一个server向一组client发送更新。
一个更新可以是一个新的key-value对,一个对现有key修改的value或者一个删除的key。目前我们可以假设key-value对能整个放到内存中并且应用根据key来使用,比如使用一个hash表或字典。为了更大存储空间或持久化我们可以把状态放进数据库,但对这里没什么影响。
下面是server:
1 |
// clonesrv1: Clone server, Model One in C |
下面是client:
1 |
// clonecli1: Clone client, Model One in C |
对这第一个模型有一些说明:
下面是这个kvmsg类,暂时只是最简单的形式:
1 |
// kvsimple: Key-value message class in C |
以后我们会设计一个更复杂的kvmsg类来适应实际的工作。
server和client都会维持hash表,但这第一个模型只是在我们先启动所有的client然后启动server并且client从不会崩溃的情况下才能正常工作,这相当假。
获取一个外带快照 Getting an Out-of-Band Snapshot
现在我们就有了第二个问题:怎么解决慢接入的client和崩溃又重启的client。
为了让一个迟到的(或重新回来的)client跟上server,它必须获取一个server状态的快照。就像我们把”message”简化成”一个排好序的key-value对”,我们也可以把”状态”简化成”一个hash表”。为了拿到server的状态,一个client打开一个DEALER socekt然后明确地向server要这个状态。
要让它能正常工作,我们必须解决一个时间的问题。拿到一个状态的快照会花费一定的时间,如果快照很大时间会相当长。我们需要适当的更新快照。但server并不知道什么时候开始向我们发送更新。一个办法是开始接收,拿到第一个更新,然后再去要”更新N的状态”。这要求server为每个更新都存储一个快照,很不现实。
所以我们会在client中做同步,如下:
这是一个简单的模型,利用了zmq自身的内部队列。下面是server:
1 |
// clonesrv2: Clone server, Model Two in C |
下面是client:
1 |
// clonecli2: Clone client, Model Two in C |
关于这两个程序有些需要注意的:
现在,这两个程序并没做什么实际的工作,但它们正确的同步了状态。它是很好的例子来战士怎么去混合多种模型:PAIR-PAIR,PUB-SUB和ROUTER-DEALER。
从client从新发布更新 Rblishing Updates from Clients
在我们的第二个模型中,key-value存储的改变来自server本身。这是个中心式的模型,对比如说我们有个中心配置文件需要分发,并且每个节点都有本地缓存的状况很有用。另一个更有趣的模型从client获取更新,而不是server。server因此成为一个无状态broker。这会有如下好处:
要把更新从client发回给server,我们可以用很多种socket模型。最简单的就是PUSH-PULL组合。
为什么我们不让client直接向其他client发布更新呢?尽管这会减少延迟,但保证不了一致性。如果你允许更新的顺序根据是谁收到它们的而改变的话,就无法得到连续的共享状态。比如我们有两个client,改变两个不同的key的话,这会工作的很好,但如果这两个client想要在大致同一时间改变相同的key,它们会得到不同的值。
有几种不同的策略去保证在同一时间不同地方发生的变化的一致性。我们要使用的方法是把所有改变都集中起来。不管是client什么时候做出的更新,都会经过server被推送出去,这会强制让一个单独的序列按照它收到更新的顺序排列。
通过协调所有的更新,server就也能在所有的更新中添加一个唯一的序列编号了。有了唯一的序列,client就能检测严重故障了,包括网络冲突和队列溢出。如果一个client发现进入的数据流有个懂,它就会采取行动。client联系server去要缺失的数据看起来很合理,但在实际应用中却没什么用。如果存在”洞”,就是网络压力造成的,现在给网络增加更大的压力会让事情变得更糟。client能做的事就只是告诉用户”现在没办法继续”,停止,在有人手动检查过问题的原因之前不要重启。
现在我们就能在client保证状态更新了。下面是server:
1 |
// clonesrv3: Clone server, Model Three in C |
下面是client:
1 |
// clonecli3: Clone client, Model Three in C |
下面是关于第三版的一些说明:
用子树工作 Working with Subtrees
随着client数量的增加,我们的共享存储规模也会随之增大。向每个client都发送所有的信息会变得没有意义。这是pub-sub的一个传统问题:当你有很少的client的时候,你可以向所有的client发送所有的信息,随着规模的增长,这会变得很没效率。client在不同地方会很不一样。
因此即使当用一个共享的存储的时候,一些client也希望只用完整存储的一部分,我们称之为 子树 subtree 。client在它做出一个状态请求的时候必须请求该子树,并且当它订阅更新的时候也必须明确指定同一个子树。
对树来说有很多种常用的语法,一种就是*分层路径(path hierarchy) ,另一种是 主题树(topic tree)*。这些看起来像这样:
我们会用分层路径,并扩展client和server好让client能用一个单独的子树工作。一旦你学会了怎么处理一个单独的子树,就会自己扩展去管理多子树了,当然是你的应用里需要的话。
下面是实现的server的子树,一个模型三的变体:
1 |
// clonesrv4: Clone server, Model Four in C |
下面是相应的client:
1 |
// clonecli4: Clone client, Model Four in C |
临时值 Ephemeral Values
临时值是那些不被定期更新就会自动过期的值。如果你想把克隆模型用于注册服务,那临时值就能让你做动态估计。一个节点加入网络,发布它的地址,然后按规律更新。如果该节点挂掉,它的地址最终就会被移除。
临时值的一个常用方法是把它们附加到一个*会话(session) 中,当会话结束的时候删除它们。在克隆模型中,会话由client定义,如果client死掉会话就会结束。一个简单点儿的实现是附加一个 剩余时间(time to live, TTL)*给临时值,该剩余时间由server使用去删除那些没有在指定时间内更新的数据。
不管什么时候我都尽可能使用的一个很好的设计原则是 不要在非必须的时候发明概念 。如果我们有很多个临时值,那会话会提供更好的行能。如果我们只有很少的临时值,那最好给每个都设置一个TTL。如果我们使用了大量的临时值,把它们附加到会话中并且适时过期会更有效。在目前这并不是一个问题,并且可能永远都不会遇到,因此会话就可以先不考虑了。
现在我们来实现临时值。首先,我们需要个在key-value message中编码TTL的方法。可以增加个帧,但使用zmq帧结构的一个问题是每次我们想增加一个新属性,就必须改变message的结构。这破坏了兼容性。因此让我们给message增加个性质帧,写些能让我们存取属性值的代码。
其次,我们需要个方法,比如说”删除该key”。到目前为止,server和client都盲目的把新值插入或更新进它们的hash表。我们可以说如果value为空,就意味着”删除该key”。
下面是个更完整的kvmsg类的版本,它实现了属性帧(也增加了一个UUID帧,我们稍后会用到)。如果需要,它也处理了从hash表中删除key的任务:
1 |
// kvmsg: Key-value message class: full in C |
模型5的client跟模型4的差不多一样。它现在使用了完整版的kvmsg类,在每条message上都设置了一个随机的ttl属性(秒级):
1 |
kvmsg_set_prop(kvmsg, "ttl", "%d", randof(30)); |
使用反应器 Using a Reactor
到现在,我们在server使用了一个轮询循环。在下个模型的server中,我们会换成使用反应器。在C中,我们使用CZMQ的
zloop
类。使用反应器会让代码更多,但更易理解和构建,因为server的每个部分都在一个单独的反应器句柄中得到处理。
我们使用一个单独的线程,在反应器句柄之间传递一个server对象。我们可以把server组织成多线程,每个管理一个socket或者定时器,但多线程在没有共享数据的时候才会工作的更好。在本例中所有的工作都围绕这server的hash表,因此单线程更简单。
下面是三个反应器句柄:
1 |
// clonesrv5: Clone server, Model Five in C |
到目前位置我们研究的克隆模型还比较简单。现在我们就要进入相当复杂的部分了,这让我又需要再去拿一杯浓缩咖啡。你会发现要设计”可靠的”消息机制相当复杂,需要在设计之前就问自己”到底需不需要这个?”如果使用不可靠的或”足够”可靠的系统就能应付过去,那就可以少费很多功夫。当然,你也可能会时不时丢些数据。这经常是个需要权衡的问题。因为这杯咖啡相当好喝,我们就开始跳进这个深坑吧。
在你运行这最后一个模型的时候,你需要停止然后重启server。这看起来像是它恢复过来了,但当然是它把更新放到了一个空的状态存储而不是真正的当前状态存储中。任何新加入的client只会得到最近的更新而得不到完整的历史更新。
我们想要的就是一种让server从中断或崩溃中恢复过来的方法。我们也需要在server停止一定时间服务的时候提供备份。当某人要求”可靠性”的时候,问问他们想要处理哪些故障。在本例中,有这些故障:
第一步是要添加第二个server。我们可以用第四章的双子星模型,把这两个server组织成主server和备份server。双子星是个反应器,因此我们把最后这个server模型重构成反应器模式是很有用的。
我们需要保证如果主server挂掉的话没有丢失更新。最简单的方法就是把它们发送给两个server。备份server然后就像一个client那样工作,通过接收更新保持状态同步。它也会从client那里拿到新的更新。它还不能把这些更新放到自己的hash表中,但可以保持一端时间。
因此,模型6从模型5的基础上引入了如下改变:
把client的逻辑设计成一个有限状态机很有用,client循环经过三种状态:
client会永远循环。很可能在启动或者进行故障转移的时候一些client会试着连接主server而另一些尝试跟备份server通信。双子星状态机会处理这种情况,确切来说是希望能处理。很难保证软件的正确性:相反我们会不断锤炼它直到没有错。
故障转移会按照以下步骤进行:
当主server恢复过来,它会:
我们会做出如下的假设:
因此我们使用双子星模型的高可靠性server组的框架会有两个server和一组跟两台server都通信的client组成。
下面是克隆模型server的第六个也是最后一个模型:
1 |
// clonesrv6: Clone server, Model Six in C |
该模型只有几百行代码,但让它运行起来花费了很长时间。更准确来说,构建模型6花费了整整一个星期,”哦my上帝,这对一个例子来说真是太复杂了”。我们在这个小程序中使用了相当多的东西,有故障转移、临时值、子树等等。让我吃惊的是之前的设计相当准确。当然那么多socket流的构建和调试也是相当有挑战的。
基于反应器的设计去除了很多代码中的啰里啰嗦,剩下的就相当简单且容易理解了。我们使用了第四章的bstar反应器。整个server是一个线程,因此没有奇怪的线程间通信——只是一个结构体指针(self)在所有的句柄之间传递,该结构体指针就能愉快的完成所有的事。使用反应器模型的一个附带好处是代码被紧密组织到一个轮询循环,这就很容易重用了。模型6的很多部分是来自模型5的。
我是一部分一部分写的,写完一部分工作”正确”之后才去写下一部分。因为有四个或五个主要的socket流,这意味着大量的调试和测试。我是通过把message打印到控制台来调试的。不要在zmq程序中使用传统的调试器:你需要监控message流来知道到底发生了什么。
为了测试,我总是试着用Valgrind,它会检测内存泄露和无效的内存使用。在C中,这是一个很重要的方面,因为你没办法把内存管理委派给一个垃圾收集器。使用像kvmsg和CZMQ会很有用。
集群hashy映射协议 The Clustered Hashmap Protocol
尽管server只是一个先前模型和双子星模型的混搭,但client相当复杂。在我们详细介绍之前,先来看看最终的协议。我把它作为一个说明写在了zmo RFC网站,也就是 Clustered Hashmap Protocol 。
大致说来,有两种方法来设计一个像这个这么复杂的协议。一种是把每种数据流都分配到各自的一组socket上,这是我们常用的方法。优点是每个数据流都很简单且清晰。缺点是同时管理多个socket数据流会很复杂。使用反应器会简单点儿,但反应器模型会产生很多必须相互适配的可变部分。
第二种办法是用一个单独是socket来处理所有的事情。在本例中,我在server中使用了ROUTER,在client中使用了DEALER,然后在这个连接之上处理所有的事情。这样会产生一个更复杂的协议,但至少复杂性都集中在一个地方。在第七章我们会看到一个基于ROUTER-DEALER组合的协议例子。
现在我们来看看CHP协议。注意”SHOULD”,”MUST”和”MAY”是我们在协议中用来明确需求等级的关键字。】
目标 Goals
CHP用于为一组基于zmq网络的client之间可靠的pub-sub模型提供基础。它定义了一个包含key-value对的”hashmap”抽象概念。任一client都能在任何时候修改任一key-value对,并且更新会被递送到所有的client。一个client能在任一时刻加入网络。
架构 Architecture
CHP连接一组client应用和一组server。client连接到server。client不能连接彼此。client能随意加入和退出。
端口和连接 Ports and Connections
server必须(MUST)启动如下三个端口地址:
client应该(SHOULD)至少启动如下两个连接:
如果client想更新haspmap的话,它可以(MAY)启动第三个连接:
剩下的框架没在下面的命令中说明。
状态同步 State Synchronization
client必须(MUST)在启动的时候发送一个ICANHAZ命令到它的快照连接。该命令包括以下两个帧:
1 |
ICANHAZ command |
每个帧都是zmq字符串。子树说明可以(MAY)为空。如果不为空,它会包含一个通过反斜杠分开的一个个路径段,以反斜杠最后结束。
server必须(MUST)返回给ICANHAZ命令零个或多个KVSYNC命令到它的快照端口,然后跟一个KTHXBAI命令。server必须(MUST)在每个命令之前前缀上client的标识,该标识由ICANHAZ命令提供。KVSYNC命令定义了一个kay-value对:
1 |
KVSYNC command |
序列编号没有符号,可以是0.
KTHXBAI命令采用如下格式:
1 |
KTHXBAI command |
序列编号必须(MUST)是KVSYNC命令先前送过来的最大数字。
当client收到了一个KTHXBAI命令,它应该(SHOULD)开始去接收从它的订阅连接发来的数据然后处理它们。
Server-to-Client Updates
当server有一个hashmap的更新,它必须(MUST)把它作为一个KVPUB命令通过发布socket广播出去。KVPUB命令有如下格式:
1 |
KVPUB command |
序列编号必须(MUST)严格递增。client必须(MUST)丢掉那些序列编号没有严格大于上一条接收到的KTHXBAI或KVPUB命令的序列编号的任何KVPUB命令。
UUID是可选项并且frame 2可以(MAY)为空(长度为0)。属性字段组织成零个或多个”name=value”后跟一个换行符的字段的格式。如果该key-value对没有属性,那属性字段就为空。
如果value为空,client应该(SHOULD)应该删除指定key的key-value项。
在缺少更新的时候server应该(SHOULD)定期发送一个HUGZ命令,比如说每秒一次。HUGZ命令格式如下:
1 |
HUGZ command |
client可以(MAY)把HUGZ的缺失作为server已经挂掉的指示器(见下面可靠性)。
Client-to-Server Updates
当client有一个针对它自己的hashmap的更新,它可以(MAY)把它作为一个KVSET命令通过发布连接发送给server。KVSET命令格式如下:
1 |
KVSET command |
序列编号没有符号且可以为0。如果使用一个可靠的server架构的话,UUID应该(SHOULD)是个全局唯一标识。
如果value为空,server必须(MUST)删除该指定key的key-value项。
server应该(SHOULD)接收如下属性:
可靠性 Reliability
CHP可以配置成双server,如果主server失效那备份server就会接替工作。CHP没有定义故障转移的机制,不过可以参照双子星模型的定义。
要辅助server的可靠性设计,client可以(MAY):
扩展性和性能 Scalability and Performance
CHP被设计成能适应大规模(成千)client的架构,限制只存在于broker的系统资源。由于所有的更新都会通过一台单独的server,总体吞吐量会被限制在峰值为每秒几百万个更新,或者更少些。
安全 Security
CHP没有实现任何认证、接入控制或者密码机制,不应该在需要这些的场景中使用。
构建一个多线程栈和API Building a Multithreaded Stach and API
目前我们使用的client栈还不能正确处理该协议。在开始构建心跳的时候,我们需要一个能运行在后台线程的client栈。在第四章结尾的自由者模型中我们使用过一个多线程API但没有详细说明它。当你开始构建像CHP那样更复杂的zmq协议的时候多线程API就相当有用了。
如果你设计设计了一个很重要的协议并且期望应用正确实现它,那大多数开发者会在大部分时间弄错的。你会听到很多人抱怨协议太复杂、太脆弱、太难使用。即使你给它们一个很简单的API,也会很难让它们去买该系统。
我们的多线程API包括一个前端对象和一个后台代理,通过两个PAIR socket连接。像这样连接两个PAIR socket会很有用,你的高级绑定很可能跟CZMQ一样,就是封装一个”创建一个带有我可以用来向它发送信息的管道的新线程”的方法。
我们在本书中看到的多线程API全都采用相同的格式:
clone_new
)创建一个context并且启动一个带有管道的后台线程。它保存有该管道的一端以便能向后台线程发送命令。
zmq_poll
循环。
1 |
void |
recv
函数,它会在前端管道上等着message。
recv
方法都会阻塞该应用。
该克隆类有着跟第四章
flicliapi
类相同的结构,并且增加了最后一个克隆模型client的逻辑。没有zmq,这种多线程API的设计就需要几周时间的艰苦工作。使用了zmq的话,就差不多是一两天的工作了。
该克隆类的真实API相当简单:
1 |
// Create a new clone class instance |
下面是克隆client的模型6,它现在变成了使用该克隆类的一个瘦壳:
1 |
// clonecli6: Clone client, Model Six in C |
注意连接函数,它指定了一个server端点地址。在该函数下,我们实际上在跟三个端口通信。然而,就像CHP协议说的那样,这三个端口是三个连续的端口数字:
因此我们可以把这三个连接整合到一个逻辑操作中(我们实现为三个独立的zmq连接调用)。
我们以该克隆栈的源代码结束。这是段复杂的代码,但只要你把它分解为前端对象类和后台agent类就很容易理解了。前端向agent发送字符串命令(“SUBTREE”, “CONNECT”, “SET”, “GET”),agent会处理这些命令并更server(s)通信。下面是agent的逻辑:
下面是真正的克隆类实现:
1 |
// clone: Clone class in C |
爱吹牛的人字拖 · 国际科学与工程大奖赛_百度百科 1 周前 |