Beehive是一个基于go channel的消息框架,它用于沟通KubeEdge的各个模块。在Beehive中注册的模块可以通过已经注册的模块的group名或者该模块的名字与之进行通信。Beehive支持如下的操作:
1、添加module
2、将module添加到某个group
3、CleanUp(从beehive core和所有groups中删除一个module)
Beehive支持对消息进行如下的操作:
1、发送到一个module/group
2、被某个module接收
3、发送sync到一个module/group
4、发送对sync的response
消息由三部分组成:
1、Header:
1)ID:消息ID,string类型
2)ParentID:出现在对sync的response消息中,string类型
3)TimeStamp:消息生成的时间,int类型
4)Sync:用于说明消息是否为Sync类型的标志位,bool类型
2、Route:
1)Source:消息源,string类型
2)Group:消息被广播到的group,string类型
3)Operation:对资源的操作?,string类型
4)Resource:指定了要操作的资源,string类型
3、Content:消息的内容,interface{}类型
1、edgecore在启动的同时,各个模块就开始尝试向beehive core注册自己;
2、beehive core有一个map,用于保存注册的module,其中module的name是键,module的实现接口是值;
3、当一个模块尝试向beehive core注册自己时,beehive从已经加载的module.yaml配置文件检测该模块是否启用。如果该模块被启用,则将其加入modules map,否则将其加入disabled modules map。
此部分对于理解beehive的操作重要。
1、channels:channels是一个map[string]chan message类型的map,其中module的名字是键值,消息channel,该channel可以用于发送消息到各个模块;
2、chsLock:控制channels字典的锁
3、typeChannels:typeChannels是map[string]string类型的字典,group name是键,map[string]chan message是值(group中每个module的name是键,与对应module连接的channel是值)
4、typeChsLock:控制typeChannels字典的锁
5、anonChannels:anonChannels是map[string] chan message类型的map,其中parentid是键,消息通道是值,该字段的作用是作为sync类型消息的response
6、anonChsLock:控制anonChannels字典的锁
// ChannelContext is object for Context channel
type ChannelContext struct {
//ConfigFactory goarchaius.ConfigurationFactory
channels map[string]chan model.Message
chsLock sync.RWMutex
typeChannels map[string]map[string]chan model.Message
typeChsLock sync.RWMutex
anonChannels map[string]chan model.Message
anonChsLock sync.RWMutex
}1、添加module的操作首先会创建一个chan message类型的channel;
2、然后module的名字作为键,上一步创建的channel作为值会被添加到channels中(map[string] chan message)
e.g.添加edged module
coreContext.Addmodule("edged")// AddModule adds module into module context
func (ctx *ChannelContext) AddModule(module string) {
channel := ctx.newChannel()
ctx.addChannel(module, channel)
}
// addChannel return chan
func (ctx *ChannelContext) addChannel(module string, moduleCh chan model.Message) {
ctx.chsLock.Lock()
defer ctx.chsLock.Unlock()
ctx.channels[module] = moduleCh
}1、添加module到group的操作首先会从channels字典字段得到一个模块的channel;
2、然后module和上一步得到的channel到typeChannels字典中,其中group name是键,map[string]chan message类型的字典是值
e.g.添加edged module到edged group中。下面的例子中,参数1是模块的名字,参数2是group的名字
coreContext.AddModuleGroup("edged","edged")// SendToGroup send msg to modules. Todo: do not stuck
func (ctx *ChannelContext) SendToGroup(moduleType string, message model.Message) {
// avoid exception because of channel closing
// TODO: need reconstruction
defer func() {
if exception := recover(); exception != nil {
klog.Warningf("Recover when sendToGroup message, exception: %+v", exception)
}
}()
send := func(ch chan model.Message) {
select {
case ch <- message:
default:
klog.Warningf("the message channel is full, message: %+v", message)
select {
case ch <- message:
}
}
}
if channelList := ctx.getTypeChannel(moduleType); channelList != nil {
for _, channel := range channelList {
go send(channel)
}
return
}
klog.Warningf("Get bad module type:%s when sendToGroup message, do nothing", moduleType)
}1、CleanUp从channels map中删除此module,并且从所用groups中将其删除(typeChannels字典);
2、与此module进行通信的channel被关闭
e.d. CleanUp edged module
coreContext.CleanUp("edged")// Cleanup close modules
func (ctx *ChannelContext) Cleanup(module string) {
if channel := ctx.getChannel(module); channel != nil {
ctx.delChannel(module)
// decrease probable exception of channel closing
time.Sleep(20 * time.Millisecond)
close(channel)
}
}
// deleteChannel by module name
func (ctx *ChannelContext) delChannel(module string) {
// delete module channel from channels map
ctx.chsLock.Lock()
_, exist := ctx.channels[module]
if !exist {
klog.Warningf("Failed to get channel, module:%s", module)
return
}
delete(ctx.channels, module)
ctx.chsLock.Unlock()
// delete module channel from typechannels map
ctx.typeChsLock.Lock()
for _, moduleMap := range ctx.typeChannels {
if _, exist := moduleMap[module]; exist {
delete(moduleMap, module)
break
}
}
ctx.typeChsLock.Unlock()
}1、从channels字典中得到与要通信模块的对应channel;
2、消息被发送到channel中
e.g. 发送消息到edged module
coreContext.Send("edged",message)// Send send msg to a module. Todo: do not stuck
func (ctx *ChannelContext) Send(module string, message model.Message) {
// avoid exception because of channel closing
// TODO: need reconstruction
defer func() {
if exception := recover(); exception != nil {
klog.Warningf("Recover when send message, exception: %+v", exception)
}
}()
if channel := ctx.getChannel(module); channel != nil {
channel <- message
return
}
klog.Warningf("Get bad module name :%s when send message, do nothing", module)
}
// getChannel return chan
func (ctx *ChannelContext) getChannel(module string) chan model.Message {
ctx.chsLock.RLock()
defer ctx.chsLock.RUnlock()
if _, exist := ctx.channels[module]; exist {
return ctx.channels[module]
}
klog.Warningf("Failed to get channel, type:%s", module)
return nil
}1、SendToGroup从typeChannels字典中得到所有模块的channels字典;
2、在得到的字典中迭代,然后向每个模块发送消息
e.g. 发送消息到edged group的所有模块
coreContext.SendToGroup("edged",message)// SendToGroup send msg to modules. Todo: do not stuck
func (ctx *ChannelContext) SendToGroup(moduleType string, message model.Message) {
// avoid exception because of channel closing
// TODO: need reconstruction
defer func() {
if exception := recover(); exception != nil {
klog.Warningf("Recover when sendToGroup message, exception: %+v", exception)
}
}()
send := func(ch chan model.Message) {
select {
case ch <- message:
default:
klog.Warningf("the message channel is full, message: %+v", message)
select {
case ch <- message:
}
}
}
if channelList := ctx.getTypeChannel(moduleType); channelList != nil {
for _, channel := range channelList {
go send(channel)
}
return
}
klog.Warningf("Get bad module type:%s when sendToGroup message, do nothing", moduleType)
}
// getTypeChannel return chan
func (ctx *ChannelContext) getTypeChannel(moduleType string) map[string]chan model.Message {
ctx.typeChsLock.RLock()
defer ctx.typeChsLock.RUnlock()
if _, exist := ctx.typeChannels[moduleType]; exist {
return ctx.typeChannels[moduleType]
}
klog.Warningf("Failed to get type channel, type:%s", moduleType)
return nil
}1、Receive得到从channels字典中得到一个module的channel;
2、然后它就等待在这个channel上,以期消息的到达并返回。同时如果有Error也会返回。
e.g. 从edged module中接收消息
msg, err:=coreContext.Receive("edged")// Receive msg from channel of module
func (ctx *ChannelContext) Receive(module string) (model.Message, error) {
if channel := ctx.getChannel(module); channel != nil {
content := <-channel
return content, nil
}
klog.Warningf("Failed to get channel for module:%s when receive message", module)
return model.Message{}, fmt.Errorf("failed to get channel for module(%s)", module)
}
// getChannel return chan
func (ctx *ChannelContext) getChannel(module string) chan model.Message {
ctx.chsLock.RLock()
defer ctx.chsLock.RUnlock()
if _, exist := ctx.channels[module]; exist {
return ctx.channels[module]
}
klog.Warningf("Failed to get channel, type:%s", module)
return nil
}1、SendSync接收3个参数(module, message, timeout duration);
2、SendSync首先从channels字典中得到module的通信channel;
3、messages被发送到channel;
4、一个新的用于发送消息的channel被创建并且被添加到annoChannels中,其中messageID是键;
5、在超时之前它都会在自己创建的annoChannel等待接收response;
6、如果在超时之前收到了消息,则消息被返回,err为nil否则返回time out error。
e.g. 给edged module发送sync类型的消息, 超时间隔设为60s
response, err:=coreContext.SendSync("edged", message, 60*time.Second)// SendSync sends message in a sync way
func (ctx *ChannelContext) SendSync(module string, message model.Message, timeout time.Duration) (model.Message, error) {
// avoid exception because of channel closing
// TODO: need reconstruction
defer func() {
if exception := recover(); exception != nil {
klog.Warningf("Recover when sendsync message, exception: %+v", exception)
}
}()
if timeout <= 0 {
timeout = MessageTimeoutDefault
}
deadline := time.Now().Add(timeout)
// make sure to set sync flag
message.Header.Sync = true
// check req/resp channel
reqChannel := ctx.getChannel(module)
if reqChannel == nil {
return model.Message{}, fmt.Errorf("bad request module name(%s)", module)
}
sendTimer := time.NewTimer(timeout)
select {
case reqChannel <- message:
case <-sendTimer.C:
return model.Message{}, errors.New("timeout to send message")
}
sendTimer.Stop()
// new anonymous channel for response
anonChan := make(chan model.Message)
anonName := getAnonChannelName(message.GetID())
ctx.anonChsLock.Lock()
ctx.anonChannels[anonName] = anonChan
ctx.anonChsLock.Unlock()
defer func() {
ctx.anonChsLock.Lock()
delete(ctx.anonChannels, anonName)
close(anonChan)
ctx.anonChsLock.Unlock()
}()
var resp model.Message
respTimer := time.NewTimer(time.Until(deadline))
select {
case resp = <-anonChan:
case <-respTimer.C:
return model.Message{}, errors.New("timeout to get response")
}
respTimer.Stop()
return resp, nil
}
// getChannel return chan
func (ctx *ChannelContext) getChannel(module string) chan model.Message {
ctx.chsLock.RLock()
defer ctx.chsLock.RUnlock()
if _, exist := ctx.channels[module]; exist {
return ctx.channels[module]
}
klog.Warningf("Failed to get channel, type:%s", module)
return nil
}
func getAnonChannelName(msgID string) string {
return msgID
}1、从group的typeChannels字典中得到modules的列表;
2、创建用于发送消息的channel,channel的缓冲区大小设置成和group中的modules的个数一致,并且被插入anonChannels字典中作为值,键为messageID;
3、在所有modules的channels上发送消息;
4、在超时之间等待。如果anonChannel的长度和group中modules的长度都不一致,则会检查channel中的message是否patentID=messageID。如果不是则返回error否则返回nil error;
5、如果超时则返回timeout error
e.g. 向edged group发送sync类型的消息,超时间隔设置为60s
err:=coreContext.SendToGroupSync("edged",message,60*time.Second)// SendToGroupSync : broadcast the message to echo module channel, the module send response back anon channel
// check timeout and the size of anon channel
func (ctx *ChannelContext) SendToGroupSync(moduleType string, message model.Message, timeout time.Duration) error {
// avoid exception because of channel closing
// TODO: need reconstruction
defer func() {
if exception := recover(); exception != nil {
klog.Warningf("Recover when sendToGroupsync message, exception: %+v", exception)
}
}()
if timeout <= 0 {
timeout = MessageTimeoutDefault
}
deadline := time.Now().Add(timeout)
channelList := ctx.getTypeChannel(moduleType)
if channelList == nil {
return fmt.Errorf("failed to get module type(%s) channel list", moduleType)
}
// echo module must sync a response,
// let anonchan size be module number
channelNumber := len(channelList)
anonChan := make(chan model.Message, channelNumber)
anonName := getAnonChannelName(message.GetID())
ctx.anonChsLock.Lock()
ctx.anonChannels[anonName] = anonChan
ctx.anonChsLock.Unlock()
cleanup := func() error {
ctx.anonChsLock.Lock()
delete(ctx.anonChannels, anonName)
close(anonChan)
ctx.anonChsLock.Unlock()
var uninvitedGuests int
// cleanup anonchan and check parentid for resp
for resp := range anonChan {
if resp.GetParentID() != message.GetID() {
uninvitedGuests++
}
}
if uninvitedGuests != 0 {
klog.Errorf("Get some unexpected:%d resp when sendToGroupsync message", uninvitedGuests)
return fmt.Errorf("got some unexpected(%d) resp", uninvitedGuests)
}
return nil
}
// make sure to set sync flag before sending
message.Header.Sync = true
var timeoutCounter int32
send := func(ch chan model.Message) {
sendTimer := time.NewTimer(time.Until(deadline))
select {
case ch <- message:
sendTimer.Stop()
case <-sendTimer.C:
atomic.AddInt32(&timeoutCounter, 1)
}
}
for _, channel := range channelList {
go send(channel)
}
sendTimer := time.NewTimer(time.Until(deadline))
ticker := time.NewTicker(TickerTimeoutDefault)
for {
// annonChan is full
if len(anonChan) == channelNumber {
break
}
select {
case <-ticker.C:
case <-sendTimer.C:
cleanup()
if timeoutCounter != 0 {
errInfo := fmt.Sprintf("timeout to send message, several %d timeout when send", timeoutCounter)
return fmt.Errorf(errInfo)
}
klog.Error("Timeout to sendToGroupsync message")
return fmt.Errorf("Timeout to send message")
}
}
return cleanup()
}
// getTypeChannel return chan
func (ctx *ChannelContext) getTypeChannel(moduleType string) map[string]chan model.Message {
ctx.typeChsLock.RLock()
defer ctx.typeChsLock.RUnlock()
if _, exist := ctx.typeChannels[moduleType]; exist {
return ctx.typeChannels[moduleType]
}
klog.Warningf("Failed to get type channel, type:%s", moduleType)
return nil
}
func getAnonChannelName(msgID string) string {
return msgID
}1、SendResp被用于向sync类型的消息发送response;
2、response的parentID必须设置的和sync消息的messageID相同;
3、当SendResp被调用时,首先会检查对于response消息的parentID,是否有anonChannels类型的通道存在;
4、如果有channel,response消息被发送到对应的channel上。
5、否则其他类型的error被记录下。
coreContext.SendResp(respMessage)// SendResp send resp for this message when using sync mode
func (ctx *ChannelContext) SendResp(message model.Message) {
anonName := getAnonChannelName(message.GetParentID())
ctx.anonChsLock.RLock()
defer ctx.anonChsLock.RUnlock()
if channel, exist := ctx.anonChannels[anonName]; exist {
channel <- message
return
}
klog.V(4).Infof("Get bad anonName:%s when sendresp message, do nothing", anonName)
}
func getAnonChannelName(msgID string) string {
return msgID
}