千呼万唤始出来,这个函数特别长,我们慢慢来看它的实现逻辑。
由于 linux 内核原生的 ipvs 模式只支持 DNAT,不支持 SNAT,所以,在以下几种场景中 ipvs 仍需要依赖 iptables 规则:
kube-proxy 启动时指定 –masquerade-all=true 参数,即集群中所有经过 kube-proxy 的包都做一次 SNAT。
kube-proxy 启动时指定 –cluster-cidr= 参数。
对于 Load Balancer 类型的 service,用于配置白名单。
对于 NodePort 类型的 service,用于配置 MASQUERADE。
对于 externalIPs 类型的 service 。但对于 ipvs 模式的 kube-proxy,无论有多少 pod/service,iptables 的规则数都是固定的。
s.Proxier.SyncLoop() 函数
内容实在太长太多了,我们分几个步骤来看。
检查 Proxier 是否已经完成初始化了,否的话,直接退出函数。
记录同步时长。
合并更新 proxier.serviceMap map 对象的内容,使内容永远保持最新,内容的保存形式 {“ns, svc名”, “端口名”, “TCP”}: {“172.16.55.10”, 1234, “TCP”} ,同时清理掉 map 对象过期的端口信息,并收集过期端口为 udp 协议的 ip 待后续使用。
计算得到过时的规则,根据这些过时的规则可以用于后续清理操作。
iptables 前置操作,确保前置规则链存在。
确保本机已创建 dummy 网卡和 ipset 默认列表,默认为 kube-ipvs0。为什么要创建 dummy 网卡?因为 ipvs netfilter 的 DNAT 钩子挂载在 INPUT 链上,当访问 ClusterIP 时,将 ClusterIP 绑定在 dummy 网卡上为了让内核识别该 IP 就是本机 IP,进而进入 INPUT 链,然后通过钩子函数 ip_vs_in 转发到 POSTROUTING 链;将 ClusterIP 绑定到 dummy 网卡;为每个 ClusterIP 创建 IPVS virtual servers 和 real server,分别对应 service 和 endpoints;
func (proxier *Proxier) syncProxyRules() {
proxier.mu.Lock()
defer proxier.mu.Unlock()
// 检查 Proxier 是否已经完成初始化了,否的话,直接退出函数。
if !proxier.isInitialized() {
klog.V(2).InfoS("Not syncing ipvs rules until Services and Endpoints have been received from master")
return
// 录同步时长。
start := time.Now()
defer func() {
metrics.SyncProxyRulesLatency.Observe(metrics.SinceInSeconds(start))
klog.V(4).InfoS("syncProxyRules complete", "elapsed", time.Since(start))
// We assume that if this was called, we really want to sync them,
// even if nothing changed in the meantime. In other words, callers are
// responsible for detecting no-op changes and not calling this function.
// 合并更新 proxier.serviceMap map 对象的内容,使内容永远保持最新,内容的保存形式 {"ns, svc名", "端口名", "TCP"}: {"172.16.55.10", 1234, "TCP"} ,同时清理掉 map 对象过期的端口信息,并收集过期端口为 udp 协议的 ip 待后续使用,然后将 changes 置为空。
serviceUpdateResult := proxier.serviceMap.Update(proxier.serviceChanges)
// 同上
endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
staleServices := serviceUpdateResult.UDPStaleClusterIP
// merge stale services gathered from updateEndpointsMap
// 计算得到过时的规则,根据这些过时的规则可以用于后续清理操作。
for _, svcPortName := range endpointUpdateResult.StaleServiceNames {
if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) {
klog.V(2).InfoS("Stale service", "protocol", strings.ToLower(string(svcInfo.Protocol())), "svcPortName", svcPortName.String(), "clusterIP", svcInfo.ClusterIP().String())
staleServices.Insert(svcInfo.ClusterIP().String())
for _, extIP := range svcInfo.ExternalIPStrings() {
staleServices.Insert(extIP)
for _, extIP := range svcInfo.LoadBalancerIPStrings() {
staleServices.Insert(extIP)
klog.V(3).InfoS("Syncing ipvs Proxier rules")
// Begin install iptables
// iptables 前置操作
// Reset all buffers used later.
// This is to avoid memory reallocations and thus improve performance.
proxier.natChains.Reset()
proxier.natRules.Reset()
proxier.filterChains.Reset()
proxier.filterRules.Reset()
// Write table headers.
utilproxy.WriteLine(proxier.filterChains, "*filter")
utilproxy.WriteLine(proxier.natChains, "*nat")
// 确保前置规则链存在
// 检查 nat 表 KUBE-MARK-DROP chain 是否存在,不修改任何规则
// 检查 nat 表 KUBE-SERVICES KUBE-POSTROUTING KUBE-FIREWALL KUBE-NODE-PORT KUBE-LOAD-BALANCER KUBE-MARK-MASQ chain 是否存在并加载到 buff
// 检查 filter 表 KUBE-FORWARD KUBE-NODE-PORT chain 是否存在并加载到 buff
// 检查以下规则,不存在则创建
// -I nat OUTPUT -m comment --comment "kubernetes service portals" -j KUBE-SERVICES
// -I nat PREROUTING -m comment --comment "kubernetes service portals" -j KUBE-SERVICES
// -I nat POSTROUTING -m comment --comment "kubernetes postrouting rules" -j KUBE-POSTROUTING
// -I filter FORWARD -m comment --comment "kubernetes forwarding rules" -j KUBE-FORWARD
// -I filter INPUT -m comment --comment "kubernetes health check rules" -j KUBE-NODE-PORT
proxier.createAndLinkKubeChain()
// make sure dummy interface exists in the system where ipvs Proxier will bind service address on it
// 确保本机已创建 dummy 网卡,默认为 kube-ipvs0。为什么要创建 dummy 网卡?因为 ipvs netfilter 的 DNAT 钩子挂载在 INPUT 链上,当访问 ClusterIP 时,将 ClusterIP 绑定在 dummy 网卡上为了让内核识别该 IP 就是本机 IP,进而进入 INPUT 链,然后通过钩子函数 ip_vs_in 转发到 POSTROUTING 链;将 ClusterIP 绑定到 dummy 网卡;为每个 ClusterIP 创建 IPVS virtual servers 和 real server,分别对应 service 和 endpoints;
_, err := proxier.netlinkHandle.EnsureDummyDevice(DefaultDummyDevice)
if err != nil {
klog.ErrorS(err, "Failed to create dummy interface", "interface", DefaultDummyDevice)
return
// make sure ip sets exists in the system.
// 检查 ipset 集是否存在
// KUBE-LOOP-BACK hash:ip,port,ip "Kubernetes endpoints dst ip:port, source ip for solving hairpin purpose"
// KUBE-CLUSTER-IP hash:ip,port "Kubernetes service cluster ip + port for masquerade purpose"
// KUBE-EXTERNAL-IP hash:ip,port "Kubernetes service external ip + port for masquerade and filter purpose"
// KUBE-EXTERNAL-IP-LOCAL hash:ip,port "Kubernetes service external ip + port with externalTrafficPolicy=local"
// KUBE-LOAD-BALANCER hash:ip,port "Kubernetes service lb portal"
// KUBE-LOAD-BALANCER-FW hash:ip,port "Kubernetes service load balancer ip + port for load balancer with sourceRange"
// KUBE-LOAD-BALANCER-LOCAL hash:ip,port "Kubernetes service load balancer ip + port with externalTrafficPolicy=local"
// KUBE-LOAD-BALANCER-SOURCE-IP hash:ip,port,ip "Kubernetes service load balancer ip + port + source IP for packet filter purpose"
// KUBE-LOAD-BALANCER-SOURCE-CIDR hash:ip,port,net "Kubernetes service load balancer ip + port + source cidr for packet filter purpose"
// KUBE-NODE-PORT-TCP bitmap:port "Kubernetes nodeport TCP port for masquerade purpose"
// KUBE-NODE-PORT-LOCAL-TCP bitmap:port "Kubernetes nodeport TCP port with externalTrafficPolicy=local"
// KUBE-NODE-PORT-UDP bitmap:port "Kubernetes nodeport UDP port for masquerade purpose"
// KUBE-NODE-PORT-LOCAL-UDP bitmap:port "Kubernetes nodeport UDP port with externalTrafficPolicy=local"
// KUBE-NODE-PORT-SCTP-HASH hash:ip,port "Kubernetes nodeport SCTP port for masquerade purpose with type 'hash ip:port'"
// KUBE-NODE-PORT-LOCAL-SCTP-HASH hash:ip,port "Kubernetes nodeport SCTP port with externalTrafficPolicy=local with type 'hash ip:port'"
// KUBE-HEALTH-CHECK-NODE-PORT bitmap:port "Kubernetes health check node port"
for _, set := range proxier.ipsetList {
if err := ensureIPSet(set); err != nil {
return
// 重置 set.activeEntries
set.resetEntries()
// activeIPVSServices represents IPVS service successfully created in this round of sync
activeIPVSServices := map[string]bool{}
// currentIPVSServices represent IPVS services listed from the system
currentIPVSServices := make(map[string]*utilipvs.VirtualServer)
// activeBindAddrs represents ip address successfully bind to DefaultDummyDevice in this round of sync
activeBindAddrs := map[string]bool{}
// 获取机器上绑定的 ip,相当于执行 $ ip route show table local type local proto kernel dev kube-ipvs0
// 10.0.0.1 scope host src 10.0.0.1
// 10.0.0.10 scope host src 10.0.0.10
// 截取唯一源 IP 字段,
// --> result set: [10.0.0.1, 10.0.0.10]
bindedAddresses, err := proxier.ipGetter.BindedIPs()
if err != nil {
klog.ErrorS(err, "error listing addresses binded to dummy interface")
// 检查NodePort
hasNodePort := false
for _, svc := range proxier.serviceMap {
svcInfo, ok := svc.(*serviceInfo)
if ok && svcInfo.NodePort() != 0 {
hasNodePort = true
break
// Both nodeAddresses and nodeIPs can be reused for all nodePort services
// and only need to be computed if we have at least one nodePort service.
var (
// List of node addresses to listen on if a nodePort is set.
nodeAddresses []string
// List of node IP addresses to be used as IPVS services if nodePort is set.
nodeIPs []net.IP
// 获取主机节点 ip
if hasNodePort {
nodeAddrSet, err := utilproxy.GetNodeAddresses(proxier.nodePortAddresses, proxier.networkInterfacer)
if err != nil {
klog.ErrorS(err, "Failed to get node ip address matching nodeport cidr")
} else {
nodeAddresses = nodeAddrSet.List()
for _, address := range nodeAddresses {
a := net.ParseIP(address)
if a.IsLoopback() {
continue
if utilproxy.IsZeroCIDR(address) {
nodeIPs, err = proxier.ipGetter.NodeIPs()
if err != nil {
klog.ErrorS(err, "Failed to list all node IPs from host")
break
nodeIPs = append(nodeIPs, a)
// 过滤对应的 ip 协议簇的 ip
idx := 0
for _, nodeIP := range nodeIPs {
if (proxier.ipFamily == v1.IPv6Protocol) == utilnet.IsIPv6(nodeIP) {
nodeIPs[idx] = nodeIP
idx++
// reset slice to filtered entries
nodeIPs = nodeIPs[:idx]
配置 ipset/ipvs 规则
为每个 service 绑定 ipvs 规则。
先配置 ipset 集。
2.1 配置回环地址、clusterIp集: ipset KUBE-LOOP-BACK,KUBE-CLUSTER-IP 。
2.2 配置 EXTERNAL-IP 集: KUBE-EXTERNAL-IP-LOCAL,KUBE-EXTERNAL-IP 。
2.3 配置 LOAD-BALANCER 集: KUBE-LOAD-BALANCER,UBE-LOAD-BALANCER-LOCAL,KUBE-LOAD-BALANCER-FW,KUBE-LOAD-BALANCER-SOURCE-CIDR,KUBE-LOAD-BALANCER-SOURCE-IP 。
2.4 配置 NodePort 集: KUBE-NODE-PORT-TCP,KUBE-NODE-PORT-UDP,KUBE-NODE-PORT-SCTP-HASH,KUBE-NODE-PORT-LOCAL-TCP,KUBE-NODE-PORT-LOCAL-TCP,KUBE-NODE-PORT-LOCAL-SCTP-HASH 。
// 为每个 service 绑定 ipvs 规则
for svcName, svc := range proxier.serviceMap {
svcInfo, ok := svc.(*serviceInfo)
if !ok {
klog.ErrorS(nil, "Failed to cast serviceInfo", "svcName", svcName.String())
continue
isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP())
localPortIPFamily := utilnet.IPv4
if isIPv6 {
localPortIPFamily = utilnet.IPv6
protocol := strings.ToLower(string(svcInfo.Protocol()))
// Precompute svcNameString; with many services the many calls
// to ServicePortName.String() show up in CPU profiles.
svcNameString := svcName.String()
// Handle traffic that loops back to the originator with SNAT.
// 配置 ipset KUBE-LOOP-BACK 集
for _, e := range proxier.endpointsMap[svcName] {
ep, ok := e.(*proxy.BaseEndpointInfo)
if !ok {
klog.ErrorS(nil, "Failed to cast BaseEndpointInfo", "endpoint", e.String())
continue
if !ep.IsLocal {
continue
epIP := ep.IP()
epPort, err := ep.Port()
// Error parsing this endpoint has been logged. Skip to next endpoint.
if epIP == "" || err != nil {
continue
// 组装成 ipset 条目
entry := &utilipset.Entry{
IP: epIP,
Port: epPort,
Protocol: protocol,
IP2: epIP,
SetType: utilipset.HashIPPortIP,
if valid := proxier.ipsetList[kubeLoopBackIPSet].validateEntry(entry); !valid {
klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoopBackIPSet].Name)
continue
// ipset 条目插入到 KUBE-LOOP-BACK 集下
proxier.ipsetList[kubeLoopBackIPSet].activeEntries.Insert(entry.String())
// Capture the clusterIP.
// 获取 clusterIP 配置 ipset 集
// ipset call
entry := &utilipset.Entry{
IP: svcInfo.ClusterIP().String(),
Port: svcInfo.Port(),
Protocol: protocol,
SetType: utilipset.HashIPPort,
// add service Cluster IP:Port to kubeServiceAccess ip set for the purpose of solving hairpin.
// service IP 加入到 kubeServiceAccess ip set 下是为了解决 hairpin ?
// proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String())
if valid := proxier.ipsetList[kubeClusterIPSet].validateEntry(entry); !valid {
klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeClusterIPSet].Name)
continue
// ipset 条目插入到 KUBE-CLUSTER-IP 表下
proxier.ipsetList[kubeClusterIPSet].activeEntries.Insert(entry.String())
// ipvs call,定义 ipvs 规则
serv := &utilipvs.VirtualServer{
Address: svcInfo.ClusterIP(),
Port: uint16(svcInfo.Port()),
Protocol: string(svcInfo.Protocol()),
Scheduler: proxier.ipvsScheduler,
// Set session affinity flag and timeout for IPVS service
if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
serv.Flags |= utilipvs.FlagPersistent
serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
// We need to bind ClusterIP to dummy interface, so set `bindAddr` parameter to `true` in syncService()
// 同步 ipvs 规则, IPVS 规则不存在则创建, 存在则检查是否有变更,更新规则, 更新期间 vip 不会 down 掉
if err := proxier.syncService(svcNameString, serv, true, bindedAddresses); err == nil {
activeIPVSServices[serv.String()] = true
activeBindAddrs[serv.Address.String()] = true
// ExternalTrafficPolicy only works for NodePort and external LB traffic, does not affect ClusterIP
// So we still need clusterIP rules in onlyNodeLocalEndpoints mode.
// 检查有没有开启 internalNodeLocal 特性,只路由内部流量到和发起方处于相同节点的服务端点
internalNodeLocal := false
if utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) && svcInfo.NodeLocalInternal() {
internalNodeLocal = true
if err := proxier.syncEndpoint(svcName, internalNodeLocal, serv); err != nil {
klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv.String())
} else {
klog.ErrorS(err, "Failed to sync service", "service", serv.String())
// Capture externalIPs.
// 获取 externalIPs 配置 ipset 集
for _, externalIP := range svcInfo.ExternalIPStrings() {
// ipset call
entry := &utilipset.Entry{
IP: externalIP,
Port: svcInfo.Port(),
Protocol: protocol,
SetType: utilipset.HashIPPort,
// 检查策略是否为 Local
if svcInfo.NodeLocalExternal() {
if valid := proxier.ipsetList[kubeExternalIPLocalSet].validateEntry(entry); !valid {
klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeExternalIPLocalSet].Name)
continue
// 加入到 KUBE-EXTERNAL-IP-LOCAL ip set
proxier.ipsetList[kubeExternalIPLocalSet].activeEntries.Insert(entry.String())
} else {
// We have to SNAT packets to external IPs.
if valid := proxier.ipsetList[kubeExternalIPSet].validateEntry(entry); !valid {
klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeExternalIPSet].Name)
continue
// 加入到 KUBE-EXTERNAL-IP ip set
proxier.ipsetList[kubeExternalIPSet].activeEntries.Insert(entry.String())
// ipvs call
// 创建 ipvs 规则
serv := &utilipvs.VirtualServer{
Address: net.ParseIP(externalIP),
Port: uint16(svcInfo.Port()),
Protocol: string(svcInfo.Protocol()),
Scheduler: proxier.ipvsScheduler,
if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
serv.Flags |= utilipvs.FlagPersistent
serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
if err := proxier.syncService(svcNameString, serv, true, bindedAddresses); err == nil {
activeIPVSServices[serv.String()] = true
activeBindAddrs[serv.Address.String()] = true
if err := proxier.syncEndpoint(svcName, svcInfo.NodeLocalExternal(), serv); err != nil {
klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv.String())
} else {
klog.ErrorS(err, "Failed to sync service", "service", serv.String())
// Capture load-balancer ingress.
// 获取 lb 入口 配置 ipset 集
for _, ingress := range svcInfo.LoadBalancerIPStrings() {
if ingress != "" {
// ipset call
entry = &utilipset.Entry{
IP: ingress,
Port: svcInfo.Port(),
Protocol: protocol,
SetType: utilipset.HashIPPort,
// add service load balancer ingressIP:Port to kubeServiceAccess ip set for the purpose of solving hairpin.
// proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String())
// If we are proxying globally, we need to masquerade in case we cross nodes.
// If we are proxying only locally, we can retain the source IP.
if valid := proxier.ipsetList[kubeLoadBalancerSet].validateEntry(entry); !valid {
klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoadBalancerSet].Name)
continue
// 加入到 KUBE-LOAD-BALANCER ip set
proxier.ipsetList[kubeLoadBalancerSet].activeEntries.Insert(entry.String())
// insert loadbalancer entry to lbIngressLocalSet if service externaltrafficpolicy=local
// 策略 externaltrafficpolicy=local
if svcInfo.NodeLocalExternal() {
if valid := proxier.ipsetList[kubeLoadBalancerLocalSet].validateEntry(entry); !valid {
klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoadBalancerLocalSet].Name)
continue
proxier.ipsetList[kubeLoadBalancerLocalSet].activeEntries.Insert(entry.String())
if len(svcInfo.LoadBalancerSourceRanges()) != 0 {
// The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field.
// This currently works for loadbalancers that preserves source ips.
// For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply.
// KUBE-LOAD-BALANCER-FW ip set
if valid := proxier.ipsetList[kubeLoadbalancerFWSet].validateEntry(entry); !valid {
klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoadbalancerFWSet].Name)
continue
proxier.ipsetList[kubeLoadbalancerFWSet].activeEntries.Insert(entry.String())
allowFromNode := false
for _, src := range svcInfo.LoadBalancerSourceRanges() {
// ipset call
entry = &utilipset.Entry{
IP: ingress,
Port: svcInfo.Port(),
Protocol: protocol,
Net: src,
SetType: utilipset.HashIPPortNet,
// enumerate all white list source cidr KUBE-LOAD-BALANCER-SOURCE-CIDR
if valid := proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].validateEntry(entry); !valid {
klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].Name)
continue
proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].activeEntries.Insert(entry.String())
// ignore error because it has been validated
_, cidr, _ := net.ParseCIDR(src)
if cidr.Contains(proxier.nodeIP) {
allowFromNode = true
// generally, ip route rule was added to intercept request to loadbalancer vip from the
// loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly.
// Need to add the following rule to allow request on host.
// 放行 lb 白名单
if allowFromNode {
entry = &utilipset.Entry{
IP: ingress,
Port: svcInfo.Port(),
Protocol: protocol,
IP2: ingress,
SetType: utilipset.HashIPPortIP,
// enumerate all white list source ip KUBE-LOAD-BALANCER-SOURCE-IP
if valid := proxier.ipsetList[kubeLoadBalancerSourceIPSet].validateEntry(entry); !valid {
klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoadBalancerSourceIPSet].Name)
continue
proxier.ipsetList[kubeLoadBalancerSourceIPSet].activeEntries.Insert(entry.String())
// ipvs call
serv := &utilipvs.VirtualServer{
Address: net.ParseIP(ingress),
Port: uint16(svcInfo.Port()),
Protocol: string(svcInfo.Protocol()),
Scheduler: proxier.ipvsScheduler,
if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
serv.Flags |= utilipvs.FlagPersistent
serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
if err := proxier.syncService(svcNameString, serv, true, bindedAddresses); err == nil {
activeIPVSServices[serv.String()] = true
activeBindAddrs[serv.Address.String()] = true
if err := proxier.syncEndpoint(svcName, svcInfo.NodeLocalExternal(), serv); err != nil {
klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv)
} else {
klog.ErrorS(err, "Failed to sync service", "service", serv)
// 针对 NodePort 场景配置 ipset 集
if svcInfo.NodePort() != 0 {
if len(nodeAddresses) == 0 || len(nodeIPs) == 0 {
// Skip nodePort configuration since an error occurred when
// computing nodeAddresses or nodeIPs.
continue
var lps []utilnet.LocalPort
for _, address := range nodeAddresses {
lp := utilnet.LocalPort{
Description: "nodePort for " + svcNameString,
IP: address,
IPFamily: localPortIPFamily,
Port: svcInfo.NodePort(),
Protocol: utilnet.Protocol(svcInfo.Protocol()),
if utilproxy.IsZeroCIDR(address) {
// Empty IP address means all
lp.IP = ""
lps = append(lps, lp)
// If we encounter a zero CIDR, then there is no point in processing the rest of the addresses.
break
lps = append(lps, lp)
// For ports on node IPs, open the actual port and hold it.
for _, lp := range lps {
if svcInfo.Protocol() != v1.ProtocolSCTP && lp.Protocol == utilnet.UDP {
conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP)
// Nodeports need SNAT, unless they're local.
// 回程需要 snat 处理
// ipset call
var (
nodePortSet *IPSet
entries []*utilipset.Entry
switch protocol {
case utilipset.ProtocolTCP:
nodePortSet = proxier.ipsetList[kubeNodePortSetTCP]
entries = []*utilipset.Entry{{
// No need to provide ip info
Port: svcInfo.NodePort(),
Protocol: protocol,
SetType: utilipset.BitmapPort,
case utilipset.ProtocolUDP:
nodePortSet = proxier.ipsetList[kubeNodePortSetUDP]
entries = []*utilipset.Entry{{
// No need to provide ip info
Port: svcInfo.NodePort(),
Protocol: protocol,
SetType: utilipset.BitmapPort,
case utilipset.ProtocolSCTP:
nodePortSet = proxier.ipsetList[kubeNodePortSetSCTP]
// Since hash ip:port is used for SCTP, all the nodeIPs to be used in the SCTP ipset entries.
entries = []*utilipset.Entry{}
for _, nodeIP := range nodeIPs {
entries = append(entries, &utilipset.Entry{
IP: nodeIP.String(),
Port: svcInfo.NodePort(),
Protocol: protocol,
SetType: utilipset.HashIPPort,
default:
// It should never hit
klog.ErrorS(nil, "Unsupported protocol type", "protocol", protocol)
if nodePortSet != nil {
entryInvalidErr := false
for _, entry := range entries {
if valid := nodePortSet.validateEntry(entry); !valid {
klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", nodePortSet.Name)
entryInvalidErr = true
break
nodePortSet.activeEntries.Insert(entry.String())
if entryInvalidErr {
continue
// Add externaltrafficpolicy=local type nodeport entry
if svcInfo.NodeLocalExternal() {
var nodePortLocalSet *IPSet
switch protocol {
case utilipset.ProtocolTCP:
nodePortLocalSet = proxier.ipsetList[kubeNodePortLocalSetTCP]
case utilipset.ProtocolUDP:
nodePortLocalSet = proxier.ipsetList[kubeNodePortLocalSetUDP]
case utilipset.ProtocolSCTP:
nodePortLocalSet = proxier.ipsetList[kubeNodePortLocalSetSCTP]
default:
// It should never hit
klog.ErrorS(nil, "Unsupported protocol type", "protocol", protocol)
if nodePortLocalSet != nil {
entryInvalidErr := false
for _, entry := range entries {
if valid := nodePortLocalSet.validateEntry(entry); !valid {
klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", nodePortLocalSet.Name)
entryInvalidErr = true
break
nodePortLocalSet.activeEntries.Insert(entry.String())
if entryInvalidErr {
continue
// Build ipvs kernel routes for each node ip address
// 为节点 ip 配置 ipvs 规则
for _, nodeIP := range nodeIPs {
// ipvs call
// 创建 ipvs 规则
serv := &utilipvs.VirtualServer{
Address: nodeIP,
Port: uint16(svcInfo.NodePort()),
Protocol: string(svcInfo.Protocol()),
Scheduler: proxier.ipvsScheduler,
if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
serv.Flags |= utilipvs.FlagPersistent
serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
// There is no need to bind Node IP to dummy interface, so set parameter `bindAddr` to `false`.
if err := proxier.syncService(svcNameString, serv, false, bindedAddresses); err == nil {
activeIPVSServices[serv.String()] = true
if err := proxier.syncEndpoint(svcName, svcInfo.NodeLocalExternal(), serv); err != nil {
klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv)
} else {
klog.ErrorS(err, "Failed to sync service", "service", serv)
if svcInfo.HealthCheckNodePort() != 0 {
nodePortSet := proxier.ipsetList[kubeHealthCheckNodePortSet]
entry := &utilipset.Entry{
// No need to provide ip info
Port: svcInfo.HealthCheckNodePort(),
Protocol: "tcp",
SetType: utilipset.BitmapPort,
if valid := nodePortSet.validateEntry(entry); !valid {
klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", nodePortSet.Name)
continue
nodePortSet.activeEntries.Insert(entry.String())
writeIptablesRules() 会根据不同的场景判断是否创建与 ipset 对应的 iptables 规则,整理过程过于痛苦,查阅资料过程中,发现别人已经有整理好的了,于是做了一回伸手党。引用这里的
同步 ipset 条目,写入到节点系统。
根据前面配置的 ipset ,配置 iptables 规则。
// sync ipset entries
// 同步 ipset 条目,写入到节点系统。
for _, set := range proxier.ipsetList {
set.syncIPSetEntries()
// Tail call iptables rules for ipset, make sure only call iptables once
// in a single loop per ip set.
// 每次循环只调用一次配置 ipset 集的 iptables 规则,参考 https://www.qikqiak.com/post/how-to-use-ipvs-in-kubernetes/
proxier.writeIptablesRules()
// Sync iptables rules.
// NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table.
proxier.iptablesData.Reset()
proxier.iptablesData.Write(proxier.natChains.Bytes())
proxier.iptablesData.Write(proxier.natRules.Bytes())
proxier.iptablesData.Write(proxier.filterChains.Bytes())
proxier.iptablesData.Write(proxier.filterRules.Bytes())
klog.V(5).InfoS("Restoring iptables", "rules", string(proxier.iptablesData.Bytes()))
err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters)
if err != nil {
klog.ErrorS(err, "Failed to execute iptables-restore", "rules", string(proxier.iptablesData.Bytes()))
metrics.IptablesRestoreFailuresTotal.Inc()
return
2.1 如果 kube-proxy 配置了–masquerade-all=true参数,则 ipvs 将伪装所有访问 Service 的 Cluster IP 的流量,此时的行为和 iptables 是一致的,由 ipvs 添加的 iptables 规则如下:
# iptables -t nat -nL
Chain PREROUTING (policy ACCEPT)
target prot opt source destination
KUBE-SERVICES all -- 0.0.0.0/0 0.0.0.0/0 /* kubernetes service portals */
Chain OUTPUT (policy ACCEPT)
target prot opt source destination
KUBE-SERVICES all -- 0.0.0.0/0 0.0.0.0/0 /* kubernetes service portals */
Chain POSTROUTING (policy ACCEPT)
target prot opt source destination
KUBE-POSTROUTING all -- 0.0.0.0/0 0.0.0.0/0 /* kubernetes postrouting rules */
Chain KUBE-MARK-MASQ (2 references)
target prot opt source destination
MARK all -- 0.0.0.0/0 0.0.0.0/0 MARK or 0x4000
Chain KUBE-POSTROUTING (1 references)
target prot opt source destination
MASQUERADE all -- 0.0.0.0/0 0.0.0.0/0 /* kubernetes service traffic requiring SNAT */ mark match 0x4000/0x4000
MASQUERADE all -- 0.0.0.0/0 0.0.0.0/0 match-set KUBE-LOOP-BACK dst,dst,src
Chain KUBE-SERVICES (2 references)
target prot opt source destination
KUBE-MARK-MASQ all -- 0.0.0.0/0 0.0.0.0/0 match-set KUBE-CLUSTER-IP dst,dst
ACCEPT all -- 0.0.0.0/0 0.0.0.0/0 match-set KUBE-CLUSTER-IP dst,dst
2.2 如果 kube-proxy 配置了--cluster-cidr=<cidr>
参数,则 ipvs 会伪装所有访问 Service Cluster IP 的外部流量,其行为和 iptables 相同,假设 kube-proxy 提供的集群 CIDR 值为:10.244.16.0/24,那么 ipvs 添加的 iptables 规则应该如下所示:
# iptables -t nat -nL
Chain PREROUTING (policy ACCEPT)
target prot opt source destination
KUBE-SERVICES all -- 0.0.0.0/0 0.0.0.0/0 /* kubernetes service portals */
Chain OUTPUT (policy ACCEPT)
target prot opt source destination
KUBE-SERVICES all -- 0.0.0.0/0 0.0.0.0/0 /* kubernetes service portals */
Chain POSTROUTING (policy ACCEPT)
target prot opt source destination
KUBE-POSTROUTING all -- 0.0.0.0/0 0.0.0.0/0 /* kubernetes postrouting rules */
Chain KUBE-MARK-MASQ (3 references)
target prot opt source destination
MARK all -- 0.0.0.0/0 0.0.0.0/0 MARK or 0x4000
Chain KUBE-POSTROUTING (1 references)
target prot opt source destination
MASQUERADE all -- 0.0.0.0/0 0.0.0.0/0 /* kubernetes service traffic requiring SNAT */ mark match 0x4000/0x4000
MASQUERADE all -- 0.0.0.0/0 0.0.0.0/0 match-set KUBE-LOOP-BACK dst,dst,src
Chain KUBE-SERVICES (2 references)
target prot opt source destination
KUBE-MARK-MASQ all -- !10.244.16.0/24 0.0.0.0/0 match-set KUBE-CLUSTER-IP dst,dst
ACCEPT all -- 0.0.0.0/0 0.0.0.0/0 match-set KUBE-CLUSTER-IP dst,dst
2.3 对于loadBalancer类型的服务,ipvs 将安装匹配 KUBE-LOAD-BALANCER 的 ipset 的 iptables 规则。特别当服务的 LoadBalancerSourceRanges 被指定或指定 externalTrafficPolicy=local 的时候,ipvs 将创建 ipset 集合KUBE-LOAD-BALANCER-LOCAL/KUBE-LOAD-BALANCER-FW/KUBE-LOAD-BALANCER-SOURCE-CIDR,并添加相应的 iptables 规则,如下所示规则:
# iptables -t nat -nL
Chain PREROUTING (policy ACCEPT)
target prot opt source destination
KUBE-SERVICES all -- 0.0.0.0/0 0.0.0.0/0 /* kubernetes service portals */
Chain OUTPUT (policy ACCEPT)
target prot opt source destination
KUBE-SERVICES all -- 0.0.0.0/0 0.0.0.0/0 /* kubernetes service portals */
Chain POSTROUTING (policy ACCEPT)
target prot opt source destination
KUBE-POSTROUTING all -- 0.0.0.0/0 0.0.0.0/0 /* kubernetes postrouting rules */
Chain KUBE-FIREWALL (1 references)
target prot opt source destination
RETURN all -- 0.0.0.0/0 0.0.0.0/0 match-set KUBE-LOAD-BALANCER-SOURCE-CIDR dst,dst,src
KUBE-MARK-DROP all -- 0.0.0.0/0 0.0.0.0/0
Chain KUBE-LOAD-BALANCER (1 references)
target prot opt source destination
KUBE-FIREWALL all -- 0.0.0.0/0 0.0.0.0/0 match-set KUBE-LOAD-BALANCER-FW dst,dst
RETURN all -- 0.0.0.0/0 0.0.0.0/0 match-set KUBE-LOAD-BALANCER-LOCAL dst,dst
KUBE-MARK-MASQ all -- 0.0.0.0/0 0.0.0.0/0
Chain KUBE-MARK-DROP (1 references)
target prot opt source destination
MARK all -- 0.0.0.0/0 0.0.0.0/0 MARK or 0x8000
Chain KUBE-MARK-MASQ (2 references)
target prot opt source destination
MARK all -- 0.0.0.0/0 0.0.0.0/0 MARK or 0x4000
Chain KUBE-POSTROUTING (1 references)
target prot opt source destination
MASQUERADE all -- 0.0.0.0/0 0.0.0.0/0 /* kubernetes service traffic requiring SNAT */ mark match 0x4000/0x4000
MASQUERADE all -- 0.0.0.0/0 0.0.0.0/0 match-set KUBE-LOOP-BACK dst,dst,src
Chain KUBE-SERVICES (2 references)
target prot opt source destination
KUBE-LOAD-BALANCER all -- 0.0.0.0/0 0.0.0.0/0 match-set KUBE-LOAD-BALANCER dst,dst
ACCEPT all -- 0.0.0.0/0 0.0.0.0/0 match-set KUBE-LOAD-BALANCER dst,dst
2.4 对于 NodePort 类型的服务,ipvs 将添加匹配KUBE-NODE-PORT-TCP/KUBE-NODE-PORT-UDP的 ipset 的iptables 规则。当指定externalTrafficPolicy=local时,ipvs 将创建 ipset 集KUBE-NODE-PORT-LOCAL-TC/KUBE-NODE-PORT-LOCAL-UDP并安装相应的 iptables 规则,如下所示:(假设服务使用 TCP 类型 nodePort)
Chain PREROUTING (policy ACCEPT)
target prot opt source destination
KUBE-SERVICES all -- 0.0.0.0/0 0.0.0.0/0 /* kubernetes service portals */
Chain OUTPUT (policy ACCEPT)
target prot opt source destination
KUBE-SERVICES all -- 0.0.0.0/0 0.0.0.0/0 /* kubernetes service portals */
Chain POSTROUTING (policy ACCEPT)
target prot opt source destination
KUBE-POSTROUTING all -- 0.0.0.0/0 0.0.0.0/0 /* kubernetes postrouting rules */
Chain KUBE-MARK-MASQ (2 references)
target prot opt source destination
MARK all -- 0.0.0.0/0 0.0.0.0/0 MARK or 0x4000
Chain KUBE-NODE-PORT (1 references)
target prot opt source destination
RETURN all -- 0.0.0.0/0 0.0.0.0/0 match-set KUBE-NODE-PORT-LOCAL-TCP dst
KUBE-MARK-MASQ all -- 0.0.0.0/0 0.0.0.0/0
Chain KUBE-POSTROUTING (1 references)
target prot opt source destination
MASQUERADE all -- 0.0.0.0/0 0.0.0.0/0 /* kubernetes service traffic requiring SNAT */ mark match 0x4000/0x4000
MASQUERADE all -- 0.0.0.0/0 0.0.0.0/0 match-set KUBE-LOOP-BACK dst,dst,src
Chain KUBE-SERVICES (2 references)
target prot opt source destination
KUBE-NODE-PORT all -- 0.0.0.0/0 0.0.0.0/0 match-set KUBE-NODE-PORT-TCP dst
2.5 对于指定了externalIPs的 Service,ipvs 会安装匹配KUBE-EXTERNAL-IP ipset 集的 iptables 规则,假设我们有指定了 externalIPs 的 Service,则 iptables 规则应该如下所示:
Chain PREROUTING (policy ACCEPT)
target prot opt source destination
KUBE-SERVICES all -- 0.0.0.0/0 0.0.0.0/0 /* kubernetes service portals */
Chain OUTPUT (policy ACCEPT)
target prot opt source destination
KUBE-SERVICES all -- 0.0.0.0/0 0.0.0.0/0 /* kubernetes service portals */
Chain POSTROUTING (policy ACCEPT)
target prot opt source destination
KUBE-POSTROUTING all -- 0.0.0.0/0 0.0.0.0/0 /* kubernetes postrouting rules */
Chain KUBE-MARK-MASQ (2 references)
target prot opt source destination
MARK all -- 0.0.0.0/0 0.0.0.0/0 MARK or 0x4000
Chain KUBE-POSTROUTING (1 references)
target prot opt source destination
MASQUERADE all -- 0.0.0.0/0 0.0.0.0/0 /* kubernetes service traffic requiring SNAT */ mark match 0x4000/0x4000
MASQUERADE all -- 0.0.0.0/0 0.0.0.0/0 match-set KUBE-LOOP-BACK dst,dst,src
Chain KUBE-SERVICES (2 references)
target prot opt source destination
KUBE-MARK-MASQ all -- 0.0.0.0/0 0.0.0.0/0 match-set KUBE-EXTERNAL-IP dst,dst
ACCEPT all -- 0.0.0.0/0 0.0.0.0/0 match-set KUBE-EXTERNAL-IP dst,dst PHYSDEV match ! --physdev-is-in ADDRTYPE match src-type !LOCAL
ACCEPT all -- 0.0.0.0/0 0.0.0.0/0 match-set KUBE-EXTERNAL-IP dst,dst ADDRTYPE match dst-type LOCAL
清理遗留的 ipvs 规则,清理 conntrack 。
for name, lastChangeTriggerTimes := range endpointUpdateResult.LastChangeTriggerTimes {
for _, lastChangeTriggerTime := range lastChangeTriggerTimes {
latency := metrics.SinceInSeconds(lastChangeTriggerTime)
metrics.NetworkProgrammingLatency.Observe(latency)
klog.V(4).InfoS("Network programming", "endpoint", klog.KRef(name.Namespace, name.Name), "elapsed", latency)
// Get legacy bind address
// currentBindAddrs represents ip addresses bind to DefaultDummyDevice from the system
currentBindAddrs, err := proxier.netlinkHandle.ListBindAddress(DefaultDummyDevice)
if err != nil {
klog.ErrorS(err, "Failed to get bind address")
legacyBindAddrs := proxier.getLegacyBindAddr(activeBindAddrs, currentBindAddrs)
// Clean up legacy IPVS services and unbind addresses
appliedSvcs, err := proxier.ipvs.GetVirtualServers()
if err == nil {
for _, appliedSvc := range appliedSvcs {
currentIPVSServices[appliedSvc.String()] = appliedSvc
} else {
klog.ErrorS(err, "Failed to get ipvs service")
proxier.cleanLegacyService(activeIPVSServices, currentIPVSServices, legacyBindAddrs)
if proxier.healthzServer != nil {
proxier.healthzServer.Updated()
metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime()
// Update service healthchecks. The endpoints list might include services that are
// not "OnlyLocal", but the services list will not, and the serviceHealthServer
// will just drop those endpoints.
if err := proxier.serviceHealthServer.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil {
klog.ErrorS(err, "Error syncing healthcheck services")
if err := proxier.serviceHealthServer.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil {
klog.ErrorS(err, "Error syncing healthcheck endpoints")
// Finish housekeeping.
// TODO: these could be made more consistent.
for _, svcIP := range staleServices.UnsortedList() {
if err := conntrack.ClearEntriesForIP(proxier.exec, svcIP, v1.ProtocolUDP); err != nil {
klog.ErrorS(err, "Failed to delete stale service IP connections", "ip", svcIP)
proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints)
proxy 核心逻辑实现这一块的代码相当长,需要对 iptables 和 ipvs 有一定的知识储备才能流畅阅读,像 ipset 这一块,我也是在看的过程中现学的,理解没那么透彻,同步规则
这一块,我也是完整看过了,不过不好组织语言,引用的是别人的内容。