mirror of
https://codeberg.org/crowci/crow.git
synced 2025-08-07 20:23:03 +03:00
Kubernetes refactor (#2794)
Kubernetes backend refactoring and tests --------- Co-authored-by: 6543 <6543@obermui.de>
This commit is contained in:
@@ -30,7 +30,6 @@ import (
|
||||
|
||||
"github.com/urfave/cli/v2"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
@@ -46,16 +45,16 @@ const (
|
||||
EngineName = "kubernetes"
|
||||
)
|
||||
|
||||
var noContext = context.Background()
|
||||
var defaultDeleteOptions = newDefaultDeleteOptions()
|
||||
|
||||
type kube struct {
|
||||
ctx context.Context
|
||||
client kubernetes.Interface
|
||||
config *Config
|
||||
config *config
|
||||
goos string
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
type config struct {
|
||||
Namespace string
|
||||
StorageClass string
|
||||
VolumeSize string
|
||||
@@ -68,10 +67,20 @@ type SecurityContextConfig struct {
|
||||
RunAsNonRoot bool
|
||||
}
|
||||
|
||||
func configFromCliContext(ctx context.Context) (*Config, error) {
|
||||
func newDefaultDeleteOptions() metav1.DeleteOptions {
|
||||
gracePeriodSeconds := int64(0) // immediately
|
||||
propagationPolicy := metav1.DeletePropagationBackground
|
||||
|
||||
return metav1.DeleteOptions{
|
||||
GracePeriodSeconds: &gracePeriodSeconds,
|
||||
PropagationPolicy: &propagationPolicy,
|
||||
}
|
||||
}
|
||||
|
||||
func configFromCliContext(ctx context.Context) (*config, error) {
|
||||
if ctx != nil {
|
||||
if c, ok := ctx.Value(types.CliContext).(*cli.Context); ok {
|
||||
config := Config{
|
||||
config := config{
|
||||
Namespace: c.String("backend-k8s-namespace"),
|
||||
StorageClass: c.String("backend-k8s-storage-class"),
|
||||
VolumeSize: c.String("backend-k8s-volume-size"),
|
||||
@@ -151,12 +160,7 @@ func (e *kube) SetupWorkflow(ctx context.Context, conf *types.Config, taskUUID s
|
||||
log.Trace().Str("taskUUID", taskUUID).Msgf("Setting up Kubernetes primitives")
|
||||
|
||||
for _, vol := range conf.Volumes {
|
||||
pvc, err := PersistentVolumeClaim(e.config.Namespace, vol.Name, e.config.StorageClass, e.config.VolumeSize, e.config.StorageRwx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = e.client.CoreV1().PersistentVolumeClaims(e.config.Namespace).Create(ctx, pvc, metav1.CreateOptions{})
|
||||
_, err := startVolume(ctx, e, vol.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -167,21 +171,10 @@ func (e *kube) SetupWorkflow(ctx context.Context, conf *types.Config, taskUUID s
|
||||
for _, stage := range conf.Stages {
|
||||
if stage.Alias == "services" {
|
||||
for _, step := range stage.Steps {
|
||||
stepName, err := dnsName(step.Name)
|
||||
svc, err := startService(ctx, e, step)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Trace().Str("pod-name", stepName).Msgf("Creating service: %s", step.Name)
|
||||
svc, err := Service(e.config.Namespace, step.Name, step.Ports)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
svc, err = e.client.CoreV1().Services(e.config.Namespace).Create(ctx, svc, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
extraHosts = append(extraHosts, step.Networks[0].Aliases[0]+":"+svc.Spec.ClusterIP)
|
||||
}
|
||||
}
|
||||
@@ -199,13 +192,8 @@ func (e *kube) SetupWorkflow(ctx context.Context, conf *types.Config, taskUUID s
|
||||
|
||||
// Start the pipeline step.
|
||||
func (e *kube) StartStep(ctx context.Context, step *types.Step, taskUUID string) error {
|
||||
pod, err := Pod(e.config.Namespace, step, e.config.PodLabels, e.config.PodAnnotations, e.goos, e.config.SecurityContext)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Trace().Str("taskUUID", taskUUID).Msgf("Creating pod: %s", pod.Name)
|
||||
_, err = e.client.CoreV1().Pods(e.config.Namespace).Create(ctx, pod, metav1.CreateOptions{})
|
||||
log.Trace().Str("taskUUID", taskUUID).Msgf("Starting step: %s", step.Name)
|
||||
_, err := startPod(ctx, e, step)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -341,92 +329,38 @@ func (e *kube) TailStep(ctx context.Context, step *types.Step, taskUUID string)
|
||||
// return rc, nil
|
||||
}
|
||||
|
||||
func (e *kube) DestroyStep(ctx context.Context, step *types.Step, taskUUID string) error {
|
||||
podName, err := dnsName(step.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Trace().Str("taskUUID", taskUUID).Msgf("Stopping pod: %s", podName)
|
||||
|
||||
gracePeriodSeconds := int64(0) // immediately
|
||||
dpb := metav1.DeletePropagationBackground
|
||||
|
||||
deleteOpts := metav1.DeleteOptions{
|
||||
GracePeriodSeconds: &gracePeriodSeconds,
|
||||
PropagationPolicy: &dpb,
|
||||
}
|
||||
|
||||
if err := e.client.CoreV1().Pods(e.config.Namespace).Delete(ctx, podName, deleteOpts); err != nil && !errors.IsNotFound(err) {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
func (e *kube) DestroyStep(_ context.Context, step *types.Step, taskUUID string) error {
|
||||
log.Trace().Str("taskUUID", taskUUID).Msgf("Stopping step: %s", step.Name)
|
||||
err := stopPod(e.ctx, e, step, defaultDeleteOptions)
|
||||
return err
|
||||
}
|
||||
|
||||
// Destroy the pipeline environment.
|
||||
func (e *kube) DestroyWorkflow(_ context.Context, conf *types.Config, taskUUID string) error {
|
||||
log.Trace().Str("taskUUID", taskUUID).Msg("Deleting Kubernetes primitives")
|
||||
|
||||
gracePeriodSeconds := int64(0) // immediately
|
||||
dpb := metav1.DeletePropagationBackground
|
||||
|
||||
deleteOpts := metav1.DeleteOptions{
|
||||
GracePeriodSeconds: &gracePeriodSeconds,
|
||||
PropagationPolicy: &dpb,
|
||||
}
|
||||
|
||||
// Use noContext because the ctx sent to this function will be canceled/done in case of error or canceled by user.
|
||||
// Don't abort on 404 errors from k8s, they most likely mean that the pod hasn't been created yet, usually because pipeline was canceled before running all steps.
|
||||
// Trace log them in case the info could be useful when troubleshooting.
|
||||
|
||||
for _, stage := range conf.Stages {
|
||||
for _, step := range stage.Steps {
|
||||
stepName, err := dnsName(step.Name)
|
||||
err := stopPod(e.ctx, e, step, defaultDeleteOptions)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Trace().Msgf("Deleting pod: %s", stepName)
|
||||
if err := e.client.CoreV1().Pods(e.config.Namespace).Delete(noContext, stepName, deleteOpts); err != nil {
|
||||
if !errors.IsNotFound(err) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, stage := range conf.Stages {
|
||||
if stage.Alias == "services" {
|
||||
for _, step := range stage.Steps {
|
||||
log.Trace().Msgf("Deleting service: %s", step.Name)
|
||||
svc, err := Service(e.config.Namespace, step.Name, step.Ports)
|
||||
if step.Type == types.StepTypeService {
|
||||
err := stopService(e.ctx, e, step, defaultDeleteOptions)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := e.client.CoreV1().Services(e.config.Namespace).Delete(noContext, svc.Name, deleteOpts); err != nil {
|
||||
if errors.IsNotFound(err) {
|
||||
log.Trace().Err(err).Msgf("Unable to delete service %s", svc.Name)
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, vol := range conf.Volumes {
|
||||
pvc, err := PersistentVolumeClaim(e.config.Namespace, vol.Name, e.config.StorageClass, e.config.VolumeSize, e.config.StorageRwx)
|
||||
err := stopVolume(e.ctx, e, vol.Name, defaultDeleteOptions)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = e.client.CoreV1().PersistentVolumeClaims(e.config.Namespace).Delete(noContext, pvc.Name, deleteOpts)
|
||||
if err != nil {
|
||||
if errors.IsNotFound(err) {
|
||||
log.Trace().Err(err).Msgf("Unable to delete pvc %s", pvc.Name)
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
Reference in New Issue
Block a user