文章
问答
冒泡
containerd源码分析-[2]cri插件

containerd-v1.7.0

此篇正式开启插件启用流程分析。

源码分析

初始化入口

pkg/cri/cri.go:42

// Register CRI service plugin
func init() {
    // 默认配置
	config := criconfig.DefaultConfig()
    // 必要信息注册
	plugin.Register(&plugin.Registration{
        // GRPC Plugin
		Type:   plugin.GRPCPlugin, 
		ID:     "cri",
		Config: &config,
        // Requires 插件,对于顶层 `app.Run()` 中
		Requires: []plugin.Type{
			plugin.EventPlugin,
			plugin.ServicePlugin,
			plugin.NRIApiPlugin,
		},
        // 初始化函数
		InitFn: initCRIService,
	})
}

CRIService 初始化流程

pkg/cri/cri.go:57

func initCRIService(ic *plugin.InitContext) (interface{}, error) {
	...
	// 上下文传递
	ctx := ic.Context
	// plugin 配置
	pluginConfig := ic.Config.(*criconfig.PluginConfig)
	// 校验 plugin 配置
	if err := criconfig.ValidatePluginConfig(ctx, pluginConfig); err != nil {
		return nil, fmt.Errorf("invalid plugin config: %w", err)
	}
	// 初始化 criconfig
	c := criconfig.Config{
		PluginConfig:       *pluginConfig,
		ContainerdRootDir:  filepath.Dir(ic.Root),
		ContainerdEndpoint: ic.Address,
		RootDir:            ic.Root,
		StateDir:           ic.State,
	}
	...
	// 构造 contaninerd client
	client, err := containerd.New(
		"",
		containerd.WithDefaultNamespace(constants.K8sContainerdNamespace),
		containerd.WithDefaultPlatform(platforms.Default()),
		// WithInMemoryServices适用于需要从另一个(内存)containerd插件(如CRI)使用containerd客户端的情况。
		containerd.WithInMemoryServices(ic),
	)
	...
	// 根据环境变量 ENABLE_CRI_SANDDBOXES 配置构造 CRIService
	var s server.CRIService
	if os.Getenv("ENABLE_CRI_SANDBOXES") != "" {
		log.G(ctx).Info("using experimental CRI Sandbox server - unset ENABLE_CRI_SANDBOXES to disable")
		s, err = sbserver.NewCRIService(c, client, getNRIAPI(ic))
	} else {
		log.G(ctx).Info("using legacy CRI server")
		s, err = server.NewCRIService(c, client, getNRIAPI(ic))
	}
	...
	// 启动协程运行 CRIService
	go func() {
		if err := s.Run(); err != nil {
			log.G(ctx).WithError(err).Fatal("Failed to run CRI service")
		}
		// TODO(random-liu): Whether and how we can stop containerd.
	}()
	...
}

关于 NRI 的介绍详见 NRI:下一代节点细粒度资源控制方案

构造 CRIService 服务

criService 结构体定义

pkg/cri/server/service.go:71

type criService struct {
	// cri 配置
	config criconfig.Config
	// 镜像文件系统路径
	imageFSPath string
	// 模拟操作系统级操作
	os osinterface.OS
	// sandboxes 相关资源
	sandboxStore *sandboxstore.Store
	// 存储所有 sandbox name 保证其唯一性
	sandboxNameIndex *registrar.Registrar
	// 存储 containers 相关资源
	containerStore *containerstore.Store
	// 存储所有 container name 保证其唯一性
	containerNameIndex *registrar.Registrar
	// 存储 images 相关资源
	imageStore *imagestore.Store
	// 存储所有 snapshots 信息
	snapshotStore *snapshotstore.Store
	// netPlugin 用于运行/停止 pod sandbox 时 配置/清除 网络
	netPlugin map[string]cni.CNI
	// client 为 containerd 客户端实例
	client *containerd.Client
	// streamServer 为处理 container streaming 请求的服务端
	streamServer streaming.Server
	// eventMonitor 为监控 containerd events 的监视器
	eventMonitor *eventMonitor
	// initialized 表明所有服务是否已经初始化了,在 server 被初始化之前,所有的 GRPC 服务必须返回 error
	initialized atomic.Bool
	// cniNetConfMonitor 用于重载 cni network 配置,当位于 network conf dir 中的配置文件发生可用变化时需要重载配置
	cniNetConfMonitor map[string]*cniNetConfSyncer
	// baseOCISpecs 包含通过 Runtime.BaseRuntimeSpec 缓存的 OCI specs
	baseOCISpecs map[string]*oci.Spec
	// allCaps 为 capabilities 列表,当为空时, 从 /proc/self/status 中解析获取
	allCaps []string
	// unpackDuplicationSuppressor 用于保证只有唯一一个 fetch request 或者 unpack handler 来处理 
	unpackDuplicationSuppressor kmutex.KeyedLocker
	// nri 用于在处理 CRI 请求的时候回调 NRI
	nri *nri.API
	// containerEventsChan 用于捕获 container 事件,并将其发送到 GetContainerEvents 调用者 
	containerEventsChan chan runtime.ContainerEventResponse
}

NewCRIService 构造

pkg/cri/server/service.go:123

func NewCRIService(config criconfig.Config, client *containerd.Client, nri *nri.API) (CRIService, error) {
	var err error
	labels := label.NewStore()
	c := &criService{
		config:                      config,
		client:                      client,
		os:                          osinterface.RealOS{},
		sandboxStore:                sandboxstore.NewStore(labels),
		containerStore:              containerstore.NewStore(labels),
		imageStore:                  imagestore.NewStore(client),
		snapshotStore:               snapshotstore.NewStore(),
		sandboxNameIndex:            registrar.NewRegistrar(),
		containerNameIndex:          registrar.NewRegistrar(),
		initialized:                 atomic.NewBool(false),
		netPlugin:                   make(map[string]cni.CNI),
		unpackDuplicationSuppressor: kmutex.New(),
	}

	// TODO: figure out a proper channel size.
	c.containerEventsChan = make(chan runtime.ContainerEventResponse, 1000)

	// SnapshotService 检查
	if client.SnapshotService(c.config.ContainerdConfig.Snapshotter) == nil {
		return nil, fmt.Errorf("failed to find snapshotter %q", c.config.ContainerdConfig.Snapshotter)
	}

	// 镜像文件系统路径构造
	c.imageFSPath = imageFSPath(config.ContainerdRootDir, config.ContainerdConfig.Snapshotter)
	logrus.Infof("Get image filesystem path %q", c.imageFSPath)

	// 冗余设计用于在 非 windows 和  linux 系统汇中初始化
	if err := c.initPlatform(); err != nil {
		return nil, fmt.Errorf("initialize platform: %w", err)
	}

	// 初始化 stream server
	c.streamServer, err = newStreamServer(c, config.StreamServerAddress, config.StreamServerPort, config.StreamIdleTimeout)
	if err != nil {
		return nil, fmt.Errorf("failed to create stream server: %w", err)
	}

	// 初始化 event monitor
	c.eventMonitor = newEventMonitor(c)

	// 初始化 cni net conf monitor
	c.cniNetConfMonitor = make(map[string]*cniNetConfSyncer)
	for name, i := range c.netPlugin {
		path := c.config.NetworkPluginConfDir
		if name != defaultNetworkPlugin {
			if rc, ok := c.config.Runtimes[name]; ok {
				path = rc.NetworkPluginConfDir
			}
		}
		if path != "" {
			m, err := newCNINetConfSyncer(path, i, c.cniLoadOptions())
			if err != nil {
				return nil, fmt.Errorf("failed to create cni conf monitor for %s: %w", name, err)
			}
			c.cniNetConfMonitor[name] = m
		}
	}

	// 预加载 base OCI specs
	c.baseOCISpecs, err = loadBaseOCISpecs(&config)
	if err != nil {
		return nil, err
	}

	// 加载 sandbox controllers(pod sandbox controller and remote shim controller)
	c.sandboxControllers[criconfig.ModePodSandbox] = podsandbox.New(config, client, c.sandboxStore, c.os, c, c.baseOCISpecs)
	c.sandboxControllers[criconfig.ModeShim] = client.SandboxController()

	c.nri = nri

	return c, nil
}

关于 SELinux-Label详解

启动 CRI Service

pkg/cri/server/service.go:207

func (c *criService) Run() error {
	logrus.Info("Start subscribing containerd event")
	// 注册 event 事件订阅者
	c.eventMonitor.subscribe(c.client)

	logrus.Infof("Start recovering state")
	// 通过 containerd 和 status checkpoint 恢复 system 状态
	if err := c.recover(ctrdutil.NamespacedContext()); err != nil {
		return fmt.Errorf("failed to recover state: %w", err)
	}

	// Start event handler.
	logrus.Info("Start event monitor")
	// 启动 eventMonitor
	eventMonitorErrCh := c.eventMonitor.start()

	// Start snapshot stats syncer, it doesn't need to be stopped.
	logrus.Info("Start snapshots syncer")
	// 构造 snapshotSyncer
	snapshotsSyncer := newSnapshotsSyncer(
		c.snapshotStore,
		c.client.SnapshotService(c.config.ContainerdConfig.Snapshotter),
		time.Duration(c.config.StatsCollectPeriod)*time.Second,
	)
	// 启动 snapshotsSyncer
	snapshotsSyncer.start()

	// 启动 CNI network conf syncers
	cniNetConfMonitorErrCh := make(chan error, len(c.cniNetConfMonitor))
	var netSyncGroup sync.WaitGroup
	for name, h := range c.cniNetConfMonitor {
		netSyncGroup.Add(1)
		logrus.Infof("Start cni network conf syncer for %s", name)
		go func(h *cniNetConfSyncer) {
			cniNetConfMonitorErrCh <- h.syncLoop()
			netSyncGroup.Done()
		}(h)
	}
	
	if len(c.cniNetConfMonitor) > 0 {
		go func() {
			netSyncGroup.Wait()
			close(cniNetConfMonitorErrCh)
		}()
	}

	// 启动 streaming server.
	logrus.Info("Start streaming server")
	streamServerErrCh := make(chan error)
	go func() {
		defer close(streamServerErrCh)
		if err := c.streamServer.Start(true); err != nil && err != http.ErrServerClosed {
			logrus.WithError(err).Error("Failed to start streaming server")
			streamServerErrCh <- err
		}
	}()

	// 在 NRI 中注册CRI domain
	if err := c.nri.Register(&criImplementation{c}); err != nil {
		return fmt.Errorf("failed to set up NRI for CRI service: %w", err)
	}

	// 设置 server 为 初始化状态. GRPC services 正式工作.
	c.initialized.Set()

	var eventMonitorErr, streamServerErr, cniNetConfMonitorErr error
	select {
	case eventMonitorErr = <-eventMonitorErrCh:
	case streamServerErr = <-streamServerErrCh:
	case cniNetConfMonitorErr = <-cniNetConfMonitorErrCh:
	}
	if err := c.Close(); err != nil {
		return fmt.Errorf("failed to stop cri service: %w", err)
	}
	
	if err := <-eventMonitorErrCh; err != nil {
		eventMonitorErr = err
	}
	logrus.Info("Event monitor stopped")
	if err := <-streamServerErrCh; err != nil {
		streamServerErr = err
	}
	logrus.Info("Stream server stopped")
	if eventMonitorErr != nil {
		return fmt.Errorf("event monitor error: %w", eventMonitorErr)
	}
	if streamServerErr != nil {
		return fmt.Errorf("stream server error: %w", streamServerErr)
	}
	if cniNetConfMonitorErr != nil {
		return fmt.Errorf("cni network conf monitor error: %w", cniNetConfMonitorErr)
	}

	return nil
}

流程架构图整理

CRI-work-flow


关于作者

Kirago
个人站点 https://kiragoo.github.io/
获得点赞
文章被阅读