添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

千呼万唤始出来,这个函数特别长,我们慢慢来看它的实现逻辑。

由于 linux 内核原生的 ipvs 模式只支持 DNAT,不支持 SNAT,所以,在以下几种场景中 ipvs 仍需要依赖 iptables 规则:

kube-proxy 启动时指定 –masquerade-all=true 参数,即集群中所有经过 kube-proxy 的包都做一次 SNAT。

kube-proxy 启动时指定 –cluster-cidr= 参数。

对于 Load Balancer 类型的 service,用于配置白名单。

对于 NodePort 类型的 service,用于配置 MASQUERADE。

对于 externalIPs 类型的 service 。但对于 ipvs 模式的 kube-proxy,无论有多少 pod/service,iptables 的规则数都是固定的。

s.Proxier.SyncLoop() 函数

内容实在太长太多了,我们分几个步骤来看。

检查 Proxier 是否已经完成初始化了,否的话,直接退出函数。

记录同步时长。

合并更新 proxier.serviceMap map 对象的内容,使内容永远保持最新,内容的保存形式 {“ns, svc名”, “端口名”, “TCP”}: {“172.16.55.10”, 1234, “TCP”} ,同时清理掉 map 对象过期的端口信息,并收集过期端口为 udp 协议的 ip 待后续使用。

计算得到过时的规则,根据这些过时的规则可以用于后续清理操作。

iptables 前置操作,确保前置规则链存在。

确保本机已创建 dummy 网卡和 ipset 默认列表,默认为 kube-ipvs0。为什么要创建 dummy 网卡?因为 ipvs netfilter 的 DNAT 钩子挂载在 INPUT 链上,当访问 ClusterIP 时,将 ClusterIP 绑定在 dummy 网卡上为了让内核识别该 IP 就是本机 IP,进而进入 INPUT 链,然后通过钩子函数 ip_vs_in 转发到 POSTROUTING 链;将 ClusterIP 绑定到 dummy 网卡;为每个 ClusterIP 创建 IPVS virtual servers 和 real server,分别对应 service 和 endpoints;

func (proxier *Proxier) syncProxyRules() {
  proxier.mu.Lock()
  defer proxier.mu.Unlock()
  // 检查 Proxier 是否已经完成初始化了,否的话,直接退出函数。
  if !proxier.isInitialized() {
    klog.V(2).InfoS("Not syncing ipvs rules until Services and Endpoints have been received from master")
    return
  // 录同步时长。
  start := time.Now()
  defer func() {
    metrics.SyncProxyRulesLatency.Observe(metrics.SinceInSeconds(start))
    klog.V(4).InfoS("syncProxyRules complete", "elapsed", time.Since(start))
  // We assume that if this was called, we really want to sync them,
  // even if nothing changed in the meantime. In other words, callers are
  // responsible for detecting no-op changes and not calling this function.
  // 合并更新 proxier.serviceMap map 对象的内容,使内容永远保持最新,内容的保存形式 {"ns, svc名", "端口名", "TCP"}: {"172.16.55.10", 1234, "TCP"} ,同时清理掉 map 对象过期的端口信息,并收集过期端口为 udp 协议的 ip 待后续使用,然后将 changes 置为空。 
  serviceUpdateResult := proxier.serviceMap.Update(proxier.serviceChanges)
  // 同上
  endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
  staleServices := serviceUpdateResult.UDPStaleClusterIP
  // merge stale services gathered from updateEndpointsMap
  // 计算得到过时的规则,根据这些过时的规则可以用于后续清理操作。
  for _, svcPortName := range endpointUpdateResult.StaleServiceNames {
    if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) {
      klog.V(2).InfoS("Stale service", "protocol", strings.ToLower(string(svcInfo.Protocol())), "svcPortName", svcPortName.String(), "clusterIP", svcInfo.ClusterIP().String())
      staleServices.Insert(svcInfo.ClusterIP().String())
      for _, extIP := range svcInfo.ExternalIPStrings() {
        staleServices.Insert(extIP)
      for _, extIP := range svcInfo.LoadBalancerIPStrings() {
        staleServices.Insert(extIP)
  klog.V(3).InfoS("Syncing ipvs Proxier rules")
  // Begin install iptables
  // iptables 前置操作
  // Reset all buffers used later.
  // This is to avoid memory reallocations and thus improve performance.
  proxier.natChains.Reset()
  proxier.natRules.Reset()
  proxier.filterChains.Reset()
  proxier.filterRules.Reset()
  // Write table headers.
  utilproxy.WriteLine(proxier.filterChains, "*filter")
  utilproxy.WriteLine(proxier.natChains, "*nat")
  // 确保前置规则链存在
  // 检查 nat 表 KUBE-MARK-DROP chain 是否存在,不修改任何规则
  // 检查 nat 表 KUBE-SERVICES KUBE-POSTROUTING KUBE-FIREWALL KUBE-NODE-PORT KUBE-LOAD-BALANCER KUBE-MARK-MASQ chain 是否存在并加载到 buff
  // 检查 filter 表 KUBE-FORWARD KUBE-NODE-PORT chain 是否存在并加载到 buff
  // 检查以下规则,不存在则创建
  // -I nat OUTPUT -m comment --comment "kubernetes service portals" -j KUBE-SERVICES
  // -I nat PREROUTING -m comment --comment "kubernetes service portals" -j KUBE-SERVICES
  // -I nat POSTROUTING -m comment --comment "kubernetes postrouting rules" -j KUBE-POSTROUTING
  // -I filter FORWARD -m comment --comment "kubernetes forwarding rules" -j KUBE-FORWARD
  // -I filter INPUT -m comment --comment "kubernetes health check rules" -j KUBE-NODE-PORT
  proxier.createAndLinkKubeChain()
  // make sure dummy interface exists in the system where ipvs Proxier will bind service address on it
  // 确保本机已创建 dummy 网卡,默认为 kube-ipvs0。为什么要创建 dummy 网卡?因为 ipvs netfilter 的 DNAT 钩子挂载在 INPUT 链上,当访问 ClusterIP 时,将 ClusterIP 绑定在 dummy 网卡上为了让内核识别该 IP 就是本机 IP,进而进入 INPUT 链,然后通过钩子函数 ip_vs_in 转发到 POSTROUTING 链;将 ClusterIP 绑定到 dummy 网卡;为每个 ClusterIP 创建 IPVS virtual servers 和 real server,分别对应 service 和 endpoints;
  _, err := proxier.netlinkHandle.EnsureDummyDevice(DefaultDummyDevice)
  if err != nil {
    klog.ErrorS(err, "Failed to create dummy interface", "interface", DefaultDummyDevice)
    return
  // make sure ip sets exists in the system.
  // 检查 ipset 集是否存在
  // KUBE-LOOP-BACK hash:ip,port,ip "Kubernetes endpoints dst ip:port, source ip for solving hairpin purpose"
  // KUBE-CLUSTER-IP hash:ip,port "Kubernetes service cluster ip + port for masquerade purpose"
  // KUBE-EXTERNAL-IP hash:ip,port "Kubernetes service external ip + port for masquerade and filter purpose"
  // KUBE-EXTERNAL-IP-LOCAL hash:ip,port "Kubernetes service external ip + port with externalTrafficPolicy=local"
  // KUBE-LOAD-BALANCER hash:ip,port "Kubernetes service lb portal"
  // KUBE-LOAD-BALANCER-FW hash:ip,port "Kubernetes service load balancer ip + port for load balancer with sourceRange"
  // KUBE-LOAD-BALANCER-LOCAL hash:ip,port "Kubernetes service load balancer ip + port with externalTrafficPolicy=local"
  // KUBE-LOAD-BALANCER-SOURCE-IP hash:ip,port,ip "Kubernetes service load balancer ip + port + source IP for packet filter purpose"
  // KUBE-LOAD-BALANCER-SOURCE-CIDR hash:ip,port,net "Kubernetes service load balancer ip + port + source cidr for packet filter purpose"
  // KUBE-NODE-PORT-TCP bitmap:port "Kubernetes nodeport TCP port for masquerade purpose"
  // KUBE-NODE-PORT-LOCAL-TCP bitmap:port "Kubernetes nodeport TCP port with externalTrafficPolicy=local"
  // KUBE-NODE-PORT-UDP bitmap:port "Kubernetes nodeport UDP port for masquerade purpose"
  // KUBE-NODE-PORT-LOCAL-UDP bitmap:port "Kubernetes nodeport UDP port with externalTrafficPolicy=local"
  // KUBE-NODE-PORT-SCTP-HASH hash:ip,port "Kubernetes nodeport SCTP port for masquerade purpose with type 'hash ip:port'"
  // KUBE-NODE-PORT-LOCAL-SCTP-HASH hash:ip,port "Kubernetes nodeport SCTP port with externalTrafficPolicy=local with type 'hash ip:port'"
  // KUBE-HEALTH-CHECK-NODE-PORT bitmap:port "Kubernetes health check node port"
  for _, set := range proxier.ipsetList {
    if err := ensureIPSet(set); err != nil {
      return
    // 重置 set.activeEntries
    set.resetEntries()
  // activeIPVSServices represents IPVS service successfully created in this round of sync
  activeIPVSServices := map[string]bool{}
  // currentIPVSServices represent IPVS services listed from the system
  currentIPVSServices := make(map[string]*utilipvs.VirtualServer)
  // activeBindAddrs represents ip address successfully bind to DefaultDummyDevice in this round of sync
  activeBindAddrs := map[string]bool{}
  // 获取机器上绑定的 ip,相当于执行 $ ip route show table local type local proto kernel dev kube-ipvs0
  // 10.0.0.1  scope host  src 10.0.0.1
  // 10.0.0.10  scope host  src 10.0.0.10
  // 截取唯一源 IP 字段,
  // --> result set: [10.0.0.1, 10.0.0.10]
  bindedAddresses, err := proxier.ipGetter.BindedIPs()
  if err != nil {
    klog.ErrorS(err, "error listing addresses binded to dummy interface")
  // 检查NodePort
  hasNodePort := false
  for _, svc := range proxier.serviceMap {
    svcInfo, ok := svc.(*serviceInfo)
    if ok && svcInfo.NodePort() != 0 {
      hasNodePort = true
      break
  // Both nodeAddresses and nodeIPs can be reused for all nodePort services
  // and only need to be computed if we have at least one nodePort service.
  var (
    // List of node addresses to listen on if a nodePort is set.
    nodeAddresses []string
    // List of node IP addresses to be used as IPVS services if nodePort is set.
    nodeIPs []net.IP
  // 获取主机节点 ip
  if hasNodePort {
    nodeAddrSet, err := utilproxy.GetNodeAddresses(proxier.nodePortAddresses, proxier.networkInterfacer)
    if err != nil {
      klog.ErrorS(err, "Failed to get node ip address matching nodeport cidr")
    } else {
      nodeAddresses = nodeAddrSet.List()
      for _, address := range nodeAddresses {
        a := net.ParseIP(address)
        if a.IsLoopback() {
          continue
        if utilproxy.IsZeroCIDR(address) {
          nodeIPs, err = proxier.ipGetter.NodeIPs()
          if err != nil {
            klog.ErrorS(err, "Failed to list all node IPs from host")
          break
        nodeIPs = append(nodeIPs, a)
  // 过滤对应的 ip 协议簇的 ip
  idx := 0
  for _, nodeIP := range nodeIPs {
    if (proxier.ipFamily == v1.IPv6Protocol) == utilnet.IsIPv6(nodeIP) {
      nodeIPs[idx] = nodeIP
      idx++
  // reset slice to filtered entries
  nodeIPs = nodeIPs[:idx]

配置 ipset/ipvs 规则

为每个 service 绑定 ipvs 规则。

先配置 ipset 集。

2.1 配置回环地址、clusterIp集: ipset KUBE-LOOP-BACK,KUBE-CLUSTER-IP 。

2.2 配置 EXTERNAL-IP 集: KUBE-EXTERNAL-IP-LOCAL,KUBE-EXTERNAL-IP 。

2.3 配置 LOAD-BALANCER 集: KUBE-LOAD-BALANCER,UBE-LOAD-BALANCER-LOCAL,KUBE-LOAD-BALANCER-FW,KUBE-LOAD-BALANCER-SOURCE-CIDR,KUBE-LOAD-BALANCER-SOURCE-IP 。

2.4 配置 NodePort 集: KUBE-NODE-PORT-TCP,KUBE-NODE-PORT-UDP,KUBE-NODE-PORT-SCTP-HASH,KUBE-NODE-PORT-LOCAL-TCP,KUBE-NODE-PORT-LOCAL-TCP,KUBE-NODE-PORT-LOCAL-SCTP-HASH 。

  // 为每个 service 绑定 ipvs 规则
  for svcName, svc := range proxier.serviceMap {
    svcInfo, ok := svc.(*serviceInfo)
    if !ok {
      klog.ErrorS(nil, "Failed to cast serviceInfo", "svcName", svcName.String())
      continue
    isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP())
    localPortIPFamily := utilnet.IPv4
    if isIPv6 {
      localPortIPFamily = utilnet.IPv6
    protocol := strings.ToLower(string(svcInfo.Protocol()))
    // Precompute svcNameString; with many services the many calls
    // to ServicePortName.String() show up in CPU profiles.
    svcNameString := svcName.String()
    // Handle traffic that loops back to the originator with SNAT.
    // 配置 ipset KUBE-LOOP-BACK 集
    for _, e := range proxier.endpointsMap[svcName] {
      ep, ok := e.(*proxy.BaseEndpointInfo)
      if !ok {
        klog.ErrorS(nil, "Failed to cast BaseEndpointInfo", "endpoint", e.String())
        continue
      if !ep.IsLocal {
        continue
      epIP := ep.IP()
      epPort, err := ep.Port()
      // Error parsing this endpoint has been logged. Skip to next endpoint.
      if epIP == "" || err != nil {
        continue
      // 组装成 ipset 条目
      entry := &utilipset.Entry{
        IP:       epIP,
        Port:     epPort,
        Protocol: protocol,
        IP2:      epIP,
        SetType:  utilipset.HashIPPortIP,
      if valid := proxier.ipsetList[kubeLoopBackIPSet].validateEntry(entry); !valid {
        klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoopBackIPSet].Name)
        continue
      // ipset 条目插入到 KUBE-LOOP-BACK 集下
      proxier.ipsetList[kubeLoopBackIPSet].activeEntries.Insert(entry.String())
    // Capture the clusterIP.
    // 获取 clusterIP 配置 ipset 集
    // ipset call
    entry := &utilipset.Entry{
      IP:       svcInfo.ClusterIP().String(),
      Port:     svcInfo.Port(),
      Protocol: protocol,
      SetType:  utilipset.HashIPPort,
    // add service Cluster IP:Port to kubeServiceAccess ip set for the purpose of solving hairpin.
    // service IP 加入到 kubeServiceAccess ip set 下是为了解决 hairpin ?
    // proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String()) 
    if valid := proxier.ipsetList[kubeClusterIPSet].validateEntry(entry); !valid {
      klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeClusterIPSet].Name)
      continue
    // ipset 条目插入到 KUBE-CLUSTER-IP 表下
    proxier.ipsetList[kubeClusterIPSet].activeEntries.Insert(entry.String())
    // ipvs call,定义 ipvs 规则
    serv := &utilipvs.VirtualServer{
      Address:   svcInfo.ClusterIP(),
      Port:      uint16(svcInfo.Port()),
      Protocol:  string(svcInfo.Protocol()),
      Scheduler: proxier.ipvsScheduler,
    // Set session affinity flag and timeout for IPVS service
    if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
      serv.Flags |= utilipvs.FlagPersistent
      serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
    // We need to bind ClusterIP to dummy interface, so set `bindAddr` parameter to `true` in syncService()
    // 同步 ipvs 规则, IPVS 规则不存在则创建, 存在则检查是否有变更,更新规则, 更新期间 vip 不会 down 掉
    if err := proxier.syncService(svcNameString, serv, true, bindedAddresses); err == nil {
      activeIPVSServices[serv.String()] = true
      activeBindAddrs[serv.Address.String()] = true
      // ExternalTrafficPolicy only works for NodePort and external LB traffic, does not affect ClusterIP
      // So we still need clusterIP rules in onlyNodeLocalEndpoints mode.
      // 检查有没有开启 internalNodeLocal 特性,只路由内部流量到和发起方处于相同节点的服务端点
      internalNodeLocal := false
      if utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) && svcInfo.NodeLocalInternal() {
        internalNodeLocal = true
      if err := proxier.syncEndpoint(svcName, internalNodeLocal, serv); err != nil {
        klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv.String())
    } else {
      klog.ErrorS(err, "Failed to sync service", "service", serv.String())
    // Capture externalIPs.
    // 获取 externalIPs 配置 ipset 集
    for _, externalIP := range svcInfo.ExternalIPStrings() {
      // ipset call
      entry := &utilipset.Entry{
        IP:       externalIP,
        Port:     svcInfo.Port(),
        Protocol: protocol,
        SetType:  utilipset.HashIPPort,
      // 检查策略是否为 Local
      if svcInfo.NodeLocalExternal() {
        if valid := proxier.ipsetList[kubeExternalIPLocalSet].validateEntry(entry); !valid {
          klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeExternalIPLocalSet].Name)
          continue
        // 加入到 KUBE-EXTERNAL-IP-LOCAL ip set
        proxier.ipsetList[kubeExternalIPLocalSet].activeEntries.Insert(entry.String())
      } else {
        // We have to SNAT packets to external IPs.
        if valid := proxier.ipsetList[kubeExternalIPSet].validateEntry(entry); !valid {
          klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeExternalIPSet].Name)
          continue
        // 加入到 KUBE-EXTERNAL-IP ip set
        proxier.ipsetList[kubeExternalIPSet].activeEntries.Insert(entry.String())
      // ipvs call
      // 创建 ipvs 规则
      serv := &utilipvs.VirtualServer{
        Address:   net.ParseIP(externalIP),
        Port:      uint16(svcInfo.Port()),
        Protocol:  string(svcInfo.Protocol()),
        Scheduler: proxier.ipvsScheduler,
      if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
        serv.Flags |= utilipvs.FlagPersistent
        serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
      if err := proxier.syncService(svcNameString, serv, true, bindedAddresses); err == nil {
        activeIPVSServices[serv.String()] = true
        activeBindAddrs[serv.Address.String()] = true
        if err := proxier.syncEndpoint(svcName, svcInfo.NodeLocalExternal(), serv); err != nil {
          klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv.String())
      } else {
        klog.ErrorS(err, "Failed to sync service", "service", serv.String())
    // Capture load-balancer ingress.
    // 获取 lb 入口 配置 ipset 集
    for _, ingress := range svcInfo.LoadBalancerIPStrings() {
      if ingress != "" {
        // ipset call
        entry = &utilipset.Entry{
          IP:       ingress,
          Port:     svcInfo.Port(),
          Protocol: protocol,
          SetType:  utilipset.HashIPPort,
        // add service load balancer ingressIP:Port to kubeServiceAccess ip set for the purpose of solving hairpin.
        // proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String())
        // If we are proxying globally, we need to masquerade in case we cross nodes.
        // If we are proxying only locally, we can retain the source IP.
        if valid := proxier.ipsetList[kubeLoadBalancerSet].validateEntry(entry); !valid {
          klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoadBalancerSet].Name)
          continue
        // 加入到 KUBE-LOAD-BALANCER ip set
        proxier.ipsetList[kubeLoadBalancerSet].activeEntries.Insert(entry.String())
        // insert loadbalancer entry to lbIngressLocalSet if service externaltrafficpolicy=local
        // 策略 externaltrafficpolicy=local
        if svcInfo.NodeLocalExternal() {
          if valid := proxier.ipsetList[kubeLoadBalancerLocalSet].validateEntry(entry); !valid {
            klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoadBalancerLocalSet].Name)
            continue
          proxier.ipsetList[kubeLoadBalancerLocalSet].activeEntries.Insert(entry.String())
        if len(svcInfo.LoadBalancerSourceRanges()) != 0 {
          // The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field.
          // This currently works for loadbalancers that preserves source ips.
          // For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply.
          // KUBE-LOAD-BALANCER-FW ip set
          if valid := proxier.ipsetList[kubeLoadbalancerFWSet].validateEntry(entry); !valid {
            klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoadbalancerFWSet].Name)
            continue
          proxier.ipsetList[kubeLoadbalancerFWSet].activeEntries.Insert(entry.String())
          allowFromNode := false
          for _, src := range svcInfo.LoadBalancerSourceRanges() {
            // ipset call
            entry = &utilipset.Entry{
              IP:       ingress,
              Port:     svcInfo.Port(),
              Protocol: protocol,
              Net:      src,
              SetType:  utilipset.HashIPPortNet,
            // enumerate all white list source cidr KUBE-LOAD-BALANCER-SOURCE-CIDR
            if valid := proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].validateEntry(entry); !valid {
              klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].Name)
              continue
            proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].activeEntries.Insert(entry.String())
            // ignore error because it has been validated
            _, cidr, _ := net.ParseCIDR(src)
            if cidr.Contains(proxier.nodeIP) {
              allowFromNode = true
          // generally, ip route rule was added to intercept request to loadbalancer vip from the
          // loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly.
          // Need to add the following rule to allow request on host.
          // 放行 lb 白名单
          if allowFromNode {
            entry = &utilipset.Entry{
              IP:       ingress,
              Port:     svcInfo.Port(),
              Protocol: protocol,
              IP2:      ingress,
              SetType:  utilipset.HashIPPortIP,
            // enumerate all white list source ip KUBE-LOAD-BALANCER-SOURCE-IP
            if valid := proxier.ipsetList[kubeLoadBalancerSourceIPSet].validateEntry(entry); !valid {
              klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoadBalancerSourceIPSet].Name)
              continue
            proxier.ipsetList[kubeLoadBalancerSourceIPSet].activeEntries.Insert(entry.String())
        // ipvs call
        serv := &utilipvs.VirtualServer{
          Address:   net.ParseIP(ingress),
          Port:      uint16(svcInfo.Port()),
          Protocol:  string(svcInfo.Protocol()),
          Scheduler: proxier.ipvsScheduler,
        if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
          serv.Flags |= utilipvs.FlagPersistent
          serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
        if err := proxier.syncService(svcNameString, serv, true, bindedAddresses); err == nil {
          activeIPVSServices[serv.String()] = true
          activeBindAddrs[serv.Address.String()] = true
          if err := proxier.syncEndpoint(svcName, svcInfo.NodeLocalExternal(), serv); err != nil {
            klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv)
        } else {
          klog.ErrorS(err, "Failed to sync service", "service", serv)
    // 针对 NodePort 场景配置 ipset 集
    if svcInfo.NodePort() != 0 {
      if len(nodeAddresses) == 0 || len(nodeIPs) == 0 {
        // Skip nodePort configuration since an error occurred when
        // computing nodeAddresses or nodeIPs.
        continue
      var lps []utilnet.LocalPort
      for _, address := range nodeAddresses {
        lp := utilnet.LocalPort{
          Description: "nodePort for " + svcNameString,
          IP:          address,
          IPFamily:    localPortIPFamily,
          Port:        svcInfo.NodePort(),
          Protocol:    utilnet.Protocol(svcInfo.Protocol()),
        if utilproxy.IsZeroCIDR(address) {
          // Empty IP address means all
          lp.IP = ""
          lps = append(lps, lp)
          // If we encounter a zero CIDR, then there is no point in processing the rest of the addresses.
          break
        lps = append(lps, lp)
      // For ports on node IPs, open the actual port and hold it.
      for _, lp := range lps {
        if svcInfo.Protocol() != v1.ProtocolSCTP && lp.Protocol == utilnet.UDP {
          conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP)
      // Nodeports need SNAT, unless they're local.
      // 回程需要 snat 处理
      // ipset call
      var (
        nodePortSet *IPSet
        entries     []*utilipset.Entry
      switch protocol {
      case utilipset.ProtocolTCP:
        nodePortSet = proxier.ipsetList[kubeNodePortSetTCP]
        entries = []*utilipset.Entry{{
          // No need to provide ip info
          Port:     svcInfo.NodePort(),
          Protocol: protocol,
          SetType:  utilipset.BitmapPort,
      case utilipset.ProtocolUDP:
        nodePortSet = proxier.ipsetList[kubeNodePortSetUDP]
        entries = []*utilipset.Entry{{
          // No need to provide ip info
          Port:     svcInfo.NodePort(),
          Protocol: protocol,
          SetType:  utilipset.BitmapPort,
      case utilipset.ProtocolSCTP:
        nodePortSet = proxier.ipsetList[kubeNodePortSetSCTP]
        // Since hash ip:port is used for SCTP, all the nodeIPs to be used in the SCTP ipset entries.
        entries = []*utilipset.Entry{}
        for _, nodeIP := range nodeIPs {
          entries = append(entries, &utilipset.Entry{
            IP:       nodeIP.String(),
            Port:     svcInfo.NodePort(),
            Protocol: protocol,
            SetType:  utilipset.HashIPPort,
      default:
        // It should never hit
        klog.ErrorS(nil, "Unsupported protocol type", "protocol", protocol)
      if nodePortSet != nil {
        entryInvalidErr := false
        for _, entry := range entries {
          if valid := nodePortSet.validateEntry(entry); !valid {
            klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", nodePortSet.Name)
            entryInvalidErr = true
            break
          nodePortSet.activeEntries.Insert(entry.String())
        if entryInvalidErr {
          continue
      // Add externaltrafficpolicy=local type nodeport entry
      if svcInfo.NodeLocalExternal() {
        var nodePortLocalSet *IPSet
        switch protocol {
        case utilipset.ProtocolTCP:
          nodePortLocalSet = proxier.ipsetList[kubeNodePortLocalSetTCP]
        case utilipset.ProtocolUDP:
          nodePortLocalSet = proxier.ipsetList[kubeNodePortLocalSetUDP]
        case utilipset.ProtocolSCTP:
          nodePortLocalSet = proxier.ipsetList[kubeNodePortLocalSetSCTP]
        default:
          // It should never hit
          klog.ErrorS(nil, "Unsupported protocol type", "protocol", protocol)
        if nodePortLocalSet != nil {
          entryInvalidErr := false
          for _, entry := range entries {
            if valid := nodePortLocalSet.validateEntry(entry); !valid {
              klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", nodePortLocalSet.Name)
              entryInvalidErr = true
              break
            nodePortLocalSet.activeEntries.Insert(entry.String())
          if entryInvalidErr {
            continue
      // Build ipvs kernel routes for each node ip address
      // 为节点 ip 配置 ipvs 规则
      for _, nodeIP := range nodeIPs {
        // ipvs call
        // 创建 ipvs 规则
        serv := &utilipvs.VirtualServer{
          Address:   nodeIP,
          Port:      uint16(svcInfo.NodePort()),
          Protocol:  string(svcInfo.Protocol()),
          Scheduler: proxier.ipvsScheduler,
        if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
          serv.Flags |= utilipvs.FlagPersistent
          serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
        // There is no need to bind Node IP to dummy interface, so set parameter `bindAddr` to `false`.
        if err := proxier.syncService(svcNameString, serv, false, bindedAddresses); err == nil {
          activeIPVSServices[serv.String()] = true
          if err := proxier.syncEndpoint(svcName, svcInfo.NodeLocalExternal(), serv); err != nil {
            klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv)
        } else {
          klog.ErrorS(err, "Failed to sync service", "service", serv)
    if svcInfo.HealthCheckNodePort() != 0 {
      nodePortSet := proxier.ipsetList[kubeHealthCheckNodePortSet]
      entry := &utilipset.Entry{
        // No need to provide ip info
        Port:     svcInfo.HealthCheckNodePort(),
        Protocol: "tcp",
        SetType:  utilipset.BitmapPort,
      if valid := nodePortSet.validateEntry(entry); !valid {
        klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", nodePortSet.Name)
        continue
      nodePortSet.activeEntries.Insert(entry.String())

writeIptablesRules() 会根据不同的场景判断是否创建与 ipset 对应的 iptables 规则,整理过程过于痛苦,查阅资料过程中,发现别人已经有整理好的了,于是做了一回伸手党。引用这里的

同步 ipset 条目,写入到节点系统。

根据前面配置的 ipset ,配置 iptables 规则。

  // sync ipset entries
  // 同步 ipset 条目,写入到节点系统。
  for _, set := range proxier.ipsetList {
    set.syncIPSetEntries()
  // Tail call iptables rules for ipset, make sure only call iptables once
  // in a single loop per ip set.
  // 每次循环只调用一次配置 ipset 集的 iptables 规则,参考 https://www.qikqiak.com/post/how-to-use-ipvs-in-kubernetes/
  proxier.writeIptablesRules()
  // Sync iptables rules.
  // NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table.
  proxier.iptablesData.Reset()
  proxier.iptablesData.Write(proxier.natChains.Bytes())
  proxier.iptablesData.Write(proxier.natRules.Bytes())
  proxier.iptablesData.Write(proxier.filterChains.Bytes())
  proxier.iptablesData.Write(proxier.filterRules.Bytes())
  klog.V(5).InfoS("Restoring iptables", "rules", string(proxier.iptablesData.Bytes()))
  err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters)
  if err != nil {
    klog.ErrorS(err, "Failed to execute iptables-restore", "rules", string(proxier.iptablesData.Bytes()))
    metrics.IptablesRestoreFailuresTotal.Inc()
    return

2.1 如果 kube-proxy 配置了–masquerade-all=true参数,则 ipvs 将伪装所有访问 Service 的 Cluster IP 的流量,此时的行为和 iptables 是一致的,由 ipvs 添加的 iptables 规则如下:

# iptables -t nat -nL
Chain PREROUTING (policy ACCEPT)
target     prot opt source               destination
KUBE-SERVICES  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes service portals */
Chain OUTPUT (policy ACCEPT)
target     prot opt source               destination
KUBE-SERVICES  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes service portals */
Chain POSTROUTING (policy ACCEPT)
target     prot opt source               destination
KUBE-POSTROUTING  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes postrouting rules */
Chain KUBE-MARK-MASQ (2 references)
target     prot opt source               destination
MARK       all  --  0.0.0.0/0            0.0.0.0/0            MARK or 0x4000
Chain KUBE-POSTROUTING (1 references)
target     prot opt source               destination
MASQUERADE  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes service traffic requiring SNAT */ mark match 0x4000/0x4000
MASQUERADE  all  --  0.0.0.0/0            0.0.0.0/0            match-set KUBE-LOOP-BACK dst,dst,src
Chain KUBE-SERVICES (2 references)
target     prot opt source               destination
KUBE-MARK-MASQ  all  --  0.0.0.0/0            0.0.0.0/0            match-set KUBE-CLUSTER-IP dst,dst
ACCEPT     all  --  0.0.0.0/0            0.0.0.0/0            match-set KUBE-CLUSTER-IP dst,dst

2.2 如果 kube-proxy 配置了--cluster-cidr=<cidr>参数,则 ipvs 会伪装所有访问 Service Cluster IP 的外部流量,其行为和 iptables 相同,假设 kube-proxy 提供的集群 CIDR 值为:10.244.16.0/24,那么 ipvs 添加的 iptables 规则应该如下所示:

# iptables -t nat -nL
Chain PREROUTING (policy ACCEPT)
target     prot opt source               destination
KUBE-SERVICES  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes service portals */
Chain OUTPUT (policy ACCEPT)
target     prot opt source               destination
KUBE-SERVICES  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes service portals */
Chain POSTROUTING (policy ACCEPT)
target     prot opt source               destination
KUBE-POSTROUTING  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes postrouting rules */
Chain KUBE-MARK-MASQ (3 references)
target     prot opt source               destination
MARK       all  --  0.0.0.0/0            0.0.0.0/0            MARK or 0x4000
Chain KUBE-POSTROUTING (1 references)
target     prot opt source               destination
MASQUERADE  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes service traffic requiring SNAT */ mark match 0x4000/0x4000
MASQUERADE  all  --  0.0.0.0/0            0.0.0.0/0            match-set KUBE-LOOP-BACK dst,dst,src
Chain KUBE-SERVICES (2 references)
target     prot opt source               destination
KUBE-MARK-MASQ  all  -- !10.244.16.0/24       0.0.0.0/0            match-set KUBE-CLUSTER-IP dst,dst
ACCEPT     all  --  0.0.0.0/0            0.0.0.0/0            match-set KUBE-CLUSTER-IP dst,dst

2.3 对于loadBalancer类型的服务,ipvs 将安装匹配 KUBE-LOAD-BALANCER 的 ipset 的 iptables 规则。特别当服务的 LoadBalancerSourceRanges 被指定或指定 externalTrafficPolicy=local 的时候,ipvs 将创建 ipset 集合KUBE-LOAD-BALANCER-LOCAL/KUBE-LOAD-BALANCER-FW/KUBE-LOAD-BALANCER-SOURCE-CIDR,并添加相应的 iptables 规则,如下所示规则:

# iptables -t nat -nL
Chain PREROUTING (policy ACCEPT)
target     prot opt source               destination
KUBE-SERVICES  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes service portals */
Chain OUTPUT (policy ACCEPT)
target     prot opt source               destination
KUBE-SERVICES  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes service portals */
Chain POSTROUTING (policy ACCEPT)
target     prot opt source               destination
KUBE-POSTROUTING  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes postrouting rules */
Chain KUBE-FIREWALL (1 references)
target     prot opt source               destination
RETURN     all  --  0.0.0.0/0            0.0.0.0/0            match-set KUBE-LOAD-BALANCER-SOURCE-CIDR dst,dst,src
KUBE-MARK-DROP  all  --  0.0.0.0/0            0.0.0.0/0
Chain KUBE-LOAD-BALANCER (1 references)
target     prot opt source               destination
KUBE-FIREWALL  all  --  0.0.0.0/0            0.0.0.0/0            match-set KUBE-LOAD-BALANCER-FW dst,dst
RETURN     all  --  0.0.0.0/0            0.0.0.0/0            match-set KUBE-LOAD-BALANCER-LOCAL dst,dst
KUBE-MARK-MASQ  all  --  0.0.0.0/0            0.0.0.0/0
Chain KUBE-MARK-DROP (1 references)
target     prot opt source               destination
MARK       all  --  0.0.0.0/0            0.0.0.0/0            MARK or 0x8000
Chain KUBE-MARK-MASQ (2 references)
target     prot opt source               destination
MARK       all  --  0.0.0.0/0            0.0.0.0/0            MARK or 0x4000
Chain KUBE-POSTROUTING (1 references)
target     prot opt source               destination
MASQUERADE  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes service traffic requiring SNAT */ mark match 0x4000/0x4000
MASQUERADE  all  --  0.0.0.0/0            0.0.0.0/0            match-set KUBE-LOOP-BACK dst,dst,src
Chain KUBE-SERVICES (2 references)
target     prot opt source               destination
KUBE-LOAD-BALANCER  all  --  0.0.0.0/0            0.0.0.0/0            match-set KUBE-LOAD-BALANCER dst,dst
ACCEPT     all  --  0.0.0.0/0            0.0.0.0/0            match-set KUBE-LOAD-BALANCER dst,dst

2.4 对于 NodePort 类型的服务,ipvs 将添加匹配KUBE-NODE-PORT-TCP/KUBE-NODE-PORT-UDP的 ipset 的iptables 规则。当指定externalTrafficPolicy=local时,ipvs 将创建 ipset 集KUBE-NODE-PORT-LOCAL-TC/KUBE-NODE-PORT-LOCAL-UDP并安装相应的 iptables 规则,如下所示:(假设服务使用 TCP 类型 nodePort)

Chain PREROUTING (policy ACCEPT)
target     prot opt source               destination
KUBE-SERVICES  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes service portals */
Chain OUTPUT (policy ACCEPT)
target     prot opt source               destination
KUBE-SERVICES  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes service portals */
Chain POSTROUTING (policy ACCEPT)
target     prot opt source               destination
KUBE-POSTROUTING  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes postrouting rules */
Chain KUBE-MARK-MASQ (2 references)
target     prot opt source               destination
MARK       all  --  0.0.0.0/0            0.0.0.0/0            MARK or 0x4000
Chain KUBE-NODE-PORT (1 references)
target     prot opt source               destination
RETURN     all  --  0.0.0.0/0            0.0.0.0/0            match-set KUBE-NODE-PORT-LOCAL-TCP dst
KUBE-MARK-MASQ  all  --  0.0.0.0/0            0.0.0.0/0
Chain KUBE-POSTROUTING (1 references)
target     prot opt source               destination
MASQUERADE  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes service traffic requiring SNAT */ mark match 0x4000/0x4000
MASQUERADE  all  --  0.0.0.0/0            0.0.0.0/0            match-set KUBE-LOOP-BACK dst,dst,src
Chain KUBE-SERVICES (2 references)
target     prot opt source               destination
KUBE-NODE-PORT  all  --  0.0.0.0/0            0.0.0.0/0            match-set KUBE-NODE-PORT-TCP dst

2.5 对于指定了externalIPs的 Service,ipvs 会安装匹配KUBE-EXTERNAL-IP ipset 集的 iptables 规则,假设我们有指定了 externalIPs 的 Service,则 iptables 规则应该如下所示:

Chain PREROUTING (policy ACCEPT)
target     prot opt source               destination
KUBE-SERVICES  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes service portals */
Chain OUTPUT (policy ACCEPT)
target     prot opt source               destination
KUBE-SERVICES  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes service portals */
Chain POSTROUTING (policy ACCEPT)
target     prot opt source               destination
KUBE-POSTROUTING  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes postrouting rules */
Chain KUBE-MARK-MASQ (2 references)
target     prot opt source               destination
MARK       all  --  0.0.0.0/0            0.0.0.0/0            MARK or 0x4000
Chain KUBE-POSTROUTING (1 references)
target     prot opt source               destination
MASQUERADE  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes service traffic requiring SNAT */ mark match 0x4000/0x4000
MASQUERADE  all  --  0.0.0.0/0            0.0.0.0/0            match-set KUBE-LOOP-BACK dst,dst,src
Chain KUBE-SERVICES (2 references)
target     prot opt source               destination
KUBE-MARK-MASQ  all  --  0.0.0.0/0            0.0.0.0/0            match-set KUBE-EXTERNAL-IP dst,dst
ACCEPT     all  --  0.0.0.0/0            0.0.0.0/0            match-set KUBE-EXTERNAL-IP dst,dst PHYSDEV match ! --physdev-is-in ADDRTYPE match src-type !LOCAL
ACCEPT     all  --  0.0.0.0/0            0.0.0.0/0            match-set KUBE-EXTERNAL-IP dst,dst ADDRTYPE match dst-type LOCAL
  • 清理遗留的 ipvs 规则,清理 conntrack 。
  •   for name, lastChangeTriggerTimes := range endpointUpdateResult.LastChangeTriggerTimes {
        for _, lastChangeTriggerTime := range lastChangeTriggerTimes {
          latency := metrics.SinceInSeconds(lastChangeTriggerTime)
          metrics.NetworkProgrammingLatency.Observe(latency)
          klog.V(4).InfoS("Network programming", "endpoint", klog.KRef(name.Namespace, name.Name), "elapsed", latency)
      // Get legacy bind address
      // currentBindAddrs represents ip addresses bind to DefaultDummyDevice from the system
      currentBindAddrs, err := proxier.netlinkHandle.ListBindAddress(DefaultDummyDevice)
      if err != nil {
        klog.ErrorS(err, "Failed to get bind address")
      legacyBindAddrs := proxier.getLegacyBindAddr(activeBindAddrs, currentBindAddrs)
      // Clean up legacy IPVS services and unbind addresses
      appliedSvcs, err := proxier.ipvs.GetVirtualServers()
      if err == nil {
        for _, appliedSvc := range appliedSvcs {
          currentIPVSServices[appliedSvc.String()] = appliedSvc
      } else {
        klog.ErrorS(err, "Failed to get ipvs service")
      proxier.cleanLegacyService(activeIPVSServices, currentIPVSServices, legacyBindAddrs)
      if proxier.healthzServer != nil {
        proxier.healthzServer.Updated()
      metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime()
      // Update service healthchecks.  The endpoints list might include services that are
      // not "OnlyLocal", but the services list will not, and the serviceHealthServer
      // will just drop those endpoints.
      if err := proxier.serviceHealthServer.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil {
        klog.ErrorS(err, "Error syncing healthcheck services")
      if err := proxier.serviceHealthServer.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil {
        klog.ErrorS(err, "Error syncing healthcheck endpoints")
      // Finish housekeeping.
      // TODO: these could be made more consistent.
      for _, svcIP := range staleServices.UnsortedList() {
        if err := conntrack.ClearEntriesForIP(proxier.exec, svcIP, v1.ProtocolUDP); err != nil {
          klog.ErrorS(err, "Failed to delete stale service IP connections", "ip", svcIP)
      proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints)
    

    proxy 核心逻辑实现这一块的代码相当长,需要对 iptables 和 ipvs 有一定的知识储备才能流畅阅读,像 ipset 这一块,我也是在看的过程中现学的,理解没那么透彻,同步规则 这一块,我也是完整看过了,不过不好组织语言,引用的是别人的内容。