k8s-client-go源码剖析(三)

四颗咖啡豆 2021-01-23 01:28:37
源码 剖析 client k8s-client-go


云原生社区活动---Kubernetes源码剖析第一期第三周作业, 也是最后一周作业.

本文主要讲述下client-go中workqueue, 看一下client-go的一个整体数据走向.如下图:

而workqueue主要是在listener这里引用,listener使用chan获取到数据之后将数据放入到工作队列进行处理。主要是由于chan过于简单,已经无法满足K8S的场景,所以衍生出了workqueue,

特性


  1. 有序
  2. 去重
  3. 并发
  4. 延迟处理
  5. 限速

当前有三种workqueue


  1. 基本队列
  2. 延迟队列
  3. 限速队列

其中延迟队列是基于基本队列实现的,而限流队列基于延迟队列实现

基本队列


看一下基本队列的接口

// client-go源码路径util/workqueue/queue.go
type Interface interface {
//新增元素 可以是任意对象
Add(item interface{})
//获取当前队列的长度
Len() int
// 阻塞获取头部元素(先入先出) 返回元素以及队列是否关闭
Get() (item interface{}, shutdown bool)
// 显示标记完成元素的处理
Done(item interface{})
//关闭队列
ShutDown()
//队列是否处于关闭状态
ShuttingDown() bool
}

看一下基本队列的数据结构,只看三个重点处理的,其他的没有展示出来

type Type struct {
//含有所有元素的元素的队列 保证有序
queue []t
//所有需要处理的元素 set是基于map以value为空struct实现的结构,保证去重
dirty set
//当前正在处理中的元素
processing set
...
}
type empty struct{}
type t interface{}
type set map[t]empty

基本队列的hello world也很简单

 wq := workqueue.New()
wq.Add("hello")
v, _ := wq.Get()

基本队列Add


func (q *Type) Add(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
//如果当前处于关闭状态,则不再新增元素
if q.shuttingDown {
return
}
//如果元素已经在等待处理中,则不再新增
if q.dirty.has(item) {
return
}
//添加到metrics
q.metrics.add(item)
//加入等待处理中
q.dirty.insert(item)
//如果目前正在处理该元素 就不将元素添加到队列
if q.processing.has(item) {
return
}
q.queue = append(q.queue, item)
q.cond.Signal()
}

基本队列Get


func (q *Type) Get() (item interface{}, shutdown bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
//如果当前没有元素并且不处于关闭状态,则阻塞
for len(q.queue) == 0 && !q.shuttingDown {
q.cond.Wait()
}
...
item, q.queue = q.queue[0], q.queue[1:]
q.metrics.get(item)
//把元素添加到正在处理队列中
q.processing.insert(item)
//把队列从等待处理队列中删除
q.dirty.delete(item)
return item, false
}

基本队列实例化


func newQueue(c clock.Clock, metrics queueMetrics, updatePeriod time.Duration) *Type {
t := &Type{
clock: c,
dirty: set{},
processing: set{},
cond: sync.NewCond(&sync.Mutex{}),
metrics: metrics,
unfinishedWorkUpdatePeriod: updatePeriod,
}
//启动一个协程 定时更新metrics
go t.updateUnfinishedWorkLoop()
return t
}
func (q *Type) updateUnfinishedWorkLoop() {
t := q.clock.NewTicker(q.unfinishedWorkUpdatePeriod)
defer t.Stop()
for range t.C() {
if !func() bool {
q.cond.L.Lock()
defer q.cond.L.Unlock()
if !q.shuttingDown {
q.metrics.updateUnfinishedWork()
return true
}
return false
}() {
return
}
}
}

延迟队列


延迟队列的实现思路主要是使用优先队列存放需要延迟添加的元素,每次判断最小延迟的元素书否已经达到了加入队列的要求(延迟的时间到了),如果是则判断下一个元素,直到没有元素或者元素还需要延迟为止。

看一下延迟队列的数据结构

type delayingType struct {
Interface
...
//放置延迟添加的元素
waitingForAddCh chan *waitFor
...
}

主要是使用chan来保存延迟添加的元素,而具体实现是通过一个实现了一个AddAfter方法,看一下具体的内容

//延迟队列的接口
type DelayingInterface interface {
Interface
// AddAfter adds an item to the workqueue after the indicated duration has passed
AddAfter(item interface{}, duration time.Duration)
}
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
...
//如果延迟实现小于等于0 直接添加到队列
if duration <= 0 {
q.Add(item)
return
}
select {
case <-q.stopCh:
//添加到chan,下面会讲一下这个chan的处理
case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
}
}

延迟元素的处理

func (q *delayingType) waitingLoop() {
defer utilruntime.HandleCrash()
never := make(<-chan time.Time)
var nextReadyAtTimer clock.Timer
waitingForQueue := &waitForPriorityQueue{}
//这里是初始化一个优先队列 具体实现有兴趣的同学可以研究下
heap.Init(waitingForQueue)
waitingEntryByData := map[t]*waitFor{}
for {
if q.Interface.ShuttingDown() {
return
}
now := q.clock.Now()
// Add ready entries
for waitingForQueue.Len() > 0 {
entry := waitingForQueue.Peek().(*waitFor)
//看一下第一个元素是否已经到达延迟的时间了
if entry.readyAt.After(now) {
break
}
//时间到了,将元素添加到工作的队列,并且从延迟的元素中移除
entry = heap.Pop(waitingForQueue).(*waitFor)
q.Add(entry.data)
delete(waitingEntryByData, entry.data)
}
// Set up a wait for the first item's readyAt (if one exists)
nextReadyAt := never
if waitingForQueue.Len() > 0 {
if nextReadyAtTimer != nil {
nextReadyAtTimer.Stop()
}
//如果还有需要延迟的元素,计算第一个元素的延迟时间(最小延迟的元素)
entry := waitingForQueue.Peek().(*waitFor)
nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
nextReadyAt = nextReadyAtTimer.C()
}
select {
case <-q.stopCh:
return
case <-q.heartbeat.C():
//定时检查下是否有元素达到延迟的时间
case <-nextReadyAt:
//这里是上面计算出来的时间,时间到了,处理到达延迟时间的元素
case waitEntry := <-q.waitingForAddCh:
//检查是否需要延迟,如果需要延迟就加入到延迟等待
if waitEntry.readyAt.After(q.clock.Now()) {
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
//如果不需要延迟就直接添加到队列
q.Add(waitEntry.data)
}
drained := false
for !drained {
select {
case waitEntry := <-q.waitingForAddCh: 

上面waitingLoop 是在实例化延迟队列的时候调用的,看一下实例化时候的逻辑

func NewDelayingQueueWithCustomClock(clock clock.Clock, name string) DelayingInterface {
//实例化一个数据结构
ret := &delayingType{
Interface: NewNamed(name),
clock: clock,
heartbeat: clock.NewTicker(maxWait),
stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(name),
}
//放到一个协程中处理延迟元素
go ret.waitingLoop()
return ret
}

限速队列


当前限速队列支持4中限速模式

  1. 令牌桶算法限速
  2. 排队指数限速
  3. 计数器模式
  4. 混合模式(多种限速算法同时使用)

限速队列的底层实际上还是通过延迟队列来进行限速,通过计算出元素的限速时间作为延迟时间

来看一下限速接口

type RateLimiter interface {
//
When(item interface{}) time.Duration
// Forget indicates that an item is finished being retried. Doesn't matter whether its for perm failing
// or for success, we'll stop tracking it
Forget(item interface{})
// NumRequeues returns back how many failures the item has had
NumRequeues(item interface{}) int
}

看一下限速队列的数据结构

// RateLimitingInterface is an interface that rate limits items being added to the queue.
type RateLimitingInterface interface {
DelayingInterface
//实际上底层还是调用的延迟队列,通过计算出元素的延迟时间 进行限速
AddRateLimited(item interface{})
// Forget indicates that an item is finished being retried. Doesn't matter whether it's for perm failing
// or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you
// still have to call `Done` on the queue.
Forget(item interface{})
// NumRequeues returns back how many times the item was requeued
NumRequeues(item interface{}) int
}
func (q *rateLimitingType) AddRateLimited(item interface{}) {
//通过when方法计算延迟加入队列的时间
q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}

令牌桶算法


client-go中的令牌桶限速是通过 golang.org/x/time/rat包来实现的

可以通过 flowcontrol.NewTokenBucketRateLimiter(qps float32, burst int) 来使用令牌桶限速算法,其中第一个参数qps表示每秒补充多少token,burst表示总token上限为多少。

排队指数算法


排队指数可以通过 workqueue.NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) 来使用。

这个算法有两个参数:

  1. baseDelay 基础限速时间
  2. maxDelay 最大限速时间

举个例子来理解一下这个算法,例如快速插入5个相同元素,baseDelay设置为1秒,maxDelay设置为10秒,都在同一个限速期内。第一个元素会在1秒后加入到队列,第二个元素会在2秒后加入到队列,第三个元素会在4秒后加入到队列,第四个元素会在8秒后加入到队列,第五个元素会在10秒后加入到队列(指数计算的结果为16,但是最大值设置了10秒)。

来看一下源码的计算

func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
//第一次为0
exp := r.failures[item]
//累加1
r.failures[item] = r.failures[item] + 1
//通过当前计数和baseDelay计算指数结果 baseDelay*(2的exp次方)
backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
if backoff > math.MaxInt64 {
return r.maxDelay
}
calculated := time.Duration(backoff)
if calculated > r.maxDelay {
return r.maxDelay
}
return calculated
}

计数器模式


计数器模式可以通过 workqueue.NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int)来使用,有三个参数

  1. fastDelay 快限速时间
  2. slowDelay 慢限速时间
  3. maxFastAttempts 快限速元素个数

原理是这样的,假设fastDelay设置为1秒,slowDelay设置为10秒,maxFastAttempts设置为3,同样在一个限速周期内快速插入5个相同的元素。前三个元素都是以1秒的限速时间加入到队列,添加第四个元素时开始使用slowDelay限速时间,也就是10秒后加入到队列,后面的元素都将以10秒的限速时间加入到队列,直到限速周期结束。

来看一下源码

func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
//添加一次就计数一次
r.failures[item] = r.failures[item] + 1
//计数小于maxFastAttempts都以fastDelay为限速时间,否则以slowDelay为限速时间
if r.failures[item] <= r.maxFastAttempts {
return r.fastDelay
}
return r.slowDelay
}

混合模式


最后一种是混合模式,可以组合使用不同的限速算法实例化限速队列

func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter {
return &MaxOfRateLimiter{limiters: limiters}
}

总结


在k8s-client-go的源码中可以看到,大量的接口组合运用,将各种功能拆分成各个细小的库,是一种非常值得学习的代码风格以及思路。

始发于 四颗咖啡豆,转载请声明出处.
关注公粽号->[四颗咖啡豆] 获取最新内容
版权声明
本文为[四颗咖啡豆]所创,转载请带上原文链接,感谢
https://segmentfault.com/a/1190000039065952

  1. 【计算机网络 12(1),尚学堂马士兵Java视频教程
  2. 【程序猿历程,史上最全的Java面试题集锦在这里
  3. 【程序猿历程(1),Javaweb视频教程百度云
  4. Notes on MySQL 45 lectures (1-7)
  5. [computer network 12 (1), Shang Xuetang Ma soldier java video tutorial
  6. The most complete collection of Java interview questions in history is here
  7. [process of program ape (1), JavaWeb video tutorial, baidu cloud
  8. Notes on MySQL 45 lectures (1-7)
  9. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  10. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  11. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  12. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  13. 【递归,Java传智播客笔记
  14. [recursion, Java intelligence podcast notes
  15. [adhere to painting for 386 days] the beginning of spring of 24 solar terms
  16. K8S系列第八篇(Service、EndPoints以及高可用kubeadm部署)
  17. K8s Series Part 8 (service, endpoints and high availability kubeadm deployment)
  18. 【重识 HTML (3),350道Java面试真题分享
  19. 【重识 HTML (2),Java并发编程必会的多线程你竟然还不会
  20. 【重识 HTML (1),二本Java小菜鸟4面字节跳动被秒成渣渣
  21. [re recognize HTML (3) and share 350 real Java interview questions
  22. [re recognize HTML (2). Multithreading is a must for Java Concurrent Programming. How dare you not
  23. [re recognize HTML (1), two Java rookies' 4-sided bytes beat and become slag in seconds
  24. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  25. RPC 1: how to develop RPC framework from scratch
  26. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  27. RPC 1: how to develop RPC framework from scratch
  28. 一次性捋清楚吧,对乱糟糟的,Spring事务扩展机制
  29. 一文彻底弄懂如何选择抽象类还是接口,连续四年百度Java岗必问面试题
  30. Redis常用命令
  31. 一双拖鞋引发的血案,狂神说Java系列笔记
  32. 一、mysql基础安装
  33. 一位程序员的独白:尽管我一生坎坷,Java框架面试基础
  34. Clear it all at once. For the messy, spring transaction extension mechanism
  35. A thorough understanding of how to choose abstract classes or interfaces, baidu Java post must ask interview questions for four consecutive years
  36. Redis common commands
  37. A pair of slippers triggered the murder, crazy God said java series notes
  38. 1、 MySQL basic installation
  39. Monologue of a programmer: despite my ups and downs in my life, Java framework is the foundation of interview
  40. 【大厂面试】三面三问Spring循环依赖,请一定要把这篇看完(建议收藏)
  41. 一线互联网企业中,springboot入门项目
  42. 一篇文带你入门SSM框架Spring开发,帮你快速拿Offer
  43. 【面试资料】Java全集、微服务、大数据、数据结构与算法、机器学习知识最全总结,283页pdf
  44. 【leetcode刷题】24.数组中重复的数字——Java版
  45. 【leetcode刷题】23.对称二叉树——Java版
  46. 【leetcode刷题】22.二叉树的中序遍历——Java版
  47. 【leetcode刷题】21.三数之和——Java版
  48. 【leetcode刷题】20.最长回文子串——Java版
  49. 【leetcode刷题】19.回文链表——Java版
  50. 【leetcode刷题】18.反转链表——Java版
  51. 【leetcode刷题】17.相交链表——Java&python版
  52. 【leetcode刷题】16.环形链表——Java版
  53. 【leetcode刷题】15.汉明距离——Java版
  54. 【leetcode刷题】14.找到所有数组中消失的数字——Java版
  55. 【leetcode刷题】13.比特位计数——Java版
  56. oracle控制用户权限命令
  57. 三年Java开发,继阿里,鲁班二期Java架构师
  58. Oracle必须要启动的服务
  59. 万字长文!深入剖析HashMap,Java基础笔试题大全带答案
  60. 一问Kafka就心慌?我却凭着这份,图灵学院vip课程百度云