171 lines
4.4 KiB
Python
171 lines
4.4 KiB
Python
import renpy
|
|
import persistent
|
|
|
|
from renpy import _
|
|
from .anita import SYSTEM_PROMPT
|
|
from .constants_ren import SYNONYMS
|
|
|
|
"""renpy
|
|
default last_response_id = None
|
|
|
|
init python:
|
|
"""
|
|
|
|
import re
|
|
|
|
|
|
def translated_system_prompt() -> str:
|
|
return renpy.translate_string(SYSTEM_PROMPT)
|
|
|
|
|
|
EMOTION_REGEX = re.compile(r"EMOTION:\w+")
|
|
EMOTION_TOKEN_REGEX = re.compile(rf"{EMOTION_REGEX.pattern} ?")
|
|
|
|
EMOJI_REGEX = re.compile(
|
|
"["
|
|
"\U0001f1e6-\U0001f1ff" # flags
|
|
"\U0001f300-\U0001f5ff" # symbols and pictographs
|
|
"\U0001f600-\U0001f64f" # emoticons
|
|
"\U0001f680-\U0001f6ff" # transport and map
|
|
"\U0001f900-\U0001f9ff" # supplemental symbols and pictographs
|
|
"\U0001fa70-\U0001faff" # symbols and pictographs extended
|
|
"\U00002702-\U000027b0" # dingbats
|
|
"\U0001f3fb-\U0001f3ff" # skin tone modifiers
|
|
"\u200d" # zero-width joiner
|
|
"\ufe0f" # emoji variation selector
|
|
"]+",
|
|
flags=re.UNICODE,
|
|
)
|
|
|
|
EMOTIONS = [
|
|
"happy",
|
|
"sad",
|
|
"surprised",
|
|
"embarrassed",
|
|
"flirty",
|
|
"angry",
|
|
"thinking",
|
|
"confused",
|
|
]
|
|
|
|
|
|
def sanitize_speech(text):
|
|
text_without_emotion_tokens = EMOTION_TOKEN_REGEX.sub("", text)
|
|
|
|
return EMOJI_REGEX.sub("", text_without_emotion_tokens)
|
|
|
|
|
|
def parse_emotion(line):
|
|
def _normalize_emotion(em):
|
|
# If not a valid emotion, then search for a match in the
|
|
# table of synonyms.
|
|
if em not in EMOTIONS:
|
|
for i in SYNONYMS.keys():
|
|
if em in SYNONYMS[i]:
|
|
return i
|
|
|
|
# If all searches failed, return emotion as is.
|
|
return em
|
|
|
|
try:
|
|
m = EMOTION_REGEX.match(line)
|
|
|
|
if m is not None:
|
|
emotion = m.group().split(":")[1]
|
|
text = line[m.span()[1] :]
|
|
sanitized = sanitize_speech(text)
|
|
|
|
return _normalize_emotion(emotion), sanitized
|
|
|
|
return None, line
|
|
|
|
except Exception as e:
|
|
return None, str(e)
|
|
|
|
|
|
def set_model_capabilities() -> bool:
|
|
"""
|
|
LM Studio throws Bad Request if the reasoning flag is set for a model
|
|
that doesn't support it. This method tries to determine if the currently
|
|
configured model supports reasoning to signal to the fetch_llm function
|
|
disable it.
|
|
"""
|
|
|
|
try:
|
|
headers = {"Authorization": f"Bearer {persistent.api_key}"}
|
|
data = {
|
|
"model": persistent.model,
|
|
"input": renpy.translate_string("Start the conversation."),
|
|
"reasoning": "off",
|
|
"system_prompt": translated_system_prompt(),
|
|
}
|
|
|
|
renpy.fetch(
|
|
f"{persistent.base_url}/api/v1/chat",
|
|
headers=headers,
|
|
json=data,
|
|
result="json",
|
|
)
|
|
|
|
except renpy.FetchError as fe:
|
|
# renpy.fetch returned a BadRequest, assume this means LM Studio
|
|
# rejected the request because the model doesn't support the
|
|
# reasoning setting in chat.
|
|
if hasattr(fe, "status_code") and fe.status_code == 400:
|
|
persistent.disable_reasoning = False
|
|
|
|
return True, None
|
|
|
|
else:
|
|
return False, str(fe)
|
|
|
|
except Exception as e:
|
|
# Something else happened.
|
|
return False, str(e)
|
|
|
|
else:
|
|
# The fetch worked, so the reasoning setting is available.
|
|
persistent.disable_reasoning = True
|
|
|
|
return True, None
|
|
|
|
|
|
def fetch_llm(message: str) -> str:
|
|
"""
|
|
Queries the chat with a model endpoint of the configured LM Studio server.
|
|
"""
|
|
|
|
global last_response_id
|
|
|
|
try:
|
|
# Set request data.
|
|
headers = {"Authorization": f"Bearer {persistent.api_key}"}
|
|
data = {
|
|
"model": persistent.model,
|
|
"input": message,
|
|
"system_prompt": translated_system_prompt(),
|
|
}
|
|
|
|
if persistent.disable_reasoning:
|
|
data["reasoning"] = "off"
|
|
|
|
# Add the previous response ID if any to continue the conversation.
|
|
if last_response_id is not None:
|
|
data["previous_response_id"] = last_response_id
|
|
|
|
# Fetch from LM Studio and parse the response.
|
|
response = renpy.fetch(
|
|
f"{persistent.base_url}/api/v1/chat",
|
|
headers=headers,
|
|
json=data,
|
|
result="json",
|
|
)
|
|
|
|
last_response_id = response["response_id"]
|
|
text = response["output"][0]["content"]
|
|
|
|
return text.split("\n")
|
|
|
|
except Exception as e:
|
|
return [f"Failed to fetch with error: {e}"]
|