Kubernetes任务调用Job与CronJob及源码分析( 三 )


  • Allow , 这也是默认情况 , 这意味着这些 Job 可以同时存在;
  • Forbid , 这意味着不会创建新的 Pod , 该创建周期被跳过;
  • Replace , 这意味着新产生的 Job 会替换旧的、没有执行完的 Job 。
如果某一次 Job 创建失败 , 这次创建就会被标记为“miss” 。 当在指定的时间窗口内 , miss 的数目达到 100 时 , 那么 CronJob 会停止再创建这个 Job 。
spec.startingDeadlineSeconds可以指定这个时间窗口 。 startingDeadlineSeconds=200意味着过去 200 s 里 , 如果 miss 的数目达到了 100 次 , 那么这个 Job 就不会被创建执行了 。
cronjob源码分析CronJob的源码在cronjob_controller.go中 , 主要实现是在Controller的syncAll方法中 。
下面我们看看CronJob是在源码中如何创建运行的:
Controller#syncAll
func (jm *Controller) syncAll() {//列出所有的jobjobListFunc := func(opts metav1.ListOptions) (runtime.Object, error) {return jm.kubeClient.BatchV1().Jobs(metav1.NamespaceAll).List(context.TODO(), opts)}js := make([]batchv1.Job, 0)//遍历jobListFunc然后将状态正常的job放入到js集合中err := pager.New(pager.SimplePageFunc(jobListFunc)).EachListItem(context.Background(), metav1.ListOptions{}, func(object runtime.Object) error {jobTmp, ok := object.(*batchv1.Job)if !ok {return fmt.Errorf("expected type *batchv1.Job, got type %T", jobTmp)}js = append(js, *jobTmp)return nil})...//列出所有的cronJobscronJobListFunc := func(opts metav1.ListOptions) (runtime.Object, error) {return jm.kubeClient.BatchV1beta1().CronJobs(metav1.NamespaceAll).List(context.TODO(), opts)}//遍历所有的jobs , 根据ObjectMeta.OwnerReference字段确定该job是否由cronJob所创建//key为uid , value为job集合jobsByCj := groupJobsByParent(js)klog.V(4).Infof("Found %d groups", len(jobsByCj))//遍历cronJobserr = pager.New(pager.SimplePageFunc(cronJobListFunc)).EachListItem(context.Background(), metav1.ListOptions{}, func(object runtime.Object) error {cj, ok := object.(*batchv1beta1.CronJob)if !ok {return fmt.Errorf("expected type *batchv1beta1.CronJob, got type %T", cj)}//进行同步syncOne(cj, jobsByCj[cj.UID], time.Now(), jm.jobControl, jm.cjControl, jm.recorder)//清理所有已经完成的jobscleanupFinishedJobs(cj, jobsByCj[cj.UID], jm.jobControl, jm.cjControl, jm.recorder)return nil})if err != nil {utilruntime.HandleError(fmt.Errorf("Failed to extract cronJobs list: %v", err))return}}syncAll方法会列出所有job以及对应的cronJobs , 然后按照cronJobs来进行归类 , 然后遍历这个列表调用syncOne方法进行同步 , 之后再调用cleanupFinishedJobs清理所有已经完成的jobs 。
然后我们再看看syncOne是具体怎么处理job的:
syncOne
func syncOne(cj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobControlInterface, cjc cjControlInterface, recorder record.EventRecorder) {nameForLog := fmt.Sprintf("%s/%s", cj.Namespace, cj.Name)childrenJobs := make(map[types.UID]bool)//遍历job列表for _, j := range js {childrenJobs[j.ObjectMeta.UID] = true//查看这个job是否是在Active列表中found := inActiveList(*cj, j.ObjectMeta.UID)//如果这个job不是在Active列表中 , 并且这个job还没有跑完 , 发送一个异常事件 。if !found!found {recorder.Eventf(cj, v1.EventTypeNormal, "MissingJob", "Active job went missing: %v", j.Name)deleteFromActiveList(cj, j.UID)}}//上面做了cronJob的Active列表的修改 , 所以需要更新一下状态updatedCJ, err := cjc.UpdateStatus(cj)if err != nil {klog.Errorf("Unable to update status for %s (rv = %s): %v", nameForLog, cj.ResourceVersion, err)return}*cj = *updatedCJ//cronJob已经被删除了 , 直接返回if cj.DeletionTimestamp != nil {return}//cronJob处于suspend , 直接返回if cj.Spec.Suspend != nilerr != nil {klog.Infof("Unable to update status for %s (rv = %s): %v", nameForLog, cj.ResourceVersion, err)}return}


推荐阅读