Files
Chartwell/Chartwell.py

1195 lines
41 KiB
Python

from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
from gpt4all import GPT4All
import os
import re
import numpy as np
import json
from pathlib import Path
import torch
import time
import sys
import gpt4all
os.environ["PATH"] = r"C:\Program Files\NVIDIA GPU Computing Toolkit\CUDA\v11.8\bin" + ";" + os.environ["PATH"]
# -------------------------
# Knowledge base selection
# -------------------------
BOOK_DIR = 'Books/Science'
# Explore adding TYPE to headers. 4 types fact, rule, reference, pedagogical, the ordering ranks
#TYPE: fact | rule | reference | pedagogical
#DOMAIN: music_theory
#PRIORITY: high | medium | low
# cleaning text documents
# https://www.text-utils.com/remove-special-characters/
# https://cloudconvert.com/docx-to-txt
# Ask ChatGPT to descrive narratives around tablature examples.
# I would like you to chunk this for my RAG system.
# Where you identify guitar tablature you are to replace it with a narrative
# describing the notes that are played in fine detail.
# Please describe the notes exactly including any bends, hammer-on, pull-off, legatto, etc
# Please do not omit any of the original descriptive text except insofar as it may be confusing for a RAG system.
# You may use the existing text inform yourself and help narrate the notation.
# {paste the text with tablature}
# Retrieval — find the most relevant chunks from your documents using embeddings and cosine similarity
# Augmented — add that retrieved context to the prompt
# Generation — use the language model to generate an answer based on that contextfinger
# -------------------
# Embedding Cleaning
# -------------------
# del embeddings_cache.npz
# del embeddings_cache_meta.json
# -------------------
# TO-DO
# -----------------
# Better table handling
# Update requirements.txt with torch installation notes
# Domain-specific clean profiles
# ---------------
# Running
# --------------
# python Chartwell.py
# --------------------------
# GIT Configuration
# ---------------------------
# git config --global credential.helper wincred
# git config credential.helper store
# git config --global user.name "Sean"
# git config --global user.email "skessler1964@gmail.com"
# Chartwell.py now has both models on GPU:
#
# GPT4All (Llama 3) — GPU for inference
# SentenceTransformer — GPU for embeddings
# IMPORTANT SETUP STEPS FOR RE-CREATING THIS ENVIORNMENT
# 1) Install python
# 3.10.11
# 2) Create venv
# python -m venv .venv
# .venv/Scripts/activate
# 3) Install Dependencies
# pip install -r requirements.txt
# 4) Meta-Llama-3.1-8B-Instruct.Q4_0.gguf
# \Users\skess\.cache\gpt4all\Meta-Llama-3-8B-Instruct.Q4_0.gguf
# The model will auto-download on the first run and then switch to allow_download=False (see below)
# The model is about 4.5G. The download is quick.
# lm_model = GPT4All("Meta-Llama-3-8B-Instruct.Q4_0.gguf",model_path=r"C:\Users\skess\.cache\gpt4all",device="gpu",allow_download=False)
# 5) huggingface This is for the sentence transformer (sentence-transformers/all-MiniLM-L6-v2)
# \Users\skess\.cache\huggingface There is a fodler structure under here.
# embed_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") This will automatically load the model if it is not already loaded
# so an internet connection would be required if running this from scratch
# IMPORTANT PYTHON NOTES - KEEP
# Python
# .venv/Scripts/Activate
# pip freeze > requirements.txt
# pip install -r requirements.txt
# Torch GPU version
# pip uninstall torch -y
# pip install torch --index-url https://download.pytorch.org/whl/cu124 --force-reinstall
# python -c "import torch; print(torch.__version__); print(torch.cuda.is_available())"
# witness : 2.x.x+cu124 True for CUDA
# Still on the to-do list:
# Fix the enrichment length cap
# Semantic chunking
# Better table handling
# ----------------------------------
# Weights for chunk weighting system
# -----------------------------------
TYPE_WEIGHTS = {
"fact": 1.10, # The "Oak" gets a small boost
"rule": 1.05,
"reference": 1.00, # The baseline
"pedagogical": 0.95 # The "Undergrowth" is only slightly demoted
}
PRIORITY_WEIGHTS = {
"high": 1.10,
"medium": 1.00,
"low": 0.90
}
# ----------------------------------
# Operating modes
# -----------------------------------
CURRENT_MODE = "research"
MODES = {
"creative": {
"print_msg": "Creative mode.",
"prompt_instruction": (
"You are a creative assistant. "
"Use the provided context as inspiration. "
"Be concise and original. "
"End your response with a single period."
)
},
# "research": {
# "print_msg": "Research mode.",
# "prompt_instruction": (
# "You are a helpful research assistant. "
# "Restrict your response strictly to the provided context. "
# "If the source material is exhausted, stop writing. "
# "If a relationship or entity is not explicitly documented in the context, do not include it. "
# # "Do not infer, supplement, or use external training knowledge. "
# "Be direct and concise. "
# "Never repeat the context or instructions. "
# "Never echo the question. "
# "End your answer with a single period. "
# )
# },
"research": {
"print_msg": "Research mode.",
"prompt_instruction": (
"You are a helpful research assistant. "
"Restrict your response strictly to the provided context. "
"If the source material is exhausted, stop writing. "
"If a relationship or entity is not explicitly documented in the context, do not include it. "
"Do not repeat the same information in different wording. "
"If multiple context passages express the same idea, summarize it once. "
"If the context contains repetitive legal or procedural text, merge it into a single concise statement. "
"Do not list multiple similar verses. "
"Prefer one coherent explanation over multiple extracted quotations. "
"Do not infer, supplement, or use external training knowledge. "
# "Be direct and concise. "
"Never repeat the context or instructions. "
"Never echo the question. "
"End your answer with a single period. "
)
},
# You are a retrieval-only QA assistant.
# Rules:
# - Use only the provided context.
# - Do not use external knowledge.
# - If the answer is not explicitly stated in the context, respond: "Not found in context."
# - Do not explain reasoning or rules.
# - Do not repeat the question.
# - Output must be one short paragraph.
"advanced": {
"print_msg": "Advanced mode.",
"prompt_instruction": (
"You are adept at mathematics and computer programming. "
"You are a linguist able to put together complex ideas and work with formulations and workflows."
)
},
"music": {
"print_msg": "Music mode.",
"prompt_instruction": (
"You are a music theory assistant.\n"
"\n"
"You may use general music theory knowledge when the context does not explicitly define a rule.\n"
"However, if the context provides a rule, table, or mapping, you MUST prioritize it over general knowledge.\n"
"\n"
"Do not invent programming code, functions, or data structures.\n"
"Do not fabricate musical tables or mappings not present in the context.\n"
"\n"
"Reasoning rules:\n"
"- Prefer context over general knowledge.\n"
"- If context is missing critical information, fall back to standard Western music theory.\n"
"- If the question is ambiguous, choose the most common theoretical interpretation.\n"
"\n"
"Output rules:\n"
"- Return only the final answer.\n"
"- No explanations unless explicitly requested.\n"
"- End with a single period.\n"
"Before answering any music question, state: root=X index=Y interval=Z target=W note=Result\n"
)
}
}
CACHE_FILES = ['embeddings_cache.npz', 'embeddings_cache_meta.json']
# This ensures the cache is always saved INSIDE the folder you are pointing to
CACHE_FILE = os.path.join(BOOK_DIR, CACHE_FILES[0]) # 'embeddings_cache.npz'
CACHE_META = os.path.join(BOOK_DIR, CACHE_FILES[1]) # 'embeddings_cache_meta.json'
book_files = []
for f in Path(BOOK_DIR).rglob('*'):
if not f.is_file() or f.name in CACHE_FILES: # Remove the embeddings files fro this list
continue
if not f.is_file():
continue
try:
with open(f, 'rb'): # just check file is readable
pass
book_files.append(str(f))
except PermissionError:
continue
print(f"Found {len(book_files)} files")
# Overlap should be 10-20% of chunk size
CHUNK_SIZE = 700
CHUNK_OVERLAP = 100
DEBUG = False
MAX_HISTORY = 5
CURRENT_LEVEL = 10
SEARCH_FILTER = None # None = search all books
# --------------------------------------------------------------------
# Toggle for whether we are using the model to enrich the corpus data
# --------------------------------------------------------------------
USE_ENRICHMENT = True
# -------------------------
# CONVERSATIONAL HISTORY
# -------------------------
conversation_history = []
# -------------------------
# LEVEL CONFIG
# -------------------------
LEVELS = {
1: {"expand": False, "top_k": 1, "max_tokens": 75, "context_len": 500},
2: {"expand": False, "top_k": 1, "max_tokens": 75, "context_len": 600},
3: {"expand": False, "top_k": 2, "max_tokens": 100*3, "context_len": 700},
4: {"expand": False, "top_k": 2, "max_tokens": 100*3, "context_len": 800},
5: {"expand": False, "top_k": 3, "max_tokens": 125*3, "context_len": 1000},
6: {"expand": False, "top_k": 5, "max_tokens": 150*3, "context_len": 1200},
7: {"expand": True, "top_k": 5, "max_tokens": 150*3, "context_len": 1400},
8: {"expand": True, "top_k": 5, "max_tokens": 175*3, "context_len": 1600},
9: {"expand": True, "top_k": 6, "max_tokens": 175*3, "context_len": 1800},
10: {"expand": True, "top_k": 6, "max_tokens": 200*3, "context_len": 2000},
}
# -------------------------
# Load models
# -------------------------
# -----------------------------------
# Load the sentence tranformer model
# -----------------------------------
print("Loading embedding model...")
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Embedding model using: {device}")
embed_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2",device=device)
# -----------------------------------
# Load the language model - If it does not exist in the download area then download it otherwise us it.
# -----------------------------------
print("Loading language model...")
model_file = "Meta-Llama-3.1-8B-Instruct-q4_0.gguf"
model_path = r"C:\Users\skess\.cache\gpt4all"
full_path = os.path.join(model_path, model_file)
if not os.path.exists(full_path):
print("Model not found locally. Downloading...")
allow_download = True
else:
allow_download = False
lm_model = GPT4All(
model_file,
model_path=model_path,
device="cuda",
allow_download=allow_download
)
# ----------------
# Table Narration
# ----------------
# This will detect and create narrations for table data in pipe form
# For example.
#| Year | Squadrons | Aircraft |
#|------|-----------|----------|
#| 1939 | 21 | 252 |
#| 1940 | 35 | 420 |
# If adding data to a corpus try to use this standard form for instance
#| Metric | Value | Context |
#|--------|-------|---------|
#| Standard deduction single 2025 | $15,750 | Under age 65 |
#| Standard deduction single 2025 | $17,750 | Age 65 or older |
#| Standard deduction MFJ 2025 | $31,500 | Both under 65 |
def narrate_table(text):
"""
Detect and convert pipe-delimited tables
to narrative prose before chunking.
"""
lines = text.split('\n')
result = []
i = 0
narrative_count = 0
table_count = 0
while i < len(lines):
line = lines[i].strip()
if '|' in line and line.count('|') >= 2:
table_lines = []
while i < len(lines) and '|' in lines[i]:
table_lines.append(lines[i].strip())
i += 1
data_lines = [l for l in table_lines
if not re.match(r'^[\|\-\s:]+$', l)]
if len(data_lines) >= 2:
table_count += 1
headers = [h.strip() for h in data_lines[0].split('|')
if h.strip()]
narratives = []
for row_line in data_lines[1:]:
values = [v.strip() for v in row_line.split('|')
if v.strip()]
if len(values) == len(headers):
parts = [f"{headers[j]} was {values[j]}"
for j in range(len(headers))]
sentence = "In this record, " + ", ".join(parts) + "."
narratives.append(sentence)
narrative_count += 1
result.append(" ".join(narratives))
else:
result.extend(table_lines)
else:
result.append(lines[i])
i += 1
if table_count > 0:
print(f" [Table narration: {table_count} table(s) detected, "
f"{narrative_count} row(s) converted]")
return '\n'.join(result)
# -------------------------
# Clean text
# -------------------------
def clean_text(text):
# Narrate tables before any other cleaning
text = narrate_table(text)
# Fix hyphenated line breaks in prose (word-\nword -> wordword)
text = re.sub(r'(\w+)-\n(\w+)', r'\1\2', text)
# Collapse 3+ newlines to double newline (preserve paragraph breaks)
text = re.sub(r'\n{3,}', '\n\n', text)
# Clean up other artifacts
text = re.sub(r'(?<=[a-z])(\d{1,3})(?=\s[A-Z])', '', text)
text = re.sub(r'[■•◆▪→]', '', text)
text = re.sub(r' +', ' ', text)
text = re.sub(r'\[\d+\]', '', text)
text = re.sub(r'\[citation needed\]', '', text)
return text.strip()
# --------------------------------
# This is for the enrichment pipeline if it is enabled. It Uses the local LLM to extract key metadata from a chunk.
# --------------------------------
# PROMPT_TEMPLATE = (
# "<|start_header_id|>user<|end_header_id|>\n"
# "Extract tags from the text below. Respond ONLY in this exact format: "
# "[Time: | Loc: | Entity: | Topic:]\n"
# # "[Time: | Loc: | Entity: | Theme:]\n"
# "ALL fields must be filled. If uncertain, make a best guess.\n"
# "Limit to 5 most important entities. No explanation.\n"
# "Text: {text}\n"
# "Tags: [<|eot_id|>\n"
# "<|start_header_id|>assistant<|end_header_id|>\n"
# )
PROMPT_TEMPLATE = (
"<|start_header_id|>user<|end_header_id|>\n"
"TAGGING OPERATION. NOT A CONVERSATION. NO EXPLANATIONS.\n"
"OUTPUT FORMAT IS FIXED. DO NOT DEVIATE.\n"
"\n"
"RULES:\n"
"1. Output EXACTLY ONE LINE in this format: [Time: | Loc: | Entity: | Topic:]\n"
"2. Fill every field. Use 'Unknown' if uncertain. Never leave a field empty.\n"
"3. Entity: list up to 5 items, comma separated.\n"
"4. No sentences. No explanation. No apology. No meta-commentary.\n"
"5. Do not repeat these instructions. Do not acknowledge this prompt.\n"
"6. Your entire response is the tag line and nothing else.\n"
"\n"
"Text: {text}\n"
"<|eot_id|>\n"
"<|start_header_id|>assistant<|end_header_id|>\n"
"Tags: ["
)
def extract_context_tags(text_chunk):
start_time = time.perf_counter()
response = lm_model.generate(
PROMPT_TEMPLATE.format(text=text_chunk),
max_tokens=60,
temp=0.01,
n_batch=512,
)
# If the model didn't provide the bracket because we 'pushed' it, add it back
tag = response.split(']')[0] + "]" if "]" in response else response
if not tag.startswith("["):
tag = "[" + tag
print(f"TAG:{tag}")
print(f"Took : {time.perf_counter() - start_time:.4f} seconds")
return tag
def is_empty_tag(tag):
values = [part.split(":")[-1].strip() for part in tag.strip("[]").split("|")]
return not any(values)
# -------------------------
# Extract the CHUNK directive from the header
# -------------------------
def get_chunk_directive(text, header_lines=20):
"""
Extract CHUNK directive from top of file only.
"""
top = "\n".join(text.splitlines()[:header_lines])
match = re.search(r"^CHUNK:\s*(\w+)", top, re.IGNORECASE | re.MULTILINE)
if match:
return match.group(1).strip().upper()
return None
# -------------------------
# Chunk text with overlap
# -------------------------
def chunk_text(text, chunk_size=CHUNK_SIZE, overlap=CHUNK_OVERLAP):
# Try to get the chunk directive if it isd present
directive = get_chunk_directive(text)
if directive == "SINGLE":
if DEBUG:
print(" [CHUNK: SINGLE detected — bypassing chunking]")
return [text.strip()]
# 1. EXTRACT HEADERS (The "Metadata Inheritance" logic)
header_patterns = [
r"TYPE:.*",
r"PRIORITY:.*",
r"DOMAIN:.*",
r"TITLE:.*",
r"CONCEPTS:.*",
r"SOURCE:.*",
r"CHUNK:.*", # special pattern currently supports SINGLE so that the entire file will be chunked and not split across chunks
]
header_lines = []
top_of_file = text[:500]
for pattern in header_patterns:
match = re.search(pattern, top_of_file, re.IGNORECASE)
if match:
header_lines.append(match.group(0))
header_prefix = "\n".join(header_lines) + "\n\n" if header_lines else ""
# 2. SEMANTIC SPLITTING (Your original Step 1 & 2)
paragraphs = [p.strip() for p in re.split(r'\n\s*\n', text) if p.strip()]
split_units = []
for para in paragraphs:
if len(para) <= chunk_size:
split_units.append(para)
else:
sentences = re.split(r'(?<=[.!?])\s+', para)
current = ""
for sentence in sentences:
if len(current) + len(sentence) <= chunk_size:
current += " " + sentence
else:
if current:
split_units.append(current.strip())
current = sentence
if current:
split_units.append(current.strip())
# 3. COMBINE & INJECT HEADERS (Step 3 with metadata injection)
chunks = []
current_chunk = ""
prev_unit = ""
for unit in split_units:
# Check if adding this unit exceeds chunk_size
if len(current_chunk) + len(unit) + 1 <= chunk_size:
current_chunk += " " + unit
else:
if current_chunk:
final_output = current_chunk.strip()
# --- CONDITIONAL ENRICHMENT LOGIC ---
if USE_ENRICHMENT:
print(f" [Enriching chunk {len(chunks)+1}...]", end="\r")
tags = extract_context_tags(final_output[:600])
if not is_empty_tag(tags):
final_output = f"{tags} {final_output}"
# ----------------------------
# Add headers to all chunks except the first one (which already has them)
if not any(pat in final_output[:100] for pat in ["TYPE:", "TITLE:"]):
final_output = header_prefix + final_output
chunks.append(final_output)
# Overlap logic
if prev_unit and len(prev_unit) + len(unit) + 1 <= chunk_size:
current_chunk = prev_unit + " " + unit
else:
current_chunk = unit
prev_unit = unit
if current_chunk:
final_output = current_chunk.strip()
if USE_ENRICHMENT:
tags = extract_context_tags(final_output[:600])
if not is_empty_tag(tags):
final_output = f"{tags} {final_output}"
if not any(pat in final_output[:100] for pat in ["TYPE:", "TITLE:"]):
final_output = header_prefix + final_output
chunks.append(final_output)
return chunks
# -------------------------
# Check if cache is valid
# -------------------------
# def cache_is_valid():
# print (f"Checking for existing enriched cache in {BOOK_DIR}...")
# if not os.path.exists(CACHE_FILE) or not os.path.exists(CACHE_META):
# return False
# with open(CACHE_META, "r") as f:
# meta = json.load(f)
# if meta.get("book_files") != book_files:
# return False
# for book_name in book_files:
# if not os.path.exists(book_name):
# continue
# stored_size = meta.get("file_sizes", {}).get(book_name)
# actual_size = os.path.getsize(book_name)
# if stored_size != actual_size:
# return False
# return True
def cache_is_valid():
print(f"\nChecking for existing enriched cache in {BOOK_DIR}...")
# --- HARD FAIL: missing cache ---
if not os.path.exists(CACHE_FILE) or not os.path.exists(CACHE_META):
print("X Missing cache or metadata → rebuild required")
return False
with open(CACHE_META, "r") as f:
meta = json.load(f)
cached_files = set(meta.get("book_files", []))
current_files = set(book_files)
added = current_files - cached_files
missing_embeddings = []
modified_files = []
for book_name in current_files:
if not os.path.exists(book_name):
continue
# NEW FILE → ignore for now
if book_name not in cached_files:
continue
stored_size = meta.get("file_sizes", {}).get(book_name)
actual_size = os.path.getsize(book_name)
# EXISTING FILE but missing metadata → BAD
if stored_size is None:
missing_embeddings.append(book_name)
continue
# EXISTING FILE but changed → BAD
if stored_size != actual_size:
modified_files.append(book_name)
# --- HARD FAIL CONDITIONS ---
if missing_embeddings:
print(f"\nX Missing embeddings for {len(missing_embeddings)} file(s):")
for f in sorted(missing_embeddings):
print(f" * {f}")
print("→ Rebuild required")
return False
if modified_files:
print(f"\nX {len(modified_files)} file(s) changed:")
for f in sorted(modified_files):
print(f" * {f}")
print("→ Rebuild required")
return False
# --- SOFT WARNING ---
if added:
print(f"\n+ {len(added)} new file(s) detected (not yet embedded):")
for f in sorted(added):
print(f" + {f}")
print("→ Continuing with existing cache (new files will be ignored)")
print("\n✓ Cache usable")
return True
# -------------------------
# Load or build embeddings
# -------------------------
all_chunks = []
all_sources = []
if cache_is_valid():
print("Loading embeddings from cache...")
data = np.load(CACHE_FILE, allow_pickle=True)
chunk_embeddings = data["embeddings"]
all_chunks = list(data["chunks"])
all_sources = list(data["sources"])
print(f"Total chunks loaded from cache: {len(all_chunks)}")
else:
print("Building embeddings from scratch...")
for book_name in book_files:
if not os.path.exists(book_name):
print(f"Warning: {book_name} not found, skipping...")
continue
print(f"Loading {book_name}...")
with open(book_name, "rb") as f:
raw = f.read()
try:
text = raw.decode("utf-8")
except UnicodeDecodeError:
print(f"[Encoding fallback] {book_name}")
text = raw.decode("cp1252") # fallback for Windows-encoded text
# Skip files marked with "# IGNORE" on first line
first_line = text.lstrip().splitlines()[0] if text.strip() else ""
if first_line.strip().upper().startswith("# IGNORE"):
print(f"Skipping {book_name} (marked IGNORE)")
continue
book_text = clean_text(text)
book_chunks = chunk_text(book_text)
all_chunks.extend(book_chunks)
all_sources.extend([book_name] * len(book_chunks))
print(f" -> {len(book_chunks)} chunks")
print(f"Total chunks: {len(all_chunks)}")
print("Embedding chunks (this may take a minute)...")
chunk_embeddings = embed_model.encode(all_chunks, convert_to_tensor=False)
print("Saving embeddings cache...")
np.savez(
CACHE_FILE,
embeddings=chunk_embeddings,
chunks=np.array(all_chunks, dtype=object),
sources=np.array(all_sources, dtype=object)
)
file_sizes = {b: os.path.getsize(b) for b in book_files if os.path.exists(b)}
with open(CACHE_META, "w") as f:
json.dump({"book_files": book_files, "file_sizes": file_sizes}, f)
print("Cache saved.")
# -------------------------
# Book filter helper
# -------------------------
def get_filtered_indices(filter_term):
"""Return indices of chunks whose source filename contains filter_term."""
if not filter_term:
return list(range(len(all_chunks)))
filter_lower = filter_term.lower()
return [i for i, src in enumerate(all_sources)
if filter_lower in os.path.basename(src).lower()]
def show_available_books():
"""Print a short list of available books with keywords."""
print("\n--- Available books ---")
for f in book_files:
base = os.path.basename(f).replace('.txt', '')
print(f" {base}")
print("--- Use 'search <keyword>: your question' to filter ---\n")
# -------------------------
# Query expansion
# -------------------------
def expand_query(question):
book_titles = ', '.join([os.path.basename(b).replace('.txt', '') for b in book_files])
prompt = (
f"You are helping search a library containing these documents:\n"
f"{book_titles}\n\n"
f"Generate 3 alternative ways to ask the following question using "
f"vocabulary, concepts, and terminology that would likely appear in "
f"these specific documents. Do not reference authors or books not in this list. "
f"The alternative questions must ask about the SAME specific fact as the original. "
f"Do not broaden or change the subject of the question. "
f"Return ONLY the 3 questions, one per line, no numbering, no explanation.\n\n"
f"Question: {question}"
)
with lm_model.chat_session():
response = lm_model.generate(prompt, max_tokens=150)
lines = [line.strip() for line in response.strip().split('\n') if line.strip()]
alternatives = [
l for l in lines
if len(l) > 15
and len(l) < 200
and '?' in l
and l != question
and ':' not in l[:20]
][:3]
all_queries = [question] + alternatives
print(f" [Expanded queries: {len(all_queries)}]")
for q in all_queries:
print(f" - {q}")
return all_queries
# ----------------------
# Topic Detection
# ----------------------
# Stopwords for topic detection
# -------------------------
STOPWORDS = {
"the","is","a","an","and","or","of","to","in","on","for","with",
"what","which","who","how","when","where","can","i","you","it",
"did","do","does","was","were","he","she","they","his","her",
"him","them","his","its","be","been","have","has","had","will",
"would","could","should","may","might","me","my","we","our"
}
def topics_are_related(question, history, lookback=3):
"""
Returns True if the question shares meaningful words
with recent conversation history.
Also returns True for very short pronoun-heavy questions
since they are almost certainly follow-ups.
"""
if not history:
return False
q_lower = question.lower()
# Get meaningful words from current question
q_words = set(q_lower.replace('?','').replace('.','').split()) - STOPWORDS
# Get words from recent history questions
recent = history[-lookback:]
history_words = set()
for exchange in recent:
history_words.update(
exchange["question"].lower().replace('?','').replace('.','').split()
)
history_words -= STOPWORDS
# Pronoun follow-up check — only if history has meaningful content
pronoun_followups = {
"he","she","they","him","her","them","his","it",
"this","that","these","those"
}
q_words_all = set(q_lower.replace('?','').replace('.','').split())
if len(q_words_all) <= 5 and q_words_all & pronoun_followups:
if history_words:
print(f" [Pronoun follow-up detected — enriching]")
return True
if not q_words:
return False
# Check meaningful word overlap
overlap = len(q_words & history_words)
print(f" [Topic overlap: {overlap} word(s)]")
return overlap > 0
def enrich_query_with_history(question):
"""
Add context from recent history to improve retrieval
for short follow-up questions.
Skips enrichment if topic has shifted or enriched query is too long.
"""
if not conversation_history:
return question
# Only enrich questions under 8 words
if len(question.split()) >= 8:
return question
# Check if topic has shifted
if not topics_are_related(question, conversation_history):
print(f" [Topic shift detected — no enrichment]")
return question
# Look back up to 3 exchanges for context
recent = conversation_history[-3:]
context_words = " ".join([ex["question"] for ex in recent])
enriched = f"{context_words} {question}"
# Don't enrich if result is too long
if len(enriched.split()) > 30:
print(f" [Enriched query too long — using original]")
return question
print(f" [Enriched query: {enriched}]")
return enriched
# --------------------------------------------
# Handles type extraction from chunk metadata
# --------------------------------------------
def extract_type(chunk_text):
"""
Extract TYPE metadata from chunk header.
Defaults to 'reference' if missing.
"""
match = re.search(r"TYPE:\s*(fact|rule|reference|pedagogical)", chunk_text, re.IGNORECASE)
if match:
return match.group(1).lower()
return "reference"
def extract_metadata(chunk):
"""
Extracts TYPE / PRIORITY metadata from a chunk if present.
Defaults are safe and neutral.
"""
meta = {
"type": "reference",
"priority": "medium"
}
# Look for TYPE: xxx
type_match = re.search(r"TYPE:\s*(\w+)", chunk, re.IGNORECASE)
if type_match:
meta["type"] = type_match.group(1).lower().strip()
# Look for PRIORITY: xxx
priority_match = re.search(r"PRIORITY:\s*(\w+)", chunk, re.IGNORECASE)
if priority_match:
meta["priority"] = priority_match.group(1).lower().strip()
return meta
# -------------------------
# Retrieve top relevant chunks
# -------------------------
def get_top_chunks(question, filter_term=None):
level_cfg = LEVELS[CURRENT_LEVEL]
# -------------------------
# Query preparation
# -------------------------
retrieval_question = enrich_query_with_history(question)
if level_cfg["expand"]:
queries = expand_query(retrieval_question)
else:
queries = [retrieval_question]
# -------------------------
# Filter scope
# -------------------------
search_indices = get_filtered_indices(filter_term)
if not search_indices:
print(f" [Warning: no books matched filter '{filter_term}' — searching all]")
search_indices = list(range(len(all_chunks)))
sub_embeddings = chunk_embeddings[search_indices]
sub_chunks = [all_chunks[i] for i in search_indices]
sub_sources = [all_sources[i] for i in search_indices]
if filter_term:
matched_books = set(os.path.basename(s) for s in sub_sources)
print(f" [Filter '{filter_term}' matched: {', '.join(matched_books)}]")
# -------------------------
# Semantic scoring (pure signal)
# -------------------------
semantic_scores = np.zeros(len(sub_chunks))
for q in queries:
query_emb = embed_model.encode([q])
scores = cosine_similarity(query_emb, sub_embeddings)[0]
semantic_scores += scores
semantic_scores /= len(queries)
# -------------------------
# SAFE MIN-MAX NORMALIZATION
# -------------------------
min_s = semantic_scores.min()
max_s = semantic_scores.max()
range_s = max_s - min_s
if range_s < 1e-6:
# All scores basically identical → neutral signal
semantic_scores = np.ones_like(semantic_scores)
else:
semantic_scores = (semantic_scores - min_s) / (range_s + 1e-9)
# -------------------------
# TYPE + PRIORITY WEIGHTING
# -------------------------
type_weights = np.zeros(len(sub_chunks))
priority_weights = np.zeros(len(sub_chunks))
for i, chunk in enumerate(sub_chunks):
chunk_type = extract_type(chunk)
type_weights[i] = TYPE_WEIGHTS.get(chunk_type, 1.0)
meta = extract_metadata(chunk)
priority_weights[i] = PRIORITY_WEIGHTS.get(meta["priority"], 1.0)
# -------------------------
# FINAL SCORE (composed signal)
# -------------------------
final_scores = (semantic_scores + 1.5 * np.log(type_weights) + 0.3 * np.log(priority_weights)
)
# -------------------------
# DEBUG VIEW (optional but very useful)
# -------------------------
if DEBUG:
debug_ranking = list(zip(
[os.path.basename(s) for s in sub_sources],
semantic_scores,
type_weights,
final_scores
))
debug_ranking.sort(key=lambda x: x[3], reverse=True)
print("\n--- TYPE-AWARE RANKING ---")
for name, sem, tw, fs in debug_ranking[:15]:
print(f"{name} | semantic similarity={sem:.4f} | type={tw:.2f} | final={fs:.4f}")
print("--- END ---\n")
# -------------------------
# Top-k selection
# -------------------------
top_k = level_cfg["top_k"]
top_indices = final_scores.argsort()[-top_k:][::-1]
return (
[sub_chunks[i] for i in top_indices],
[sub_sources[i] for i in top_indices]
)
# -------------------------
# Parse search filter from input
# -------------------------
def parse_input(user_input):
"""
Detects 'search keyword: question' syntax.
Returns (question, filter_term) tuple.
"""
pattern = re.match(r'^search\s+(.+?):\s*(.+)$', user_input, re.IGNORECASE)
if pattern:
filter_term = pattern.group(1).strip()
question = pattern.group(2).strip()
return question, filter_term
return user_input, SEARCH_FILTER
# --------------------------
# Truncate context at a sentence boundary to avoid feeding the LLM incomplete fragments
# -----------------------------
def truncate_at_sentence(text, max_chars):
if len(text) <= max_chars:
return text
truncated = text[:max_chars]
last_period = max(
truncated.rfind('.'),
truncated.rfind('!'),
truncated.rfind('?')
)
return truncated[:last_period + 1] if last_period > 0 else truncated
# -------------------------
# Ask question
# -------------------------
def ask_question(question, show_sources=False, filter_term=None):
global conversation_history
level_cfg = LEVELS[CURRENT_LEVEL]
top_chunks, sources = get_top_chunks(question, filter_term=filter_term)
if DEBUG:
print("\n--- Retrieved chunks ---")
for i, chunk in enumerate(top_chunks):
print(f"\nChunk {i+1}:")
print(chunk[:300])
print("--- End chunks ---\n")
joined_chunks = " ".join(top_chunks)
# If SINGLE chunk present, do NOT truncate
if "CHUNK: SINGLE" in joined_chunks:
if DEBUG:
print(" [SINGLE chunk detected — skipping context truncation]")
context = joined_chunks
else:
context = truncate_at_sentence(
joined_chunks,
level_cfg["context_len"]
)
history_text = ""
if conversation_history:
history_text = "Previous conversation:\n"
for exchange in conversation_history[-MAX_HISTORY:]:
history_text += f"Q: {exchange['question']}\n"
history_text += f"A: {exchange['answer']}\n"
history_text += "\n"
# Grab instruction and print status based on the manual mode
mode_cfg = MODES[CURRENT_MODE]
print(mode_cfg["print_msg"])
prompt_instruction = mode_cfg["prompt_instruction"]
with lm_model.chat_session(system_prompt=prompt_instruction):
user_message = (
f"{history_text}"
f"CONTEXT:\n{context}\n\n"
f"QUESTION: {question}\n\n"
f"ANSWER:"
)
response = lm_model.generate(
user_message,
max_tokens=level_cfg["max_tokens"]
)
answer = response.strip()
# Strip any runaway stop markers and everything after them
stop_markers = ["###", "####", "END OF ANSWER", "Final Answer", "STOP", "]]>"]
for marker in stop_markers:
if marker in answer:
answer = answer[:answer.index(marker)].strip()
# WARNING: corrupted or truncated answers stored in conversation_history
# will poison subsequent responses. Always store condensed_answer, not full response.
# When storing to conversation_history, store condensed version
condensed_answer = answer.split('\n')[0] # just the first line
conversation_history.append({
"question": question,
"answer": condensed_answer
})
if len(conversation_history) > MAX_HISTORY:
conversation_history = conversation_history[-MAX_HISTORY:]
if show_sources:
unique_sources = list(set(sources))
short_sources = [os.path.basename(s) for s in unique_sources]
print(f" [Sources: {', '.join(short_sources)}]")
print(f" [Level: {CURRENT_LEVEL} | "
f"expand={'on' if level_cfg['expand'] else 'off'} | "
f"top_k={level_cfg['top_k']} | "
f"max_tokens={level_cfg['max_tokens']}]")
print(f" [Memory: {len(conversation_history)} exchanges]")
if filter_term:
print(f" [Filter: '{filter_term}']")
return answer
# -------------------------
# Interactive loop
# -------------------------
print("\nReady! Ask questions about your books")
print("Commands: 'exit', 'sources on/off', 'level 1-10',")
print(" 'memory clear', 'memory show', 'debug on/off'")
print(" 'books' — list available books")
print(" 'search <keyword>: question' — filter by book\n")
show_sources = False
# Bot loop
while True:
# user_input = input(f"[L{CURRENT_LEVEL}] You: ")
user_input = input(f"[L{CURRENT_LEVEL}][{CURRENT_MODE}] You: ")
if user_input.lower() in ["exit", "quit"]:
break
elif user_input.startswith("mode "):
try:
# Splits "mode advanced" and takes "advanced"
new_mode = user_input.split(maxsplit=1)[1]
if new_mode in MODES:
CURRENT_MODE = new_mode
print(MODES[CURRENT_MODE]["print_msg"])
else:
available = ", ".join(MODES.keys())
print(f"Invalid mode. Available: {available}")
except IndexError:
print("Usage: mode [creative|research|advanced]")
continue
elif user_input.lower() == "memory clear":
conversation_history.clear()
print("Conversation memory cleared.")
continue
elif user_input.lower() == "memory show":
if not conversation_history:
print("No conversation history.")
else:
print(f"\n--- Last {len(conversation_history)} exchanges ---")
for i, exchange in enumerate(conversation_history):
print(f"\nQ{i+1}: {exchange['question']}")
print(f"A{i+1}: {exchange['answer'][:100]}...")
print("---\n")
continue
elif user_input.lower() == "debug on":
DEBUG = True
print("Debug mode enabled.")
continue
elif user_input.lower() == "debug off":
DEBUG = False
print("Debug mode disabled.")
continue
elif user_input.lower() == "sources on":
show_sources = True
print("Source display enabled.")
continue
elif user_input.lower() == "sources off":
show_sources = False
print("Source display disabled.")
continue
elif user_input.lower() == "books":
show_available_books()
continue
elif user_input.lower().startswith("level "):
try:
lvl = int(user_input.split()[1])
if 1 <= lvl <= 10:
CURRENT_LEVEL = lvl
cfg = LEVELS[CURRENT_LEVEL]
print(f"Level set to {CURRENT_LEVEL}"
f"expand={'on' if cfg['expand'] else 'off'}, "
f"top_k={cfg['top_k']}, "
f"max_tokens={cfg['max_tokens']}")
else:
print("Level must be between 1 and 10.")
except:
print("Usage: level 1 through level 10")
continue
# Parse for search filter
question, filter_term = parse_input(user_input)
response = ask_question(question, show_sources=show_sources, filter_term=filter_term)
print("Bot:", response)