|
|
|
|
@ -1,11 +1,28 @@
|
|
|
|
|
package org.dromara.ai.process.provider.processor.impl;
|
|
|
|
|
|
|
|
|
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
|
|
|
|
import com.fasterxml.jackson.core.JsonParser;
|
|
|
|
|
import com.fasterxml.jackson.databind.JsonNode;
|
|
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
|
|
import com.github.yulichang.toolkit.JoinWrappers;
|
|
|
|
|
import com.github.yulichang.wrapper.MPJLambdaWrapper;
|
|
|
|
|
import org.dromara.ai.domain.AiChatMessage;
|
|
|
|
|
import org.dromara.ai.domain.AiChatMessageDetail;
|
|
|
|
|
import org.dromara.ai.domain.AiTokenUsage;
|
|
|
|
|
import org.dromara.ai.mapper.AiChatMessageDetailMapper;
|
|
|
|
|
import org.dromara.ai.mapper.AiChatMessageMapper;
|
|
|
|
|
import org.dromara.ai.mapper.AiTokenUsageMapper;
|
|
|
|
|
import org.dromara.ai.process.dto.AIMessage;
|
|
|
|
|
import org.dromara.ai.process.dto.AIRequest;
|
|
|
|
|
import org.dromara.ai.process.dto.AIResponse;
|
|
|
|
|
import org.dromara.ai.process.dto.TokenUsage;
|
|
|
|
|
import org.dromara.ai.process.enums.AIChatMessageTypeEnum;
|
|
|
|
|
import org.dromara.ai.process.provider.processor.IUnifiedAIProviderProcessor;
|
|
|
|
|
import org.dromara.ai.test.ChatRequest;
|
|
|
|
|
import org.dromara.common.constant.HwMomAiConstants;
|
|
|
|
|
import org.dromara.common.satoken.utils.LoginHelper;
|
|
|
|
|
import org.dromara.system.api.model.LoginUser;
|
|
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
|
|
import org.springframework.http.HttpHeaders;
|
|
|
|
|
import org.springframework.http.MediaType;
|
|
|
|
|
import org.springframework.web.reactive.function.client.WebClient;
|
|
|
|
|
@ -14,6 +31,7 @@ import reactor.core.publisher.Mono;
|
|
|
|
|
import reactor.util.retry.Retry;
|
|
|
|
|
|
|
|
|
|
import java.time.Duration;
|
|
|
|
|
import java.util.List;
|
|
|
|
|
import java.util.regex.Matcher;
|
|
|
|
|
import java.util.regex.Pattern;
|
|
|
|
|
|
|
|
|
|
@ -26,6 +44,12 @@ public abstract class BaseAIProviderProcessor implements IUnifiedAIProviderProce
|
|
|
|
|
|
|
|
|
|
protected final ObjectMapper objectMapper;
|
|
|
|
|
protected final WebClient webClient;
|
|
|
|
|
@Autowired
|
|
|
|
|
private AiChatMessageMapper aiChatMessageMapper;
|
|
|
|
|
@Autowired
|
|
|
|
|
private AiChatMessageDetailMapper aiChatMessageDetailMapper;
|
|
|
|
|
@Autowired
|
|
|
|
|
private AiTokenUsageMapper aiTokenUsageMapper;
|
|
|
|
|
|
|
|
|
|
// 用于解析流式JSON块的模式
|
|
|
|
|
private static final Pattern JSON_PATTERN = Pattern.compile("\\{(?:[^{}]|\\{(?:[^{}]|\\{[^{}]*\\})*\\})*\\}");
|
|
|
|
|
@ -50,7 +74,7 @@ public abstract class BaseAIProviderProcessor implements IUnifiedAIProviderProce
|
|
|
|
|
/**
|
|
|
|
|
* 处理流式响应的JSON块
|
|
|
|
|
*/
|
|
|
|
|
protected String parseStreamChunk(String jsonChunk) {
|
|
|
|
|
protected String parseStreamChunkContent(String jsonChunk) {
|
|
|
|
|
try {
|
|
|
|
|
return extractContentFromStreamJson(jsonChunk);
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
@ -116,7 +140,7 @@ public abstract class BaseAIProviderProcessor implements IUnifiedAIProviderProce
|
|
|
|
|
/**
|
|
|
|
|
* 执行流式HTTP请求
|
|
|
|
|
*/
|
|
|
|
|
protected Flux<String> executeStreamRequest(String url, String requestBody, String apiKey) {
|
|
|
|
|
protected Flux<String> executeStreamRequestContent(String url, String requestBody, String apiKey) {
|
|
|
|
|
// String prompt = "你好";
|
|
|
|
|
// requestBody = String.format(
|
|
|
|
|
// "{\"model\":\"deepseek-chat\",\"stream\":true,\"messages\":[{\"role\":\"user\",\"content\":\"%s\"}]}",
|
|
|
|
|
@ -135,12 +159,216 @@ public abstract class BaseAIProviderProcessor implements IUnifiedAIProviderProce
|
|
|
|
|
.filter(chunk -> chunk != null && !chunk.isBlank())
|
|
|
|
|
.filter(chunk -> !chunk.equals("[DONE]"))
|
|
|
|
|
.filter(chunk -> chunk.startsWith("{") && chunk.endsWith("}"))
|
|
|
|
|
.map(this::parseStreamChunk)
|
|
|
|
|
.map(this::parseStreamChunkContent)
|
|
|
|
|
.filter(content -> content != null && !content.isEmpty())
|
|
|
|
|
|
|
|
|
|
.onErrorResume(e -> Flux.error(new RuntimeException("流式请求失败: " + e.getMessage())));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 处理流式响应的JSON块,返回包含内容和token信息的对象
|
|
|
|
|
*/
|
|
|
|
|
protected StreamChunkResult parseStreamChunk(String jsonChunk) {
|
|
|
|
|
try {
|
|
|
|
|
return extractContentAndTokensFromStreamJson(jsonChunk);
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
return new StreamChunkResult(null, null);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 从流式JSON中提取内容和token信息(由子类实现)
|
|
|
|
|
*/
|
|
|
|
|
protected abstract StreamChunkResult extractContentAndTokensFromStreamJson(String jsonChunk) throws Exception;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 执行流式HTTP请求,返回包含内容和token信息的Flux
|
|
|
|
|
*/
|
|
|
|
|
protected Flux<StreamChunkResult> executeStreamRequest(String url, String requestBody, String apiKey) {
|
|
|
|
|
return webClient.post()
|
|
|
|
|
.uri(url)
|
|
|
|
|
.header(HttpHeaders.AUTHORIZATION, "Bearer " + apiKey)
|
|
|
|
|
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
|
|
|
|
|
.header(HttpHeaders.ACCEPT, MediaType.TEXT_EVENT_STREAM_VALUE)
|
|
|
|
|
.bodyValue(requestBody)
|
|
|
|
|
.retrieve()
|
|
|
|
|
.bodyToFlux(String.class)
|
|
|
|
|
.timeout(Duration.ofSeconds(60))
|
|
|
|
|
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)))
|
|
|
|
|
.flatMap(this::extractJsonChunks)
|
|
|
|
|
.filter(chunk -> chunk != null && !chunk.isBlank())
|
|
|
|
|
.filter(chunk -> !chunk.equals("[DONE]"))
|
|
|
|
|
.filter(chunk -> chunk.startsWith("{") && chunk.endsWith("}"))
|
|
|
|
|
.map(this::parseStreamChunk)
|
|
|
|
|
.filter(result -> result.hasContent() || result.hasTokenUsage())
|
|
|
|
|
.onErrorResume(e -> Flux.error(new RuntimeException("流式请求失败: " + e.getMessage())));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
protected void saveChatMessage(AIRequest request, String fullResponse, TokenUsage tokenUsage, LoginUser loginUser) {
|
|
|
|
|
try {
|
|
|
|
|
String sessionId = request.getSessionId();
|
|
|
|
|
AiChatMessage aiChatMessage = aiChatMessageMapper
|
|
|
|
|
.selectOne(new LambdaQueryWrapper<AiChatMessage>()
|
|
|
|
|
.eq(AiChatMessage::getSessionId, sessionId));
|
|
|
|
|
List<AIMessage> messages = request.getMessages();
|
|
|
|
|
if (aiChatMessage == null) {
|
|
|
|
|
aiChatMessage = new AiChatMessage();
|
|
|
|
|
aiChatMessage.setSessionId(request.getSessionId());
|
|
|
|
|
aiChatMessage.setMessageTopic(objectMapper.writeValueAsString(request.getMessageTopic()));
|
|
|
|
|
aiChatMessage.setMessageType(AIChatMessageTypeEnum.AI_CHAT.getCode());
|
|
|
|
|
aiChatMessage.setModelId(request.getModelId());
|
|
|
|
|
aiChatMessage.setKnowledgeBaseId(request.getKnowledgeBaseId());
|
|
|
|
|
// aiChatMessage.setTotalToken();
|
|
|
|
|
aiChatMessage.setTenantId(loginUser.getTenantId());
|
|
|
|
|
aiChatMessage.setCreateBy(loginUser.getUserId());
|
|
|
|
|
aiChatMessage.setCreateDept(loginUser.getDeptId());
|
|
|
|
|
|
|
|
|
|
aiChatMessageMapper.insert(aiChatMessage);
|
|
|
|
|
} else {
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
saveTokenUsage(HwMomAiConstants.AI_CHAT_MESSAGE_DETAIL_TYPE_QUESTION, objectMapper.writeValueAsString(request.getQuestionContent()),
|
|
|
|
|
objectMapper.writeValueAsString(fullResponse), tokenUsage, request.getModelId(), request.getKnowledgeBaseId(), null,
|
|
|
|
|
aiChatMessage.getChatMessageId(),request.getSessionId(),request.getCarryHistoryFlag(),"1", loginUser.getUserId());
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// AiChatMessageDetail aiChatMessageDetail = new AiChatMessageDetail();
|
|
|
|
|
// aiChatMessageDetail.setChatMessageId(aiChatMessage.getChatMessageId());
|
|
|
|
|
// aiChatMessageDetail.setSessionId(request.getSessionId());
|
|
|
|
|
// aiChatMessageDetail.setQuestionContent(objectMapper.writeValueAsString(request.getQuestionContent()));
|
|
|
|
|
// aiChatMessageDetail.setAnswerContent(objectMapper.writeValueAsString(fullResponse));
|
|
|
|
|
// // 设置token使用信息
|
|
|
|
|
// if (tokenUsage != null) {
|
|
|
|
|
// aiChatMessageDetail.setPromptToken((long) tokenUsage.getPromptTokens());
|
|
|
|
|
// aiChatMessageDetail.setCompletionToken((long) tokenUsage.getCompletionTokens());
|
|
|
|
|
// aiChatMessageDetail.setTotalToken((long) tokenUsage.getTotalTokens());
|
|
|
|
|
// }
|
|
|
|
|
// aiChatMessageDetail.setModelId(request.getModelId());
|
|
|
|
|
// aiChatMessageDetail.setKnowledgeBaseId(request.getKnowledgeBaseId());
|
|
|
|
|
// aiChatMessageDetail.setTakeFlag(request.getCarryHistoryFlag());
|
|
|
|
|
// aiChatMessageDetail.setCompleteFlag("1");
|
|
|
|
|
// aiChatMessageDetail.setTenantId(loginUser.getTenantId());
|
|
|
|
|
// aiChatMessageDetail.setCreateBy(loginUser.getUserId());
|
|
|
|
|
// aiChatMessageDetail.setCreateDept(loginUser.getDeptId());
|
|
|
|
|
//
|
|
|
|
|
// aiChatMessageDetailMapper.insert(aiChatMessageDetail);
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
throw new RuntimeException("保存聊天记录失败", e);
|
|
|
|
|
}
|
|
|
|
|
// log.info("聊天记录已保存,ID: {}", record.getId());
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 保存token使用情况
|
|
|
|
|
* @param messageDetailType
|
|
|
|
|
* @param questionContent
|
|
|
|
|
* @param answerContent
|
|
|
|
|
* @param tokenUsage
|
|
|
|
|
* @param modelId
|
|
|
|
|
* @param knowledgeBaseId
|
|
|
|
|
* @param knowledgeContentId
|
|
|
|
|
* @param chatMessageId
|
|
|
|
|
* @param sessionId
|
|
|
|
|
* @param takeFlag
|
|
|
|
|
* @param completeFlag
|
|
|
|
|
* @param userId
|
|
|
|
|
*/
|
|
|
|
|
@Override
|
|
|
|
|
public void saveTokenUsage(String messageDetailType, String questionContent, String answerContent, TokenUsage tokenUsage,
|
|
|
|
|
Long modelId, Long knowledgeBaseId, Long knowledgeContentId,
|
|
|
|
|
Long chatMessageId,String sessionId,String takeFlag,String completeFlag,Long userId) {
|
|
|
|
|
Long promptToken = tokenUsage!=null ? tokenUsage.getPromptToken():null;
|
|
|
|
|
Long completionToken = tokenUsage!=null ? tokenUsage.getCompletionToken():null;
|
|
|
|
|
Long totalToken = tokenUsage!=null ? tokenUsage.getTotalToken():null;
|
|
|
|
|
|
|
|
|
|
AiChatMessageDetail aiChatMessageDetail = new AiChatMessageDetail();
|
|
|
|
|
aiChatMessageDetail.setMessageDetailType(messageDetailType);
|
|
|
|
|
aiChatMessageDetail.setQuestionContent(questionContent);
|
|
|
|
|
aiChatMessageDetail.setAnswerContent(answerContent);
|
|
|
|
|
aiChatMessageDetail.setPromptToken(promptToken);
|
|
|
|
|
aiChatMessageDetail.setCompletionToken(completionToken);
|
|
|
|
|
aiChatMessageDetail.setTotalToken(totalToken);
|
|
|
|
|
aiChatMessageDetail.setModelId(modelId);
|
|
|
|
|
aiChatMessageDetail.setKnowledgeBaseId(knowledgeBaseId);
|
|
|
|
|
aiChatMessageDetail.setKnowledgeContentId(knowledgeContentId);
|
|
|
|
|
aiChatMessageDetail.setTakeFlag(takeFlag);
|
|
|
|
|
aiChatMessageDetail.setCompleteFlag(completeFlag);
|
|
|
|
|
aiChatMessageDetail.setChatMessageId(chatMessageId);
|
|
|
|
|
aiChatMessageDetail.setSessionId(sessionId);
|
|
|
|
|
aiChatMessageDetailMapper.insert(aiChatMessageDetail);
|
|
|
|
|
|
|
|
|
|
MPJLambdaWrapper<AiTokenUsage> lqw = JoinWrappers.lambda(AiTokenUsage.class)
|
|
|
|
|
.selectAll(AiTokenUsage.class)
|
|
|
|
|
.eq(userId != null, AiTokenUsage::getUserId, userId)
|
|
|
|
|
.eq(modelId != null, AiTokenUsage::getModelId, modelId);
|
|
|
|
|
|
|
|
|
|
AiTokenUsage aiTokenUsage = aiTokenUsageMapper.selectOne(lqw);
|
|
|
|
|
if (aiTokenUsage == null) {
|
|
|
|
|
aiTokenUsage = new AiTokenUsage();
|
|
|
|
|
aiTokenUsage.setPromptToken(promptToken);
|
|
|
|
|
aiTokenUsage.setCompletionToken(completionToken);
|
|
|
|
|
aiTokenUsage.setTotalToken(totalToken);
|
|
|
|
|
aiTokenUsage.setModelId(modelId);
|
|
|
|
|
aiTokenUsage.setUserId(userId);
|
|
|
|
|
aiTokenUsageMapper.insert(aiTokenUsage);
|
|
|
|
|
} else {
|
|
|
|
|
if (promptToken != null) {
|
|
|
|
|
Long currentPromptToken = aiTokenUsage.getPromptToken() == null ? 0L : aiTokenUsage.getPromptToken();
|
|
|
|
|
aiTokenUsage.setPromptToken(currentPromptToken + promptToken);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (completionToken != null) {
|
|
|
|
|
Long currentCompletionToken = aiTokenUsage.getCompletionToken() == null ? 0L : aiTokenUsage.getCompletionToken();
|
|
|
|
|
aiTokenUsage.setCompletionToken(currentCompletionToken + completionToken);
|
|
|
|
|
}
|
|
|
|
|
aiTokenUsage.setTotalToken(aiTokenUsage.getTotalToken() + totalToken);
|
|
|
|
|
|
|
|
|
|
aiTokenUsageMapper.updateById(aiTokenUsage);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 流式chunk结果类
|
|
|
|
|
*/
|
|
|
|
|
protected static class StreamChunkResult {
|
|
|
|
|
private final String content;
|
|
|
|
|
private final TokenUsage tokenUsage;
|
|
|
|
|
|
|
|
|
|
public StreamChunkResult(String content, TokenUsage tokenUsage) {
|
|
|
|
|
this.content = content;
|
|
|
|
|
this.tokenUsage = tokenUsage;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// getters
|
|
|
|
|
public String getContent() { return content; }
|
|
|
|
|
public TokenUsage getTokenUsage() { return tokenUsage; }
|
|
|
|
|
|
|
|
|
|
public boolean hasContent() {
|
|
|
|
|
return content != null && !content.isEmpty();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public boolean hasTokenUsage() {
|
|
|
|
|
return tokenUsage != null;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// /**
|
|
|
|
|
// * 流式回复
|
|
|
|
|
// * @param request
|
|
|
|
|
|