1275 lines
43 KiB
Python
1275 lines
43 KiB
Python
from sentence_transformers import SentenceTransformer
|
|
from sklearn.metrics.pairwise import cosine_similarity
|
|
from gpt4all import GPT4All
|
|
import os
|
|
import re
|
|
import numpy as np
|
|
import json
|
|
from pathlib import Path
|
|
import torch
|
|
import time
|
|
import sys
|
|
import gpt4all
|
|
|
|
os.environ["PATH"] = r"C:\Program Files\NVIDIA GPU Computing Toolkit\CUDA\v11.8\bin" + ";" + os.environ["PATH"]
|
|
|
|
|
|
# -------------------------
|
|
# Knowledge base selection
|
|
# -------------------------
|
|
BOOK_DIR = 'Books/Music'
|
|
|
|
# Explore adding TYPE to headers. 4 types fact, rule, reference, pedagogical, the ordering ranks
|
|
#TYPE: fact | rule | reference | pedagogical
|
|
#DOMAIN: music_theory
|
|
#PRIORITY: high | medium | low
|
|
|
|
# cleaning text documents
|
|
# https://www.text-utils.com/remove-special-characters/
|
|
# https://cloudconvert.com/docx-to-txt
|
|
|
|
# Ask ChatGPT to descrive narratives around tablature examples.
|
|
# I would like you to chunk this for my RAG system.
|
|
# Where you identify guitar tablature you are to replace it with a narrative
|
|
# describing the notes that are played in fine detail.
|
|
# Please describe the notes exactly including any bends, hammer-on, pull-off, legatto, etc
|
|
# Please do not omit any of the original descriptive text except insofar as it may be confusing for a RAG system.
|
|
# You may use the existing text inform yourself and help narrate the notation.
|
|
# {paste the text with tablature}
|
|
|
|
|
|
# Retrieval — find the most relevant chunks from your documents using embeddings and cosine similarity
|
|
# Augmented — add that retrieved context to the prompt
|
|
# Generation — use the language model to generate an answer based on that contextfinger
|
|
|
|
# -------------------
|
|
# Embedding Cleaning
|
|
# -------------------
|
|
# del embeddings_cache.npz
|
|
# del embeddings_cache_meta.json
|
|
|
|
# -------------------
|
|
# TO-DO
|
|
# -----------------
|
|
# Better table handling
|
|
# Update requirements.txt with torch installation notes
|
|
# Domain-specific clean profiles
|
|
|
|
# ---------------
|
|
# Running
|
|
# --------------
|
|
# python Chartwell.py
|
|
|
|
# --------------------------
|
|
# GIT Configuration
|
|
# ---------------------------
|
|
# git config --global credential.helper wincred
|
|
# git config credential.helper store
|
|
# git config --global user.name "Sean"
|
|
# git config --global user.email "skessler1964@gmail.com"
|
|
|
|
|
|
# Chartwell.py now has both models on GPU:
|
|
#
|
|
# GPT4All (Llama 3) — GPU for inference
|
|
# SentenceTransformer — GPU for embeddings
|
|
|
|
# IMPORTANT SETUP STEPS FOR RE-CREATING THIS ENVIORNMENT
|
|
# 1) Install python
|
|
# 3.10.11
|
|
# 2) Create venv
|
|
# python -m venv .venv
|
|
# .venv/Scripts/activate
|
|
# 3) Install Dependencies
|
|
# pip install -r requirements.txt
|
|
# 4) Meta-Llama-3.1-8B-Instruct.Q4_0.gguf
|
|
# \Users\skess\.cache\gpt4all\Meta-Llama-3-8B-Instruct.Q4_0.gguf
|
|
# The model will auto-download on the first run and then switch to allow_download=False (see below)
|
|
# The model is about 4.5G. The download is quick.
|
|
# lm_model = GPT4All("Meta-Llama-3-8B-Instruct.Q4_0.gguf",model_path=r"C:\Users\skess\.cache\gpt4all",device="gpu",allow_download=False)
|
|
# 5) huggingface This is for the sentence transformer (sentence-transformers/all-MiniLM-L6-v2)
|
|
# \Users\skess\.cache\huggingface There is a fodler structure under here.
|
|
# embed_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") This will automatically load the model if it is not already loaded
|
|
# so an internet connection would be required if running this from scratch
|
|
|
|
# IMPORTANT PYTHON NOTES - KEEP
|
|
# Python
|
|
# .venv/Scripts/Activate
|
|
# pip freeze > requirements.txt
|
|
# pip install -r requirements.txt
|
|
|
|
|
|
# Torch GPU version
|
|
# pip uninstall torch -y
|
|
# pip install torch --index-url https://download.pytorch.org/whl/cu124 --force-reinstall
|
|
# python -c "import torch; print(torch.__version__); print(torch.cuda.is_available())"
|
|
# witness : 2.x.x+cu124 True for CUDA
|
|
|
|
# Still on the to-do list:
|
|
# Fix the enrichment length cap
|
|
# Semantic chunking
|
|
# Better table handling
|
|
|
|
# ----------------------------------
|
|
# Weights for chunk weighting system
|
|
# -----------------------------------
|
|
TYPE_WEIGHTS = {
|
|
"fact": 1.10, # The "Oak" gets a small boost
|
|
"rule": 1.05,
|
|
"reference": 1.00, # The baseline
|
|
"pedagogical": 0.95 # The "Undergrowth" is only slightly demoted
|
|
}
|
|
|
|
|
|
PRIORITY_WEIGHTS = {
|
|
"high": 1.10,
|
|
"medium": 1.00,
|
|
"low": 0.90
|
|
}
|
|
|
|
# ----------------------------------
|
|
# Operating modes
|
|
# -----------------------------------
|
|
CURRENT_MODE = "research"
|
|
MODES = {
|
|
"creative": {
|
|
"print_msg": "Creative mode.",
|
|
"prompt_instruction": (
|
|
"You are a creative assistant. "
|
|
"Use the provided context as inspiration. "
|
|
"Be concise and original. "
|
|
"End your response with a single period."
|
|
)
|
|
},
|
|
|
|
"research": {
|
|
"print_msg": "Research mode.",
|
|
"prompt_instruction": (
|
|
"You are a helpful research assistant. "
|
|
"Restrict your response strictly to the provided context. "
|
|
"If the source material is exhausted, stop writing. "
|
|
"If a relationship or entity is not explicitly documented in the context, do not include it. "
|
|
"Do not repeat the same information in different wording. "
|
|
"If multiple context passages express the same idea, summarize it once. "
|
|
"If the context contains repetitive legal or procedural text, merge it into a single concise statement. "
|
|
"Do not list multiple similar verses. "
|
|
"Prefer one coherent explanation over multiple extracted quotations. "
|
|
"Do not infer, guess, or use external knowledge under any circumstances. "
|
|
"Never repeat the context or instructions. "
|
|
"Never echo the question. "
|
|
"End your answer with a single period. "
|
|
)
|
|
},
|
|
|
|
"advanced": {
|
|
"print_msg": "Advanced mode.",
|
|
"prompt_instruction": (
|
|
"You are a highly capable analytical assistant. "
|
|
"Base your response primarily on the provided context. "
|
|
|
|
"OUTPUT FORMAT (strict):\n"
|
|
"Step 1: ANALYSIS\n"
|
|
"- Write sentences, each prefixed with:\n"
|
|
" [C] = directly supported by the context\n"
|
|
" [I] = inferred from the context\n"
|
|
" [E] = not explicitly supported\n\n"
|
|
|
|
"Step 2: FINAL ANSWER\n"
|
|
"- Write ONE paragraph summary only\n"
|
|
"- Must be fully supported by statements in ANALYSIS\n"
|
|
"- Do NOT introduce new information\n\n"
|
|
|
|
"RULES:\n"
|
|
"- Do not repeat sentences\n"
|
|
"- Do not create multiple sections beyond ANALYSIS and FINAL ANSWER\n"
|
|
"- Minimize [E] usage\n"
|
|
"- If context is insufficient, say so in FINAL ANSWER\n"
|
|
)
|
|
},
|
|
|
|
"music": {
|
|
"print_msg": "Music mode.",
|
|
"prompt_instruction": (
|
|
"You are a music theory assistant.\n"
|
|
"\n"
|
|
"You may use general music theory knowledge when the context does not explicitly define a rule.\n"
|
|
"However, if the context provides a rule, table, or mapping, you MUST prioritize it over general knowledge.\n"
|
|
"\n"
|
|
"Do not invent programming code, functions, or data structures.\n"
|
|
"Do not fabricate musical tables or mappings not present in the context.\n"
|
|
"\n"
|
|
"Reasoning rules:\n"
|
|
"- Prefer context over general knowledge.\n"
|
|
"- If context is missing critical information, fall back to standard Western music theory.\n"
|
|
"- If the question is ambiguous, choose the most common theoretical interpretation.\n"
|
|
"\n"
|
|
"Output rules:\n"
|
|
"- Return only the final answer.\n"
|
|
"- No explanations unless explicitly requested.\n"
|
|
"- End with a single period.\n"
|
|
)
|
|
}
|
|
}
|
|
|
|
CACHE_FILES = ['embeddings_cache.npz', 'embeddings_cache_meta.json']
|
|
|
|
# This ensures the cache is always saved INSIDE the folder you are pointing to
|
|
CACHE_FILE = os.path.join(BOOK_DIR, CACHE_FILES[0]) # 'embeddings_cache.npz'
|
|
CACHE_META = os.path.join(BOOK_DIR, CACHE_FILES[1]) # 'embeddings_cache_meta.json'
|
|
|
|
book_files = []
|
|
|
|
for f in Path(BOOK_DIR).rglob('*'):
|
|
if not f.is_file() or f.name in CACHE_FILES: # Remove the embeddings files fro this list
|
|
continue
|
|
if not f.is_file():
|
|
continue
|
|
try:
|
|
with open(f, 'rb'): # just check file is readable
|
|
pass
|
|
book_files.append(str(f))
|
|
except PermissionError:
|
|
continue
|
|
print(f"Found {len(book_files)} files")
|
|
|
|
# Overlap should be 10-20% of chunk size
|
|
CHUNK_SIZE = 700
|
|
CHUNK_OVERLAP = 100
|
|
DEBUG = False
|
|
|
|
MAX_HISTORY = 5
|
|
CURRENT_LEVEL = 10
|
|
SEARCH_FILTER = None # None = search all books
|
|
|
|
# --------------------------------------------------------------------
|
|
# Toggle for whether we are using the model to enrich the corpus data
|
|
# --------------------------------------------------------------------
|
|
USE_ENRICHMENT = True
|
|
|
|
# -------------------------
|
|
# CONVERSATIONAL HISTORY
|
|
# -------------------------
|
|
conversation_history = []
|
|
|
|
# -------------------------
|
|
# LEVEL CONFIG
|
|
# -------------------------
|
|
LEVELS = {
|
|
1: {"expand": False, "top_k": 1, "max_tokens": 75, "context_len": 500},
|
|
2: {"expand": False, "top_k": 1, "max_tokens": 75, "context_len": 600},
|
|
3: {"expand": False, "top_k": 2, "max_tokens": 100*3, "context_len": 700},
|
|
4: {"expand": False, "top_k": 2, "max_tokens": 100*3, "context_len": 800},
|
|
5: {"expand": False, "top_k": 3, "max_tokens": 125*3, "context_len": 1000},
|
|
6: {"expand": False, "top_k": 6, "max_tokens": 200*3, "context_len": 2000},
|
|
7: {"expand": True, "top_k": 5, "max_tokens": 150*3, "context_len": 1400},
|
|
8: {"expand": True, "top_k": 5, "max_tokens": 175*3, "context_len": 1600},
|
|
9: {"expand": True, "top_k": 6, "max_tokens": 175*3, "context_len": 1800},
|
|
10: {"expand": True, "top_k": 6, "max_tokens": 200*3, "context_len": 2000},
|
|
}
|
|
|
|
# -------------------------
|
|
# Load models
|
|
# -------------------------
|
|
# -----------------------------------
|
|
# Load the sentence tranformer model
|
|
# -----------------------------------
|
|
print("Loading embedding model...")
|
|
device = "cuda" if torch.cuda.is_available() else "cpu"
|
|
print(f"Embedding model using: {device}")
|
|
embed_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2",device=device)
|
|
|
|
# -----------------------------------
|
|
# Load the language model - If it does not exist in the download area then download it otherwise us it.
|
|
# -----------------------------------
|
|
print("Loading language model...")
|
|
model_file = "Meta-Llama-3.1-8B-Instruct-q4_0.gguf"
|
|
model_path = r"C:\Users\skess\.cache\gpt4all"
|
|
|
|
full_path = os.path.join(model_path, model_file)
|
|
|
|
if not os.path.exists(full_path):
|
|
print("Model not found locally. Downloading...")
|
|
allow_download = True
|
|
else:
|
|
allow_download = False
|
|
|
|
lm_model = GPT4All(
|
|
model_file,
|
|
model_path=model_path,
|
|
device="cuda",
|
|
allow_download=allow_download
|
|
)
|
|
|
|
# ----------------
|
|
# Table Narration
|
|
# ----------------
|
|
# This will detect and create narrations for table data in pipe form
|
|
# For example.
|
|
#| Year | Squadrons | Aircraft |
|
|
#|------|-----------|----------|
|
|
#| 1939 | 21 | 252 |
|
|
#| 1940 | 35 | 420 |
|
|
|
|
# If adding data to a corpus try to use this standard form for instance
|
|
#| Metric | Value | Context |
|
|
#|--------|-------|---------|
|
|
#| Standard deduction single 2025 | $15,750 | Under age 65 |
|
|
#| Standard deduction single 2025 | $17,750 | Age 65 or older |
|
|
#| Standard deduction MFJ 2025 | $31,500 | Both under 65 |
|
|
|
|
def narrate_table(text):
|
|
"""
|
|
Detect and convert pipe-delimited tables
|
|
to narrative prose before chunking.
|
|
"""
|
|
lines = text.split('\n')
|
|
result = []
|
|
i = 0
|
|
narrative_count = 0
|
|
table_count = 0
|
|
|
|
while i < len(lines):
|
|
line = lines[i].strip()
|
|
|
|
if '|' in line and line.count('|') >= 2:
|
|
table_lines = []
|
|
while i < len(lines) and '|' in lines[i]:
|
|
table_lines.append(lines[i].strip())
|
|
i += 1
|
|
|
|
data_lines = [l for l in table_lines
|
|
if not re.match(r'^[\|\-\s:]+$', l)]
|
|
|
|
if len(data_lines) >= 2:
|
|
table_count += 1
|
|
headers = [h.strip() for h in data_lines[0].split('|')
|
|
if h.strip()]
|
|
|
|
narratives = []
|
|
for row_line in data_lines[1:]:
|
|
values = [v.strip() for v in row_line.split('|')
|
|
if v.strip()]
|
|
if len(values) == len(headers):
|
|
parts = [f"{headers[j]} was {values[j]}"
|
|
for j in range(len(headers))]
|
|
sentence = "In this record, " + ", ".join(parts) + "."
|
|
narratives.append(sentence)
|
|
narrative_count += 1
|
|
|
|
result.append(" ".join(narratives))
|
|
else:
|
|
result.extend(table_lines)
|
|
else:
|
|
result.append(lines[i])
|
|
i += 1
|
|
|
|
if table_count > 0:
|
|
print(f" [Table narration: {table_count} table(s) detected, "
|
|
f"{narrative_count} row(s) converted]")
|
|
|
|
return '\n'.join(result)
|
|
|
|
# -------------------------
|
|
# Clean text
|
|
# -------------------------
|
|
def clean_text(text):
|
|
# Narrate tables before any other cleaning
|
|
text = narrate_table(text)
|
|
|
|
# Fix hyphenated line breaks in prose (word-\nword -> wordword)
|
|
text = re.sub(r'(\w+)-\n(\w+)', r'\1\2', text)
|
|
|
|
# Collapse 3+ newlines to double newline (preserve paragraph breaks)
|
|
text = re.sub(r'\n{3,}', '\n\n', text)
|
|
|
|
# Clean up other artifacts
|
|
text = re.sub(r'(?<=[a-z])(\d{1,3})(?=\s[A-Z])', '', text)
|
|
text = re.sub(r'[■•◆▪→]', '', text)
|
|
text = re.sub(r' +', ' ', text)
|
|
text = re.sub(r'\[\d+\]', '', text)
|
|
text = re.sub(r'\[citation needed\]', '', text)
|
|
return text.strip()
|
|
|
|
# --------------------------------
|
|
# This is for the enrichment pipeline if it is enabled. It Uses the local LLM to extract key metadata from a chunk.
|
|
# --------------------------------
|
|
|
|
PROMPT_TEMPLATE = (
|
|
"<|start_header_id|>user<|end_header_id|>\n"
|
|
"TAGGING OPERATION. NOT A CONVERSATION. NO EXPLANATIONS.\n"
|
|
"OUTPUT FORMAT IS FIXED. DO NOT DEVIATE.\n"
|
|
"\n"
|
|
"RULES:\n"
|
|
"1. Output EXACTLY ONE LINE in this format: [Time: | Loc: | Entity: | Topic:]\n"
|
|
"2. Fill every field. Use 'Unknown' if uncertain. Never leave a field empty.\n"
|
|
"3. Entity: list up to 5 items, comma separated.\n"
|
|
"4. No sentences. No explanation. No apology. No meta-commentary.\n"
|
|
"5. Do not repeat these instructions. Do not acknowledge this prompt.\n"
|
|
"6. Your entire response is the tag line and nothing else.\n"
|
|
"\n"
|
|
"Text: {text}\n"
|
|
"<|eot_id|>\n"
|
|
"<|start_header_id|>assistant<|end_header_id|>\n"
|
|
"Tags: ["
|
|
)
|
|
|
|
|
|
def extract_context_tags(text_chunk):
|
|
start_time = time.perf_counter()
|
|
response = lm_model.generate(
|
|
PROMPT_TEMPLATE.format(text=text_chunk),
|
|
max_tokens=60,
|
|
temp=0.01,
|
|
n_batch=512,
|
|
)
|
|
|
|
# If the model didn't provide the bracket because we 'pushed' it, add it back
|
|
tag = response.split(']')[0] + "]" if "]" in response else response
|
|
if not tag.startswith("["):
|
|
tag = "[" + tag
|
|
|
|
print(f"TAG:{tag}")
|
|
print(f"Took : {time.perf_counter() - start_time:.4f} seconds")
|
|
return tag
|
|
|
|
def is_empty_tag(tag):
|
|
values = [part.split(":")[-1].strip() for part in tag.strip("[]").split("|")]
|
|
return not any(values)
|
|
|
|
# -------------------------
|
|
# Extract the CHUNK directive from the header
|
|
# -------------------------
|
|
def get_chunk_directive(text, header_lines=20):
|
|
"""
|
|
Extract CHUNK directive from top of file only.
|
|
"""
|
|
top = "\n".join(text.splitlines()[:header_lines])
|
|
match = re.search(r"^CHUNK:\s*(\w+)", top, re.IGNORECASE | re.MULTILINE)
|
|
if match:
|
|
return match.group(1).strip().upper()
|
|
return None
|
|
|
|
# -------------------------
|
|
# Chunk text with overlap
|
|
# -------------------------
|
|
def chunk_text(text, chunk_size=CHUNK_SIZE, overlap=CHUNK_OVERLAP):
|
|
|
|
# Try to get the chunk directive if it isd present
|
|
directive = get_chunk_directive(text)
|
|
|
|
if directive == "SINGLE":
|
|
if DEBUG:
|
|
print(" [CHUNK: SINGLE detected — bypassing chunking]")
|
|
return [text.strip()]
|
|
|
|
# 1. EXTRACT HEADERS (The "Metadata Inheritance" logic)
|
|
header_patterns = [
|
|
r"TYPE:.*",
|
|
r"PRIORITY:.*",
|
|
r"DOMAIN:.*",
|
|
r"TITLE:.*",
|
|
r"CONCEPTS:.*",
|
|
r"SOURCE:.*",
|
|
r"CHUNK:.*", # special pattern currently supports SINGLE so that the entire file will be chunked and not split across chunks
|
|
]
|
|
header_lines = []
|
|
top_of_file = text[:500]
|
|
for pattern in header_patterns:
|
|
match = re.search(pattern, top_of_file, re.IGNORECASE)
|
|
if match:
|
|
header_lines.append(match.group(0))
|
|
|
|
header_prefix = "\n".join(header_lines) + "\n\n" if header_lines else ""
|
|
|
|
# 2. SEMANTIC SPLITTING (Your original Step 1 & 2)
|
|
paragraphs = [p.strip() for p in re.split(r'\n\s*\n', text) if p.strip()]
|
|
split_units = []
|
|
for para in paragraphs:
|
|
if len(para) <= chunk_size:
|
|
split_units.append(para)
|
|
else:
|
|
sentences = re.split(r'(?<=[.!?])\s+', para)
|
|
current = ""
|
|
for sentence in sentences:
|
|
if len(current) + len(sentence) <= chunk_size:
|
|
current += " " + sentence
|
|
else:
|
|
if current:
|
|
split_units.append(current.strip())
|
|
current = sentence
|
|
if current:
|
|
split_units.append(current.strip())
|
|
|
|
# 3. COMBINE & INJECT HEADERS (Step 3 with metadata injection)
|
|
chunks = []
|
|
current_chunk = ""
|
|
prev_unit = ""
|
|
|
|
for unit in split_units:
|
|
# Check if adding this unit exceeds chunk_size
|
|
if len(current_chunk) + len(unit) + 1 <= chunk_size:
|
|
current_chunk += " " + unit
|
|
else:
|
|
if current_chunk:
|
|
final_output = current_chunk.strip()
|
|
# --- CONDITIONAL ENRICHMENT LOGIC ---
|
|
if USE_ENRICHMENT:
|
|
print(f" [Enriching chunk {len(chunks)+1}...]", end="\r")
|
|
tags = extract_context_tags(final_output[:600])
|
|
if not is_empty_tag(tags):
|
|
final_output = f"{tags} {final_output}"
|
|
# ----------------------------
|
|
# Add headers to all chunks except the first one (which already has them)
|
|
if not any(pat in final_output[:100] for pat in ["TYPE:", "TITLE:"]):
|
|
final_output = header_prefix + final_output
|
|
chunks.append(final_output)
|
|
|
|
# Overlap logic
|
|
if prev_unit and len(prev_unit) + len(unit) + 1 <= chunk_size:
|
|
current_chunk = prev_unit + " " + unit
|
|
else:
|
|
current_chunk = unit
|
|
prev_unit = unit
|
|
|
|
if current_chunk:
|
|
final_output = current_chunk.strip()
|
|
if USE_ENRICHMENT:
|
|
tags = extract_context_tags(final_output[:600])
|
|
if not is_empty_tag(tags):
|
|
final_output = f"{tags} {final_output}"
|
|
if not any(pat in final_output[:100] for pat in ["TYPE:", "TITLE:"]):
|
|
final_output = header_prefix + final_output
|
|
chunks.append(final_output)
|
|
|
|
return chunks
|
|
|
|
# -------------------------
|
|
# Check if cache is valid
|
|
# -------------------------
|
|
def cache_is_valid():
|
|
print(f"\nChecking for existing enriched cache in {BOOK_DIR}...")
|
|
|
|
status = {
|
|
"valid": True,
|
|
"added": [],
|
|
"modified": [],
|
|
"missing_embeddings": []
|
|
}
|
|
|
|
# --- HARD FAIL: missing cache files ---
|
|
if not os.path.exists(CACHE_FILE) or not os.path.exists(CACHE_META):
|
|
print("X Missing cache or metadata → rebuild required")
|
|
status["valid"] = False
|
|
return status
|
|
|
|
with open(CACHE_META, "r") as f:
|
|
meta = json.load(f)
|
|
|
|
cached_files = set(meta.get("book_files", []))
|
|
current_files = set(book_files)
|
|
|
|
# --- Detect NEW files ---
|
|
status["added"] = list(current_files - cached_files)
|
|
|
|
# --- Check EXISTING files ---
|
|
for book_name in current_files:
|
|
if not os.path.exists(book_name):
|
|
continue
|
|
|
|
# Skip new files (handled separately)
|
|
if book_name not in cached_files:
|
|
continue
|
|
|
|
stored_size = meta.get("file_sizes", {}).get(book_name)
|
|
actual_size = os.path.getsize(book_name)
|
|
|
|
# Missing metadata entry → bad
|
|
if stored_size is None:
|
|
status["missing_embeddings"].append(book_name)
|
|
continue
|
|
|
|
# File changed → needs reprocessing
|
|
if stored_size != actual_size:
|
|
status["modified"].append(book_name)
|
|
|
|
# --- HARD FAIL CONDITIONS ---
|
|
if status["missing_embeddings"]:
|
|
print(f"\nX Missing embeddings for {len(status['missing_embeddings'])} file(s):")
|
|
for f in sorted(status["missing_embeddings"]):
|
|
print(f" * {f}")
|
|
status["valid"] = False
|
|
|
|
if status["modified"]:
|
|
print(f"\nX {len(status['modified'])} file(s) changed:")
|
|
for f in sorted(status["modified"]):
|
|
print(f" * {f}")
|
|
|
|
# --- SOFT WARNING ---
|
|
if status["added"]:
|
|
print(f"\n+ {len(status['added'])} new file(s) detected:")
|
|
for f in sorted(status["added"]):
|
|
print(f" + {f}")
|
|
|
|
if status["valid"]:
|
|
print("\n✓ Cache usable (incremental updates possible)")
|
|
else:
|
|
print("\n→ Full rebuild required")
|
|
|
|
return status
|
|
|
|
# --------------------------------------------------
|
|
# Save updated cache file
|
|
# --------------------------------------------------
|
|
def save_updated_cache():
|
|
np.savez(
|
|
CACHE_FILE,
|
|
embeddings=chunk_embeddings,
|
|
chunks=np.array(all_chunks, dtype=object),
|
|
sources=np.array(all_sources, dtype=object)
|
|
)
|
|
|
|
file_sizes = {b: os.path.getsize(b) for b in book_files if os.path.exists(b)}
|
|
|
|
with open(CACHE_META, "w") as f:
|
|
json.dump({
|
|
"book_files": book_files,
|
|
"file_sizes": file_sizes
|
|
}, f)
|
|
|
|
print("Cache updated.")
|
|
|
|
# --------------------------------------------------
|
|
# Remove chunks from embeddings for specified files
|
|
# --------------------------------------------------
|
|
def remove_chunks_for_files(files_to_remove):
|
|
global all_chunks, all_sources, chunk_embeddings
|
|
|
|
if not files_to_remove:
|
|
return
|
|
|
|
keep_indices = [
|
|
i for i, src in enumerate(all_sources)
|
|
if src not in files_to_remove
|
|
]
|
|
|
|
all_chunks = [all_chunks[i] for i in keep_indices]
|
|
all_sources = [all_sources[i] for i in keep_indices]
|
|
chunk_embeddings = chunk_embeddings[keep_indices]
|
|
|
|
print(f"Removed old chunks for {len(files_to_remove)} modified file(s)")
|
|
|
|
# -------------------------
|
|
# Process new and modified files
|
|
# -------------------------
|
|
def process_incremental_updates(status):
|
|
global all_chunks, all_sources, chunk_embeddings
|
|
|
|
files_to_process = status["added"] + status["modified"]
|
|
|
|
# Step 1 — remove outdated chunks (ONLY modified files)
|
|
remove_chunks_for_files(status["modified"])
|
|
|
|
new_chunks = []
|
|
new_sources = []
|
|
|
|
# Step 2 — process new + modified files
|
|
for book_name in files_to_process:
|
|
print(f"[Updating] {book_name}")
|
|
|
|
with open(book_name, "rb") as f:
|
|
raw = f.read()
|
|
|
|
try:
|
|
text = raw.decode("utf-8")
|
|
except:
|
|
text = raw.decode("cp1252")
|
|
|
|
# Skip IGNORE files
|
|
first_line = text.lstrip().splitlines()[0] if text.strip() else ""
|
|
if first_line.strip().upper().startswith("# IGNORE"):
|
|
print(f"Skipping {book_name} (marked IGNORE)")
|
|
continue
|
|
|
|
book_text = clean_text(text)
|
|
chunks = chunk_text(book_text)
|
|
|
|
new_chunks.extend(chunks)
|
|
new_sources.extend([book_name] * len(chunks))
|
|
|
|
# Step 3 — nothing to add
|
|
if not new_chunks:
|
|
print("No new chunks to add.")
|
|
return
|
|
|
|
# Step 4 — embed
|
|
print(f"Embedding {len(new_chunks)} new chunks...")
|
|
new_embeddings = embed_model.encode(new_chunks, convert_to_tensor=False)
|
|
|
|
# Step 5 — append
|
|
if len(all_chunks) == 0:
|
|
# edge case: empty cache
|
|
all_chunks = new_chunks
|
|
all_sources = new_sources
|
|
chunk_embeddings = np.array(new_embeddings)
|
|
else:
|
|
all_chunks.extend(new_chunks)
|
|
all_sources.extend(new_sources)
|
|
chunk_embeddings = np.vstack([chunk_embeddings, new_embeddings])
|
|
|
|
# Step 6 — save
|
|
save_updated_cache()
|
|
|
|
# -------------------------
|
|
# Load or build embeddings
|
|
# -------------------------
|
|
all_chunks = []
|
|
all_sources = []
|
|
|
|
status = cache_is_valid()
|
|
|
|
# if cache_is_valid():
|
|
if status["valid"]:
|
|
print("Loading embeddings from cache...")
|
|
data = np.load(CACHE_FILE, allow_pickle=True)
|
|
chunk_embeddings = data["embeddings"]
|
|
all_chunks = list(data["chunks"])
|
|
all_sources = list(data["sources"])
|
|
print(f"Total chunks loaded from cache: {len(all_chunks)}")
|
|
|
|
# check if we have additions or modifications
|
|
if status["added"] or status["modified"]:
|
|
print("\n[Incremental update triggered]")
|
|
process_incremental_updates(status)
|
|
|
|
else:
|
|
print("Building embeddings from scratch...")
|
|
for book_name in book_files:
|
|
if not os.path.exists(book_name):
|
|
print(f"Warning: {book_name} not found, skipping...")
|
|
continue
|
|
print(f"Loading {book_name}...")
|
|
with open(book_name, "rb") as f:
|
|
raw = f.read()
|
|
try:
|
|
text = raw.decode("utf-8")
|
|
except UnicodeDecodeError:
|
|
print(f"[Encoding fallback] {book_name}")
|
|
text = raw.decode("cp1252") # fallback for Windows-encoded text
|
|
|
|
# Skip files marked with "# IGNORE" on first line
|
|
first_line = text.lstrip().splitlines()[0] if text.strip() else ""
|
|
if first_line.strip().upper().startswith("# IGNORE"):
|
|
print(f"Skipping {book_name} (marked IGNORE)")
|
|
continue
|
|
book_text = clean_text(text)
|
|
|
|
book_chunks = chunk_text(book_text)
|
|
all_chunks.extend(book_chunks)
|
|
all_sources.extend([book_name] * len(book_chunks))
|
|
print(f" -> {len(book_chunks)} chunks")
|
|
print(f"Total chunks: {len(all_chunks)}")
|
|
print("Embedding chunks (this may take a minute)...")
|
|
chunk_embeddings = embed_model.encode(all_chunks, convert_to_tensor=False)
|
|
|
|
print("Saving embeddings cache...")
|
|
np.savez(
|
|
CACHE_FILE,
|
|
embeddings=chunk_embeddings,
|
|
chunks=np.array(all_chunks, dtype=object),
|
|
sources=np.array(all_sources, dtype=object)
|
|
)
|
|
file_sizes = {b: os.path.getsize(b) for b in book_files if os.path.exists(b)}
|
|
with open(CACHE_META, "w") as f:
|
|
json.dump({"book_files": book_files, "file_sizes": file_sizes}, f)
|
|
print("Cache saved.")
|
|
|
|
# -------------------------
|
|
# Book filter helper
|
|
# -------------------------
|
|
def get_filtered_indices(filter_term):
|
|
"""Return indices of chunks whose source filename contains filter_term."""
|
|
if not filter_term:
|
|
return list(range(len(all_chunks)))
|
|
filter_lower = filter_term.lower()
|
|
return [i for i, src in enumerate(all_sources)
|
|
if filter_lower in os.path.basename(src).lower()]
|
|
|
|
def show_available_books():
|
|
"""Print a short list of available books with keywords."""
|
|
print("\n--- Available books ---")
|
|
for f in book_files:
|
|
base = os.path.basename(f).replace('.txt', '')
|
|
print(f" {base}")
|
|
print("--- Use 'search <keyword>: your question' to filter ---\n")
|
|
|
|
# -------------------------
|
|
# Query expansion
|
|
# -------------------------
|
|
def expand_query(question):
|
|
book_titles = ', '.join([os.path.basename(b).replace('.txt', '') for b in book_files])
|
|
|
|
prompt = (
|
|
f"You are helping search a library containing these documents:\n"
|
|
f"{book_titles}\n\n"
|
|
f"Generate 3 alternative ways to ask the following question using "
|
|
f"vocabulary, concepts, and terminology that would likely appear in "
|
|
f"these specific documents. Do not reference authors or books not in this list. "
|
|
f"The alternative questions must ask about the SAME specific fact as the original. "
|
|
f"Do not broaden or change the subject of the question. "
|
|
f"Return ONLY the 3 questions, one per line, no numbering, no explanation.\n\n"
|
|
f"Question: {question}"
|
|
)
|
|
with lm_model.chat_session():
|
|
response = lm_model.generate(prompt, max_tokens=150)
|
|
|
|
lines = [line.strip() for line in response.strip().split('\n') if line.strip()]
|
|
alternatives = [
|
|
l for l in lines
|
|
if len(l) > 15
|
|
and len(l) < 200
|
|
and '?' in l
|
|
and l != question
|
|
and ':' not in l[:20]
|
|
][:3]
|
|
|
|
all_queries = [question] + alternatives
|
|
print(f" [Expanded queries: {len(all_queries)}]")
|
|
for q in all_queries:
|
|
print(f" - {q}")
|
|
return all_queries
|
|
|
|
# ----------------------
|
|
# Topic Detection
|
|
# ----------------------
|
|
# Stopwords for topic detection
|
|
# -------------------------
|
|
STOPWORDS = {
|
|
"the","is","a","an","and","or","of","to","in","on","for","with",
|
|
"what","which","who","how","when","where","can","i","you","it",
|
|
"did","do","does","was","were","he","she","they","his","her",
|
|
"him","them","his","its","be","been","have","has","had","will",
|
|
"would","could","should","may","might","me","my","we","our"
|
|
}
|
|
|
|
|
|
def topics_are_related(question, history, lookback=3):
|
|
"""
|
|
Returns True if the question shares meaningful words
|
|
with recent conversation history.
|
|
Also returns True for very short pronoun-heavy questions
|
|
since they are almost certainly follow-ups.
|
|
"""
|
|
if not history:
|
|
return False
|
|
|
|
q_lower = question.lower()
|
|
|
|
# Get meaningful words from current question
|
|
q_words = set(q_lower.replace('?','').replace('.','').split()) - STOPWORDS
|
|
|
|
# Get words from recent history questions
|
|
recent = history[-lookback:]
|
|
history_words = set()
|
|
for exchange in recent:
|
|
history_words.update(
|
|
exchange["question"].lower().replace('?','').replace('.','').split()
|
|
)
|
|
history_words -= STOPWORDS
|
|
|
|
# Pronoun follow-up check — only if history has meaningful content
|
|
pronoun_followups = {
|
|
"he","she","they","him","her","them","his","it",
|
|
"this","that","these","those"
|
|
}
|
|
q_words_all = set(q_lower.replace('?','').replace('.','').split())
|
|
|
|
if len(q_words_all) <= 5 and q_words_all & pronoun_followups:
|
|
if history_words:
|
|
print(f" [Pronoun follow-up detected — enriching]")
|
|
return True
|
|
|
|
if not q_words:
|
|
return False
|
|
|
|
# Check meaningful word overlap
|
|
overlap = len(q_words & history_words)
|
|
print(f" [Topic overlap: {overlap} word(s)]")
|
|
return overlap > 0
|
|
|
|
def enrich_query_with_history(question):
|
|
"""
|
|
Add context from recent history to improve retrieval
|
|
for short follow-up questions.
|
|
Skips enrichment if topic has shifted or enriched query is too long.
|
|
"""
|
|
if not conversation_history:
|
|
return question
|
|
|
|
# Only enrich questions under 8 words
|
|
if len(question.split()) >= 8:
|
|
return question
|
|
|
|
# Check if topic has shifted
|
|
if not topics_are_related(question, conversation_history):
|
|
print(f" [Topic shift detected — no enrichment]")
|
|
return question
|
|
|
|
# Look back up to 3 exchanges for context
|
|
recent = conversation_history[-3:]
|
|
context_words = " ".join([ex["question"] for ex in recent])
|
|
enriched = f"{context_words} {question}"
|
|
|
|
# Don't enrich if result is too long
|
|
if len(enriched.split()) > 30:
|
|
print(f" [Enriched query too long — using original]")
|
|
return question
|
|
|
|
print(f" [Enriched query: {enriched}]")
|
|
return enriched
|
|
|
|
# --------------------------------------------
|
|
# Handles type extraction from chunk metadata
|
|
# --------------------------------------------
|
|
def extract_type(chunk_text):
|
|
"""
|
|
Extract TYPE metadata from chunk header.
|
|
Defaults to 'reference' if missing.
|
|
"""
|
|
match = re.search(r"TYPE:\s*(fact|rule|reference|pedagogical)", chunk_text, re.IGNORECASE)
|
|
if match:
|
|
return match.group(1).lower()
|
|
return "reference"
|
|
|
|
def extract_metadata(chunk):
|
|
"""
|
|
Extracts TYPE / PRIORITY metadata from a chunk if present.
|
|
Defaults are safe and neutral.
|
|
"""
|
|
meta = {
|
|
"type": "reference",
|
|
"priority": "medium"
|
|
}
|
|
|
|
# Look for TYPE: xxx
|
|
type_match = re.search(r"TYPE:\s*(\w+)", chunk, re.IGNORECASE)
|
|
if type_match:
|
|
meta["type"] = type_match.group(1).lower().strip()
|
|
|
|
# Look for PRIORITY: xxx
|
|
priority_match = re.search(r"PRIORITY:\s*(\w+)", chunk, re.IGNORECASE)
|
|
if priority_match:
|
|
meta["priority"] = priority_match.group(1).lower().strip()
|
|
|
|
return meta
|
|
|
|
# -------------------------
|
|
# Retrieve top relevant chunks
|
|
# -------------------------
|
|
def get_top_chunks(question, filter_term=None):
|
|
level_cfg = LEVELS[CURRENT_LEVEL]
|
|
|
|
# -------------------------
|
|
# Query preparation
|
|
# -------------------------
|
|
retrieval_question = enrich_query_with_history(question)
|
|
|
|
if level_cfg["expand"]:
|
|
queries = expand_query(retrieval_question)
|
|
else:
|
|
queries = [retrieval_question]
|
|
|
|
# -------------------------
|
|
# Filter scope
|
|
# -------------------------
|
|
search_indices = get_filtered_indices(filter_term)
|
|
|
|
if not search_indices:
|
|
print(f" [Warning: no books matched filter '{filter_term}' — searching all]")
|
|
search_indices = list(range(len(all_chunks)))
|
|
|
|
sub_embeddings = chunk_embeddings[search_indices]
|
|
sub_chunks = [all_chunks[i] for i in search_indices]
|
|
sub_sources = [all_sources[i] for i in search_indices]
|
|
|
|
if filter_term:
|
|
matched_books = set(os.path.basename(s) for s in sub_sources)
|
|
print(f" [Filter '{filter_term}' matched: {', '.join(matched_books)}]")
|
|
|
|
# -------------------------
|
|
# Semantic scoring (pure signal)
|
|
# -------------------------
|
|
semantic_scores = np.zeros(len(sub_chunks))
|
|
|
|
for q in queries:
|
|
query_emb = embed_model.encode([q])
|
|
scores = cosine_similarity(query_emb, sub_embeddings)[0]
|
|
semantic_scores += scores
|
|
|
|
semantic_scores /= len(queries)
|
|
|
|
# -------------------------
|
|
# SAFE MIN-MAX NORMALIZATION
|
|
# -------------------------
|
|
min_s = semantic_scores.min()
|
|
max_s = semantic_scores.max()
|
|
range_s = max_s - min_s
|
|
|
|
if range_s < 1e-6:
|
|
# All scores basically identical → neutral signal
|
|
semantic_scores = np.ones_like(semantic_scores)
|
|
else:
|
|
semantic_scores = (semantic_scores - min_s) / (range_s + 1e-9)
|
|
|
|
# -------------------------
|
|
# TYPE + PRIORITY WEIGHTING
|
|
# -------------------------
|
|
type_weights = np.zeros(len(sub_chunks))
|
|
priority_weights = np.zeros(len(sub_chunks))
|
|
|
|
for i, chunk in enumerate(sub_chunks):
|
|
chunk_type = extract_type(chunk)
|
|
type_weights[i] = TYPE_WEIGHTS.get(chunk_type, 1.0)
|
|
|
|
meta = extract_metadata(chunk)
|
|
priority_weights[i] = PRIORITY_WEIGHTS.get(meta["priority"], 1.0)
|
|
|
|
# -------------------------
|
|
# FINAL SCORE (composed signal)
|
|
# -------------------------
|
|
final_scores = (semantic_scores + 1.5 * np.log(type_weights) + 0.3 * np.log(priority_weights)
|
|
)
|
|
|
|
# -------------------------
|
|
# DEBUG VIEW (optional but very useful)
|
|
# -------------------------
|
|
if DEBUG:
|
|
debug_ranking = list(zip(
|
|
[os.path.basename(s) for s in sub_sources],
|
|
semantic_scores,
|
|
type_weights,
|
|
final_scores
|
|
))
|
|
|
|
debug_ranking.sort(key=lambda x: x[3], reverse=True)
|
|
|
|
print("\n--- TYPE-AWARE RANKING ---")
|
|
for name, sem, tw, fs in debug_ranking[:15]:
|
|
print(f"{name} | semantic similarity={sem:.4f} | type={tw:.2f} | final={fs:.4f}")
|
|
print("--- END ---\n")
|
|
|
|
# -------------------------
|
|
# Top-k selection
|
|
# -------------------------
|
|
top_k = level_cfg["top_k"]
|
|
top_indices = final_scores.argsort()[-top_k:][::-1]
|
|
|
|
return (
|
|
[sub_chunks[i] for i in top_indices],
|
|
[sub_sources[i] for i in top_indices]
|
|
)
|
|
|
|
|
|
# -------------------------
|
|
# Parse search filter from input
|
|
# -------------------------
|
|
def parse_input(user_input):
|
|
"""
|
|
Detects 'search keyword: question' syntax.
|
|
Returns (question, filter_term) tuple.
|
|
"""
|
|
pattern = re.match(r'^search\s+(.+?):\s*(.+)$', user_input, re.IGNORECASE)
|
|
if pattern:
|
|
filter_term = pattern.group(1).strip()
|
|
question = pattern.group(2).strip()
|
|
return question, filter_term
|
|
return user_input, SEARCH_FILTER
|
|
|
|
# --------------------------
|
|
# Truncate context at a sentence boundary to avoid feeding the LLM incomplete fragments
|
|
# -----------------------------
|
|
def truncate_at_sentence(text, max_chars):
|
|
if len(text) <= max_chars:
|
|
return text
|
|
truncated = text[:max_chars]
|
|
last_period = max(
|
|
truncated.rfind('.'),
|
|
truncated.rfind('!'),
|
|
truncated.rfind('?')
|
|
)
|
|
return truncated[:last_period + 1] if last_period > 0 else truncated
|
|
|
|
# -------------------------
|
|
# Ask question
|
|
# -------------------------
|
|
def ask_question(question, show_sources=False, filter_term=None):
|
|
global conversation_history
|
|
|
|
level_cfg = LEVELS[CURRENT_LEVEL]
|
|
top_chunks, sources = get_top_chunks(question, filter_term=filter_term)
|
|
|
|
if DEBUG:
|
|
print("\n--- Retrieved chunks ---")
|
|
for i, chunk in enumerate(top_chunks):
|
|
print(f"\nChunk {i+1}:")
|
|
print(chunk[:300])
|
|
print("--- End chunks ---\n")
|
|
|
|
joined_chunks = " ".join(top_chunks)
|
|
|
|
# If SINGLE chunk present, do NOT truncate
|
|
if "CHUNK: SINGLE" in joined_chunks:
|
|
if DEBUG:
|
|
print(" [SINGLE chunk detected — skipping context truncation]")
|
|
context = joined_chunks
|
|
else:
|
|
context = truncate_at_sentence(
|
|
joined_chunks,
|
|
level_cfg["context_len"]
|
|
)
|
|
|
|
history_text = ""
|
|
if conversation_history:
|
|
history_text = "Previous conversation:\n"
|
|
for exchange in conversation_history[-MAX_HISTORY:]:
|
|
history_text += f"Q: {exchange['question']}\n"
|
|
history_text += f"A: {exchange['answer']}\n"
|
|
history_text += "\n"
|
|
|
|
# Grab instruction and print status based on the manual mode
|
|
mode_cfg = MODES[CURRENT_MODE]
|
|
print(mode_cfg["print_msg"])
|
|
prompt_instruction = mode_cfg["prompt_instruction"]
|
|
|
|
with lm_model.chat_session(system_prompt=prompt_instruction):
|
|
user_message = (
|
|
f"{history_text}"
|
|
f"CONTEXT:\n{context}\n\n"
|
|
f"QUESTION: {question}\n\n"
|
|
f"ANSWER:"
|
|
)
|
|
response = lm_model.generate(
|
|
user_message,
|
|
max_tokens=level_cfg["max_tokens"]
|
|
)
|
|
|
|
answer = response.strip()
|
|
|
|
# Strip any runaway stop markers and everything after them
|
|
stop_markers = ["###", "####", "END OF ANSWER", "Final Answer", "STOP", "]]>"]
|
|
for marker in stop_markers:
|
|
if marker in answer:
|
|
answer = answer[:answer.index(marker)].strip()
|
|
|
|
# WARNING: corrupted or truncated answers stored in conversation_history
|
|
# will poison subsequent responses. Always store condensed_answer, not full response.
|
|
# When storing to conversation_history, store condensed version
|
|
condensed_answer = answer.split('\n')[0] # just the first line
|
|
conversation_history.append({
|
|
"question": question,
|
|
"answer": condensed_answer
|
|
})
|
|
|
|
if len(conversation_history) > MAX_HISTORY:
|
|
conversation_history = conversation_history[-MAX_HISTORY:]
|
|
|
|
if show_sources:
|
|
unique_sources = list(set(sources))
|
|
short_sources = [os.path.basename(s) for s in unique_sources]
|
|
print(f" [Sources: {', '.join(short_sources)}]")
|
|
print(f" [Level: {CURRENT_LEVEL} | "
|
|
f"expand={'on' if level_cfg['expand'] else 'off'} | "
|
|
f"top_k={level_cfg['top_k']} | "
|
|
f"max_tokens={level_cfg['max_tokens']}]")
|
|
print(f" [Memory: {len(conversation_history)} exchanges]")
|
|
if filter_term:
|
|
print(f" [Filter: '{filter_term}']")
|
|
|
|
return answer
|
|
|
|
# -------------------------
|
|
# Interactive loop
|
|
# -------------------------
|
|
print("\nReady! Ask questions about your books")
|
|
print("Commands: 'exit', 'sources on/off', 'level 1-10',")
|
|
print(" 'memory clear', 'memory show', 'debug on/off'")
|
|
print(" 'books' — list available books")
|
|
print(" 'search <keyword>: question' — filter by book\n")
|
|
show_sources = False
|
|
|
|
# Bot loop
|
|
while True:
|
|
# user_input = input(f"[L{CURRENT_LEVEL}] You: ")
|
|
user_input = input(f"[L{CURRENT_LEVEL}][{CURRENT_MODE}] You: ")
|
|
|
|
if user_input.lower() in ["exit", "quit"]:
|
|
break
|
|
|
|
elif user_input.startswith("mode "):
|
|
try:
|
|
# Splits "mode advanced" and takes "advanced"
|
|
new_mode = user_input.split(maxsplit=1)[1]
|
|
|
|
if new_mode in MODES:
|
|
CURRENT_MODE = new_mode
|
|
print(MODES[CURRENT_MODE]["print_msg"])
|
|
else:
|
|
available = ", ".join(MODES.keys())
|
|
print(f"Invalid mode. Available: {available}")
|
|
except IndexError:
|
|
print("Usage: mode [creative|research|advanced]")
|
|
continue
|
|
elif user_input.lower() == "memory clear":
|
|
conversation_history.clear()
|
|
print("Conversation memory cleared.")
|
|
continue
|
|
elif user_input.lower() == "memory show":
|
|
if not conversation_history:
|
|
print("No conversation history.")
|
|
else:
|
|
print(f"\n--- Last {len(conversation_history)} exchanges ---")
|
|
for i, exchange in enumerate(conversation_history):
|
|
print(f"\nQ{i+1}: {exchange['question']}")
|
|
print(f"A{i+1}: {exchange['answer'][:100]}...")
|
|
print("---\n")
|
|
continue
|
|
elif user_input.lower() == "debug on":
|
|
DEBUG = True
|
|
print("Debug mode enabled.")
|
|
continue
|
|
elif user_input.lower() == "debug off":
|
|
DEBUG = False
|
|
print("Debug mode disabled.")
|
|
continue
|
|
elif user_input.lower() == "sources on":
|
|
show_sources = True
|
|
print("Source display enabled.")
|
|
continue
|
|
elif user_input.lower() == "sources off":
|
|
show_sources = False
|
|
print("Source display disabled.")
|
|
continue
|
|
elif user_input.lower() == "books":
|
|
show_available_books()
|
|
continue
|
|
elif user_input.lower().startswith("level "):
|
|
try:
|
|
lvl = int(user_input.split()[1])
|
|
if 1 <= lvl <= 10:
|
|
CURRENT_LEVEL = lvl
|
|
cfg = LEVELS[CURRENT_LEVEL]
|
|
print(f"Level set to {CURRENT_LEVEL} — "
|
|
f"expand={'on' if cfg['expand'] else 'off'}, "
|
|
f"top_k={cfg['top_k']}, "
|
|
f"max_tokens={cfg['max_tokens']}")
|
|
else:
|
|
print("Level must be between 1 and 10.")
|
|
except:
|
|
print("Usage: level 1 through level 10")
|
|
continue
|
|
|
|
# Parse for search filter
|
|
question, filter_term = parse_input(user_input)
|
|
|
|
response = ask_question(question, show_sources=show_sources, filter_term=filter_term)
|
|
print("Bot:", response)
|