import { webSocket, WebSocketSubject } from 'rxjs/webSocket'
class MessageService {
private ws: WebSocketSubject< any > ;
connect ( ) {
this . ws = webSocket ( {
url: 'ws://localhost:8080/ws'
} ) ;
send ( type : string , data? : any ) {
this . ws. next ( {
type ,
} ) ;
class AppComponent {
constructor ( private messageService: MessageService)
this . messageService. subscribe ( data => {
switch ( data. type ) {
case 'TYPE_1' :
case 'TYPE_2' :
shend1 ( ) {
this . messageService. send ( 'TYPE_1' , { ... } ) ;
shend2 ( ) {
this . messageService. send ( 'TYPE_2' , { ... } ) ;
这样的好处是使用非常灵活,几乎就是把 Rxjs 的 webSocket
返回直接提供给了开发者,不过缺点也很明显,就是上文提到的无法将消息类型和数据类型对应起来,尤其是在大型的 Websocket 应用中尤为突出。
为了解决这些问题,接下来我们会利用 Typescript 的一些高阶用法重新定义一个类型安全的 Websocket 应用。
在开始之前我们需要先解释下文会用到的两个词的意思,避免混淆。
消息类型(用于区分不同行为的消息, 与 Typescript 无关)
数据类型(真正意义上的 Typescript 类型,对应每个消息的数据类型)
为了区分不同行为的消息我们需要先定义消息类型的枚举,其中 Receive
是接收类型,Send
是发送类型。其实一个枚举就可以满足这个需求,不过为了覆盖更多在项目中可能会出现的情况这里声明了两个枚举。
export enum Receive {
CONNECT = 'CONNECT' ,
USER_LIST = 'USER_LIST' ,
MESSAGE = 'MESSAGE' ,
JOINED = 'JOINED' ,
LEAVE = 'LEAVE' ,
RENAME = 'RENAME' ,
export enum Send {
JOINED = 'JOINED' ,
LEAVE = 'LEAVE' ,
RENAME = 'RENAME' ,
MESSAGE = 'MESSAGE' ,
GET_USER_LIST = 'GET_USER_LIST'
将枚举初始为字符串可以让我们直接获取到对应字符串
出于演示目的,这里的数据类型仅满足最小可用度,同时也是为了便于理解。
export type User = string ;
export interface Rename {
user: User;
newName: User;
export interface ChatMessage {
form: User;
content: string ;
time: number ;
不同的消息类型对应了不同的数据, 这里我们再添加两个 interface 用于映射消息类型和数据类型的关系,其中 key
是消息类型 value
是与之对应的数据类型。
需要注意的是有些 value
的类型为 never
, 这意味着它不需要要发送数据。
export interface MessageReceiveData {
[ Receive. CONNECT ] : never ;
[ Receive. USER_LIST ] : User[ ] ;
[ Receive. MESSAGE ] : ChatMessage;
[ Receive. JOINED ] : User;
[ Receive. LEAVE ] : User;
[ Receive. RENAME ] : Rename;
export interface MessageSendData {
[ Send. MESSAGE ] : ChatMessage;
[ Send. GET_USER_LIST ] : never ;
[ Send. JOINED ] : User;
[ Send. LEAVE ] : User;
[ Send. RENAME ] : Rename;
接下来定义我们约定的 Websocket 消息格式的类型
type DataType< T extends ( Send | Receive) > = T extends Send ? MessageSendData[ Send] : MessageReceiveData[ Receive] ;
export interface MessageBody < T extends ( Send | Receive) > {
type : T ;
data: DataType< T > ;
这里的 T
是范型,通过 extends
将它限制在了 Send
和 Receive
两个枚举之间。
DataType
是一个条件类型,用于确定在不同消息类型时返回正确的数据类型,理解这个简单的工具类型对阅读下面的内容很有帮助,这里举例稍做解释。
当 T
为 Receive.USER_LIST
时,将会返回如下类型
MessageBody< Receive. USER_LIST >
type : 'USER_LIST' ,
data: User[ ]
当 T
为 Receive.CONNECT
时,将会返回如下类型。注意!这里因为对应的数据类型是 never
所以没有 data
属性。
MessageBody< Receive. CONNECT >
type : 'CONNECT'
Websocket 服务
重新修改最初版本的服务,为 WebSocketSubject
定义明确的类型,并添加一个 receive
方法通过一个 Subject
来分发消息。
class MessageService {
private ws: WebSocketSubject< MessageBody< Receive | Send>> ;
private received$ = new Subject < MessageBody< Receive>> ( ) ;
connect ( ) {
this . ws = webSocket ( {
url: 'ws://localhost:8080/ws'
} ) ;
this . ws. subscribe ( data => this . received$. next ( data as MessageBody< Receive> ) ) ;
receive< K extends Receive > ( type : K ) : Observable< MessageReceiveData[ K ] > {
return this . received$. pipe (
filter ( message => message. type === type ) ,
map ( message => message. data)
) as Observable< MessageReceiveData[ K ] > ;
这里的 receive
方法同样接受一个范型,同时也是消息类型,并且返回在 MessageReceiveData
中对应的数据类型,这样我们便可以安全的使用 receive
方法来订阅消息在保证类型安全同时还能获得高级编辑器的智能提示功能。
class AppComponent {
constructor ( private messageService: MessageService)
this . messageService
. receive ( Receive. CONNECT )
. subscribe ( ( ) => {
this . messageService
. receive ( Receive. MESSAGE )
. subscribe ( data => {
console . log ( data. content)
console . log ( data. a)
熟悉 Rxjs
的同学知道这里还需要做取消订阅处理,稍后我们会有统一的方案处理这个问题。
接受消息的问题解决了,接下来需要解决发送数据的问题了,不同的消息类型需要发送不同的数据,而比如上面的 Send.GET_USER_LIST
类型这不需要发送数据,只需要发送对于的消息类型即可,要做到这一点我们希望 send
方法能根据类型识别需要的数据类型,同时还能支持无数据的消息类型。在开始之前我们先看看之前的 send
方法。
class MessageService {
send ( type : string , data? : any ) {
this . ws. next ( {
type ,
} ) ;
可以看到这里的 data
参数是可选的,就是为了应对无数据的消息类型,现在我们希望 Typescript 在知道 type
的情况下能明确告诉我们是否需要 data
,以及什么类型的 data
。请考虑下面这种实现方式。
send< K extends Send > ( type : K , data? : MessageSendData[ K ] ) {
this . ws. next ( {
type ,
} ) ;
send< Send. GET_USER_LIST > ( Send. GET_USER_LIST ) ;
send< Send. MESSAGE > ( Send. MESSAGE , message) ;
send< Send. RENAME > ( Send. RENAM ) ;
这样的确可以知道 data
的类型,不过由于有些消息类型不需要 data
,所以类型是可选的,这导致我们无法确切的知道一个消息类型是否需要 data
。 有没有什么办法可以同时兼顾着两需求呢?请再考虑下面的实现方式。
type ArgumentsType< T > = T extends ( ... args: infer U ) => void ? U : never ;
type SendArgumentsType< T extends keyof MessageSendData> =
MessageSendData[ T ] extends never
? ArgumentsType< ( type : T ) => void >
: ArgumentsType< ( type : T , data: MessageSendData[ T ] ) => void > ;
send< K extends Send > ( ... args: SendArgumentsType< K > ) {
const [ type , data] = args;
this . ws. next (
type ,
这段代码看上去感觉很复杂,也许还有些没见过的操作符,我们首先来看看 ArgumentsType
类型。
这里它接受一个范型,如果是个方法则返回 U
,那么这个 U
是哪里来的呢?是通过 infer
操作符推导出来的,这里推导的是传入方法的参数,而且是通过参数展开操作符(…) 展开后的数组,实际使用的效果是这样的。
type ArgumentsType< T > = T extends ( ... args: infer U ) => void ? U : never ;
ArgumentsType< ( string , number ) => void > ;
ArgumentsType< ( boolean ) => void > ;
ArgumentsType< ( ) => void > ;
ArgumentsType< string >
现在再来看SendArgumentsType
类型,首先判断 MessageSendData[T]
类型是不是 naver
, 如果是的话则通过 ArgumentsType
来返回无数据的参数类型,否则就通过 ArgumentsType
来返回对应消息类型的参数类型。
export type SendArgumentsType< T extends keyof MessageSendData> =
MessageSendData[ T ] extends never
? ArgumentsType< ( type : T ) => void >
: ArgumentsType< ( type : T , data: MessageSendData[ T ] ) => void > ;
SendArgumentsType< Send. MESSAGE >
SendArgumentsType< Send. RENAME >
SendArgumentsType< Send. GET_USER_LIST >
现在我们利用参数展开操作符(…),将 shend
方法的参数展开成数组,再利用 SendArgumentsType
类型就能实现我们想要的功能了。
send< Send. GET_USER_LIST > ( Send. GET_USER_LIST ) ;
send< Send. MESSAGE > ( Send. MESSAGE , message) ;
send< Send. RENAME > ( Send. RENAM ) ;
现在我们发送/接受消息都能正确提示和约束类型了。
class AppComponent {
username = '' ;
constructor ( private messageService: MessageService)
this . messageService
. receive ( Receive. CONNECT )
. subscribe ( ( ) => {
this . getUserList ( ) ;
this . messageService
. receive ( Receive. MESSAGE )
. subscribe ( data => {
getUserList ( ) {
this . messageService. send< Send. GET_USER_LIST > ( Send. GET_USER_LIST ) ;
join ( username: string ) {
this . username = username;
this . messageService. send< Send. JOINED > ( Send. JOINED , this . username) ;
rename ( username: string ) {
const data = {
user: this . username,
newName: username
this . messageService. send< Send. RENAME > ( Send. RENAME , data) ;
sendMessage ( content: string ) {
const message = {
content,
form: this . username,
time: Date. now ( )
this . messageService. send< Send. MESSAGE > ( Send. MESSAGE , message) ;
看上去我们的问题已经解决了,不过订阅消息的方式还是显得很臃肿,而且还有取消订阅的问题没有解决,如果应用中很多组件都订阅了消息,那么光是维护订阅也会花费不少的时间了精力。所以接下来我们还要继续改造我们的应用,添加一个订阅管理器已经订阅装饰器,让我们能够像下面这样订阅消息,并且不用关心取消订阅的问题。
class AppComponent {
username = '' ;
constructor ( private messageService: MessageService)
this . messageService
. receive ( Receive. CONNECT )
. subscribe ( ( ) => {
this . messageService
. receive ( Receive. MESSAGE )
. subscribe ( data => {
this . messageService
. receive ( Receive. USER_LIST )
. subscribe ( data => {
上面的代码依然存在两个问题:
所有的订阅都集中在一个方法里,显得这个方法过于臃肿
还需要花费不少的精力管理取消订阅
所以现在我们要着手解决这两个问题,让一个方法负责一个类型的消息,同时不再需要关心取消订阅的问题,就像下面这样。
export class AppComponent extends MessageListenersManager {
constructor ( private messageService: MessageService) {
super ( messageService) ;
@MessageListener ( Receive. CONNECT )
onConnect ( ) {
@MessageListener ( Receive. MESSAGE )
addMessage ( message: ChatMessage ) {
@MessageListener ( Receive. USER_LIST )
updateUserList ( users: User[ ] ) {
订阅管理器
我们先完成 MessageListenersManager
类,它的目的主要有两个一是确保派生类注入 MessageService
实例,二是通过一些方法管理 Subscription
并在合适的时机取消订阅,它的实现是这样的。
export class MessageListenersManager {
static __messageListeners__: Function [ ] = [ ] ;
readonly __messageListenersTakeUntilDestroy$__ = new Subject < void > ( ) ;
constructor ( public messageService: MessageService) {
while ( MessageListenersManager. __messageListeners__. length > 0 ) {
const fun = MessageListenersManager. __messageListeners__. pop ( ) ;
fun . apply ( this ) ;
ngOnDestroy ( ) : void {
this . __messageListenersTakeUntilDestroy$__. next ( ) ;
this . __messageListenersTakeUntilDestroy$__. complete ( ) ;
首先定义一个静态属性 __messageListeners__
,用于存放创建订阅的方法,然后再 constructor 里面依次执行它们以创建订阅,但是为什么要用静态属性呢?因为在成员装饰器被调用的时候类还没有被构造,无法访问内部的属性,只有静态属性可以在构造前被访问。之后再创建一个名为 __messageListenersTakeUntilDestroy$__
的 Subject
再订阅时放在 Rxjs 操作符 takeUntil
里用于取消订阅,最后在 ngOnDestroy
生命周期里调用以取消订阅。
订阅装饰器
接下来创建一个成员装饰器,创建指定消息类型的订阅然后将方法推入 __messageListeners__
,这里的 ReceiveArgumentsType
类型是用来约束被装饰方法的参数类型。
export type ReceiveArgumentsType<
T extends keyof MessageReceiveData
> = MessageReceiveData[ T ] extends undefined
? ( ) => void
: ( data? : MessageReceiveData[ T ] ) => void ;
export function MessageListener< T extends keyof MessageReceiveData> ( type : T ) {
return (
target: MessageListenersManager,
propertyKey: string ,
descriptor: TypedPropertyDescriptor< ReceiveArgumentsType< T >> ) => {
const constructor = Object. getPrototypeOf ( target) . constructor ;
if ( constructor && constructor . __messageListeners__) {
constructor . __messageListeners__. push ( function ( ) {
this . messageService. receive ( type )
. pipe ( takeUntil ( this . __messageListenersTakeUntilDestroy$__) )
. subscribe ( data => {
descriptor. value . call ( this , data) ;
} ) ;
} ) ;
return descriptor;
现在我们就可以像下面这样订阅和发送 Websocket 消息了:
export class AppComponent extends MessageListenersManager {
username = '' ;
constructor ( private messageService: MessageService) {
super ( messageService) ;
@MessageListener ( Receive. CONNECT )
onConnect ( ) {
@MessageListener ( Receive. MESSAGE )
addMessage ( message: ChatMessage ) {
@MessageListener ( Receive. USER_LIST )
updateUserList ( users: User[ ] ) {
getUserList ( ) {
this . messageService. send< Send. GET_USER_LIST > ( Send. GET_USER_LIST ) ;
join ( username: string ) {
this . username = username;
this . messageService. send< Send. JOINED > ( Send. JOINED , this . username) ;
rename ( username: string ) {
const data = {
user: this . username,
newName: username
this . messageService. send< Send. RENAME > ( Send. RENAME , data) ;
sendMessage ( content: string ) {
const message = {
content,
form: this . username,
time: Date. now ( )
this . messageService. send< Send. MESSAGE > ( Send. MESSAGE , message) ;