from sentence_transformers import SentenceTransformer from sklearn.metrics.pairwise import cosine_similarity from gpt4all import GPT4All import os import re import numpy as np import json from pathlib import Path import torch # Explore adding TYPE to headers. 4 types fact, rule, reference, pedagogical, the ordering ranks #TYPE: fact | rule | reference | pedagogical #DOMAIN: music_theory #PRIORITY: high | medium | low # cleaning text documents # https://www.text-utils.com/remove-special-characters/ # Ask ChatGPT to descrive narratives around tablature examples. # I would like you to chunk this for my RAG system. # Where you identify guitar tablature you are to replace it with a narrative # describing the notes that are played in fine detail. # Please describe the notes exactly including any bends, hammer-on, pull-off, legatto, etc # Please do not omit any of the original descriptive text except insofar as it may be confusing for a RAG system. # You may use the existing text inform yourself and help narrate the notation. # {paste the text with tablature} # Retrieval — find the most relevant chunks from your documents using embeddings and cosine similarity # Augmented — add that retrieved context to the prompt # Generation — use the language model to generate an answer based on that contextfinger # ------------------- # Embedding Cleaning # ------------------- # del embeddings_cache.npz # del embeddings_cache_meta.json # ------------------- # TO-DO # ----------------- # Better table handling # Update requirements.txt with torch installation notes # Domain-specific clean profiles # --------------- # Running # -------------- # python Chartwell.py # -------------------------- # GIT Configuration # --------------------------- # git config --global credential.helper wincred # git config credential.helper store # git config --global user.name "Sean" # git config --global user.email "skessler1964@gmail.com" # Chartwell.py now has both models on GPU: # # GPT4All (Llama 3) — GPU for inference # SentenceTransformer — GPU for embeddings # IMPORTANT SETUP STEPS FOR RE-CREATING THIS ENVIORNMENT # 1) Install python # 3.10.11 # 2) Create venv # python -m venv .venv # .venv/Scripts/activate # 3) Install Dependencies # pip install -r requirements.txt # 4) Meta-Llama-3.1-8B-Instruct.Q4_0.gguf # \Users\skess\.cache\gpt4all\Meta-Llama-3-8B-Instruct.Q4_0.gguf # The model will auto-download on the first run and then switch to allow_download=False (see below) # The model is about 4.5G. The download is quick. # lm_model = GPT4All("Meta-Llama-3-8B-Instruct.Q4_0.gguf",model_path=r"C:\Users\skess\.cache\gpt4all",device="gpu",allow_download=False) # 5) huggingface This is for the sentence transformer (sentence-transformers/all-MiniLM-L6-v2) # \Users\skess\.cache\huggingface There is a fodler structure under here. # embed_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") This will automatically load the model if it is not already loaded # so an internet connection would be required if running this from scratch # IMPORTANT PYTHON NOTES - KEEP # Python # .venv/Scripts/Activate # pip freeze > requirements.txt # pip install -r requirements.txt # Torch GPU version # pip uninstall torch -y # pip install torch --index-url https://download.pytorch.org/whl/cu124 --force-reinstall # python -c "import torch; print(torch.__version__); print(torch.cuda.is_available())" # witness : 2.x.x+cu124 True for CUDA # Still on the to-do list: # Fix the enrichment length cap # Semantic chunking # Better table handling # ---------------------------------- # Weights for chunk weighting system # ----------------------------------- # TYPE_WEIGHTS = { # "fact": 1.50, # "rule": 1.20, # "reference": 1.00, # "pedagogical": 0.85 # } TYPE_WEIGHTS = { "fact": 1.10, # The "Oak" gets a small boost "rule": 1.05, "reference": 1.00, # The baseline "pedagogical": 0.95 # The "Undergrowth" is only slightly demoted } PRIORITY_WEIGHTS = { "high": 1.10, "medium": 1.00, "low": 0.90 } # ------------------------- # Knowledge base selection # ------------------------- BOOK_DIR = 'Books/History' # just a string book_files = [] for f in Path(BOOK_DIR).rglob('*'): if not f.is_file(): continue try: with open(f, 'rb'): # just check file is readable pass book_files.append(str(f)) except PermissionError: continue print(f"Found {len(book_files)} files") # Overlap should be 10-20% of chunk size CHUNK_SIZE = 700 CHUNK_OVERLAP = 100 DEBUG = False CACHE_FILE = "embeddings_cache.npz" CACHE_META = "embeddings_cache_meta.json" MAX_HISTORY = 5 CURRENT_LEVEL = 10 SEARCH_FILTER = None # None = search all books # ------------------------- # CONVERSATIONAL HISTORY # ------------------------- conversation_history = [] # ------------------------- # LEVEL CONFIG # ------------------------- LEVELS = { 1: {"expand": False, "top_k": 1, "max_tokens": 75, "context_len": 500}, 2: {"expand": False, "top_k": 1, "max_tokens": 75, "context_len": 600}, 3: {"expand": False, "top_k": 2, "max_tokens": 100*3, "context_len": 700}, 4: {"expand": False, "top_k": 2, "max_tokens": 100*3, "context_len": 800}, 5: {"expand": False, "top_k": 3, "max_tokens": 125*3, "context_len": 1000}, 6: {"expand": False, "top_k": 5, "max_tokens": 150*3, "context_len": 1200}, 7: {"expand": True, "top_k": 5, "max_tokens": 150*3, "context_len": 1400}, 8: {"expand": True, "top_k": 5, "max_tokens": 175*3, "context_len": 1600}, 9: {"expand": True, "top_k": 6, "max_tokens": 175*3, "context_len": 1800}, 10: {"expand": True, "top_k": 6, "max_tokens": 200*3, "context_len": 2000}, } # ------------------------- # Load models # ------------------------- # ----------------------------------- # Load the sentence tranformer model # ----------------------------------- print("Loading embedding model...") device = "cuda" if torch.cuda.is_available() else "cpu" print(f"Embedding model using: {device}") embed_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2",device=device) # ----------------------------------- # Load the language model - If it does not exist in the download area then download it otherwise us it. # ----------------------------------- # model_file = "Meta-Llama-3.1-8B-Instruct.Q4_0.gguf" print("Loading language model...") #model_file = "Meta-Llama-3-8B-Instruct.Q4_0.gguf" # upgrading to 3.1 model_file = "Meta-Llama-3.1-8B-Instruct-q4_0.gguf" model_path = r"C:\Users\skess\.cache\gpt4all" full_path = os.path.join(model_path, model_file) if not os.path.exists(full_path): print("Model not found locally. Downloading...") allow_download = True else: allow_download = False lm_model = GPT4All( model_file, model_path=model_path, device="gpu", allow_download=allow_download ) # ---------------- # Table Narration # ---------------- # This will detect and create narrations for table data in pipe form # For example. #| Year | Squadrons | Aircraft | #|------|-----------|----------| #| 1939 | 21 | 252 | #| 1940 | 35 | 420 | # If adding data to a corpus try to use this standard form for instance #| Metric | Value | Context | #|--------|-------|---------| #| Standard deduction single 2025 | $15,750 | Under age 65 | #| Standard deduction single 2025 | $17,750 | Age 65 or older | #| Standard deduction MFJ 2025 | $31,500 | Both under 65 | def narrate_table(text): """ Detect and convert pipe-delimited tables to narrative prose before chunking. """ lines = text.split('\n') result = [] i = 0 narrative_count = 0 table_count = 0 while i < len(lines): line = lines[i].strip() if '|' in line and line.count('|') >= 2: table_lines = [] while i < len(lines) and '|' in lines[i]: table_lines.append(lines[i].strip()) i += 1 data_lines = [l for l in table_lines if not re.match(r'^[\|\-\s:]+$', l)] if len(data_lines) >= 2: table_count += 1 headers = [h.strip() for h in data_lines[0].split('|') if h.strip()] narratives = [] for row_line in data_lines[1:]: values = [v.strip() for v in row_line.split('|') if v.strip()] if len(values) == len(headers): parts = [f"{headers[j]} was {values[j]}" for j in range(len(headers))] sentence = "In this record, " + ", ".join(parts) + "." narratives.append(sentence) narrative_count += 1 result.append(" ".join(narratives)) else: result.extend(table_lines) else: result.append(lines[i]) i += 1 if table_count > 0: print(f" [Table narration: {table_count} table(s) detected, " f"{narrative_count} row(s) converted]") return '\n'.join(result) # ------------------------- # Clean text # ------------------------- def clean_text(text): # Narrate tables before any other cleaning text = narrate_table(text) # Fix hyphenated line breaks in prose (word-\nword -> wordword) text = re.sub(r'(\w+)-\n(\w+)', r'\1\2', text) # Collapse 3+ newlines to double newline (preserve paragraph breaks) text = re.sub(r'\n{3,}', '\n\n', text) # Clean up other artifacts text = re.sub(r'(?<=[a-z])(\d{1,3})(?=\s[A-Z])', '', text) text = re.sub(r'[■•◆▪→]', '', text) text = re.sub(r' +', ' ', text) text = re.sub(r'\[\d+\]', '', text) text = re.sub(r'\[citation needed\]', '', text) return text.strip() # ------------------------- # Chunk text with overlap # ------------------------- def chunk_text(text, chunk_size=CHUNK_SIZE, overlap=CHUNK_OVERLAP): # 1. EXTRACT HEADERS (The "Metadata Inheritance" logic) header_patterns = [r"TYPE:.*", r"PRIORITY:.*", r"DOMAIN:.*", r"TITLE:.*",r"CONCEPTS:.*",r"SOURCE:.*"] header_lines = [] top_of_file = text[:500] for pattern in header_patterns: match = re.search(pattern, top_of_file, re.IGNORECASE) if match: header_lines.append(match.group(0)) header_prefix = "\n".join(header_lines) + "\n\n" if header_lines else "" # 2. SEMANTIC SPLITTING (Your original Step 1 & 2) paragraphs = [p.strip() for p in re.split(r'\n\s*\n', text) if p.strip()] split_units = [] for para in paragraphs: if len(para) <= chunk_size: split_units.append(para) else: sentences = re.split(r'(?<=[.!?])\s+', para) current = "" for sentence in sentences: if len(current) + len(sentence) <= chunk_size: current += " " + sentence else: if current: split_units.append(current.strip()) current = sentence if current: split_units.append(current.strip()) # 3. COMBINE & INJECT HEADERS (Step 3 with metadata injection) chunks = [] current_chunk = "" prev_unit = "" for unit in split_units: # Check if adding this unit exceeds chunk_size if len(current_chunk) + len(unit) + 1 <= chunk_size: current_chunk += " " + unit else: if current_chunk: # Add headers to all chunks except the first one (which already has them) final_output = current_chunk.strip() if not any(pat in final_output[:100] for pat in ["TYPE:", "TITLE:"]): final_output = header_prefix + final_output chunks.append(final_output) # Overlap logic if prev_unit and len(prev_unit) + len(unit) + 1 <= chunk_size: current_chunk = prev_unit + " " + unit else: current_chunk = unit prev_unit = unit if current_chunk: final_output = current_chunk.strip() if not any(pat in final_output[:100] for pat in ["TYPE:", "TITLE:"]): final_output = header_prefix + final_output chunks.append(final_output) return chunks # ------------------------- # Check if cache is valid # ------------------------- def cache_is_valid(): if not os.path.exists(CACHE_FILE) or not os.path.exists(CACHE_META): return False with open(CACHE_META, "r") as f: meta = json.load(f) if meta.get("book_files") != book_files: return False for book_name in book_files: if not os.path.exists(book_name): continue stored_size = meta.get("file_sizes", {}).get(book_name) actual_size = os.path.getsize(book_name) if stored_size != actual_size: return False return True # ------------------------- # Load or build embeddings # ------------------------- all_chunks = [] all_sources = [] if cache_is_valid(): print("Loading embeddings from cache...") data = np.load(CACHE_FILE, allow_pickle=True) chunk_embeddings = data["embeddings"] all_chunks = list(data["chunks"]) all_sources = list(data["sources"]) print(f"Total chunks loaded from cache: {len(all_chunks)}") else: print("Building embeddings from scratch...") for book_name in book_files: if not os.path.exists(book_name): print(f"Warning: {book_name} not found, skipping...") continue print(f"Loading {book_name}...") with open(book_name, "rb") as f: raw = f.read() try: text = raw.decode("utf-8") except UnicodeDecodeError: print(f"[Encoding fallback] {book_name}") text = raw.decode("cp1252") # fallback for Windows-encoded text # Skip files marked with "# IGNORE" on first line first_line = text.lstrip().splitlines()[0] if text.strip() else "" if first_line.strip().upper().startswith("# IGNORE"): print(f"Skipping {book_name} (marked IGNORE)") continue book_text = clean_text(text) book_chunks = chunk_text(book_text) all_chunks.extend(book_chunks) all_sources.extend([book_name] * len(book_chunks)) print(f" -> {len(book_chunks)} chunks") print(f"Total chunks: {len(all_chunks)}") print("Embedding chunks (this may take a minute)...") chunk_embeddings = embed_model.encode(all_chunks, convert_to_tensor=False) print("Saving embeddings cache...") np.savez( CACHE_FILE, embeddings=chunk_embeddings, chunks=np.array(all_chunks, dtype=object), sources=np.array(all_sources, dtype=object) ) file_sizes = {b: os.path.getsize(b) for b in book_files if os.path.exists(b)} with open(CACHE_META, "w") as f: json.dump({"book_files": book_files, "file_sizes": file_sizes}, f) print("Cache saved.") # ------------------------- # Book filter helper # ------------------------- def get_filtered_indices(filter_term): """Return indices of chunks whose source filename contains filter_term.""" if not filter_term: return list(range(len(all_chunks))) filter_lower = filter_term.lower() return [i for i, src in enumerate(all_sources) if filter_lower in os.path.basename(src).lower()] def show_available_books(): """Print a short list of available books with keywords.""" print("\n--- Available books ---") for f in book_files: base = os.path.basename(f).replace('.txt', '') print(f" {base}") print("--- Use 'search : your question' to filter ---\n") # ------------------------- # Query expansion # ------------------------- def expand_query(question): book_titles = ', '.join([os.path.basename(b).replace('.txt', '') for b in book_files]) prompt = ( f"You are helping search a library containing these documents:\n" f"{book_titles}\n\n" f"Generate 3 alternative ways to ask the following question using " f"vocabulary, concepts, and terminology that would likely appear in " f"these specific documents. Do not reference authors or books not in this list. " f"The alternative questions must ask about the SAME specific fact as the original. " f"Do not broaden or change the subject of the question. " f"Return ONLY the 3 questions, one per line, no numbering, no explanation.\n\n" f"Question: {question}" ) with lm_model.chat_session(): response = lm_model.generate(prompt, max_tokens=150) lines = [line.strip() for line in response.strip().split('\n') if line.strip()] alternatives = [ l for l in lines if len(l) > 15 and len(l) < 200 and '?' in l and l != question and ':' not in l[:20] ][:3] all_queries = [question] + alternatives print(f" [Expanded queries: {len(all_queries)}]") for q in all_queries: print(f" - {q}") return all_queries # ---------------------- # Topic Detection # ---------------------- # Stopwords for topic detection # ------------------------- STOPWORDS = { "the","is","a","an","and","or","of","to","in","on","for","with", "what","which","who","how","when","where","can","i","you","it", "did","do","does","was","were","he","she","they","his","her", "him","them","his","its","be","been","have","has","had","will", "would","could","should","may","might","me","my","we","our" } def topics_are_related(question, history, lookback=3): """ Returns True if the question shares meaningful words with recent conversation history. Also returns True for very short pronoun-heavy questions since they are almost certainly follow-ups. """ if not history: return False q_lower = question.lower() # Get meaningful words from current question q_words = set(q_lower.replace('?','').replace('.','').split()) - STOPWORDS # Get words from recent history questions recent = history[-lookback:] history_words = set() for exchange in recent: history_words.update( exchange["question"].lower().replace('?','').replace('.','').split() ) history_words -= STOPWORDS # Pronoun follow-up check — only if history has meaningful content pronoun_followups = { "he","she","they","him","her","them","his","it", "this","that","these","those" } q_words_all = set(q_lower.replace('?','').replace('.','').split()) if len(q_words_all) <= 5 and q_words_all & pronoun_followups: if history_words: print(f" [Pronoun follow-up detected — enriching]") return True if not q_words: return False # Check meaningful word overlap overlap = len(q_words & history_words) print(f" [Topic overlap: {overlap} word(s)]") return overlap > 0 def enrich_query_with_history(question): """ Add context from recent history to improve retrieval for short follow-up questions. Skips enrichment if topic has shifted or enriched query is too long. """ if not conversation_history: return question # Only enrich questions under 8 words if len(question.split()) >= 8: return question # Check if topic has shifted if not topics_are_related(question, conversation_history): print(f" [Topic shift detected — no enrichment]") return question # Look back up to 3 exchanges for context recent = conversation_history[-3:] context_words = " ".join([ex["question"] for ex in recent]) enriched = f"{context_words} {question}" # Don't enrich if result is too long if len(enriched.split()) > 30: print(f" [Enriched query too long — using original]") return question print(f" [Enriched query: {enriched}]") return enriched # -------------------------------------------- # Handles type extraction from chunk metadata # -------------------------------------------- def extract_type(chunk_text): """ Extract TYPE metadata from chunk header. Defaults to 'reference' if missing. """ match = re.search(r"TYPE:\s*(fact|rule|reference|pedagogical)", chunk_text, re.IGNORECASE) if match: return match.group(1).lower() return "reference" def extract_metadata(chunk): """ Extracts TYPE / PRIORITY metadata from a chunk if present. Defaults are safe and neutral. """ meta = { "type": "reference", "priority": "medium" } # Look for TYPE: xxx type_match = re.search(r"TYPE:\s*(\w+)", chunk, re.IGNORECASE) if type_match: meta["type"] = type_match.group(1).lower().strip() # Look for PRIORITY: xxx priority_match = re.search(r"PRIORITY:\s*(\w+)", chunk, re.IGNORECASE) if priority_match: meta["priority"] = priority_match.group(1).lower().strip() return meta # ------------------------- # Retrieve top relevant chunks # ------------------------- def get_top_chunks(question, filter_term=None): level_cfg = LEVELS[CURRENT_LEVEL] # ------------------------- # Query preparation # ------------------------- retrieval_question = enrich_query_with_history(question) if level_cfg["expand"]: queries = expand_query(retrieval_question) else: queries = [retrieval_question] # ------------------------- # Filter scope # ------------------------- search_indices = get_filtered_indices(filter_term) if not search_indices: print(f" [Warning: no books matched filter '{filter_term}' — searching all]") search_indices = list(range(len(all_chunks))) sub_embeddings = chunk_embeddings[search_indices] sub_chunks = [all_chunks[i] for i in search_indices] sub_sources = [all_sources[i] for i in search_indices] if filter_term: matched_books = set(os.path.basename(s) for s in sub_sources) print(f" [Filter '{filter_term}' matched: {', '.join(matched_books)}]") # ------------------------- # Semantic scoring (pure signal) # ------------------------- semantic_scores = np.zeros(len(sub_chunks)) for q in queries: query_emb = embed_model.encode([q]) scores = cosine_similarity(query_emb, sub_embeddings)[0] semantic_scores += scores semantic_scores /= len(queries) # ------------------------- # SAFE MIN-MAX NORMALIZATION # ------------------------- min_s = semantic_scores.min() max_s = semantic_scores.max() range_s = max_s - min_s if range_s < 1e-6: # All scores basically identical → neutral signal semantic_scores = np.ones_like(semantic_scores) else: semantic_scores = (semantic_scores - min_s) / (range_s + 1e-9) # ------------------------- # TYPE + PRIORITY WEIGHTING # ------------------------- type_weights = np.zeros(len(sub_chunks)) priority_weights = np.zeros(len(sub_chunks)) for i, chunk in enumerate(sub_chunks): chunk_type = extract_type(chunk) type_weights[i] = TYPE_WEIGHTS.get(chunk_type, 1.0) meta = extract_metadata(chunk) priority_weights[i] = PRIORITY_WEIGHTS.get(meta["priority"], 1.0) # ------------------------- # FINAL SCORE (composed signal) # ------------------------- final_scores = (semantic_scores + 1.5 * np.log(type_weights) + 0.3 * np.log(priority_weights) ) # ------------------------- # DEBUG VIEW (optional but very useful) # ------------------------- if DEBUG: debug_ranking = list(zip( [os.path.basename(s) for s in sub_sources], semantic_scores, type_weights, final_scores )) debug_ranking.sort(key=lambda x: x[3], reverse=True) print("\n--- TYPE-AWARE RANKING ---") for name, sem, tw, fs in debug_ranking[:15]: print(f"{name} | sem={sem:.4f} | type={tw:.2f} | final={fs:.4f}") print("--- END ---\n") # ------------------------- # Top-k selection # ------------------------- top_k = level_cfg["top_k"] top_indices = final_scores.argsort()[-top_k:][::-1] return ( [sub_chunks[i] for i in top_indices], [sub_sources[i] for i in top_indices] ) # ------------------------- # Parse search filter from input # ------------------------- def parse_input(user_input): """ Detects 'search keyword: question' syntax. Returns (question, filter_term) tuple. """ pattern = re.match(r'^search\s+(.+?):\s*(.+)$', user_input, re.IGNORECASE) if pattern: filter_term = pattern.group(1).strip() question = pattern.group(2).strip() return question, filter_term return user_input, SEARCH_FILTER # -------------------------- # Truncate context at a sentence boundary to avoid feeding the LLM incomplete fragments # ----------------------------- def truncate_at_sentence(text, max_chars): if len(text) <= max_chars: return text truncated = text[:max_chars] last_period = max( truncated.rfind('.'), truncated.rfind('!'), truncated.rfind('?') ) return truncated[:last_period + 1] if last_period > 0 else truncated # ------------------------- # Determimne if the question is asking for a creative or factual response # ------------------------- def is_creative_request(question): triggers = { "suggest", "write", "complete", "finish", "rhyme", "next line", "come up with", "give me", "idea for", "open", "start", "begin", "chorus", "verse", "bridge", "hook", "lyric", "lyrics", "continue", "follow", "what comes", "how might", "how would" } q_lower = question.lower() return any(t in q_lower for t in triggers) # ------------------------- # Ask question # ------------------------- def ask_question(question, show_sources=False, filter_term=None): global conversation_history level_cfg = LEVELS[CURRENT_LEVEL] top_chunks, sources = get_top_chunks(question, filter_term=filter_term) if DEBUG: print("\n--- Retrieved chunks ---") for i, chunk in enumerate(top_chunks): print(f"\nChunk {i+1}:") print(chunk[:300]) print("--- End chunks ---\n") context = truncate_at_sentence( " ".join(top_chunks), level_cfg["context_len"] ) history_text = "" if conversation_history: history_text = "Previous conversation:\n" for exchange in conversation_history[-MAX_HISTORY:]: history_text += f"Q: {exchange['question']}\n" history_text += f"A: {exchange['answer']}\n" history_text += "\n" if is_creative_request(question): prompt_instruction = ( "You are a creative assistant. " "Use the provided context as inspiration. " "Be concise and original. " "End your response with a single period." ) else: # prompt_instruction = ( # "You are a helpful research assistant. " # "Answer ONLY using the provided context. " # "Be direct and concise. Never repeat the context or instructions. " # "Never echo the question. End your answer with a single period." # ) prompt_instruction=( "You are a helpful research assistant. " "Restrict your response strictly to the provided context. " "If the source material is exhausted, stop writing. " "If a relationship or entity is not explicitly documented in the context, do not include it. " "Do not infer, supplement, or use external training knowledge. " "Be direct and concise. " "Never repeat the context or instructions. " "Never echo the question. " "End your answer with a single period." ) with lm_model.chat_session(system_prompt=prompt_instruction): user_message = ( f"{history_text}" f"CONTEXT:\n{context}\n\n" f"QUESTION: {question}\n\n" f"ANSWER:" ) response = lm_model.generate( user_message, max_tokens=level_cfg["max_tokens"] ) answer = response.strip() # Strip any runaway stop markers and everything after them stop_markers = ["###", "####", "END OF ANSWER", "Final Answer", "STOP", "]]>"] for marker in stop_markers: if marker in answer: answer = answer[:answer.index(marker)].strip() conversation_history.append({ "question": question, "answer": answer }) if len(conversation_history) > MAX_HISTORY: conversation_history = conversation_history[-MAX_HISTORY:] if show_sources: unique_sources = list(set(sources)) short_sources = [os.path.basename(s) for s in unique_sources] print(f" [Sources: {', '.join(short_sources)}]") print(f" [Level: {CURRENT_LEVEL} | " f"expand={'on' if level_cfg['expand'] else 'off'} | " f"top_k={level_cfg['top_k']} | " f"max_tokens={level_cfg['max_tokens']}]") print(f" [Memory: {len(conversation_history)} exchanges]") if filter_term: print(f" [Filter: '{filter_term}']") return answer # ------------------------- # Interactive loop # ------------------------- print("\nReady! Ask questions about your books") print("Commands: 'exit', 'sources on/off', 'level 1-10',") print(" 'memory clear', 'memory show', 'debug on/off'") print(" 'books' — list available books") print(" 'search : question' — filter by book\n") show_sources = False while True: user_input = input(f"[L{CURRENT_LEVEL}] You: ") if user_input.lower() in ["exit", "quit"]: break elif user_input.lower() == "memory clear": conversation_history.clear() print("Conversation memory cleared.") continue elif user_input.lower() == "memory show": if not conversation_history: print("No conversation history.") else: print(f"\n--- Last {len(conversation_history)} exchanges ---") for i, exchange in enumerate(conversation_history): print(f"\nQ{i+1}: {exchange['question']}") print(f"A{i+1}: {exchange['answer'][:100]}...") print("---\n") continue elif user_input.lower() == "debug on": DEBUG = True print("Debug mode enabled.") continue elif user_input.lower() == "debug off": DEBUG = False print("Debug mode disabled.") continue elif user_input.lower() == "sources on": show_sources = True print("Source display enabled.") continue elif user_input.lower() == "sources off": show_sources = False print("Source display disabled.") continue elif user_input.lower() == "books": show_available_books() continue elif user_input.lower().startswith("level "): try: lvl = int(user_input.split()[1]) if 1 <= lvl <= 10: CURRENT_LEVEL = lvl cfg = LEVELS[CURRENT_LEVEL] print(f"Level set to {CURRENT_LEVEL} — " f"expand={'on' if cfg['expand'] else 'off'}, " f"top_k={cfg['top_k']}, " f"max_tokens={cfg['max_tokens']}") else: print("Level must be between 1 and 10.") except: print("Usage: level 1 through level 10") continue # Parse for search filter question, filter_term = parse_input(user_input) response = ask_question(question, show_sources=show_sources, filter_term=filter_term) print("Bot:", response)