添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

Mangos™ is an implementation in pure Go of the SP ( Scalability Protocols ) messaging system. These are colloquially known as nanomsg .

NOTE : The import path has changed! Please change any references to go.nanomsg.org/mangos/v3 . The old v2 imports will still work for old applications, provided that a sufficiently modern version of Go is used. However, no further work will be done on earlier versions. Earlier versions will still inter-operate with this version, except that within the same process the inproc transport can only be used by consumers using the same version of mangos.

The modern C implementation of the SP protocols is available as NNG™ .

The original implementation of the SP protocols is available as nanomsg™ .

Generally (modulo a few caveats) all of these implementations can inter-operate.

The design is intended to make it easy to add new transports, as well as new topologies ( protocols in SP parlance.)

At present, all the Req/Rep, Pub/Sub, Pair, Bus, Push/Pull, and Surveyor/Respondent patterns are supported. This project also supports an experimental protocol called Star.

Supported transports include TCP, inproc, IPC, WebSocket, WebSocket/TLS and TLS.

Basic interoperability with nanomsg and NNG has been verified (you can do so yourself with nanocat and macat ) for all protocols and transports that NNG and nanomsg support, except for the ZeroTier transport and the PAIRv1 protocol, which are only supported in NNG at this time.

There are a number of projects that use these products together.

Documentation

For API documentation, see https://pkg.go.dev/go.nanomsg.org/mangos/v3 .

Testing

This package supports internal self tests, which can be run in the idiomatic Go way. (Note that most of the tests are in a test subdirectory.)

$ go test go.nanomsg.org/mangos/v3/...

There are also internal benchmarks available:

$ go test -bench=. go.nanomsg.org/mangos/v3/test

Commercial Support

Staysail Systems, Inc. offers commercial support for mangos.

Examples

Some examples are posted in the directories under examples/ in this project.

These examples are rewrites (in Go) of Tim Dysinger's Getting Started with Nanomsg.

Running go doc in the example directories will yield information about how to run each example program.

Enjoy!

mangos™, Nanomsg™ and NNG™ are trademarks of Garrett D'Amore.

Package mangos provides a pure Go implementation of the Scalability Protocols. These are more commonly known as "nanomsg" which is the C-based software package that is also their reference implementation.

These protocols facilitate the rapid creation of applications which rely on multiple participants in sometimes complex communications topologies, including Request/Reply, Publish/Subscribe, Push/Pull, Surveyor/Respondent, etc.

For more information, see www.nanomsg.org.

Index

  • Constants
  • func Device(s1 Socket, s2 Socket) error type Context type Dialer type Listener type Message func NewMessage(sz int) *Message func (m *Message) Clone() func (m *Message) Dup() *Message func (m *Message) Free() func (m *Message) MakeUnique() *Message type Pipe type PipeEvent type PipeEventHook type ProtocolBase type ProtocolContext type ProtocolInfo type ProtocolPipe type Socket type TranDialer type TranListener type TranPipe type Transport

    Constants

    View Source
    const (
    	ErrBadAddr     = errors.ErrBadAddr
    	ErrBadHeader   = errors.ErrBadHeader
    	ErrBadVersion  = errors.ErrBadVersion
    	ErrTooShort    = errors.ErrTooShort
    	ErrTooLong     = errors.ErrTooLong
    	ErrClosed      = errors.ErrClosed
    	ErrConnRefused = errors.ErrConnRefused
    	ErrSendTimeout = errors.ErrSendTimeout
    	ErrRecvTimeout = errors.ErrRecvTimeout
    	ErrProtoState  = errors.ErrProtoState
    	ErrProtoOp     = errors.ErrProtoOp
    	ErrBadTran     = errors.ErrBadTran
    	ErrBadProto    = errors.ErrBadProto
    	ErrBadOption   = errors.ErrBadOption
    	ErrBadValue    = errors.ErrBadValue
    	ErrGarbled     = errors.ErrGarbled
    	ErrAddrInUse   = errors.ErrAddrInUse
    	ErrBadProperty = errors.ErrBadProperty
    	ErrTLSNoConfig = errors.ErrTLSNoConfig
    	ErrTLSNoCert   = errors.ErrTLSNoCert
    	ErrNotRaw      = errors.ErrNotRaw
    	ErrCanceled    = errors.ErrCanceled
    	ErrNoContext   = errors.ErrNoContext
    	ErrNoPeers     = errors.ErrNoPeers
      

    Various error codes. View Source

    const (
    	// OptionRaw is used to test if the socket in RAW mod.  The details of
    	// how this varies from normal mode vary from protocol to protocol,
    	// but RAW mode is generally minimal protocol processing, and
    	// stateless.  RAW mode sockets are constructed with different
    	// protocol constructor.  Raw mode is generally used with Device()
    	// or similar proxy configurations.
    	OptionRaw = "RAW"
    	// OptionRecvDeadline is the time until the next Recv times out.  The
    	// value is a time.Duration.  Zero value may be passed to indicate that
    	// no timeout should be applied.  A negative value indicates a
    	// non-blocking operation.  By default there is no timeout.
    	OptionRecvDeadline = "RECV-DEADLINE"
    	// OptionSendDeadline is the time until the next Send times out.  The
    	// value is a time.Duration.  Zero value may be passed to indicate that
    	// no timeout should be applied.  A negative value indicates a
    	// non-blocking operation.  By default there is no timeout.
    	OptionSendDeadline = "SEND-DEADLINE"
    	// OptionRetryTime is used by REQ.  The argument is a time.Duration.
    	// When a request has not been replied to within the given duration,
    	// the request will automatically be resent to an available peer.
    	// This value should be longer than the maximum possible processing
    	// and transport time.  The value zero indicates that no automatic
    	// retries should be sent.  The default value is one minute.
    	// Note that changing this option is only guaranteed to affect requests
    	// sent after the option is set.  Changing the value while a request
    	// is outstanding may not have the desired effect.
    	OptionRetryTime = "RETRY-TIME"
    	// OptionSubscribe is used by SUB/XSUB.  The argument is a []byte.
    	// The application will receive messages that start with this prefix.
    	// Multiple subscriptions may be in effect on a given socket.  The
    	// application will not receive messages that do not match any current
    	// subscriptions.  (If there are no subscriptions for a SUB/XSUB
    	// socket, then the application will not receive any messages.  An
    	// empty prefix can be used to subscribe to all messages.)
    	OptionSubscribe = "SUBSCRIBE"
    	// OptionUnsubscribe is used by SUB/XSUB.  The argument is a []byte,
    	// representing a previously established subscription, which will be
    	// removed from the socket.
    	OptionUnsubscribe = "UNSUBSCRIBE"
    	// OptionSurveyTime is used to indicate the deadline for survey
    	// responses, when used with a SURVEYOR socket.  Messages arriving
    	// after this will be discarded.  Additionally, this will set the
    	// OptionRecvDeadline when starting the survey, so that attempts to
    	// receive messages fail with ErrRecvTimeout when the survey is
    	// concluded.  The value is a time.Duration.  Zero can be passed to
    	// indicate an infinite time.  Default is 1 second.
    	OptionSurveyTime = "SURVEY-TIME"
    	// OptionTLSConfig is used to supply TLS configuration details. It
    	// can be set using the ListenOptions or DialOptions.
    	// The parameter is a tls.Config pointer.
    	OptionTLSConfig = "TLS-CONFIG"
    	// OptionWriteQLen is used to set the size, in messages, of the write
    	// queue channel. By default, it's 128. This option cannot be set if
    	// Dial or Listen has been called on the socket.
    	OptionWriteQLen = "WRITEQ-LEN"
    	// OptionReadQLen is used to set the size, in messages, of the read
    	// queue channel. By default, it's 128. This option cannot be set if
    	// Dial or Listen has been called on the socket.
    	OptionReadQLen = "READQ-LEN"
    	// OptionKeepAlive is used to set TCP KeepAlive.  Value is a boolean.
    	// Default is true.
    	OptionKeepAlive = "KEEPALIVE"
    	// OptionKeepAliveTime is used to set TCP KeepAlive time in seconds.
    	// Value is a time.Duration. Default is OS dependent.
    	// Default is true.
    	OptionKeepAliveTime = "KEEPALIVETIME"
    	// OptionNoDelay is used to configure Nagle -- when true messages are
    	// sent as soon as possible, otherwise some buffering may occur.
    	// Value is a boolean.  Default is true.
    	OptionNoDelay = "NO-DELAY"
    	// OptionLinger is used to set the linger property.  This is the amount
    	// of time to wait for send queues to drain when Close() is called.
    	// Close() may block for up to this long if there is unsent data, but
    	// will return as soon as all data is delivered to the transport.
    	// Value is a time.Duration.  Default is one second.
    	OptionLinger = "LINGER"
    	// OptionTTL is used to set the maximum time-to-live for messages.
    	// Note that not all protocols can honor this at this time, but for
    	// those that do, if a message traverses more than this many devices,
    	// it will be dropped.  This is used to provide protection against
    	// loops in the topology.  The default is protocol specific.
    	OptionTTL = "TTL"
    	// OptionMaxRecvSize supplies the maximum receive size for inbound
    	// messages.  This option exists because the wire protocol allows
    	// the sender to specify the size of the incoming message, and
    	// if the size were overly large, a bad remote actor could perform a
    	// remote Denial-Of-Service by requesting ridiculously  large message
    	// sizes and then stalling on send.  The default value is 1MB.
    	// A value of 0 removes the limit, but should not be used unless
    	// absolutely sure that the peer is trustworthy.
    	// Not all transports honor this limit.  For example, this limit
    	// makes no sense when used with inproc.
    	// Note that the size includes any Protocol specific header.  It is
    	// better to pick a value that is a little too big, than too small.
    	// This option is only intended to prevent gross abuse  of the system,
    	// and not a substitute for proper application message verification.
    	// This option is type int.
    	OptionMaxRecvSize = "MAX-RCV-SIZE"
    	// OptionReconnectTime is the initial interval used for connection
    	// attempts.  If a connection attempt does not succeed, then ths socket
    	// will wait this long before trying again.  An optional exponential
    	// backoff may cause this value to grow.  See OptionMaxReconnectTime
    	// for more details.   This is a time.Duration whose default value is
    	// 100msec.  This option must be set before starting any dialers.
    	OptionReconnectTime = "RECONNECT-TIME"
    	// OptionMaxReconnectTime is the maximum value of the time between
    	// connection attempts, when an exponential backoff is used.  If this
    	// value is zero, then exponential backoff is disabled, otherwise
    	// the value to wait between attempts is doubled until it hits this
    	// limit.  This value is a time.Duration, with initial value 0.
    	// This option must be set before starting any dialers.
    	OptionMaxReconnectTime = "MAX-RECONNECT-TIME"
    	// OptionBestEffort enables non-blocking send operations on the
    	// socket. Normally (for some socket types), a socket will block if
    	// there are no receivers, or the receivers are unable to keep up
    	// with the sender. (Multicast sockets types like Bus or Star do not
    	// behave this way.)  If this option is set, instead of blocking, the
    	// message will be silently discarded.  The value is a boolean, and
    	// defaults to False.
    	OptionBestEffort = "BEST-EFFORT"
    	// OptionLocalAddr expresses a local address.  For dialers, this is
    	// the (often random) address that was locally bound.  For listeners,
    	// it is usually the service address.  The value is a net.Addr.  This
    	// is generally a read-only value for pipes, though it might sometimes
    	// be available on dialers or listeners.
    	OptionLocalAddr = "LOCAL-ADDR"
    	// OptionRemoteAddr expresses a remote address.  For dialers, this is
    	// the service address.  For listeners, its the address of the far
    	// end dialer.  The value is a net.Addr.  It is generally read-only
    	// and available only on pipes and dialers.
    	OptionRemoteAddr = "REMOTE-ADDR"
    	// OptionTLSConnState is used to supply TLS connection details. The
    	// value is a tls.ConnectionState.  It is only valid when TLS is used.
    	// This is available on pipes that are using TLS.
    	OptionTLSConnState = "TLS-STATE"
    	// OptionHTTPRequest conveys an *http.Request.  This read-only option
    	// only exists for Pipes using websocket connections.
    	OptionHTTPRequest = "HTTP-REQUEST"
    	// OptionDialAsynch (used on a Dialer) causes the Dial() operation
    	// to run in the background.  Further, the Dialer will always redial,
    	// even if the first attempt fails.  (Normally dialing is performed
    	// synchronously, so that if the remote peer is unavailable at first
    	// the caller can learn of the error and handle or report it.
    	// Note that mangos v1 behavior is the same as if this option is
    	// set to true.
    	OptionDialAsynch = "DIAL-ASYNCH"
    	// OptionPeerPID is the peer process ID.  This is only implemented for
    	// transports that support it, and it is a read-only option for pipes
    	// only.  It may require cgo on some platforms.  The value is an int.
    	OptionPeerPID = "PEER-PID"
    	// OptionPeerUID is the effective peer user ID, typically obtained via
    	// SO_PEERCRED.  It is only available transports that support it, and is
    	// a read-only option for pipes.  It may require cgo on some platforms.
    	// The value is an int.
    	OptionPeerUID = "PEER-UID"
    	// OptionPeerGID is the effective peer group ID, typically obtained via
    	// SO_PEERCRED.  It is only available transports that support it, and is
    	// a read-only option for pipes.  It may require cgo on some platforms.
    	// The value is an int.
    	OptionPeerGID = "PEER-GID"
    	// OptionPeerZone is the peer's zone ID.  This is only supported on
    	// Solaris platforms at present, and only when cgo support is enabled.
    	// The value is an int.
    	OptionPeerZone = "PEER-ZONE"
    	// OptionFailNoPeers causes send or receive operations to fail
    	// immediately rather than waiting for a timeout if there are no
    	// connected peers.  This helps discriminate between cases involving
    	// flow control, from those where we we have no peers.  Use of this
    	// option may make applications more brittle, as a temporary disconnect
    	// that may otherwise self-heal quickly will now create an immediate
    	// failure.  Applications using this should be prepared to deal with
    	// such failures.  Note that not all protocols respect this -- best
    	// effort protocols will particularly not support this.
    	OptionFailNoPeers = "FAIL-NO-PEERS"
          
    const (
    	// PipeEventAttaching is called before the Pipe is registered with the
    	// socket.  The intention is to permit the application to reject
    	// a pipe before it is attached.
    	PipeEventAttaching = iota
    	// PipeEventAttached occurs after the Pipe is attached.
    	// Consequently, it is possible to use the Pipe for delivering
    	// events to sockets, etc.
    	PipeEventAttached
    	// PipeEventDetached occurs after the Pipe has been detached
    	// from the socket.
    	PipeEventDetached
      

    Useful constants for protocol numbers. Note that the major protocol number is stored in the upper 12 bits, and the minor (subprotocol) is located in the bottom 4 bits.

    Variables

    This section is empty.

    Functions

    func Device(s1 Socket, s2 Socket) error

    Device is used to create a forwarding loop between two sockets. If the same socket is listed (or either socket is nil), then a loopback device is established instead. Note that the single socket case is only valid for protocols where the underlying protocol can peer for itself (e.g. PAIR, or BUS, but not REQ/REP or PUB/SUB!)

    If the plumbing is successful, nil will be returned. Two threads will be established to forward messages in each direction. If either socket returns error on receive or send, the goroutine doing the forwarding will exit. This means that closing either socket will generally cause the goroutines to exit. Apart from closing the socket(s), no further operations should be performed against the socket.

    Both sockets should be RAW; use of a "cooked" socket will result in ErrNotRaw.

    type Context interface {
    	// Close closes the open Socket.  Further operations on the socket
    	// will return ErrClosed.
    	Close() error
    	// GetOption is used to retrieve an option for a socket.
    	GetOption(name string) (interface{}, error)
    	// SetOption is used to set an option for a socket.
    	SetOption(name string, value interface{}) error
    	// Send puts the message on the outbound send queue.  It blocks
    	// until the message can be queued, or the send deadline expires.
    	// If a queued message is later dropped for any reason,
    	// there will be no notification back to the application.
    	Send([]byte) error
    	// Recv receives a complete message.  The entire message is received.
    	Recv() ([]byte, error)
    	// SendMsg puts the message on the outbound send.  It works like Send,
    	// but allows the caller to supply message headers.  AGAIN, the Socket
    	// ASSUMES OWNERSHIP OF THE MESSAGE.
    	SendMsg(*Message) error
    	// RecvMsg receives a complete message, including the message header,
    	// which is useful for protocols in raw mode.
    	RecvMsg() (*Message, error)
      

    Context is a protocol context, and represents the upper side operations that applications will want to use. Every socket has a default context, but only a certain protocols will allow the creation of additional Context instances (only if separate stateful contexts make sense for a given protocol).

    type Dialer interface {
    	// Close closes the dialer, and removes it from any active socket.
    	// Further operations on the Dialer will return ErrClosed.
    	Close() error
    	// Dial starts connecting on the address.  If a connection fails,
    	// it will restart.
    	Dial() error
    	// Address returns the string (full URL) of the Listener.
    	Address() string
    	// SetOption sets an option on the Dialer. Setting options
    	// can only be done before Dial() has been called.
    	SetOption(name string, value interface{}) error
    	// GetOption gets an option value from the Listener.
    	GetOption(name string) (interface{}, error)
      

    Dialer is an interface to the underlying dialer for a transport and address.

    type Listener interface {
    	// Close closes the listener, and removes it from any active socket.
    	// Further operations on the Listener will return ErrClosed.
    	Close() error
    	// Listen starts listening for new connectons on the address.
    	Listen() error
    	// Address returns the string (full URL) of the Listener.
    	Address() string
    	// SetOption sets an option on the Listener. Setting options
    	// can only be done before Listen() has been called.
    	SetOption(name string, value interface{}) error
    	// GetOption gets an option value from the Listener.
    	GetOption(name string) (interface{}, error)
      

    Listener is an interface to the underlying listener for a transport and address.

    type Message struct {
    	// Header carries any protocol (SP) specific header.  Applications
    	// should not modify or use this unless they are using Raw mode.
    	// No user data may be placed here.
    	Header []byte
    	// Body carries the body of the message.  This can also be thought
    	// of as the message "payload".
    	Body []byte
    	// Pipe may be set on message receipt, to indicate the Pipe from
    	// which the Message was received.  There are no guarantees that the
    	// Pipe is still active, and applications should only use this for
    	// informational purposes.
    	Pipe Pipe
    	// contains filtered or unexported fields
      

    Message encapsulates the messages that we exchange back and forth. The meaning of the Header and Body fields, and where the splits occur, will vary depending on the protocol. Note however that any headers applied by transport layers (including TCP/ethernet headers, and SP protocol independent length headers), are *not* included in the Header.

    func NewMessage(sz int) *Message

    NewMessage is the supported way to obtain a new Message. This makes use of a "cache" which greatly reduces the load on the garbage collector.

    func (m *Message) Clone()

    Clone bumps the reference count on the message, allowing it to be shared. Callers of this MUST ensure that the message is never modified. If a read-only copy needs to be made "unique", callers can do so by using the Uniq function.

    func (m *Message) Dup() *Message

    Dup creates a "duplicate" message. The message is made as a deep copy, so the resulting message is safe to modify.

    func (m *Message) Free()

    Free releases the message to the pool from which it was allocated. While this is not strictly necessary thanks to GC, doing so allows for the resources to be recycled without engaging GC. This can have rather substantial benefits for performance.

    func (m *Message) MakeUnique() *Message

    MakeUnique ensures that the message is not shared. If the reference count on the message is one, then the message is returned as is. Otherwise a new copy of hte message is made, and the reference count on the original is dropped. Note that it is an error for the caller to use the original message after this function; the caller should always do `m = m.MakeUnique()`. This function should be called whenever the message is leaving the control of the caller, such as when passing it to a user program.

    Note that transports always should call this on their transmit path if they are going to modify the message. (Most do not.) // ID returns the numeric ID for this Pipe. This will be a // 31 bit (bit 32 is clear) value for the Pipe, which is unique // across all other Pipe instances in the application, while // this Pipe exists. (IDs are recycled on Close, but only after // all other Pipe values are used.) ID() uint32 // Address returns the address (URL form) associated with the Pipe. // This matches the string passed to Dial() or Listen(). Address() string // GetOption returns an arbitrary option. The details will vary // for different transport types. GetOption(name string) (interface{}, error) // Listener returns the Listener for this Pipe, or nil if none. Listener() Listener // Dialer returns the Dialer for this Pipe, or nil if none. Dialer() Dialer // Close closes the Pipe. This does a disconnect, or something similar. // Note that if a dialer is present and active, it will redial. Close() error

    Pipe represents the high level interface to a low level communications channel. There is one of these associated with a given TCP connection, for example. This interface is intended for application use.

    Note that applications cannot send or receive data on a Pipe directly.

    type PipeEventHook func(PipeEvent, Pipe)

    PipeEventHook is an application supplied function to be called when events occur relating to a Pipe. // AddPipe is called when a new Pipe is added to the socket. // Typically, this is as a result of connect or accept completing. // The pipe ID will be unique for the socket at this time. // The implementation must not call back into the socket, but it // may reject the pipe by returning a non-nil result. AddPipe(ProtocolPipe) error // RemovePipe is called when a Pipe is removed from the socket. // Typically, this indicates a disconnected or closed connection. // This is called exactly once, after the underlying transport pipe // is closed. The Pipe ID will still be valid. RemovePipe(ProtocolPipe) // OpenContext is a request to create a unique instance of the // protocol state machine, allowing concurrent use of states on // a given protocol socket. Protocols that don't support this // should return ErrProtoOp. OpenContext() (ProtocolContext, error)

    ProtocolBase provides the protocol-specific handling for sockets. This is the new style API for sockets, and is how protocols provide their specific handling. // SendMsg sends the message. The message may be queued, or // may be delivered immediately, depending on the nature of // the protocol. On success, the context assumes ownership // of the message. On error, the caller retains ownership, // and may either resend the message or dispose of it otherwise. SendMsg(*Message) error // RecvMsg receives a complete message, including the message header, // which is useful for protocols in raw mode. RecvMsg() (*Message, error) // GetOption is used to retrieve the current value of an option. // If the protocol doesn't recognize the option, EBadOption should // be returned. GetOption(string) (interface{}, error) // SetOption is used to set an option. EBadOption is returned if // the option name is not recognized, EBadValue if the value is // invalid. SetOption(string, interface{}) error

    ProtocolContext is a "context" for a protocol, which contains the various stateful operations such as timers, etc. necessary for running the protocol. This is separable from the protocol itself as the protocol may permit the creation of multiple contexts.

    type ProtocolPipe interface {
    	// ID returns a unique 31-bit value associated with this.
    	// The value is unique for a given socket, at a given time.
    	ID() uint32
    	// Close does what you think.
    	Close() error
    	// SendMsg sends a message.  On success it returns nil. This is a
    	// blocking call.
    	SendMsg(*Message) error
    	// RecvMsg receives a message.  It blocks until the message is
    	// received.  On error, the pipe is closed and nil is returned.
    	RecvMsg() *Message
    	// SetPrivate is used to set protocol private data.
    	SetPrivate(interface{})
    	// GetPrivate returns the previously stored protocol private data.
    	GetPrivate() interface{}
      

    ProtocolPipe represents the handle that a Protocol implementation has to the underlying stream transport. It can be thought of as one side of a TCP, IPC, or other type of connection.

    type Socket interface {
    	// Info returns information about the protocol (numbers and names)
    	// and peer protocol.
    	Info() ProtocolInfo
    	// Close closes the open Socket.  Further operations on the socket
    	// will return ErrClosed.
    	Close() error
    	// Send puts the message on the outbound send queue.  It blocks
    	// until the message can be queued, or the send deadline expires.
    	// If a queued message is later dropped for any reason,
    	// there will be no notification back to the application.
    	Send([]byte) error
    	// Recv receives a complete message.  The entire message is received.
    	Recv() ([]byte, error)
    	// SendMsg puts the message on the outbound send.  It works like Send,
    	// but allows the caller to supply message headers.  AGAIN, the Socket
    	// ASSUMES OWNERSHIP OF THE MESSAGE.
    	SendMsg(*Message) error
    	// RecvMsg receives a complete message, including the message header,
    	// which is useful for protocols in raw mode.
    	RecvMsg() (*Message, error)
    	// Dial connects a remote endpoint to the Socket.  The function
    	// returns immediately, and an asynchronous goroutine is started to
    	// establish and maintain the connection, reconnecting as needed.
    	// If the address is invalid, then an error is returned.
    	Dial(addr string) error
    	DialOptions(addr string, options map[string]interface{}) error
    	// NewDialer returns a Dialer object which can be used to get
    	// access to the underlying configuration for dialing.
    	NewDialer(addr string, options map[string]interface{}) (Dialer, error)
    	// Listen connects a local endpoint to the Socket.  Remote peers
    	// may connect (e.g. with Dial) and will each be "connected" to
    	// the Socket.  The accepter logic is run in a separate goroutine.
    	// The only error possible is if the address is invalid.
    	Listen(addr string) error
    	ListenOptions(addr string, options map[string]interface{}) error
    	NewListener(addr string, options map[string]interface{}) (Listener, error)
    	// GetOption is used to retrieve an option for a socket.
    	GetOption(name string) (interface{}, error)
    	// SetOption is used to set an option for a socket.
    	SetOption(name string, value interface{}) error
    	// OpenContext creates a new Context.  If a protocol does not
    	// support separate contexts, this will return an error.
    	OpenContext() (Context, error)
    	// SetPipeEventHook sets a PipeEventHook function to be called when a
    	// Pipe is added or removed from this socket (connect/disconnect).
    	// The previous hook is returned (nil if none.)  (Only one hook can
    	// be used at a time.)
    	SetPipeEventHook(PipeEventHook) PipeEventHook
      

    Socket is the main access handle applications use to access the SP system. It is an abstraction of an application's "connection" to a messaging topology. Applications can have more than one Socket open at a time.

    type TranDialer interface {
    	// Dial is used to initiate a connection to a remote peer.
    	Dial() (TranPipe, error)
    	// SetOption sets a local option on the dialer.
    	// ErrBadOption can be returned for unrecognized options.
    	// ErrBadValue can be returned for incorrect value types.
    	SetOption(name string, value interface{}) error
    	// GetOption gets a local option from the dialer.
    	// ErrBadOption can be returned for unrecognized options.
    	GetOption(name string) (value interface{}, err error)
      

    TranDialer represents the client side of a connection. Clients initiate the connection.

    TranDialer is only intended for use by transport implementors, and should not be directly used in applications.

    type TranListener interface {
    	// Listen actually begins listening on the interface.  It is
    	// called just prior to the Accept() routine normally. It is
    	// the socket equivalent of bind()+listen().
    	Listen() error
    	// Accept completes the server side of a connection.  Once the
    	// connection is established and initial handshaking is complete,
    	// the resulting connection is returned to the client.
    	Accept() (TranPipe, error)
    	// Close ceases any listening activity, and will specifically close
    	// any underlying file descriptor.  Once this is done, the only way
    	// to resume listening is to create a new Server instance.  Presumably
    	// this function is only called when the last reference to the server
    	// is about to go away.  Established connections are unaffected.
    	Close() error
    	// SetOption sets a local option on the listener.
    	// ErrBadOption can be returned for unrecognized options.
    	// ErrBadValue can be returned for incorrect value types.
    	SetOption(name string, value interface{}) error
    	// GetOption gets a local option from the listener.
    	// ErrBadOption can be returned for unrecognized options.
    	GetOption(name string) (value interface{}, err error)
    	// Address gets the local address.  The value may not be meaningful
    	// until Listen() has been called.
    	Address() string
      

    TranListener represents the server side of a connection. Servers respond to a connection request from clients.

    TranListener is only intended for use by transport implementors, and should not be directly used in applications.

    type TranPipe interface {
    	// Send sends a complete message.  In the event of a partial send,
    	// the Pipe will be closed, and an error is returned.  For reasons
    	// of efficiency, we allow the message to be sent in a scatter/gather
    	// list.
    	Send(*Message) error
    	// Recv receives a complete message.  In the event that either a
    	// complete message could not be received, an error is returned
    	// to the caller and the Pipe is closed.
    	// To mitigate Denial-of-Service attacks, we limit the max message
    	// size to 1M.
    	Recv() (*Message, error)
    	// Close closes the underlying transport.  Further operations on
    	// the Pipe will result in errors.  Note that messages that are
    	// queued in transport buffers may still be received by the remote
    	// peer.
    	Close() error
    	// GetOption returns an arbitrary transport specific option on a
    	// pipe.  Options for pipes are read-only and specific to that
    	// particular connection. If the property doesn't exist, then
    	// ErrBadOption should be returned.
    	GetOption(string) (interface{}, error)
      

    TranPipe behaves like a full-duplex message-oriented connection between two peers. Callers may call operations on a Pipe simultaneously from different goroutines. (These are different from net.Conn because they provide message oriented semantics.)

    Pipe is only intended for use by transport implementors, and should not be directly used in applications.

    type Transport interface {
    	// Scheme returns a string used as the prefix for SP "addresses".
    	// This is similar to a URI scheme.  For example, schemes can be
    	// "tcp" (for "tcp://xxx..."), "ipc", "inproc", etc.
    	Scheme() string
    	// NewDialer creates a new Dialer for this Transport.
    	NewDialer(url string, sock Socket) (TranDialer, error)
    	// NewListener creates a new PipeListener for this Transport.
    	// This generally also arranges for an OS-level file descriptor to be
    	// opened, and bound to the the given address, as well as establishing
    	// any "listen" backlog.
    	NewListener(url string, sock Socket) (TranListener, error)
      

    Transport is the interface for transport suppliers to implement.

    Package errors just defines some constant error codes, and is intended to be directly imported.
    Package errors just defines some constant error codes, and is intended to be directly imported.
    context implements a request/reply server that utilizes a pool of worker goroutines to service multiple requests simultaneously.
    context implements a request/reply server that utilizes a pool of worker goroutines to service multiple requests simultaneously.
    pair implements a pair example.
    pair implements a pair example. pipeline
    pipeline implements a one way pipe example.
    pipeline implements a one way pipe example. pubsub
    pubsub implements a publish/subscribe example.
    pubsub implements a publish/subscribe example.
    raw implements an example concurrent request/reply server, using the raw server socket.
    raw implements an example concurrent request/reply server, using the raw server socket. reqrep
    reqprep implements a request/reply example.
    reqprep implements a request/reply example. survey
    survey implements a survey example.
    survey implements a survey example. websocket
    websocket implements a simple websocket server for mangos, demonstrating how to use multiplex multiple sockets on a single HTTP server instance.
    websocket implements a simple websocket server for mangos, demonstrating how to use multiplex multiple sockets on a single HTTP server instance.
    Package pull implements the PULL protocol, which is the read side of the pipeline pattern.
    Package pull implements the PULL protocol, which is the read side of the pipeline pattern.
    Package push implements the PUSH protocol, which is the write side of the pipeline pattern.
    Package push implements the PUSH protocol, which is the write side of the pipeline pattern.
    Package rep implements the REP protocol, which is the response side of the request/response pattern.
    Package rep implements the REP protocol, which is the response side of the request/response pattern.
    Package req implements the REQ protocol, which is the request side of the request/response pattern.
    Package req implements the REQ protocol, which is the request side of the request/response pattern. respondent
    Package respondent implements the RESPONDENT protocol, which is the response side of the survey pattern.
    Package respondent implements the RESPONDENT protocol, which is the response side of the survey pattern.
    Package star implements a new, experimental protocol called "STAR".
    Package star implements a new, experimental protocol called "STAR".
    Package sub implements the SUB protocol.
    Package sub implements the SUB protocol. surveyor
    Package surveyor implements the SURVEYOR protocol.
    Package surveyor implements the SURVEYOR protocol.
    Package xbus implements the BUS protocol.
    Package xbus implements the BUS protocol. xpair
    Package xpair implements the PAIR protocol.
    Package xpair implements the PAIR protocol. xpair1
    Package xpair1 implements the PAIRv1 protocol in monogamous mode only.
    Package xpair1 implements the PAIRv1 protocol in monogamous mode only.
    Package xpub implements the PUB protocol.
    Package xpub implements the PUB protocol. xpull
    Package xpull implements the PULL protocol.
    Package xpull implements the PULL protocol. xpush
    Package xpush implements the raw PUSH protocol.
    Package xpush implements the raw PUSH protocol.
    Package xrep implements the raw REP protocol, which is the response side of the request/response pattern.
    Package xrep implements the raw REP protocol, which is the response side of the request/response pattern.
    Package xreq implements the raw REQ protocol, which is the request side of the request/response pattern.
    Package xreq implements the raw REQ protocol, which is the request side of the request/response pattern. xrespondent
    Package xrespondent implements the raw RESPONDENT protocol, which is the response side of survey pattern.
    Package xrespondent implements the raw RESPONDENT protocol, which is the response side of survey pattern. xstar
    Package xstar implements the experimental star protocol.
    Package xstar implements the experimental star protocol.
    Package xsub implements the raw SUB protocol.
    Package xsub implements the raw SUB protocol. xsurveyor
    Package xsurveyor implements the SURVEYOR protocol.
    Package xsurveyor implements the SURVEYOR protocol.
    go.dev uses cookies from Google to deliver and enhance the quality of its services and to analyze traffic. Learn more.