from sentence_transformers import SentenceTransformer from sklearn.metrics.pairwise import cosine_similarity from gpt4all import GPT4All import os import re import numpy as np import json from pathlib import Path import torch import time import sys import gpt4all os.environ["PATH"] = r"C:\Program Files\NVIDIA GPU Computing Toolkit\CUDA\v11.8\bin" + ";" + os.environ["PATH"] # ------------------------- # Knowledge base selection # ------------------------- BOOK_DIR = 'Books/Music' # Explore adding TYPE to headers. 4 types fact, rule, reference, pedagogical, the ordering ranks #TYPE: fact | rule | reference | pedagogical #DOMAIN: music_theory #PRIORITY: high | medium | low # cleaning text documents # https://www.text-utils.com/remove-special-characters/ # https://cloudconvert.com/docx-to-txt # Ask ChatGPT to descrive narratives around tablature examples. # I would like you to chunk this for my RAG system. # Where you identify guitar tablature you are to replace it with a narrative # describing the notes that are played in fine detail. # Please describe the notes exactly including any bends, hammer-on, pull-off, legatto, etc # Please do not omit any of the original descriptive text except insofar as it may be confusing for a RAG system. # You may use the existing text inform yourself and help narrate the notation. # {paste the text with tablature} # Retrieval — find the most relevant chunks from your documents using embeddings and cosine similarity # Augmented — add that retrieved context to the prompt # Generation — use the language model to generate an answer based on that contextfinger # ------------------- # Embedding Cleaning # ------------------- # del embeddings_cache.npz # del embeddings_cache_meta.json # ------------------- # TO-DO # ----------------- # Better table handling # Update requirements.txt with torch installation notes # Domain-specific clean profiles # --------------- # Running # -------------- # python Chartwell.py # -------------------------- # GIT Configuration # --------------------------- # git config --global credential.helper wincred # git config credential.helper store # git config --global user.name "Sean" # git config --global user.email "skessler1964@gmail.com" # Chartwell.py now has both models on GPU: # # GPT4All (Llama 3) — GPU for inference # SentenceTransformer — GPU for embeddings # IMPORTANT SETUP STEPS FOR RE-CREATING THIS ENVIORNMENT # 1) Install python # 3.10.11 # 2) Create venv # python -m venv .venv # .venv/Scripts/activate # 3) Install Dependencies # pip install -r requirements.txt # 4) Meta-Llama-3.1-8B-Instruct.Q4_0.gguf # \Users\skess\.cache\gpt4all\Meta-Llama-3-8B-Instruct.Q4_0.gguf # The model will auto-download on the first run and then switch to allow_download=False (see below) # The model is about 4.5G. The download is quick. # lm_model = GPT4All("Meta-Llama-3-8B-Instruct.Q4_0.gguf",model_path=r"C:\Users\skess\.cache\gpt4all",device="gpu",allow_download=False) # 5) huggingface This is for the sentence transformer (sentence-transformers/all-MiniLM-L6-v2) # \Users\skess\.cache\huggingface There is a fodler structure under here. # embed_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") This will automatically load the model if it is not already loaded # so an internet connection would be required if running this from scratch # IMPORTANT PYTHON NOTES - KEEP # Python # .venv/Scripts/Activate # pip freeze > requirements.txt # pip install -r requirements.txt # Torch GPU version # pip uninstall torch -y # pip install torch --index-url https://download.pytorch.org/whl/cu124 --force-reinstall # python -c "import torch; print(torch.__version__); print(torch.cuda.is_available())" # witness : 2.x.x+cu124 True for CUDA # Still on the to-do list: # Fix the enrichment length cap # Semantic chunking # Better table handling # ---------------------------------- # Weights for chunk weighting system # ----------------------------------- TYPE_WEIGHTS = { "fact": 1.10, # The "Oak" gets a small boost "rule": 1.05, "reference": 1.00, # The baseline "pedagogical": 0.95 # The "Undergrowth" is only slightly demoted } PRIORITY_WEIGHTS = { "high": 1.10, "medium": 1.00, "low": 0.90 } # ---------------------------------- # Operating modes # ----------------------------------- CURRENT_MODE = "research" MODES = { "creative": { "print_msg": "Creative mode.", "prompt_instruction": ( "You are a creative assistant. " "Use the provided context as inspiration. " "Be concise and original. " "End your response with a single period." ) }, "research": { "print_msg": "Research mode.", "prompt_instruction": ( "You are a helpful research assistant. " "Restrict your response strictly to the provided context. " "If the source material is exhausted, stop writing. " "If a relationship or entity is not explicitly documented in the context, do not include it. " "Do not repeat the same information in different wording. " "If multiple context passages express the same idea, summarize it once. " "If the context contains repetitive legal or procedural text, merge it into a single concise statement. " "Do not list multiple similar verses. " "Prefer one coherent explanation over multiple extracted quotations. " "Do not infer, guess, or use external knowledge under any circumstances. " "Never repeat the context or instructions. " "Never echo the question. " "End your answer with a single period. " ) }, "advanced": { "print_msg": "Advanced mode.", "prompt_instruction": ( "You are a highly capable analytical assistant. " "Base your response primarily on the provided context. " "OUTPUT FORMAT (strict):\n" "Step 1: ANALYSIS\n" "- Write sentences, each prefixed with:\n" " [C] = directly supported by the context\n" " [I] = inferred from the context\n" " [E] = not explicitly supported\n\n" "Step 2: FINAL ANSWER\n" "- Write ONE paragraph summary only\n" "- Must be fully supported by statements in ANALYSIS\n" "- Do NOT introduce new information\n\n" "RULES:\n" "- Do not repeat sentences\n" "- Do not create multiple sections beyond ANALYSIS and FINAL ANSWER\n" "- Minimize [E] usage\n" "- If context is insufficient, say so in FINAL ANSWER\n" ) }, "music": { "print_msg": "Music mode.", "prompt_instruction": ( "You are a music theory assistant.\n" "\n" "You may use general music theory knowledge when the context does not explicitly define a rule.\n" "However, if the context provides a rule, table, or mapping, you MUST prioritize it over general knowledge.\n" "\n" "Do not invent programming code, functions, or data structures.\n" "Do not fabricate musical tables or mappings not present in the context.\n" "\n" "Reasoning rules:\n" "- Prefer context over general knowledge.\n" "- If context is missing critical information, fall back to standard Western music theory.\n" "- If the question is ambiguous, choose the most common theoretical interpretation.\n" "\n" "Output rules:\n" "- Return only the final answer.\n" "- No explanations unless explicitly requested.\n" "- End with a single period.\n" ) } } CACHE_FILES = ['embeddings_cache.npz', 'embeddings_cache_meta.json'] # This ensures the cache is always saved INSIDE the folder you are pointing to CACHE_FILE = os.path.join(BOOK_DIR, CACHE_FILES[0]) # 'embeddings_cache.npz' CACHE_META = os.path.join(BOOK_DIR, CACHE_FILES[1]) # 'embeddings_cache_meta.json' book_files = [] for f in Path(BOOK_DIR).rglob('*'): if not f.is_file() or f.name in CACHE_FILES: # Remove the embeddings files fro this list continue if not f.is_file(): continue try: with open(f, 'rb'): # just check file is readable pass book_files.append(str(f)) except PermissionError: continue print(f"Found {len(book_files)} files") # Overlap should be 10-20% of chunk size CHUNK_SIZE = 700 CHUNK_OVERLAP = 100 DEBUG = False MAX_HISTORY = 5 CURRENT_LEVEL = 10 SEARCH_FILTER = None # None = search all books # -------------------------------------------------------------------- # Toggle for whether we are using the model to enrich the corpus data # -------------------------------------------------------------------- USE_ENRICHMENT = True # ------------------------- # CONVERSATIONAL HISTORY # ------------------------- conversation_history = [] # ------------------------- # LEVEL CONFIG # ------------------------- LEVELS = { 1: {"expand": False, "top_k": 1, "max_tokens": 75, "context_len": 500}, 2: {"expand": False, "top_k": 1, "max_tokens": 75, "context_len": 600}, 3: {"expand": False, "top_k": 2, "max_tokens": 100*3, "context_len": 700}, 4: {"expand": False, "top_k": 2, "max_tokens": 100*3, "context_len": 800}, 5: {"expand": False, "top_k": 3, "max_tokens": 125*3, "context_len": 1000}, 6: {"expand": False, "top_k": 6, "max_tokens": 200*3, "context_len": 2000}, 7: {"expand": True, "top_k": 5, "max_tokens": 150*3, "context_len": 1400}, 8: {"expand": True, "top_k": 5, "max_tokens": 175*3, "context_len": 1600}, 9: {"expand": True, "top_k": 6, "max_tokens": 175*3, "context_len": 1800}, 10: {"expand": True, "top_k": 6, "max_tokens": 200*3, "context_len": 2000}, } # ------------------------- # Load models # ------------------------- # ----------------------------------- # Load the sentence tranformer model # ----------------------------------- print("Loading embedding model...") device = "cuda" if torch.cuda.is_available() else "cpu" print(f"Embedding model using: {device}") embed_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2",device=device) # ----------------------------------- # Load the language model - If it does not exist in the download area then download it otherwise us it. # ----------------------------------- print("Loading language model...") model_file = "Meta-Llama-3.1-8B-Instruct-q4_0.gguf" model_path = r"C:\Users\skess\.cache\gpt4all" full_path = os.path.join(model_path, model_file) if not os.path.exists(full_path): print("Model not found locally. Downloading...") allow_download = True else: allow_download = False lm_model = GPT4All( model_file, model_path=model_path, device="cuda", allow_download=allow_download ) # ---------------- # Table Narration # ---------------- # This will detect and create narrations for table data in pipe form # For example. #| Year | Squadrons | Aircraft | #|------|-----------|----------| #| 1939 | 21 | 252 | #| 1940 | 35 | 420 | # If adding data to a corpus try to use this standard form for instance #| Metric | Value | Context | #|--------|-------|---------| #| Standard deduction single 2025 | $15,750 | Under age 65 | #| Standard deduction single 2025 | $17,750 | Age 65 or older | #| Standard deduction MFJ 2025 | $31,500 | Both under 65 | def narrate_table(text): """ Detect and convert pipe-delimited tables to narrative prose before chunking. """ lines = text.split('\n') result = [] i = 0 narrative_count = 0 table_count = 0 while i < len(lines): line = lines[i].strip() if '|' in line and line.count('|') >= 2: table_lines = [] while i < len(lines) and '|' in lines[i]: table_lines.append(lines[i].strip()) i += 1 data_lines = [l for l in table_lines if not re.match(r'^[\|\-\s:]+$', l)] if len(data_lines) >= 2: table_count += 1 headers = [h.strip() for h in data_lines[0].split('|') if h.strip()] narratives = [] for row_line in data_lines[1:]: values = [v.strip() for v in row_line.split('|') if v.strip()] if len(values) == len(headers): parts = [f"{headers[j]} was {values[j]}" for j in range(len(headers))] sentence = "In this record, " + ", ".join(parts) + "." narratives.append(sentence) narrative_count += 1 result.append(" ".join(narratives)) else: result.extend(table_lines) else: result.append(lines[i]) i += 1 if table_count > 0: print(f" [Table narration: {table_count} table(s) detected, " f"{narrative_count} row(s) converted]") return '\n'.join(result) # ------------------------- # Clean text # ------------------------- def clean_text(text): # Narrate tables before any other cleaning text = narrate_table(text) # Fix hyphenated line breaks in prose (word-\nword -> wordword) text = re.sub(r'(\w+)-\n(\w+)', r'\1\2', text) # Collapse 3+ newlines to double newline (preserve paragraph breaks) text = re.sub(r'\n{3,}', '\n\n', text) # Clean up other artifacts text = re.sub(r'(?<=[a-z])(\d{1,3})(?=\s[A-Z])', '', text) text = re.sub(r'[■•◆▪→]', '', text) text = re.sub(r' +', ' ', text) text = re.sub(r'\[\d+\]', '', text) text = re.sub(r'\[citation needed\]', '', text) return text.strip() # -------------------------------- # This is for the enrichment pipeline if it is enabled. It Uses the local LLM to extract key metadata from a chunk. # -------------------------------- PROMPT_TEMPLATE = ( "<|start_header_id|>user<|end_header_id|>\n" "TAGGING OPERATION. NOT A CONVERSATION. NO EXPLANATIONS.\n" "OUTPUT FORMAT IS FIXED. DO NOT DEVIATE.\n" "\n" "RULES:\n" "1. Output EXACTLY ONE LINE in this format: [Time: | Loc: | Entity: | Topic:]\n" "2. Fill every field. Use 'Unknown' if uncertain. Never leave a field empty.\n" "3. Entity: list up to 5 items, comma separated.\n" "4. No sentences. No explanation. No apology. No meta-commentary.\n" "5. Do not repeat these instructions. Do not acknowledge this prompt.\n" "6. Your entire response is the tag line and nothing else.\n" "\n" "Text: {text}\n" "<|eot_id|>\n" "<|start_header_id|>assistant<|end_header_id|>\n" "Tags: [" ) def extract_context_tags(text_chunk): start_time = time.perf_counter() response = lm_model.generate( PROMPT_TEMPLATE.format(text=text_chunk), max_tokens=60, temp=0.01, n_batch=512, ) # If the model didn't provide the bracket because we 'pushed' it, add it back tag = response.split(']')[0] + "]" if "]" in response else response if not tag.startswith("["): tag = "[" + tag print(f"TAG:{tag}") print(f"Took : {time.perf_counter() - start_time:.4f} seconds") return tag def is_empty_tag(tag): values = [part.split(":")[-1].strip() for part in tag.strip("[]").split("|")] return not any(values) # ------------------------- # Extract the CHUNK directive from the header # ------------------------- def get_chunk_directive(text, header_lines=20): """ Extract CHUNK directive from top of file only. """ top = "\n".join(text.splitlines()[:header_lines]) match = re.search(r"^CHUNK:\s*(\w+)", top, re.IGNORECASE | re.MULTILINE) if match: return match.group(1).strip().upper() return None # ------------------------- # Chunk text with overlap # ------------------------- def chunk_text(text, chunk_size=CHUNK_SIZE, overlap=CHUNK_OVERLAP): # Try to get the chunk directive if it isd present directive = get_chunk_directive(text) if directive == "SINGLE": if DEBUG: print(" [CHUNK: SINGLE detected — bypassing chunking]") return [text.strip()] # 1. EXTRACT HEADERS (The "Metadata Inheritance" logic) header_patterns = [ r"TYPE:.*", r"PRIORITY:.*", r"DOMAIN:.*", r"TITLE:.*", r"CONCEPTS:.*", r"SOURCE:.*", r"CHUNK:.*", # special pattern currently supports SINGLE so that the entire file will be chunked and not split across chunks ] header_lines = [] top_of_file = text[:500] for pattern in header_patterns: match = re.search(pattern, top_of_file, re.IGNORECASE) if match: header_lines.append(match.group(0)) header_prefix = "\n".join(header_lines) + "\n\n" if header_lines else "" # 2. SEMANTIC SPLITTING (Your original Step 1 & 2) paragraphs = [p.strip() for p in re.split(r'\n\s*\n', text) if p.strip()] split_units = [] for para in paragraphs: if len(para) <= chunk_size: split_units.append(para) else: sentences = re.split(r'(?<=[.!?])\s+', para) current = "" for sentence in sentences: if len(current) + len(sentence) <= chunk_size: current += " " + sentence else: if current: split_units.append(current.strip()) current = sentence if current: split_units.append(current.strip()) # 3. COMBINE & INJECT HEADERS (Step 3 with metadata injection) chunks = [] current_chunk = "" prev_unit = "" for unit in split_units: # Check if adding this unit exceeds chunk_size if len(current_chunk) + len(unit) + 1 <= chunk_size: current_chunk += " " + unit else: if current_chunk: final_output = current_chunk.strip() # --- CONDITIONAL ENRICHMENT LOGIC --- if USE_ENRICHMENT: print(f" [Enriching chunk {len(chunks)+1}...]", end="\r") tags = extract_context_tags(final_output[:600]) if not is_empty_tag(tags): final_output = f"{tags} {final_output}" # ---------------------------- # Add headers to all chunks except the first one (which already has them) if not any(pat in final_output[:100] for pat in ["TYPE:", "TITLE:"]): final_output = header_prefix + final_output chunks.append(final_output) # Overlap logic if prev_unit and len(prev_unit) + len(unit) + 1 <= chunk_size: current_chunk = prev_unit + " " + unit else: current_chunk = unit prev_unit = unit if current_chunk: final_output = current_chunk.strip() if USE_ENRICHMENT: tags = extract_context_tags(final_output[:600]) if not is_empty_tag(tags): final_output = f"{tags} {final_output}" if not any(pat in final_output[:100] for pat in ["TYPE:", "TITLE:"]): final_output = header_prefix + final_output chunks.append(final_output) return chunks # ------------------------- # Check if cache is valid # ------------------------- def cache_is_valid(): print(f"\nChecking for existing enriched cache in {BOOK_DIR}...") status = { "valid": True, "added": [], "modified": [], "missing_embeddings": [] } # --- HARD FAIL: missing cache files --- if not os.path.exists(CACHE_FILE) or not os.path.exists(CACHE_META): print("X Missing cache or metadata → rebuild required") status["valid"] = False return status with open(CACHE_META, "r") as f: meta = json.load(f) cached_files = set(meta.get("book_files", [])) current_files = set(book_files) # --- Detect NEW files --- status["added"] = list(current_files - cached_files) # --- Check EXISTING files --- for book_name in current_files: if not os.path.exists(book_name): continue # Skip new files (handled separately) if book_name not in cached_files: continue stored_size = meta.get("file_sizes", {}).get(book_name) actual_size = os.path.getsize(book_name) # Missing metadata entry → bad if stored_size is None: status["missing_embeddings"].append(book_name) continue # File changed → needs reprocessing if stored_size != actual_size: status["modified"].append(book_name) # --- HARD FAIL CONDITIONS --- if status["missing_embeddings"]: print(f"\nX Missing embeddings for {len(status['missing_embeddings'])} file(s):") for f in sorted(status["missing_embeddings"]): print(f" * {f}") status["valid"] = False if status["modified"]: print(f"\nX {len(status['modified'])} file(s) changed:") for f in sorted(status["modified"]): print(f" * {f}") # --- SOFT WARNING --- if status["added"]: print(f"\n+ {len(status['added'])} new file(s) detected:") for f in sorted(status["added"]): print(f" + {f}") if status["valid"]: print("\n✓ Cache usable (incremental updates possible)") else: print("\n→ Full rebuild required") return status # -------------------------------------------------- # Save updated cache file # -------------------------------------------------- def save_updated_cache(): np.savez( CACHE_FILE, embeddings=chunk_embeddings, chunks=np.array(all_chunks, dtype=object), sources=np.array(all_sources, dtype=object) ) file_sizes = {b: os.path.getsize(b) for b in book_files if os.path.exists(b)} with open(CACHE_META, "w") as f: json.dump({ "book_files": book_files, "file_sizes": file_sizes }, f) print("Cache updated.") # -------------------------------------------------- # Remove chunks from embeddings for specified files # -------------------------------------------------- def remove_chunks_for_files(files_to_remove): global all_chunks, all_sources, chunk_embeddings if not files_to_remove: return keep_indices = [ i for i, src in enumerate(all_sources) if src not in files_to_remove ] all_chunks = [all_chunks[i] for i in keep_indices] all_sources = [all_sources[i] for i in keep_indices] chunk_embeddings = chunk_embeddings[keep_indices] print(f"Removed old chunks for {len(files_to_remove)} modified file(s)") # ------------------------- # Process new and modified files # ------------------------- def process_incremental_updates(status): global all_chunks, all_sources, chunk_embeddings files_to_process = status["added"] + status["modified"] # Step 1 — remove outdated chunks (ONLY modified files) remove_chunks_for_files(status["modified"]) new_chunks = [] new_sources = [] # Step 2 — process new + modified files for book_name in files_to_process: print(f"[Updating] {book_name}") with open(book_name, "rb") as f: raw = f.read() try: text = raw.decode("utf-8") except: text = raw.decode("cp1252") # Skip IGNORE files first_line = text.lstrip().splitlines()[0] if text.strip() else "" if first_line.strip().upper().startswith("# IGNORE"): print(f"Skipping {book_name} (marked IGNORE)") continue book_text = clean_text(text) chunks = chunk_text(book_text) new_chunks.extend(chunks) new_sources.extend([book_name] * len(chunks)) # Step 3 — nothing to add if not new_chunks: print("No new chunks to add.") return # Step 4 — embed print(f"Embedding {len(new_chunks)} new chunks...") new_embeddings = embed_model.encode(new_chunks, convert_to_tensor=False) # Step 5 — append if len(all_chunks) == 0: # edge case: empty cache all_chunks = new_chunks all_sources = new_sources chunk_embeddings = np.array(new_embeddings) else: all_chunks.extend(new_chunks) all_sources.extend(new_sources) chunk_embeddings = np.vstack([chunk_embeddings, new_embeddings]) # Step 6 — save save_updated_cache() # ------------------------- # Load or build embeddings # ------------------------- all_chunks = [] all_sources = [] status = cache_is_valid() # if cache_is_valid(): if status["valid"]: print("Loading embeddings from cache...") data = np.load(CACHE_FILE, allow_pickle=True) chunk_embeddings = data["embeddings"] all_chunks = list(data["chunks"]) all_sources = list(data["sources"]) print(f"Total chunks loaded from cache: {len(all_chunks)}") # check if we have additions or modifications if status["added"] or status["modified"]: print("\n[Incremental update triggered]") process_incremental_updates(status) else: print("Building embeddings from scratch...") for book_name in book_files: if not os.path.exists(book_name): print(f"Warning: {book_name} not found, skipping...") continue print(f"Loading {book_name}...") with open(book_name, "rb") as f: raw = f.read() try: text = raw.decode("utf-8") except UnicodeDecodeError: print(f"[Encoding fallback] {book_name}") text = raw.decode("cp1252") # fallback for Windows-encoded text # Skip files marked with "# IGNORE" on first line first_line = text.lstrip().splitlines()[0] if text.strip() else "" if first_line.strip().upper().startswith("# IGNORE"): print(f"Skipping {book_name} (marked IGNORE)") continue book_text = clean_text(text) book_chunks = chunk_text(book_text) all_chunks.extend(book_chunks) all_sources.extend([book_name] * len(book_chunks)) print(f" -> {len(book_chunks)} chunks") print(f"Total chunks: {len(all_chunks)}") print("Embedding chunks (this may take a minute)...") chunk_embeddings = embed_model.encode(all_chunks, convert_to_tensor=False) print("Saving embeddings cache...") np.savez( CACHE_FILE, embeddings=chunk_embeddings, chunks=np.array(all_chunks, dtype=object), sources=np.array(all_sources, dtype=object) ) file_sizes = {b: os.path.getsize(b) for b in book_files if os.path.exists(b)} with open(CACHE_META, "w") as f: json.dump({"book_files": book_files, "file_sizes": file_sizes}, f) print("Cache saved.") # ------------------------- # Book filter helper # ------------------------- def get_filtered_indices(filter_term): """Return indices of chunks whose source filename contains filter_term.""" if not filter_term: return list(range(len(all_chunks))) filter_lower = filter_term.lower() return [i for i, src in enumerate(all_sources) if filter_lower in os.path.basename(src).lower()] def show_available_books(): """Print a short list of available books with keywords.""" print("\n--- Available books ---") for f in book_files: base = os.path.basename(f).replace('.txt', '') print(f" {base}") print("--- Use 'search : your question' to filter ---\n") # ------------------------- # Query expansion # ------------------------- def expand_query(question): book_titles = ', '.join([os.path.basename(b).replace('.txt', '') for b in book_files]) prompt = ( f"You are helping search a library containing these documents:\n" f"{book_titles}\n\n" f"Generate 3 alternative ways to ask the following question using " f"vocabulary, concepts, and terminology that would likely appear in " f"these specific documents. Do not reference authors or books not in this list. " f"The alternative questions must ask about the SAME specific fact as the original. " f"Do not broaden or change the subject of the question. " f"Return ONLY the 3 questions, one per line, no numbering, no explanation.\n\n" f"Question: {question}" ) with lm_model.chat_session(): response = lm_model.generate(prompt, max_tokens=150) lines = [line.strip() for line in response.strip().split('\n') if line.strip()] alternatives = [ l for l in lines if len(l) > 15 and len(l) < 200 and '?' in l and l != question and ':' not in l[:20] ][:3] all_queries = [question] + alternatives print(f" [Expanded queries: {len(all_queries)}]") for q in all_queries: print(f" - {q}") return all_queries # ---------------------- # Topic Detection # ---------------------- # Stopwords for topic detection # ------------------------- STOPWORDS = { "the","is","a","an","and","or","of","to","in","on","for","with", "what","which","who","how","when","where","can","i","you","it", "did","do","does","was","were","he","she","they","his","her", "him","them","his","its","be","been","have","has","had","will", "would","could","should","may","might","me","my","we","our" } def topics_are_related(question, history, lookback=3): """ Returns True if the question shares meaningful words with recent conversation history. Also returns True for very short pronoun-heavy questions since they are almost certainly follow-ups. """ if not history: return False q_lower = question.lower() # Get meaningful words from current question q_words = set(q_lower.replace('?','').replace('.','').split()) - STOPWORDS # Get words from recent history questions recent = history[-lookback:] history_words = set() for exchange in recent: history_words.update( exchange["question"].lower().replace('?','').replace('.','').split() ) history_words -= STOPWORDS # Pronoun follow-up check — only if history has meaningful content pronoun_followups = { "he","she","they","him","her","them","his","it", "this","that","these","those" } q_words_all = set(q_lower.replace('?','').replace('.','').split()) if len(q_words_all) <= 5 and q_words_all & pronoun_followups: if history_words: print(f" [Pronoun follow-up detected — enriching]") return True if not q_words: return False # Check meaningful word overlap overlap = len(q_words & history_words) print(f" [Topic overlap: {overlap} word(s)]") return overlap > 0 def enrich_query_with_history(question): """ Add context from recent history to improve retrieval for short follow-up questions. Skips enrichment if topic has shifted or enriched query is too long. """ if not conversation_history: return question # Only enrich questions under 8 words if len(question.split()) >= 8: return question # Check if topic has shifted if not topics_are_related(question, conversation_history): print(f" [Topic shift detected — no enrichment]") return question # Look back up to 3 exchanges for context recent = conversation_history[-3:] context_words = " ".join([ex["question"] for ex in recent]) enriched = f"{context_words} {question}" # Don't enrich if result is too long if len(enriched.split()) > 30: print(f" [Enriched query too long — using original]") return question print(f" [Enriched query: {enriched}]") return enriched # -------------------------------------------- # Handles type extraction from chunk metadata # -------------------------------------------- def extract_type(chunk_text): """ Extract TYPE metadata from chunk header. Defaults to 'reference' if missing. """ match = re.search(r"TYPE:\s*(fact|rule|reference|pedagogical)", chunk_text, re.IGNORECASE) if match: return match.group(1).lower() return "reference" def extract_metadata(chunk): """ Extracts TYPE / PRIORITY metadata from a chunk if present. Defaults are safe and neutral. """ meta = { "type": "reference", "priority": "medium" } # Look for TYPE: xxx type_match = re.search(r"TYPE:\s*(\w+)", chunk, re.IGNORECASE) if type_match: meta["type"] = type_match.group(1).lower().strip() # Look for PRIORITY: xxx priority_match = re.search(r"PRIORITY:\s*(\w+)", chunk, re.IGNORECASE) if priority_match: meta["priority"] = priority_match.group(1).lower().strip() return meta # ------------------------- # Retrieve top relevant chunks # ------------------------- def get_top_chunks(question, filter_term=None): level_cfg = LEVELS[CURRENT_LEVEL] # ------------------------- # Query preparation # ------------------------- retrieval_question = enrich_query_with_history(question) if level_cfg["expand"]: queries = expand_query(retrieval_question) else: queries = [retrieval_question] # ------------------------- # Filter scope # ------------------------- search_indices = get_filtered_indices(filter_term) if not search_indices: print(f" [Warning: no books matched filter '{filter_term}' — searching all]") search_indices = list(range(len(all_chunks))) sub_embeddings = chunk_embeddings[search_indices] sub_chunks = [all_chunks[i] for i in search_indices] sub_sources = [all_sources[i] for i in search_indices] if filter_term: matched_books = set(os.path.basename(s) for s in sub_sources) print(f" [Filter '{filter_term}' matched: {', '.join(matched_books)}]") # ------------------------- # Semantic scoring (pure signal) # ------------------------- semantic_scores = np.zeros(len(sub_chunks)) for q in queries: query_emb = embed_model.encode([q]) scores = cosine_similarity(query_emb, sub_embeddings)[0] semantic_scores += scores semantic_scores /= len(queries) # ------------------------- # SAFE MIN-MAX NORMALIZATION # ------------------------- min_s = semantic_scores.min() max_s = semantic_scores.max() range_s = max_s - min_s if range_s < 1e-6: # All scores basically identical → neutral signal semantic_scores = np.ones_like(semantic_scores) else: semantic_scores = (semantic_scores - min_s) / (range_s + 1e-9) # ------------------------- # TYPE + PRIORITY WEIGHTING # ------------------------- type_weights = np.zeros(len(sub_chunks)) priority_weights = np.zeros(len(sub_chunks)) for i, chunk in enumerate(sub_chunks): chunk_type = extract_type(chunk) type_weights[i] = TYPE_WEIGHTS.get(chunk_type, 1.0) meta = extract_metadata(chunk) priority_weights[i] = PRIORITY_WEIGHTS.get(meta["priority"], 1.0) # ------------------------- # FINAL SCORE (composed signal) # ------------------------- final_scores = (semantic_scores + 1.5 * np.log(type_weights) + 0.3 * np.log(priority_weights) ) # ------------------------- # DEBUG VIEW (optional but very useful) # ------------------------- if DEBUG: debug_ranking = list(zip( [os.path.basename(s) for s in sub_sources], semantic_scores, type_weights, final_scores )) debug_ranking.sort(key=lambda x: x[3], reverse=True) print("\n--- TYPE-AWARE RANKING ---") for name, sem, tw, fs in debug_ranking[:15]: print(f"{name} | semantic similarity={sem:.4f} | type={tw:.2f} | final={fs:.4f}") print("--- END ---\n") # ------------------------- # Top-k selection # ------------------------- top_k = level_cfg["top_k"] top_indices = final_scores.argsort()[-top_k:][::-1] return ( [sub_chunks[i] for i in top_indices], [sub_sources[i] for i in top_indices] ) # ------------------------- # Parse search filter from input # ------------------------- def parse_input(user_input): """ Detects 'search keyword: question' syntax. Returns (question, filter_term) tuple. """ pattern = re.match(r'^search\s+(.+?):\s*(.+)$', user_input, re.IGNORECASE) if pattern: filter_term = pattern.group(1).strip() question = pattern.group(2).strip() return question, filter_term return user_input, SEARCH_FILTER # -------------------------- # Truncate context at a sentence boundary to avoid feeding the LLM incomplete fragments # ----------------------------- def truncate_at_sentence(text, max_chars): if len(text) <= max_chars: return text truncated = text[:max_chars] last_period = max( truncated.rfind('.'), truncated.rfind('!'), truncated.rfind('?') ) return truncated[:last_period + 1] if last_period > 0 else truncated # ------------------------- # Ask question # ------------------------- def ask_question(question, show_sources=False, filter_term=None): global conversation_history level_cfg = LEVELS[CURRENT_LEVEL] top_chunks, sources = get_top_chunks(question, filter_term=filter_term) if DEBUG: print("\n--- Retrieved chunks ---") for i, chunk in enumerate(top_chunks): print(f"\nChunk {i+1}:") print(chunk[:300]) print("--- End chunks ---\n") joined_chunks = " ".join(top_chunks) # If SINGLE chunk present, do NOT truncate if "CHUNK: SINGLE" in joined_chunks: if DEBUG: print(" [SINGLE chunk detected — skipping context truncation]") context = joined_chunks else: context = truncate_at_sentence( joined_chunks, level_cfg["context_len"] ) history_text = "" if conversation_history: history_text = "Previous conversation:\n" for exchange in conversation_history[-MAX_HISTORY:]: history_text += f"Q: {exchange['question']}\n" history_text += f"A: {exchange['answer']}\n" history_text += "\n" # Grab instruction and print status based on the manual mode mode_cfg = MODES[CURRENT_MODE] print(mode_cfg["print_msg"]) prompt_instruction = mode_cfg["prompt_instruction"] with lm_model.chat_session(system_prompt=prompt_instruction): user_message = ( f"{history_text}" f"CONTEXT:\n{context}\n\n" f"QUESTION: {question}\n\n" f"ANSWER:" ) response = lm_model.generate( user_message, max_tokens=level_cfg["max_tokens"] ) answer = response.strip() # Strip any runaway stop markers and everything after them stop_markers = ["###", "####", "END OF ANSWER", "Final Answer", "STOP", "]]>"] for marker in stop_markers: if marker in answer: answer = answer[:answer.index(marker)].strip() # WARNING: corrupted or truncated answers stored in conversation_history # will poison subsequent responses. Always store condensed_answer, not full response. # When storing to conversation_history, store condensed version condensed_answer = answer.split('\n')[0] # just the first line conversation_history.append({ "question": question, "answer": condensed_answer }) if len(conversation_history) > MAX_HISTORY: conversation_history = conversation_history[-MAX_HISTORY:] if show_sources: unique_sources = list(set(sources)) short_sources = [os.path.basename(s) for s in unique_sources] print(f" [Sources: {', '.join(short_sources)}]") print(f" [Level: {CURRENT_LEVEL} | " f"expand={'on' if level_cfg['expand'] else 'off'} | " f"top_k={level_cfg['top_k']} | " f"max_tokens={level_cfg['max_tokens']}]") print(f" [Memory: {len(conversation_history)} exchanges]") if filter_term: print(f" [Filter: '{filter_term}']") return answer # ------------------------- # Interactive loop # ------------------------- print("\nReady! Ask questions about your books") print("Commands: 'exit', 'sources on/off', 'level 1-10',") print(" 'memory clear', 'memory show', 'debug on/off'") print(" 'books' — list available books") print(" 'search : question' — filter by book\n") show_sources = False # Bot loop while True: # user_input = input(f"[L{CURRENT_LEVEL}] You: ") user_input = input(f"[L{CURRENT_LEVEL}][{CURRENT_MODE}] You: ") if user_input.lower() in ["exit", "quit"]: break elif user_input.startswith("mode "): try: # Splits "mode advanced" and takes "advanced" new_mode = user_input.split(maxsplit=1)[1] if new_mode in MODES: CURRENT_MODE = new_mode print(MODES[CURRENT_MODE]["print_msg"]) else: available = ", ".join(MODES.keys()) print(f"Invalid mode. Available: {available}") except IndexError: print("Usage: mode [creative|research|advanced]") continue elif user_input.lower() == "memory clear": conversation_history.clear() print("Conversation memory cleared.") continue elif user_input.lower() == "memory show": if not conversation_history: print("No conversation history.") else: print(f"\n--- Last {len(conversation_history)} exchanges ---") for i, exchange in enumerate(conversation_history): print(f"\nQ{i+1}: {exchange['question']}") print(f"A{i+1}: {exchange['answer'][:100]}...") print("---\n") continue elif user_input.lower() == "debug on": DEBUG = True print("Debug mode enabled.") continue elif user_input.lower() == "debug off": DEBUG = False print("Debug mode disabled.") continue elif user_input.lower() == "sources on": show_sources = True print("Source display enabled.") continue elif user_input.lower() == "sources off": show_sources = False print("Source display disabled.") continue elif user_input.lower() == "books": show_available_books() continue elif user_input.lower().startswith("level "): try: lvl = int(user_input.split()[1]) if 1 <= lvl <= 10: CURRENT_LEVEL = lvl cfg = LEVELS[CURRENT_LEVEL] print(f"Level set to {CURRENT_LEVEL} — " f"expand={'on' if cfg['expand'] else 'off'}, " f"top_k={cfg['top_k']}, " f"max_tokens={cfg['max_tokens']}") else: print("Level must be between 1 and 10.") except: print("Usage: level 1 through level 10") continue # Parse for search filter question, filter_term = parse_input(user_input) response = ask_question(question, show_sources=show_sources, filter_term=filter_term) print("Bot:", response)