diff --git a/rabbitmq/delay.go b/rabbitmq/delay.go new file mode 100644 index 0000000..82faa66 --- /dev/null +++ b/rabbitmq/delay.go @@ -0,0 +1,95 @@ +// Package rabbitmq - RabbitMQ延时消息发布 +package rabbitmq + +import ( + "context" + "time" + + "github.com/gogf/gf/v2/encoding/gjson" + "github.com/gogf/gf/v2/errors/gerror" + amqp "github.com/rabbitmq/amqp091-go" +) + +// PublishWithDelay 发布延时消息到RabbitMQ +// delaySeconds: 延时秒数 +func PublishWithDelay(ctx context.Context, routingKey string, message interface{}, delaySeconds int) error { + ch, err := GetChannel() + if err != nil { + return gerror.Wrap(err, "获取RabbitMQ通道失败") + } + if ch == nil { + return gerror.New("RabbitMQ通道未初始化") + } + + // 序列化消息 + body, err := gjson.Encode(message) + if err != nil { + return gerror.Wrapf(err, "序列化消息失败") + } + + // 声明延时交换机(x-delayed-message类型) + // 注意:需要RabbitMQ安装延时插件 rabbitmq-plugins enable rabbitmq_delayed_message_exchange + exchangeName := "delayed.exchange" + err = ch.ExchangeDeclare( + exchangeName, + "x-delayed-message", // 延时交换机类型 + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + amqp.Table{ + "x-delayed-type": "direct", // 底层交换机类型 + }, + ) + if err != nil { + return gerror.Wrapf(err, "声明延时交换机失败") + } + + // 声明队列 + queue, err := ch.QueueDeclare( + routingKey, // 队列名使用routingKey + true, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, + ) + if err != nil { + return gerror.Wrapf(err, "声明队列失败") + } + + // 绑定队列到交换机 + err = ch.QueueBind( + queue.Name, // queue name + routingKey, // routing key + exchangeName, // exchange + false, + nil, + ) + if err != nil { + return gerror.Wrapf(err, "绑定队列失败") + } + + // 发布延时消息 + err = ch.PublishWithContext( + ctx, + exchangeName, // exchange + routingKey, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "application/json", + Body: body, + DeliveryMode: amqp.Persistent, // 持久化消息 + Headers: amqp.Table{ + "x-delay": delaySeconds * 1000, // 延时时间(毫秒) + }, + Timestamp: time.Now(), + }, + ) + if err != nil { + return gerror.Wrapf(err, "发布延时消息失败") + } + + return nil +} diff --git a/ragflow/chat.go b/ragflow/chat.go index 7f0acb5..1bf69c1 100644 --- a/ragflow/chat.go +++ b/ragflow/chat.go @@ -6,6 +6,37 @@ import ( "github.com/gogf/gf/v2/errors/gerror" ) +// CreateChatReq 创建对话配置请求 +type CreateChatReq struct { + Name string `json:"name"` // 对话配置名称 + DatasetIds []string `json:"dataset_ids"` // 关联的知识库ID列表 + Prompt *PromptConfig `json:"prompt"` // 提示词配置 +} + +// PromptConfig 提示词配置 +type PromptConfig struct { + Prompt string `json:"prompt"` // 提示词内容 + SimilarityThreshold float64 `json:"similarity_threshold"` // 相似度阈值 + KeywordsSimilarityWeight float64 `json:"keywords_similarity_weight"` // 关键词相似度权重 + TopN int `json:"top_n"` // 返回顶部N个chunk + EmptyResponse string `json:"empty_response"` // 无匹配时回复 + Opener string `json:"opener"` // 开场白 + ShowQuote bool `json:"show_quote"` // 是否显示引用 + Variables []map[string]interface{} `json:"variables"` // 变量列表 +} + +// CreateChatRes 创建对话配置响应 +type CreateChatRes struct { + ChatId string `json:"id"` // 对话配置ID +} + +// UpdateChatReq 更新对话配置请求 +type UpdateChatReq struct { + Name string `json:"name,omitempty"` // 对话配置名称 + DatasetIds []string `json:"dataset_ids,omitempty"` // 关联的知识库ID列表 + Prompt *PromptConfig `json:"prompt,omitempty"` // 提示词配置 +} + // 聊天助手管理 // 参考: https://ragflow.com.cn/docs/dev/http_api_reference#聊天助手管理 @@ -54,24 +85,6 @@ type Variable struct { Optional bool `json:"optional"` } -// CreateChatReq 创建聊天助手请求 -type CreateChatReq struct { - Name string `json:"name"` - Avatar string `json:"avatar,omitempty"` - DatasetIds []string `json:"dataset_ids,omitempty"` - Llm *Llm `json:"llm,omitempty"` - Prompt *Prompt `json:"prompt,omitempty"` -} - -// UpdateChatReq 更新聊天助手请求 -type UpdateChatReq struct { - Name string `json:"name,omitempty"` - Avatar string `json:"avatar,omitempty"` - DatasetIds []string `json:"dataset_ids,omitempty"` - Llm *Llm `json:"llm,omitempty"` - Prompt *Prompt `json:"prompt,omitempty"` -} - // ListChatsReq 列出聊天助手请求 type ListChatsReq struct { Page int `json:"page,omitempty"` diff --git a/ragflow/document.go b/ragflow/document.go index c08f32b..2fee990 100644 --- a/ragflow/document.go +++ b/ragflow/document.go @@ -1,10 +1,15 @@ +// Package ragflow - RAGFlow文档管理 +// 功能:RAGFlow知识库文档的上传、列表、删除操作 package ragflow import ( + "bytes" "context" "strings" + "github.com/gogf/gf/v2/encoding/gjson" "github.com/gogf/gf/v2/errors/gerror" + "github.com/gogf/gf/v2/frame/g" ) // 数据集内文件管理 @@ -138,12 +143,62 @@ func (c *Client) ListDocuments(ctx context.Context, datasetId string, req *ListD return &res, nil } -// UploadDocument 上传文档 -// 注意:此方法需要特殊处理 multipart/form-data,目前的 request 方法可能不支持 -// 我们需要扩展 request 方法或在此处单独实现 +// UploadDocumentFromText 上传文本内容作为文档 +func (c *Client) UploadDocumentFromText(ctx context.Context, datasetId, content, filename string) (documentId string, err error) { + if datasetId == "" { + return "", gerror.New("datasetId不能为空") + } + if content == "" { + return "", gerror.New("文档内容不能为空") + } + if filename == "" { + filename = "document.txt" + } + + // 构造URL + url := c.BaseURL + "/api/v1/datasets/" + datasetId + "/documents" + + // 使用gclient上传文本作为文件 + client := c.HTTPClient.Clone() + client.SetHeader("Authorization", "Bearer "+c.APIKey) + + // 使用ContentType方法上传multipart表单 + resp, err := client.Post(ctx, url, g.Map{ + "file": bytes.NewReader([]byte(content)), + }) + if err != nil { + return "", err + } + defer resp.Close() + + // 解析响应 + var result struct { + Code int `json:"code"` + Message string `json:"message"` + Data struct { + Id string `json:"id"` + } `json:"data"` + } + + bodyBytes := resp.ReadAll() + if err = gjson.DecodeTo(bodyBytes, &result); err != nil { + return "", err + } + + if result.Code != 0 { + return "", gerror.Newf("上传文档失败 (code=%d): %s", result.Code, result.Message) + } + + if result.Data.Id == "" { + return "", gerror.New("上传成功但未返回文档ID") + } + + return result.Data.Id, nil +} + +// UploadDocument 上传文档(保留兼容) func (c *Client) UploadDocument(ctx context.Context, datasetId string, filePaths []string) (err error) { - // TODO: 实现文件上传逻辑,需要使用 gclient 的 UploadFile 功能 - return gerror.New("upload document not implemented yet") + return gerror.New("upload document from file not implemented yet, use UploadDocumentFromText instead") } // DeleteDocument 删除文档