Files
common/full-text-search/elasticsearch/client.go

119 lines
2.8 KiB
Go
Raw Normal View History

2025-12-09 17:55:08 +08:00
package elasticsearch
import (
"context"
"sync"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/glog"
"github.com/olivere/elastic/v7"
)
var (
client *elastic.Client
clientOnce sync.Once
)
// Config ES 配置
type Config struct {
Addresses []string // ES 地址列表
Username string // 用户名
Password string // 密码
}
// Init 初始化 ES 客户端(单例)
func Init(ctx context.Context) (err error) {
clientOnce.Do(func() {
addresses := g.Cfg().MustGet(ctx, "elasticsearch.addresses", []string{"http://localhost:9200"}).Strings()
username := g.Cfg().MustGet(ctx, "elasticsearch.username", "").String()
password := g.Cfg().MustGet(ctx, "elasticsearch.password", "").String()
options := []elastic.ClientOptionFunc{
elastic.SetURL(addresses...),
elastic.SetSniff(false), // 禁用嗅探,避免容器环境问题
}
if username != "" && password != "" {
options = append(options, elastic.SetBasicAuth(username, password))
}
client, err = elastic.NewClient(options...)
if err != nil {
glog.Errorf(ctx, "ES 客户端初始化失败: %v", err)
return
}
// 测试连接
2025-12-20 11:27:10 +08:00
info, code, testErr := client.Ping(addresses[0]).Do(ctx)
if testErr != nil {
glog.Warningf(ctx, "ES 连接测试失败(月度归档功能不可用): %v", testErr)
client = nil // 清空客户端
err = testErr
2025-12-09 17:55:08 +08:00
return
}
glog.Infof(ctx, "ES 连接成功 - 版本: %s, 状态码: %d", info.Version.Number, code)
})
return
}
// GetClient 获取 ES 客户端
func GetClient() *elastic.Client {
return client
}
// BulkIndex 批量写入文档
func BulkIndex(ctx context.Context, indexName string, docs []interface{}) (err error) {
if client == nil {
return gerror.New("ES 客户端未初始化")
}
bulk := client.Bulk().Index(indexName)
for _, doc := range docs {
bulk.Add(elastic.NewBulkIndexRequest().Doc(doc))
}
resp, err := bulk.Do(ctx)
if err != nil {
return
}
if resp.Errors {
for _, item := range resp.Failed() {
glog.Errorf(ctx, "ES 写入失败 - Index: %s, Error: %s", item.Index, item.Error.Reason)
}
}
glog.Infof(ctx, "ES 批量写入完成 - 索引: %s, 成功: %d, 失败: %d",
indexName, len(resp.Succeeded()), len(resp.Failed()))
return
}
// CreateIndexIfNotExists 创建索引(如果不存在)
func CreateIndexIfNotExists(ctx context.Context, indexName, mapping string) (err error) {
if client == nil {
return gerror.New("ES 客户端未初始化")
}
exists, err := client.IndexExists(indexName).Do(ctx)
if err != nil {
return
}
if !exists {
_, err = client.CreateIndex(indexName).BodyString(mapping).Do(ctx)
if err != nil {
return
}
glog.Infof(ctx, "ES 索引创建成功: %s", indexName)
}
return
}
// Close 关闭客户端
func Close() {
if client != nil {
client.Stop()
}
}