223 lines
6.6 KiB
Go
223 lines
6.6 KiB
Go
package k3sconfig
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"os"
|
||
"time"
|
||
|
||
"github.com/gogf/gf/v2/frame/g"
|
||
"github.com/gogf/gf/v2/os/gcfg"
|
||
"github.com/r3labs/diff/v2"
|
||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||
"k8s.io/client-go/kubernetes"
|
||
"k8s.io/client-go/rest"
|
||
"k8s.io/client-go/tools/clientcmd"
|
||
)
|
||
|
||
type K8sConfig struct {
|
||
ApiServer string `json:"apiServer"`
|
||
Token string `json:"token"`
|
||
Kubeconfig string `json:"kubeconfig"`
|
||
Namespace string `json:"namespace"`
|
||
}
|
||
|
||
func init() {
|
||
ctx := context.Background()
|
||
|
||
consulAddr := g.Cfg().MustGet(ctx, "consul.address").String()
|
||
if consulAddr != "" {
|
||
g.Log().Debug(ctx, "📄 [K3sConfig] 检测到 consul.address 配置,使用 Consul,跳过 K3s 加载")
|
||
return
|
||
}
|
||
|
||
loadFromK3sCluster(ctx)
|
||
}
|
||
|
||
func loadFromK3sCluster(ctx context.Context) {
|
||
serviceName := g.Cfg().MustGet(ctx, "server.name", "").String()
|
||
|
||
if serviceName == "" {
|
||
panic("❌ [K3sConfig] 配置文件中未设置 server.name")
|
||
}
|
||
|
||
k8sConfig := getK8sConfig(ctx)
|
||
namespace := k8sConfig.Namespace
|
||
if namespace == "" {
|
||
namespace = "default"
|
||
}
|
||
|
||
configMapName := fmt.Sprintf("%s-config", serviceName)
|
||
g.Log().Info(ctx, "🔗 [K3sConfig] 从 K3s 集群加载配置:", configMapName)
|
||
|
||
var config *rest.Config
|
||
var err error
|
||
|
||
if k8sConfig.ApiServer != "" && k8sConfig.Token != "" {
|
||
g.Log().Infof(ctx, "📍 [K3sConfig] 使用远程 K8s API Server: %s", k8sConfig.ApiServer)
|
||
config = &rest.Config{
|
||
Host: k8sConfig.ApiServer,
|
||
BearerToken: k8sConfig.Token,
|
||
TLSClientConfig: rest.TLSClientConfig{
|
||
Insecure: true,
|
||
},
|
||
}
|
||
} else if k8sConfig.Kubeconfig != "" {
|
||
g.Log().Infof(ctx, "📍 [K3sConfig] 使用 Kubeconfig 文件: %s", k8sConfig.Kubeconfig)
|
||
config, err = clientcmd.BuildConfigFromFlags("", k8sConfig.Kubeconfig)
|
||
if err != nil {
|
||
panic(fmt.Sprintf("❌ [K3sConfig] 构建 K8s 配置失败: %v", err))
|
||
}
|
||
} else {
|
||
kubeconfigEnv := os.Getenv("KUBECONFIG")
|
||
if kubeconfigEnv != "" {
|
||
g.Log().Infof(ctx, "📍 [K3sConfig] 使用环境变量 KUBECONFIG: %s", kubeconfigEnv)
|
||
config, err = clientcmd.BuildConfigFromFlags("", kubeconfigEnv)
|
||
if err != nil {
|
||
panic(fmt.Sprintf("❌ [K3sConfig] 构建 K8s 配置失败: %v", err))
|
||
}
|
||
} else {
|
||
home, _ := os.UserHomeDir()
|
||
defaultKubeconfig := fmt.Sprintf("%s/.kube/config", home)
|
||
g.Log().Infof(ctx, "📍 [K3sConfig] 使用默认 Kubeconfig: %s", defaultKubeconfig)
|
||
config, err = clientcmd.BuildConfigFromFlags("", defaultKubeconfig)
|
||
if err != nil {
|
||
panic(fmt.Sprintf("❌ [K3sConfig] 构建 K8s 配置失败: %v", err))
|
||
}
|
||
}
|
||
}
|
||
|
||
g.Log().Infof(ctx, "🔌 [K3sConfig] K8s API Server: %s", config.Host)
|
||
|
||
clientset, err := kubernetes.NewForConfig(config)
|
||
if err != nil {
|
||
panic(fmt.Sprintf("❌ [K3sConfig] 创建 K8s 客户端失败: %v", err))
|
||
}
|
||
|
||
cm, err := clientset.CoreV1().ConfigMaps(namespace).Get(ctx, configMapName, metav1.GetOptions{})
|
||
if err != nil {
|
||
panic(fmt.Sprintf("❌ [K3sConfig] 获取 ConfigMap 失败: %v", err))
|
||
}
|
||
|
||
g.Log().Infof(ctx, "📦 [K3sConfig] ConfigMap 信息 - Name: %s, Namespace: %s, ResourceVersion: %s",
|
||
cm.Name, cm.Namespace, cm.ResourceVersion)
|
||
|
||
configData, ok := cm.Data["config.yml"]
|
||
if !ok {
|
||
g.Log().Debugf(ctx, "📋 [K3sConfig] ConfigMap 可用键: %v", getMapKeys(cm.Data))
|
||
panic("❌ [K3sConfig] ConfigMap 中未找到 config.yml 键")
|
||
}
|
||
|
||
g.Log().Infof(ctx, "📄 [K3sConfig] 获取到的配置内容长度: %d 字节", len(configData))
|
||
adapter, err := gcfg.NewAdapterContent()
|
||
if err != nil {
|
||
panic(fmt.Sprintf("❌ [K3sConfig] 创建配置适配器失败: %v", err))
|
||
}
|
||
|
||
adapter.SetContent(configData)
|
||
g.Cfg().SetAdapter(adapter)
|
||
g.Log().Infof(ctx, "✅ [K3sConfig] 成功从 K3s 加载配置: %s/%s", namespace, configMapName)
|
||
|
||
go watchK3sConfig(ctx, clientset, namespace, configMapName, cm.ResourceVersion)
|
||
}
|
||
|
||
func getK8sConfig(ctx context.Context) K8sConfig {
|
||
return K8sConfig{
|
||
ApiServer: g.Cfg().MustGet(ctx, "k8s.apiServer", "").String(),
|
||
Token: g.Cfg().MustGet(ctx, "k8s.token", "").String(),
|
||
Kubeconfig: g.Cfg().MustGet(ctx, "k8s.kubeconfig", "").String(),
|
||
Namespace: g.Cfg().MustGet(ctx, "k8s.namespace", "default").String(),
|
||
}
|
||
}
|
||
|
||
func watchK3sConfig(ctx context.Context, clientset *kubernetes.Clientset, namespace, configMapName string, lastResourceVersion string) {
|
||
g.Log().Info(ctx, "👀 [K3sConfig] 开始监听 ConfigMap 变更...")
|
||
|
||
for {
|
||
time.Sleep(10 * time.Second)
|
||
|
||
cm, err := clientset.CoreV1().ConfigMaps(namespace).Get(ctx, configMapName, metav1.GetOptions{
|
||
ResourceVersion: "0",
|
||
})
|
||
if err != nil {
|
||
g.Log().Errorf(ctx, "❌ [K3sConfig] 监听 ConfigMap 失败: %v", err)
|
||
continue
|
||
}
|
||
|
||
if cm.ResourceVersion == lastResourceVersion {
|
||
continue
|
||
}
|
||
|
||
g.Log().Infof(ctx, "🔔 [K3sConfig] 检测到 ConfigMap 变更, Old Version: %s, New Version: %s",
|
||
lastResourceVersion, cm.ResourceVersion)
|
||
|
||
configData, ok := cm.Data["config.yml"]
|
||
if !ok {
|
||
g.Log().Error(ctx, "❌ [K3sConfig] ConfigMap 中未找到 config.yml 键")
|
||
continue
|
||
}
|
||
|
||
updateK3sConfig(configData)
|
||
lastResourceVersion = cm.ResourceVersion
|
||
}
|
||
}
|
||
|
||
func updateK3sConfig(content string) {
|
||
ctx := context.Background()
|
||
|
||
oldConfig, _ := g.Cfg().Data(ctx)
|
||
|
||
adapter, err := gcfg.NewAdapterContent()
|
||
if err != nil {
|
||
g.Log().Errorf(ctx, "❌ [K3sConfig] 创建配置适配器失败: %v", err)
|
||
return
|
||
}
|
||
adapter.SetContent(content)
|
||
g.Cfg().SetAdapter(adapter)
|
||
|
||
newConfig, _ := g.Cfg().Data(ctx)
|
||
|
||
changelog, err := diff.Diff(oldConfig, newConfig)
|
||
if err != nil {
|
||
g.Log().Errorf(ctx, "❌ [K3sConfig] 配置对比失败: %v", err)
|
||
return
|
||
}
|
||
|
||
if len(changelog) == 0 {
|
||
g.Log().Info(ctx, "✅ [K3sConfig] 配置已热更新成功(内容无实质变化)")
|
||
return
|
||
}
|
||
|
||
g.Log().Infof(ctx, "=== [K3sConfig] 检测到配置变更 (%d 项) ===", len(changelog))
|
||
for _, change := range changelog {
|
||
switch change.Type {
|
||
case "create":
|
||
g.Log().Infof(ctx, "[+] 新增: %s = %v", change.Path, change.To)
|
||
case "update":
|
||
g.Log().Infof(ctx, "[~] 修改: %s: %v -> %v", change.Path, change.From, change.To)
|
||
case "delete":
|
||
g.Log().Infof(ctx, "[-] 删除: %s (原值: %v)", change.Path, change.From)
|
||
}
|
||
}
|
||
g.Log().Info(ctx, "=================================")
|
||
}
|
||
|
||
func getMapKeys(m map[string]string) []string {
|
||
keys := make([]string, 0, len(m))
|
||
for k := range m {
|
||
keys = append(keys, k)
|
||
}
|
||
return keys
|
||
}
|
||
|
||
func loadConfigFromFile(ctx context.Context, configPath string) {
|
||
adapter, err := gcfg.NewAdapterFile(configPath)
|
||
if err != nil {
|
||
g.Log().Errorf(ctx, "❌ [K3sConfig] 创建文件适配器失败: %v", err)
|
||
return
|
||
}
|
||
|
||
g.Cfg().SetAdapter(adapter)
|
||
g.Log().Info(ctx, "✅ [K3sConfig] 已成功加载配置:", configPath)
|
||
}
|