Experimental Changes

This commit is contained in:
2026-04-13 14:20:04 -04:00
parent 95b4610927
commit 268928e9c5
42 changed files with 10431 additions and 13441 deletions

View File

@@ -8,9 +8,24 @@ import json
from pathlib import Path
import torch
# Explore adding TYPE to headers. 4 types fact, rule, reference, pedagogical, the ordering ranks
#TYPE: fact | rule | reference | pedagogical
#DOMAIN: music_theory
#PRIORITY: high | medium | low
# Ask ChatGPT to descrive narratives around tablature examples.
# I would like you to chunk this for my RAG system.
# Where you identify guitar tablature you are to replace it with a narrative
# describing the notes that are played in fine detail.
# Please describe the notes exactly including any bends, hammer-on, pull-off, legatto, etc
# Please do not omit any of the original descriptive text except insofar as it may be confusing for a RAG system.
# You may use the existing text inform yourself and help narrate the notation.
# {paste the text with tablature}
# Retrieval — find the most relevant chunks from your documents using embeddings and cosine similarity
# Augmented — add that retrieved context to the prompt
# Generation — use the language model to generate an answer based on that context
# Generation — use the language model to generate an answer based on that contextfinger
# -------------------
# Embedding Cleaning
@@ -80,10 +95,26 @@ import torch
# Semantic chunking
# Better table handling
# ----------------------------------
# Weights for chunk weighting system
# -----------------------------------
TYPE_WEIGHTS = {
"fact": 1.30,
"rule": 1.20,
"reference": 1.00,
"pedagogical": 0.85
}
PRIORITY_WEIGHTS = {
"high": 1.10,
"medium": 1.00,
"low": 0.90
}
# -------------------------
# Knowledge base selection
# -------------------------
BOOK_DIR = 'Books/Accounting' # just a string
BOOK_DIR = 'Books/Music' # just a string
book_files = []
for f in Path(BOOK_DIR).rglob('*'):
@@ -97,10 +128,14 @@ for f in Path(BOOK_DIR).rglob('*'):
continue
print(f"Found {len(book_files)} files")
# Overlap should be 10-20% of chunk size
CHUNK_SIZE = 700
CHUNK_OVERLAP = 100
# CHUNK_SIZE = 700
# CHUNK_OVERLAP = 100
CHUNK_SIZE = 1500
CHUNK_OVERLAP = 300
DEBUG = False
CACHE_FILE = "embeddings_cache.npz"
CACHE_META = "embeddings_cache_meta.json"
@@ -122,11 +157,11 @@ LEVELS = {
3: {"expand": False, "top_k": 2, "max_tokens": 100*3, "context_len": 700},
4: {"expand": False, "top_k": 2, "max_tokens": 100*3, "context_len": 800},
5: {"expand": False, "top_k": 3, "max_tokens": 125*3, "context_len": 1000},
6: {"expand": False, "top_k": 3, "max_tokens": 150*3, "context_len": 1200},
7: {"expand": True, "top_k": 3, "max_tokens": 150*3, "context_len": 1400},
8: {"expand": True, "top_k": 4, "max_tokens": 175*3, "context_len": 1600},
9: {"expand": True, "top_k": 5, "max_tokens": 175*3, "context_len": 1800},
10: {"expand": True, "top_k": 5, "max_tokens": 200*3, "context_len": 2000},
6: {"expand": False, "top_k": 5, "max_tokens": 150*3, "context_len": 1200},
7: {"expand": True, "top_k": 5, "max_tokens": 150*3, "context_len": 1400},
8: {"expand": True, "top_k": 5, "max_tokens": 175*3, "context_len": 1600},
9: {"expand": True, "top_k": 6, "max_tokens": 175*3, "context_len": 1800},
10: {"expand": True, "top_k": 6, "max_tokens": 200*3, "context_len": 2000},
}
# -------------------------
@@ -143,8 +178,14 @@ embed_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2",devic
# -----------------------------------
# Load the language model - If it does not exist in the download area then download it otherwise us it.
# -----------------------------------
# model_file = "Meta-Llama-3.1-8B-Instruct.Q4_0.gguf"
print("Loading language model...")
model_file = "Meta-Llama-3-8B-Instruct.Q4_0.gguf"
#model_file = "Meta-Llama-3-8B-Instruct.Q4_0.gguf"
# upgrading to 3.1
model_file = "Meta-Llama-3.1-8B-Instruct-q4_0.gguf"
model_path = r"C:\Users\skess\.cache\gpt4all"
full_path = os.path.join(model_path, model_file)
@@ -238,9 +279,13 @@ def clean_text(text):
# Narrate tables before any other cleaning
text = narrate_table(text)
# existing cleaning...
# Fix hyphenated line breaks in prose (word-\nword -> wordword)
text = re.sub(r'(\w+)-\n(\w+)', r'\1\2', text)
text = re.sub(r'\n+', ' ', text)
# Collapse 3+ newlines to double newline (preserve paragraph breaks)
text = re.sub(r'\n{3,}', '\n\n', text)
# Clean up other artifacts
text = re.sub(r'(?<=[a-z])(\d{1,3})(?=\s[A-Z])', '', text)
text = re.sub(r'[■•◆▪→]', '', text)
text = re.sub(r' +', ' ', text)
@@ -251,16 +296,24 @@ def clean_text(text):
# Chunk text with overlap
# -------------------------
def chunk_text(text, chunk_size=CHUNK_SIZE, overlap=CHUNK_OVERLAP):
# Step 1 — Split into paragraphs first
paragraphs = [p.strip() for p in re.split(r'\n\s*\n', text) if p.strip()]
# 1. EXTRACT HEADERS (The "Metadata Inheritance" logic)
header_patterns = [r"TYPE:.*", r"PRIORITY:.*", r"DOMAIN:.*", r"TITLE:.*"]
header_lines = []
top_of_file = text[:500]
for pattern in header_patterns:
match = re.search(pattern, top_of_file, re.IGNORECASE)
if match:
header_lines.append(match.group(0))
header_prefix = "\n".join(header_lines) + "\n\n" if header_lines else ""
# Step 2 — Split any overly long paragraphs into sentences
# 2. SEMANTIC SPLITTING (Your original Step 1 & 2)
paragraphs = [p.strip() for p in re.split(r'\n\s*\n', text) if p.strip()]
split_units = []
for para in paragraphs:
if len(para) <= chunk_size * 2:
if len(para) <= chunk_size:
split_units.append(para)
else:
# Break long paragraph into sentences
sentences = re.split(r'(?<=[.!?])\s+', para)
current = ""
for sentence in sentences:
@@ -273,19 +326,24 @@ def chunk_text(text, chunk_size=CHUNK_SIZE, overlap=CHUNK_OVERLAP):
if current:
split_units.append(current.strip())
# Step 3 — Combine units into chunks up to chunk_size
# with overlap by re-including the previous unit
# 3. COMBINE & INJECT HEADERS (Step 3 with metadata injection)
chunks = []
current_chunk = ""
prev_unit = ""
for unit in split_units:
# Check if adding this unit exceeds chunk_size
if len(current_chunk) + len(unit) + 1 <= chunk_size:
current_chunk += " " + unit
else:
if current_chunk:
chunks.append(current_chunk.strip())
# Overlap — start new chunk with previous unit for context
# Add headers to all chunks except the first one (which already has them)
final_output = current_chunk.strip()
if not any(pat in final_output[:100] for pat in ["TYPE:", "TITLE:"]):
final_output = header_prefix + final_output
chunks.append(final_output)
# Overlap logic
if prev_unit and len(prev_unit) + len(unit) + 1 <= chunk_size:
current_chunk = prev_unit + " " + unit
else:
@@ -293,7 +351,10 @@ def chunk_text(text, chunk_size=CHUNK_SIZE, overlap=CHUNK_OVERLAP):
prev_unit = unit
if current_chunk:
chunks.append(current_chunk.strip())
final_output = current_chunk.strip()
if not any(pat in final_output[:100] for pat in ["TYPE:", "TITLE:"]):
final_output = header_prefix + final_output
chunks.append(final_output)
return chunks
@@ -344,7 +405,14 @@ else:
except UnicodeDecodeError:
print(f"[Encoding fallback] {book_name}")
text = raw.decode("cp1252") # fallback for Windows-encoded text
book_text = clean_text(text)
# Skip files marked with "# IGNORE" on first line
first_line = text.lstrip().splitlines()[0] if text.strip() else ""
if first_line.strip().upper().startswith("# IGNORE"):
print(f"Skipping {book_name} (marked IGNORE)")
continue
book_text = clean_text(text)
book_chunks = chunk_text(book_text)
all_chunks.extend(book_chunks)
all_sources.extend([book_name] * len(book_chunks))
@@ -510,28 +578,77 @@ def enrich_query_with_history(question):
print(f" [Enriched query: {enriched}]")
return enriched
# --------------------------------------------
# Handles type extraction from chunk metadata
# --------------------------------------------
def extract_type(chunk_text):
"""
Extract TYPE metadata from chunk header.
Defaults to 'reference' if missing.
"""
match = re.search(r"TYPE:\s*(fact|rule|reference|pedagogical)", chunk_text, re.IGNORECASE)
if match:
return match.group(1).lower()
return "reference"
def extract_metadata(chunk):
"""
Extracts TYPE / PRIORITY metadata from a chunk if present.
Defaults are safe and neutral.
"""
meta = {
"type": "fact",
"priority": "medium"
}
# Look for TYPE: xxx
type_match = re.search(r"TYPE:\s*(\w+)", chunk, re.IGNORECASE)
if type_match:
meta["type"] = type_match.group(1).lower().strip()
# Look for PRIORITY: xxx
priority_match = re.search(r"PRIORITY:\s*(\w+)", chunk, re.IGNORECASE)
if priority_match:
meta["priority"] = priority_match.group(1).lower().strip()
return meta
# --------------------------
# determine if we are being asked to derive a response
# --------------------------
def is_derivation_request(question):
keywords = {
"derive", "construct", "starting on", "root note",
"apply the formula", "apply formula", "step by step"
}
q = question.lower()
return any(k in q for k in keywords)
# -------------------------
# Retrieve top relevant chunks
# -------------------------
def get_top_chunks(question, filter_term=None):
level_cfg = LEVELS[CURRENT_LEVEL]
# Enrich short follow-up questions with history context
# -------------------------
# Query preparation
# -------------------------
retrieval_question = enrich_query_with_history(question)
if level_cfg["expand"]:
queries = expand_query(retrieval_question)
else:
queries = [retrieval_question]
# Get filtered indices
# -------------------------
# Filter scope
# -------------------------
search_indices = get_filtered_indices(filter_term)
if not search_indices:
print(f" [Warning: no books matched filter '{filter_term}' — searching all]")
search_indices = list(range(len(all_chunks)))
# Subset embeddings and metadata
sub_embeddings = chunk_embeddings[search_indices]
sub_chunks = [all_chunks[i] for i in search_indices]
sub_sources = [all_sources[i] for i in search_indices]
@@ -540,19 +657,78 @@ def get_top_chunks(question, filter_term=None):
matched_books = set(os.path.basename(s) for s in sub_sources)
print(f" [Filter '{filter_term}' matched: {', '.join(matched_books)}]")
# Score within filtered subset
sub_scores = np.zeros(len(sub_chunks))
# -------------------------
# Semantic scoring (pure signal)
# -------------------------
semantic_scores = np.zeros(len(sub_chunks))
for q in queries:
query_emb = embed_model.encode([q])
scores = cosine_similarity(query_emb, sub_embeddings)[0]
sub_scores += scores
semantic_scores += scores
sub_scores /= len(queries)
semantic_scores /= len(queries)
# -------------------------
# SAFE MIN-MAX NORMALIZATION
# -------------------------
min_s = semantic_scores.min()
max_s = semantic_scores.max()
range_s = max_s - min_s
if range_s < 1e-6:
# All scores basically identical → neutral signal
semantic_scores = np.ones_like(semantic_scores)
else:
semantic_scores = (semantic_scores - min_s) / (range_s + 1e-9)
# -------------------------
# TYPE + PRIORITY WEIGHTING
# -------------------------
type_weights = np.zeros(len(sub_chunks))
priority_weights = np.zeros(len(sub_chunks))
for i, chunk in enumerate(sub_chunks):
chunk_type = extract_type(chunk)
type_weights[i] = TYPE_WEIGHTS.get(chunk_type, 1.0)
meta = extract_metadata(chunk)
priority_weights[i] = PRIORITY_WEIGHTS.get(meta["priority"], 1.0)
# -------------------------
# FINAL SCORE (composed signal)
# -------------------------
final_scores = semantic_scores + np.log(type_weights) + np.log(priority_weights)
# -------------------------
# DEBUG VIEW (optional but very useful)
# -------------------------
if DEBUG:
debug_ranking = list(zip(
[os.path.basename(s) for s in sub_sources],
semantic_scores,
type_weights,
final_scores
))
debug_ranking.sort(key=lambda x: x[3], reverse=True)
print("\n--- TYPE-AWARE RANKING ---")
for name, sem, tw, fs in debug_ranking[:15]:
print(f"{name} | sem={sem:.4f} | type={tw:.2f} | final={fs:.4f}")
print("--- END ---\n")
# -------------------------
# Top-k selection
# -------------------------
top_k = level_cfg["top_k"]
top_indices = sub_scores.argsort()[-top_k:][::-1]
top_indices = final_scores.argsort()[-top_k:][::-1]
return (
[sub_chunks[i] for i in top_indices],
[sub_sources[i] for i in top_indices]
)
return [sub_chunks[i] for i in top_indices], [sub_sources[i] for i in top_indices]
# -------------------------
# Parse search filter from input
@@ -613,7 +789,10 @@ def ask_question(question, show_sources=False, filter_term=None):
print(chunk[:300])
print("--- End chunks ---\n")
context = truncate_at_sentence(" ".join(top_chunks), level_cfg["context_len"])
context = truncate_at_sentence(
" ".join(top_chunks),
level_cfg["context_len"]
)
history_text = ""
if conversation_history:
@@ -622,19 +801,30 @@ def ask_question(question, show_sources=False, filter_term=None):
history_text += f"Q: {exchange['question']}\n"
history_text += f"A: {exchange['answer']}\n"
history_text += "\n"
if is_creative_request(question):
if is_derivation_request(question):
prompt_instruction = (
"You are a creative songwriting assistant. "
"Use the provided context as inspiration and technique guidance. "
"Generate original creative suggestions. "
"Be concise. Do not reproduce the context. "
"You are a deterministic logic engine. "
"1. Prioritize 'TYPE: rule' and 'GLOBAL CONSTRAINT' entries in the CONTEXT over all other data. "
"2. If the CONTEXT defines a multi-step procedure (e.g., Sequence Integrity or Alphabetical Anchors), execute those steps exactly. "
"3. Resolve all naming conflicts using the provided 'CORRECTION LOGIC'. "
"4. Output ONLY the final resolved string of elements. "
"5. Do not show your work, intermediate calculations, or code. "
"6. Do not use LaTeX, boxes, or mathematical notation. Output the result as plain text only. "
"End your answer with a single period."
)
elif is_creative_request(question):
prompt_instruction = (
"You are a creative assistant. "
"Use the provided context as inspiration. "
"Be concise and original. "
"End your response with a single period."
)
else:
prompt_instruction = (
"You are a helpful research assistant. "
"Answer using ONLY the provided context. "
"Answer ONLY using the provided context. "
"Be direct and concise. Never repeat the context or instructions. "
"Never echo the question. End your answer with a single period."
)
@@ -654,7 +844,7 @@ def ask_question(question, show_sources=False, filter_term=None):
answer = response.strip()
# Strip any runaway stop markers and everything after them
stop_markers = ["###", "####", "END OF ANSWER", "Final Answer", "STOP"]
stop_markers = ["###", "####", "END OF ANSWER", "Final Answer", "STOP", "]]>"]
for marker in stop_markers:
if marker in answer:
answer = answer[:answer.index(marker)].strip()