Files
Chartwell/Chartwell.py

753 lines
26 KiB
Python

from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
from gpt4all import GPT4All
import os
import re
import numpy as np
import json
from pathlib import Path
import torch
# Retrieval — find the most relevant chunks from your documents using embeddings and cosine similarity
# Augmented — add that retrieved context to the prompt
# Generation — use the language model to generate an answer based on that context
# -------------------
# Embedding Cleaning
# -------------------
# del embeddings_cache.npz
# del embeddings_cache_meta.json
# -------------------
# TO-DO
# -----------------
# Better table handling
# Update requirements.txt with torch installation notes
# Domain-specific clean profiles
# ---------------
# Running
# --------------
# python Chartwell.py
# --------------------------
# GIT Configuration
# ---------------------------
# git config --global credential.helper wincred
# git config credential.helper store
# git config --global user.name "Sean"
# git config --global user.email "skessler1964@gmail.com"
# Chartwell.py now has both models on GPU:
#
# GPT4All (Llama 3) — GPU for inference
# SentenceTransformer — GPU for embeddings
# IMPORTANT SETUP STEPS FOR RE-CREATING THIS ENVIORNMENT
# 1) Install python
# 3.10.11
# 2) Create venv
# python -m venv .venv
# .venv/Scripts/activate
# 3) Install Dependencies
# pip install -r requirements.txt
# 4) Meta-Llama-3-8B-Instruct.Q4_0.gguf
# \Users\skess\.cache\gpt4all\Meta-Llama-3-8B-Instruct.Q4_0.gguf
# The model will auto-download on the first run and then switch to allow_download=False (see below)
# The model is about 4.5G. The download is quick.
# lm_model = GPT4All("Meta-Llama-3-8B-Instruct.Q4_0.gguf",model_path=r"C:\Users\skess\.cache\gpt4all",device="gpu",allow_download=False)
# 5) huggingface This is for the sentence transformer (sentence-transformers/all-MiniLM-L6-v2)
# \Users\skess\.cache\huggingface There is a fodler structure under here.
# embed_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") This will automatically load the model if it is not already loaded
# so an internet connection would be required if running this from scratch
# IMPORTANT PYTHON NOTES - KEEP
# Python
# .venv/Scripts/Activate
# pip freeze > requirements.txt
# pip install -r requirements.txt
# Torch GPU version
# pip uninstall torch -y
# pip install torch --index-url https://download.pytorch.org/whl/cu124 --force-reinstall
# python -c "import torch; print(torch.__version__); print(torch.cuda.is_available())"
# witness : 2.x.x+cu124 True for CUDA
# Still on the to-do list:
# Fix the enrichment length cap
# Semantic chunking
# Better table handling
# -------------------------
# Knowledge base selection
# -------------------------
BOOK_DIR = 'Books/Accounting' # just a string
book_files = []
for f in Path(BOOK_DIR).rglob('*'):
if not f.is_file():
continue
try:
with open(f, 'rb'): # just check file is readable
pass
book_files.append(str(f))
except PermissionError:
continue
print(f"Found {len(book_files)} files")
# Overlap should be 10-20% of chunk size
CHUNK_SIZE = 700
CHUNK_OVERLAP = 100
DEBUG = False
CACHE_FILE = "embeddings_cache.npz"
CACHE_META = "embeddings_cache_meta.json"
MAX_HISTORY = 5
CURRENT_LEVEL = 10
SEARCH_FILTER = None # None = search all books
# -------------------------
# CONVERSATIONAL HISTORY
# -------------------------
conversation_history = []
# -------------------------
# LEVEL CONFIG
# -------------------------
LEVELS = {
1: {"expand": False, "top_k": 1, "max_tokens": 75, "context_len": 500},
2: {"expand": False, "top_k": 1, "max_tokens": 75, "context_len": 600},
3: {"expand": False, "top_k": 2, "max_tokens": 100*3, "context_len": 700},
4: {"expand": False, "top_k": 2, "max_tokens": 100*3, "context_len": 800},
5: {"expand": False, "top_k": 3, "max_tokens": 125*3, "context_len": 1000},
6: {"expand": False, "top_k": 3, "max_tokens": 150*3, "context_len": 1200},
7: {"expand": True, "top_k": 3, "max_tokens": 150*3, "context_len": 1400},
8: {"expand": True, "top_k": 4, "max_tokens": 175*3, "context_len": 1600},
9: {"expand": True, "top_k": 5, "max_tokens": 175*3, "context_len": 1800},
10: {"expand": True, "top_k": 5, "max_tokens": 200*3, "context_len": 2000},
}
# -------------------------
# Load models
# -------------------------
# -----------------------------------
# Load the sentence tranformer model
# -----------------------------------
print("Loading embedding model...")
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Embedding model using: {device}")
embed_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2",device=device)
# -----------------------------------
# Load the language model - If it does not exist in the download area then download it otherwise us it.
# -----------------------------------
print("Loading language model...")
model_file = "Meta-Llama-3-8B-Instruct.Q4_0.gguf"
model_path = r"C:\Users\skess\.cache\gpt4all"
full_path = os.path.join(model_path, model_file)
if not os.path.exists(full_path):
print("Model not found locally. Downloading...")
allow_download = True
else:
allow_download = False
lm_model = GPT4All(
model_file,
model_path=model_path,
device="gpu",
allow_download=allow_download
)
# ----------------
# Table Narration
# ----------------
# This will detect and create narrations for table data in pipe form
# For example.
#| Year | Squadrons | Aircraft |
#|------|-----------|----------|
#| 1939 | 21 | 252 |
#| 1940 | 35 | 420 |
# If adding data to a corpus try to use this standard form for instance
#| Metric | Value | Context |
#|--------|-------|---------|
#| Standard deduction single 2025 | $15,750 | Under age 65 |
#| Standard deduction single 2025 | $17,750 | Age 65 or older |
#| Standard deduction MFJ 2025 | $31,500 | Both under 65 |
def narrate_table(text):
"""
Detect and convert pipe-delimited tables
to narrative prose before chunking.
"""
lines = text.split('\n')
result = []
i = 0
narrative_count = 0
table_count = 0
while i < len(lines):
line = lines[i].strip()
if '|' in line and line.count('|') >= 2:
table_lines = []
while i < len(lines) and '|' in lines[i]:
table_lines.append(lines[i].strip())
i += 1
data_lines = [l for l in table_lines
if not re.match(r'^[\|\-\s:]+$', l)]
if len(data_lines) >= 2:
table_count += 1
headers = [h.strip() for h in data_lines[0].split('|')
if h.strip()]
narratives = []
for row_line in data_lines[1:]:
values = [v.strip() for v in row_line.split('|')
if v.strip()]
if len(values) == len(headers):
parts = [f"{headers[j]} was {values[j]}"
for j in range(len(headers))]
sentence = "In this record, " + ", ".join(parts) + "."
narratives.append(sentence)
narrative_count += 1
result.append(" ".join(narratives))
else:
result.extend(table_lines)
else:
result.append(lines[i])
i += 1
if table_count > 0:
print(f" [Table narration: {table_count} table(s) detected, "
f"{narrative_count} row(s) converted]")
return '\n'.join(result)
# -------------------------
# Clean text
# -------------------------
def clean_text(text):
# Narrate tables before any other cleaning
text = narrate_table(text)
# existing cleaning...
text = re.sub(r'(\w+)-\n(\w+)', r'\1\2', text)
text = re.sub(r'\n+', ' ', text)
text = re.sub(r'(?<=[a-z])(\d{1,3})(?=\s[A-Z])', '', text)
text = re.sub(r'[■•◆▪→]', '', text)
text = re.sub(r' +', ' ', text)
text = re.sub(r'\[\d+\]', '', text)
text = re.sub(r'\[citation needed\]', '', text)
return text.strip()
# -------------------------
# Chunk text with overlap
# -------------------------
def chunk_text(text, chunk_size=CHUNK_SIZE, overlap=CHUNK_OVERLAP):
# Step 1 — Split into paragraphs first
paragraphs = [p.strip() for p in re.split(r'\n\s*\n', text) if p.strip()]
# Step 2 — Split any overly long paragraphs into sentences
split_units = []
for para in paragraphs:
if len(para) <= chunk_size * 2:
split_units.append(para)
else:
# Break long paragraph into sentences
sentences = re.split(r'(?<=[.!?])\s+', para)
current = ""
for sentence in sentences:
if len(current) + len(sentence) <= chunk_size:
current += " " + sentence
else:
if current:
split_units.append(current.strip())
current = sentence
if current:
split_units.append(current.strip())
# Step 3 — Combine units into chunks up to chunk_size
# with overlap by re-including the previous unit
chunks = []
current_chunk = ""
prev_unit = ""
for unit in split_units:
if len(current_chunk) + len(unit) + 1 <= chunk_size:
current_chunk += " " + unit
else:
if current_chunk:
chunks.append(current_chunk.strip())
# Overlap — start new chunk with previous unit for context
if prev_unit and len(prev_unit) + len(unit) + 1 <= chunk_size:
current_chunk = prev_unit + " " + unit
else:
current_chunk = unit
prev_unit = unit
if current_chunk:
chunks.append(current_chunk.strip())
return chunks
# -------------------------
# Check if cache is valid
# -------------------------
def cache_is_valid():
if not os.path.exists(CACHE_FILE) or not os.path.exists(CACHE_META):
return False
with open(CACHE_META, "r") as f:
meta = json.load(f)
if meta.get("book_files") != book_files:
return False
for book_name in book_files:
if not os.path.exists(book_name):
continue
stored_size = meta.get("file_sizes", {}).get(book_name)
actual_size = os.path.getsize(book_name)
if stored_size != actual_size:
return False
return True
# -------------------------
# Load or build embeddings
# -------------------------
all_chunks = []
all_sources = []
if cache_is_valid():
print("Loading embeddings from cache...")
data = np.load(CACHE_FILE, allow_pickle=True)
chunk_embeddings = data["embeddings"]
all_chunks = list(data["chunks"])
all_sources = list(data["sources"])
print(f"Total chunks loaded from cache: {len(all_chunks)}")
else:
print("Building embeddings from scratch...")
for book_name in book_files:
if not os.path.exists(book_name):
print(f"Warning: {book_name} not found, skipping...")
continue
print(f"Loading {book_name}...")
with open(book_name, "rb") as f:
raw = f.read()
try:
text = raw.decode("utf-8")
except UnicodeDecodeError:
print(f"[Encoding fallback] {book_name}")
text = raw.decode("cp1252") # fallback for Windows-encoded text
book_text = clean_text(text)
book_chunks = chunk_text(book_text)
all_chunks.extend(book_chunks)
all_sources.extend([book_name] * len(book_chunks))
print(f" -> {len(book_chunks)} chunks")
print(f"Total chunks: {len(all_chunks)}")
print("Embedding chunks (this may take a minute)...")
chunk_embeddings = embed_model.encode(all_chunks, convert_to_tensor=False)
print("Saving embeddings cache...")
np.savez(
CACHE_FILE,
embeddings=chunk_embeddings,
chunks=np.array(all_chunks, dtype=object),
sources=np.array(all_sources, dtype=object)
)
file_sizes = {b: os.path.getsize(b) for b in book_files if os.path.exists(b)}
with open(CACHE_META, "w") as f:
json.dump({"book_files": book_files, "file_sizes": file_sizes}, f)
print("Cache saved.")
# -------------------------
# Book filter helper
# -------------------------
def get_filtered_indices(filter_term):
"""Return indices of chunks whose source filename contains filter_term."""
if not filter_term:
return list(range(len(all_chunks)))
filter_lower = filter_term.lower()
return [i for i, src in enumerate(all_sources)
if filter_lower in os.path.basename(src).lower()]
def show_available_books():
"""Print a short list of available books with keywords."""
print("\n--- Available books ---")
for f in book_files:
base = os.path.basename(f).replace('.txt', '')
print(f" {base}")
print("--- Use 'search <keyword>: your question' to filter ---\n")
# -------------------------
# Query expansion
# -------------------------
def expand_query(question):
book_titles = ', '.join([os.path.basename(b).replace('.txt', '') for b in book_files])
prompt = (
f"You are helping search a library containing these documents:\n"
f"{book_titles}\n\n"
f"Generate 3 alternative ways to ask the following question using "
f"vocabulary, concepts, and terminology that would likely appear in "
f"these specific documents. Do not reference authors or books not in this list. "
f"The alternative questions must ask about the SAME specific fact as the original. "
f"Do not broaden or change the subject of the question. "
f"Return ONLY the 3 questions, one per line, no numbering, no explanation.\n\n"
f"Question: {question}"
)
with lm_model.chat_session():
response = lm_model.generate(prompt, max_tokens=150)
lines = [line.strip() for line in response.strip().split('\n') if line.strip()]
alternatives = [
l for l in lines
if len(l) > 15
and len(l) < 200
and '?' in l
and l != question
and ':' not in l[:20]
][:3]
all_queries = [question] + alternatives
print(f" [Expanded queries: {len(all_queries)}]")
for q in all_queries:
print(f" - {q}")
return all_queries
# ----------------------
# Topic Detection
# ----------------------
# Stopwords for topic detection
# -------------------------
STOPWORDS = {
"the","is","a","an","and","or","of","to","in","on","for","with",
"what","which","who","how","when","where","can","i","you","it",
"did","do","does","was","were","he","she","they","his","her",
"him","them","his","its","be","been","have","has","had","will",
"would","could","should","may","might","me","my","we","our"
}
def topics_are_related(question, history, lookback=3):
"""
Returns True if the question shares meaningful words
with recent conversation history.
Also returns True for very short pronoun-heavy questions
since they are almost certainly follow-ups.
"""
if not history:
return False
q_lower = question.lower()
# Get meaningful words from current question
q_words = set(q_lower.replace('?','').replace('.','').split()) - STOPWORDS
# Get words from recent history questions
recent = history[-lookback:]
history_words = set()
for exchange in recent:
history_words.update(
exchange["question"].lower().replace('?','').replace('.','').split()
)
history_words -= STOPWORDS
# Pronoun follow-up check — only if history has meaningful content
pronoun_followups = {
"he","she","they","him","her","them","his","it",
"this","that","these","those"
}
q_words_all = set(q_lower.replace('?','').replace('.','').split())
if len(q_words_all) <= 5 and q_words_all & pronoun_followups:
if history_words:
print(f" [Pronoun follow-up detected — enriching]")
return True
if not q_words:
return False
# Check meaningful word overlap
overlap = len(q_words & history_words)
print(f" [Topic overlap: {overlap} word(s)]")
return overlap > 0
def enrich_query_with_history(question):
"""
Add context from recent history to improve retrieval
for short follow-up questions.
Skips enrichment if topic has shifted or enriched query is too long.
"""
if not conversation_history:
return question
# Only enrich questions under 8 words
if len(question.split()) >= 8:
return question
# Check if topic has shifted
if not topics_are_related(question, conversation_history):
print(f" [Topic shift detected — no enrichment]")
return question
# Look back up to 3 exchanges for context
recent = conversation_history[-3:]
context_words = " ".join([ex["question"] for ex in recent])
enriched = f"{context_words} {question}"
# Don't enrich if result is too long
if len(enriched.split()) > 30:
print(f" [Enriched query too long — using original]")
return question
print(f" [Enriched query: {enriched}]")
return enriched
# -------------------------
# Retrieve top relevant chunks
# -------------------------
def get_top_chunks(question, filter_term=None):
level_cfg = LEVELS[CURRENT_LEVEL]
# Enrich short follow-up questions with history context
retrieval_question = enrich_query_with_history(question)
if level_cfg["expand"]:
queries = expand_query(retrieval_question)
else:
queries = [retrieval_question]
# Get filtered indices
search_indices = get_filtered_indices(filter_term)
if not search_indices:
print(f" [Warning: no books matched filter '{filter_term}' — searching all]")
search_indices = list(range(len(all_chunks)))
# Subset embeddings and metadata
sub_embeddings = chunk_embeddings[search_indices]
sub_chunks = [all_chunks[i] for i in search_indices]
sub_sources = [all_sources[i] for i in search_indices]
if filter_term:
matched_books = set(os.path.basename(s) for s in sub_sources)
print(f" [Filter '{filter_term}' matched: {', '.join(matched_books)}]")
# Score within filtered subset
sub_scores = np.zeros(len(sub_chunks))
for q in queries:
query_emb = embed_model.encode([q])
scores = cosine_similarity(query_emb, sub_embeddings)[0]
sub_scores += scores
sub_scores /= len(queries)
top_k = level_cfg["top_k"]
top_indices = sub_scores.argsort()[-top_k:][::-1]
return [sub_chunks[i] for i in top_indices], [sub_sources[i] for i in top_indices]
# -------------------------
# Parse search filter from input
# -------------------------
def parse_input(user_input):
"""
Detects 'search keyword: question' syntax.
Returns (question, filter_term) tuple.
"""
pattern = re.match(r'^search\s+(.+?):\s*(.+)$', user_input, re.IGNORECASE)
if pattern:
filter_term = pattern.group(1).strip()
question = pattern.group(2).strip()
return question, filter_term
return user_input, SEARCH_FILTER
# --------------------------
# Truncate context at a sentence boundary to avoid feeding the LLM incomplete fragments
# -----------------------------
def truncate_at_sentence(text, max_chars):
if len(text) <= max_chars:
return text
truncated = text[:max_chars]
last_period = max(
truncated.rfind('.'),
truncated.rfind('!'),
truncated.rfind('?')
)
return truncated[:last_period + 1] if last_period > 0 else truncated
# -------------------------
# Determimne if the question is asking for a creative or factual response
# -------------------------
def is_creative_request(question):
triggers = {
"suggest", "write", "complete", "finish", "rhyme", "next line",
"come up with", "give me", "idea for", "open", "start", "begin",
"chorus", "verse", "bridge", "hook", "lyric", "lyrics",
"continue", "follow", "what comes", "how might", "how would"
}
q_lower = question.lower()
return any(t in q_lower for t in triggers)
# -------------------------
# Ask question
# -------------------------
def ask_question(question, show_sources=False, filter_term=None):
global conversation_history
level_cfg = LEVELS[CURRENT_LEVEL]
top_chunks, sources = get_top_chunks(question, filter_term=filter_term)
if DEBUG:
print("\n--- Retrieved chunks ---")
for i, chunk in enumerate(top_chunks):
print(f"\nChunk {i+1}:")
print(chunk[:300])
print("--- End chunks ---\n")
context = truncate_at_sentence(" ".join(top_chunks), level_cfg["context_len"])
history_text = ""
if conversation_history:
history_text = "Previous conversation:\n"
for exchange in conversation_history[-MAX_HISTORY:]:
history_text += f"Q: {exchange['question']}\n"
history_text += f"A: {exchange['answer']}\n"
history_text += "\n"
if is_creative_request(question):
prompt_instruction = (
"You are a creative songwriting assistant. "
"Use the provided context as inspiration and technique guidance. "
"Generate original creative suggestions. "
"Be concise. Do not reproduce the context. "
"End your response with a single period."
)
else:
prompt_instruction = (
"You are a helpful research assistant. "
"Answer using ONLY the provided context. "
"Be direct and concise. Never repeat the context or instructions. "
"Never echo the question. End your answer with a single period."
)
with lm_model.chat_session(system_prompt=prompt_instruction):
user_message = (
f"{history_text}"
f"CONTEXT:\n{context}\n\n"
f"QUESTION: {question}\n\n"
f"ANSWER:"
)
response = lm_model.generate(
user_message,
max_tokens=level_cfg["max_tokens"]
)
answer = response.strip()
# Strip any runaway stop markers and everything after them
stop_markers = ["###", "####", "END OF ANSWER", "Final Answer", "STOP"]
for marker in stop_markers:
if marker in answer:
answer = answer[:answer.index(marker)].strip()
conversation_history.append({
"question": question,
"answer": answer
})
if len(conversation_history) > MAX_HISTORY:
conversation_history = conversation_history[-MAX_HISTORY:]
if show_sources:
unique_sources = list(set(sources))
short_sources = [os.path.basename(s) for s in unique_sources]
print(f" [Sources: {', '.join(short_sources)}]")
print(f" [Level: {CURRENT_LEVEL} | "
f"expand={'on' if level_cfg['expand'] else 'off'} | "
f"top_k={level_cfg['top_k']} | "
f"max_tokens={level_cfg['max_tokens']}]")
print(f" [Memory: {len(conversation_history)} exchanges]")
if filter_term:
print(f" [Filter: '{filter_term}']")
return answer
# -------------------------
# Interactive loop
# -------------------------
print("\nReady! Ask questions about your books")
print("Commands: 'exit', 'sources on/off', 'level 1-10',")
print(" 'memory clear', 'memory show', 'debug on/off'")
print(" 'books' — list available books")
print(" 'search <keyword>: question' — filter by book\n")
show_sources = False
while True:
user_input = input(f"[L{CURRENT_LEVEL}] You: ")
if user_input.lower() in ["exit", "quit"]:
break
elif user_input.lower() == "memory clear":
conversation_history.clear()
print("Conversation memory cleared.")
continue
elif user_input.lower() == "memory show":
if not conversation_history:
print("No conversation history.")
else:
print(f"\n--- Last {len(conversation_history)} exchanges ---")
for i, exchange in enumerate(conversation_history):
print(f"\nQ{i+1}: {exchange['question']}")
print(f"A{i+1}: {exchange['answer'][:100]}...")
print("---\n")
continue
elif user_input.lower() == "debug on":
DEBUG = True
print("Debug mode enabled.")
continue
elif user_input.lower() == "debug off":
DEBUG = False
print("Debug mode disabled.")
continue
elif user_input.lower() == "sources on":
show_sources = True
print("Source display enabled.")
continue
elif user_input.lower() == "sources off":
show_sources = False
print("Source display disabled.")
continue
elif user_input.lower() == "books":
show_available_books()
continue
elif user_input.lower().startswith("level "):
try:
lvl = int(user_input.split()[1])
if 1 <= lvl <= 10:
CURRENT_LEVEL = lvl
cfg = LEVELS[CURRENT_LEVEL]
print(f"Level set to {CURRENT_LEVEL}"
f"expand={'on' if cfg['expand'] else 'off'}, "
f"top_k={cfg['top_k']}, "
f"max_tokens={cfg['max_tokens']}")
else:
print("Level must be between 1 and 10.")
except:
print("Usage: level 1 through level 10")
continue
# Parse for search filter
question, filter_term = parse_input(user_input)
response = ask_question(question, show_sources=show_sources, filter_term=filter_term)
print("Bot:", response)