using MarketData.MarketDataModel;
using MarketData.Utils;
using MarketData.DataAccess;
using System.Collections.Concurrent;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Linq;
using System.Threading.Tasks;
namespace MarketData.Cache
{
///
/// GLPriceCache - Used by Gain/Loss Generator which are usedby MarketDataSErver and the User Interface.
/// The entire cache evicts every hour so calling Add(PortfolioTrades portfolioTrades) will force price reload
/// Always ensures that open trade symbols get most recent price from the database.
///
public class GLPriceCache : IDisposable
{
// -- Singleton ------------------------------------------------------------
private static readonly object instanceLock = new object();
private static GLPriceCache instance = null;
// -- Disposal -------------------------------------------------------------
private volatile bool disposed = false;
private int refreshInProgress = 0;
///
///
///
///
public static GLPriceCache GetInstance()
{
lock (instanceLock)
{
if (instance == null)instance = new GLPriceCache();
return instance;
}
}
// -- State ----------------------------------------------------------------
private readonly Dictionary priceCache = new Dictionary();
private readonly ConcurrentDictionary symbolFetchLocks = new ConcurrentDictionary();
private readonly object cacheLock = new object();
private DateTime latestDate = Utility.Epoch;
// -- Background refresh ---------------------------------------------------
private readonly TimeSpan cacheCycle = TimeSpan.FromMinutes(5);
private Timer refreshTimer = null;
private int tickCount = 0;
private const int evictionTickInterval = 12; // every 12 ticks x 5 min = 1 hour
// -- Parallelism ----------------------------------------------------------
private static readonly int maxParallelDbCalls = ResolveMaxParallelDbCalls();
private static int ResolveMaxParallelDbCalls()
{
return Math.Min(Math.Max(1, Environment.ProcessorCount) * 3, 32);
// int @default = Math.Min(Math.Max(1, Environment.ProcessorCount) * 3, 32);
// string configured = Environment.GetEnvironmentVariable("GL_PRICE_CACHE_PARALLEL_DB_CALLS");
// return int.TryParse(configured, out int parsed) && parsed > 0 ? parsed : @default;
}
// -- Constructor ----------------------------------------------------------
private GLPriceCache()
{
refreshTimer = new Timer(OnCacheRefreshTick, null, cacheCycle, cacheCycle);
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (disposed) return;
disposed = true;
if (disposing)
{
Timer timerToDispose = null;
lock (instanceLock)
{
timerToDispose = refreshTimer;
refreshTimer = null;
instance = null;
}
// Dispose timer immediately; no blocking wait
timerToDispose?.Dispose();
MDTrace.WriteLine(LogLevel.DEBUG, "[GLPriceCache:Dispose] Disposed.");
}
}
///
/// Dispoal
///
//public void Dispose()
//{
// Dispose(true);
// GC.SuppressFinalize(this);
//}
//protected virtual void Dispose(bool disposing)
//{
// if (disposed) return;
// disposed = true;
// if (disposing)
// {
// Timer timerToDispose;
// lock (instanceLock)
// {
// timerToDispose = refreshTimer;
// refreshTimer = null;
// instance = null;
// }
// // Block until any in-flight tick completes before disposing
// using (ManualResetEventSlim waited = new ManualResetEventSlim(false))
// {
// if (timerToDispose != null)
// timerToDispose.Dispose(waited.WaitHandle);
// waited.Wait(TimeSpan.FromSeconds(10));
// }
// MDTrace.WriteLine(LogLevel.DEBUG, "[GLPriceCache:Dispose] Disposed.");
// }
//}
// -- Background tick ------------------------------------------------------
private void OnCacheRefreshTick(object state)
{
if (disposed) return;
try
{
lock (cacheLock)
{
long count = CountInternal();
MDTrace.WriteLine(LogLevel.DEBUG,$"[GLPriceCache:Tick] Symbols: {priceCache.Keys.Count}. " + $"Items in cache: {Utility.FormatNumber(count, 0, true)}.");
}
if (++tickCount % evictionTickInterval == 0)
{
EvictStaleSymbols();
}
}
catch (Exception ex)
{
MDTrace.WriteLine(LogLevel.DEBUG,$"[GLPriceCache:Tick] [ERROR] Unhandled exception: {ex}");
}
}
// -- Eviction -------------------------------------------------------------
private void EvictStaleSymbols()
{
if (Interlocked.CompareExchange(ref refreshInProgress, 1, 0) == 1)
{
MDTrace.WriteLine(LogLevel.DEBUG, "[GLPriceCache:Evict] Skipped refresh in progress.");
return;
}
try
{
int count;
lock (cacheLock)
{
count = priceCache.Count;
priceCache.Clear();
symbolFetchLocks.Clear();
}
MDTrace.WriteLine(LogLevel.DEBUG,$"[GLPriceCache:Evict] Cache cleared. {count} symbols evicted.");
}
finally
{
Interlocked.Exchange(ref refreshInProgress, 0);
}
}
// -- Public API -----------------------------------------------------------
// public void Add(PortfolioTrades portfolioTrades)
// {
// List symbols = portfolioTrades.Symbols;
// Dictionary minTradeDates = symbols.ToDictionary(
// sym => sym, sym => portfolioTrades.GetMinTradeDate(sym));
// // Only open positions need an intraday price refresh.
// // Closed positions, regardless of close date, have immutable prices skip the DB call.
// HashSet mutableSymbols = new HashSet(
// symbols.Where(sym => portfolioTrades.HasOpenPositions(sym)));
// Dictionary minCacheDates;
// lock (cacheLock)
// {
// minCacheDates = symbols.ToDictionary(
// sym => sym,
// sym => priceCache.ContainsKey(sym) ? priceCache[sym].MinDate : DateTime.MaxValue);
// }
// ConcurrentDictionary fetchedPrices = new ConcurrentDictionary();
// ConcurrentDictionary latestPrices = new ConcurrentDictionary();
// Parallel.ForEach(symbols, new ParallelOptions { MaxDegreeOfParallelism = maxParallelDbCalls }, symbol =>
// {
// if (disposed) return;
// DateTime minTradeDate = minTradeDates[symbol];
// DateTime minCacheDate = minCacheDates[symbol];
// // Acquire a per-symbol lock to prevent concurrent clients from issuing
// // duplicate DB fetches for the same symbol. First thread fetches;
// // subsequent threads for the same symbol block here, then re-check
// // the cache on entry and skip the fetch if already populated.
// object symbolLock = symbolFetchLocks.GetOrAdd(symbol, _ => new object());
// lock (symbolLock)
// {
// try
// {
// // Re-check cache state after acquiring symbol lock another
// // client thread may have already fetched this symbol while we waited
// lock (cacheLock)
// {
// if (priceCache.ContainsKey(symbol))
// minCacheDate = priceCache[symbol].MinDate;
// }
// // Historical fetch only when cache is missing or incomplete
// Prices prices = null;
// if (minCacheDate == DateTime.MaxValue)
// prices = PricingDA.GetPrices(symbol, minTradeDate);
// else if (minTradeDate < minCacheDate)
// prices = PricingDA.GetPrices(symbol, minCacheDate, minTradeDate);
// if (prices != null && prices.Count > 0)
// fetchedPrices[symbol] = prices;
// // Intraday refresh mutable symbols only
// if (mutableSymbols.Contains(symbol))
// {
// Price latestPrice = PricingDA.GetPrice(symbol);
// if (latestPrice != null)
// latestPrices[symbol] = latestPrice;
// }
// }
// catch (Exception ex)
// {
// MDTrace.WriteLine(LogLevel.DEBUG,
// $"[GLPriceCache:Add] [ERROR] Failed fetching prices for {symbol}: {ex.Message}");
// }
// }
// });
// lock (cacheLock)
// {
// // Historical prices idempotent, will not overwrite existing entries
// foreach (KeyValuePair kvp in fetchedPrices)
// foreach (Price price in kvp.Value)
// AddInternal(price);
// // Latest prices unconditional overwrite to capture intraday updates
// foreach (KeyValuePair kvp in latestPrices)
// {
// PricesByDate pricesByDate;
// if (!priceCache.TryGetValue(kvp.Key, out pricesByDate))
// {
// pricesByDate = new PricesByDate();
// priceCache[kvp.Key] = pricesByDate;
// }
// if (pricesByDate.ContainsKey(kvp.Value.Date))
// pricesByDate.Remove(kvp.Value.Date);
// pricesByDate.Add(kvp.Value.Date, kvp.Value);
// }
// }
// MDTrace.WriteLine(LogLevel.DEBUG,
// $"[GLPriceCache:Add] Symbols: {symbols.Count}, Mutable: {mutableSymbols.Count}, " +
// $"Historical fetches: {fetchedPrices.Count}, Intraday updates: {latestPrices.Count}");
// }
public void Add(PortfolioTrades portfolioTrades)
{
List symbols = portfolioTrades.Symbols;
// Map each symbol to its earliest trade date
Dictionary minTradeDates = symbols.ToDictionary(symbol => symbol, symbol => portfolioTrades.GetMinTradeDate(symbol));
// Only open positions need intraday price refresh
HashSet mutableSymbols = new HashSet(symbols.Where(sym => portfolioTrades.HasOpenPositions(sym)));
// Pre-filter symbols to skip fully cached immutable symbols
List symbolsToProcess = symbols
.Where(sym =>
{
DateTime minCacheDate;
lock (cacheLock)
{
minCacheDate = priceCache.ContainsKey(sym) ? priceCache[sym].MinDate : DateTime.MaxValue;
}
DateTime minTradeDate = minTradeDates[sym];
// Skip if symbol is immutable AND cache already covers all trade dates
return mutableSymbols.Contains(sym) || minTradeDate < minCacheDate;
})
.ToList();
if (symbolsToProcess.Count == 0) return; // nothing to do
ConcurrentDictionary fetchedPrices = new ConcurrentDictionary();
ConcurrentDictionary latestPrices = new ConcurrentDictionary();
Parallel.ForEach(symbolsToProcess, new ParallelOptions { MaxDegreeOfParallelism = maxParallelDbCalls }, symbol =>
{
if (disposed) return;
DateTime minTradeDate = minTradeDates[symbol];
DateTime minCacheDate;
lock (cacheLock)
{
minCacheDate = priceCache.ContainsKey(symbol) ? priceCache[symbol].MinDate : DateTime.MaxValue;
}
object symbolLock = symbolFetchLocks.GetOrAdd(symbol, _ => new object());
lock (symbolLock)
{
try
{
// Re-check cache after acquiring lock
lock (cacheLock)
{
if (priceCache.ContainsKey(symbol))minCacheDate = priceCache[symbol].MinDate;
}
Prices prices = null;
// Historical fetch only if cache is missing or incomplete
if (minCacheDate == DateTime.MaxValue)prices = PricingDA.GetPrices(symbol, minTradeDate);
else if (minTradeDate < minCacheDate)prices = PricingDA.GetPrices(symbol, minCacheDate, minTradeDate);
if (prices != null && prices.Count > 0)fetchedPrices[symbol] = prices;
// Intraday refresh for mutable symbols only
if (mutableSymbols.Contains(symbol))
{
Price latestPrice = PricingDA.GetPrice(symbol);
if (latestPrice != null)latestPrices[symbol] = latestPrice;
}
}
catch (Exception ex)
{
MDTrace.WriteLine(LogLevel.DEBUG,$"[GLPriceCache:Add] [ERROR] Failed fetching prices for {symbol}: {ex.Message}");
}
} // lcok(symLock)
}); // Parallel
lock (cacheLock)
{
// Historical prices idempotent, do not overwrite existing entries
foreach (KeyValuePair kvp in fetchedPrices)
{
foreach (Price price in kvp.Value)
{
AddInternal(price);
}
}
// Latest prices unconditional overwrite to capture intraday updates
foreach (KeyValuePair kvp in latestPrices)
{
PricesByDate pricesByDate;
if (!priceCache.TryGetValue(kvp.Key, out pricesByDate))
{
pricesByDate = new PricesByDate();
priceCache[kvp.Key] = pricesByDate;
}
if (pricesByDate.ContainsKey(kvp.Value.Date))
{
pricesByDate.Remove(kvp.Value.Date);
}
pricesByDate.Add(kvp.Value.Date, kvp.Value);
}
}
// Only log if we actually fetched or updated something
if (fetchedPrices.Count > 0 || latestPrices.Count > 0)
{
MDTrace.WriteLine(LogLevel.DEBUG,$"[GLPriceCache:Add] Symbols processed: {symbolsToProcess.Count}, Mutable: {mutableSymbols.Count}, " +
$"Historical fetches: {fetchedPrices.Count}, Intraday updates: {latestPrices.Count}");
}
}
///
/// Add prices
///
///
public void Add(Prices prices)
{
foreach (Price price in prices)Add(price);
}
///
/// Add(List symbols, DateTime pricingDate)
///
///
///
public void Add(List symbols, DateTime pricingDate)
{
if (symbols == null || symbols.Count == 0) return;
ConcurrentDictionary fetchedPrices = new ConcurrentDictionary();
Parallel.ForEach(symbols, new ParallelOptions { MaxDegreeOfParallelism = maxParallelDbCalls }, symbol =>
{
if (disposed) return;
lock (cacheLock)
{
if (ContainsPriceInternal(symbol, pricingDate)) return;
}
try
{
Price price = PricingDA.GetPrice(symbol, pricingDate);
if (price != null) fetchedPrices[symbol] = price;
}
catch (Exception ex)
{
MDTrace.WriteLine(LogLevel.DEBUG,$"[GLPriceCache:Add] [ERROR] Failed fetching price for {symbol} on {pricingDate:yyyy-MM-dd}: {ex.Message}");
}
});
lock (cacheLock)
{
foreach (KeyValuePair kvp in fetchedPrices)
{
AddInternal(kvp.Value);
}
}
}
///
/// Add(Price price)
///
///
public void Add(Price price)
{
lock (cacheLock)
{
AddInternal(price);
}
}
///
/// Refresh
///
public void Refresh()
{
if (Interlocked.CompareExchange(ref refreshInProgress, 1, 0) == 1)
{
MDTrace.WriteLine(LogLevel.DEBUG, "[GLPriceCache:Refresh] Skipped refresh already in progress.");
return;
}
try
{
List symbols;
Dictionary currentMaxDates;
lock (cacheLock)
{
symbols = priceCache.Keys.ToList();
currentMaxDates = priceCache.ToDictionary(kvp => kvp.Key, kvp => kvp.Value.MaxDate);
}
if (symbols.Count == 0) return;
ConcurrentDictionary fullReloads = new ConcurrentDictionary();
ConcurrentDictionary singleUpdates = new ConcurrentDictionary();
// Fetch outside the cache lock no timeout risk holding cacheLock over I/O
Dictionary maxDbDates = PricingDA.GetLatestDates(symbols);
DateTime latestDateFromDb = PricingDA.GetLatestDate();
Parallel.ForEach(symbols, new ParallelOptions { MaxDegreeOfParallelism = maxParallelDbCalls }, symbol =>
{
if (disposed) return;
DateTime cachedMax;
if (!currentMaxDates.TryGetValue(symbol, out cachedMax)) return;
try
{
DateTime dbMax;
if (maxDbDates.TryGetValue(symbol, out dbMax) && dbMax.Date != cachedMax.Date)
{
Prices prices = PricingDA.GetPrices(symbol, cachedMax);
if (prices != null) fullReloads[symbol] = prices.GetPricesByDate();
}
else
{
Price price = PricingDA.GetPrice(symbol, cachedMax);
if (price != null) singleUpdates[symbol] = price;
}
}
catch (Exception ex)
{
MDTrace.WriteLine(LogLevel.DEBUG,$"[GLPriceCache:Refresh] [ERROR] Failed refreshing {symbol}: {ex.Message}");
}
}); // Parallel
lock (cacheLock)
{
latestDate = latestDateFromDb;
foreach (KeyValuePair kvp in fullReloads)
{
PricesByDate existing;
if (priceCache.TryGetValue(kvp.Key, out existing) && existing.MaxDate == currentMaxDates[kvp.Key])
{
priceCache[kvp.Key] = kvp.Value;
}
}
foreach (KeyValuePair kvp in singleUpdates)
{
PricesByDate pricesByDate;
if (priceCache.TryGetValue(kvp.Key, out pricesByDate) && pricesByDate.MaxDate == currentMaxDates[kvp.Key])
{
if (pricesByDate.ContainsKey(kvp.Value.Date))
{
pricesByDate.Remove(kvp.Value.Date);
}
pricesByDate.Add(kvp.Value.Date, kvp.Value);
}
}
} // lock(cacheLock)
MDTrace.WriteLine(LogLevel.DEBUG,$"[GLPriceCache:Refresh] Full reloads: {fullReloads.Count}, Single updates: {singleUpdates.Count}");
}
finally
{
Interlocked.Exchange(ref refreshInProgress, 0);
}
}
///
/// GetLatestDate
///
///
public DateTime GetLatestDate()
{
lock (cacheLock)
{
if (Utility.IsEpoch(latestDate))
{
latestDate = PricingDA.GetLatestDate();
}
return latestDate;
}
}
///
/// RefreshLatestDate
///
public void RefreshLatestDate()
{
lock (cacheLock)
{
latestDate = PricingDA.GetLatestDate();
}
}
///
/// GetMinCacheDate
///
///
///
public DateTime GetMinCacheDate(string symbol)
{
lock (cacheLock)
{
PricesByDate symbolPrices;
if (!priceCache.TryGetValue(symbol, out symbolPrices) || symbolPrices.Count == 0)
{
return Utility.Epoch;
}
return symbolPrices.MinDate;
}
}
///
/// GetPrices
///
///
///
///
///
public Prices GetPrices(string symbol, DateTime endDate, int dayCount)
{
lock (cacheLock)
{
PricesByDate pricesByDate;
if (!priceCache.TryGetValue(symbol, out pricesByDate)) return new Prices();
DateGenerator dateGenerator = new DateGenerator();
List historicalDates = dateGenerator.GenerateHistoricalDates(endDate, dayCount);
Prices result = new Prices();
foreach (DateTime date in historicalDates)
{
if (pricesByDate.ContainsKey(date))
{
result.Add(pricesByDate[date]);
}
}
return result;
} // lock(cacheLock)
}
///
/// GetPrice
///
///
///
///
public Price GetPrice(string symbol, DateTime date)
{
lock (cacheLock)
{
PricesByDate pricesByDate;
if (!priceCache.TryGetValue(symbol, out pricesByDate)) return null;
Price price;
return pricesByDate.TryGetValue(date, out price) ? price : null;
}
}
///
/// ContainsPrice
///
///
///
///
public bool ContainsPrice(string symbol, DateTime date)
{
lock (cacheLock)
{
return ContainsPriceInternal(symbol, date);
}
}
///
/// ContainsPrice
///
///
///
///
public bool ContainsPrice(List symbols, DateTime date)
{
if (symbols == null || symbols.Count == 0) return false;
lock (cacheLock)
{
foreach (string symbol in symbols)
{
if (!ContainsPriceInternal(symbol, date)) return false;
}
return true;
}
}
///
/// ContainsSymbol
///
///
///
public bool ContainsSymbol(string symbol)
{
lock (cacheLock)
{
return priceCache.ContainsKey(symbol);
}
}
// -- Private helpers ------------------------------------------------------
// Must be called under cacheLock
private void AddInternal(Price price)
{
if (price == null) return;
PricesByDate pricesByDate;
if (!priceCache.TryGetValue(price.Symbol, out pricesByDate))
{
pricesByDate = new PricesByDate();
priceCache[price.Symbol] = pricesByDate;
}
if (!pricesByDate.ContainsKey(price.Date))
{
pricesByDate.Add(price.Date, price);
}
}
///
/// ContainsPriceInternal - mUST BE CALLED UNDER CACHELOCK
///
///
///
///
private bool ContainsPriceInternal(string symbol, DateTime date)
{
PricesByDate pricesByDate;
if (!priceCache.TryGetValue(symbol, out pricesByDate)) return false;
return pricesByDate.ContainsKey(date);
}
///
/// CountInternal - MUST BE CALLED UNDER CACHELOCK
///
///
private long CountInternal()
{
long count = 0;
foreach (PricesByDate pricesByDate in priceCache.Values)
{
count += pricesByDate.Count;
}
return count;
}
}
}