Enahnce GLPirceCache. Eviction logic and better synchronizartion.
This commit is contained in:
@@ -2,479 +2,507 @@
|
||||
using MarketData.Utils;
|
||||
using MarketData.DataAccess;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Threading;
|
||||
using System.Collections.Generic;
|
||||
using System;
|
||||
using System.Threading.Tasks;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace MarketData.Cache
|
||||
{
|
||||
public class GLPriceCache
|
||||
{
|
||||
private Dictionary<string, PricesByDate> priceCache = new Dictionary<string, PricesByDate>();
|
||||
private static GLPriceCache instance = null;
|
||||
private DateTime latestDate = Utility.Epoch;
|
||||
private Thread cacheMonitorThread = null;
|
||||
private volatile bool threadRun = true;
|
||||
private int cacheCycle = 300000;
|
||||
private object thisLock = new object();
|
||||
private object fetchLock = new object();
|
||||
|
||||
private GLPriceCache()
|
||||
public class GLPriceCache : IDisposable
|
||||
{
|
||||
cacheMonitorThread = new Thread(new ThreadStart(ThreadProc));
|
||||
cacheMonitorThread.Start();
|
||||
}
|
||||
// ── Singleton ────────────────────────────────────────────────────────────
|
||||
private static readonly object instanceLock = new object();
|
||||
private static GLPriceCache instance = null;
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
Thread threadToJoin = null;
|
||||
|
||||
lock (thisLock)
|
||||
{
|
||||
if (instance == null || !threadRun) return;
|
||||
threadRun = false;
|
||||
threadToJoin = cacheMonitorThread;
|
||||
cacheMonitorThread = null;
|
||||
instance = null;
|
||||
}
|
||||
|
||||
if (threadToJoin != null)
|
||||
{
|
||||
MDTrace.WriteLine(LogLevel.DEBUG, $"[GLPriceCache:Dispose] Thread state is '{Utility.ThreadStateToString(threadToJoin)}'. Joining...");
|
||||
threadToJoin.Join(5000);
|
||||
}
|
||||
|
||||
MDTrace.WriteLine(LogLevel.DEBUG, "[GLPriceCache:Dispose] End");
|
||||
}
|
||||
|
||||
public static GLPriceCache GetInstance()
|
||||
{
|
||||
lock (typeof(GLPriceCache))
|
||||
{
|
||||
if (instance == null)
|
||||
public static GLPriceCache GetInstance()
|
||||
{
|
||||
instance = new GLPriceCache();
|
||||
}
|
||||
return instance;
|
||||
}
|
||||
}
|
||||
|
||||
public void Add(PortfolioTrades portfolioTrades)
|
||||
{
|
||||
List<string> symbols = portfolioTrades.Symbols;
|
||||
DateTime today = DateTime.Today;
|
||||
|
||||
// Default: 3x logical cores, capped at 32 to avoid saturating the connection pool.
|
||||
// I/O-bound DB calls benefit from more threads than CPU cores, but diminishing
|
||||
// returns set in beyond ~32 concurrent connections for most DB workloads.
|
||||
int defaultParallelism = Math.Min(Environment.ProcessorCount * 3, 32);
|
||||
|
||||
Dictionary<string, DateTime> minTradeDates = symbols.ToDictionary(symbol => symbol, symbol => portfolioTrades.GetMinTradeDate(symbol));
|
||||
|
||||
// Symbols that need an intraday refresh:
|
||||
// - open positions (no close date), or
|
||||
// - closed today (close price may still be settling)
|
||||
HashSet<string> mutableSymbols = new HashSet<string>(symbols.Where(sym => portfolioTrades.HasOpenPositions(sym)));
|
||||
//|| portfolioTrades.GetMaxTradeDate(sym).Date == today));
|
||||
|
||||
Dictionary<string, DateTime> minCacheDates;
|
||||
lock (thisLock)
|
||||
{
|
||||
minCacheDates = symbols.ToDictionary(symbol => symbol,symbol => priceCache.ContainsKey(symbol) ? priceCache[symbol].MinDate : DateTime.MaxValue);
|
||||
lock (instanceLock)
|
||||
{
|
||||
if (instance == null)
|
||||
instance = new GLPriceCache();
|
||||
return instance;
|
||||
}
|
||||
}
|
||||
|
||||
ConcurrentDictionary<string, Prices> fetchedPrices = new ConcurrentDictionary<string, Prices>();
|
||||
ConcurrentDictionary<string, Price> latestPrices = new ConcurrentDictionary<string, Price>();
|
||||
// ── State ────────────────────────────────────────────────────────────────
|
||||
private readonly Dictionary<string, PricesByDate> priceCache = new Dictionary<string, PricesByDate>();
|
||||
private readonly ConcurrentDictionary<string, object> symbolFetchLocks = new ConcurrentDictionary<string, object>();
|
||||
private readonly object cacheLock = new object();
|
||||
private DateTime latestDate = Utility.Epoch;
|
||||
|
||||
Parallel.ForEach(symbols, new ParallelOptions { MaxDegreeOfParallelism = defaultParallelism }, symbol =>
|
||||
// ── Background refresh ───────────────────────────────────────────────────
|
||||
private readonly TimeSpan cacheCycle = TimeSpan.FromMinutes(5);
|
||||
private Timer refreshTimer = null;
|
||||
private int tickCount = 0;
|
||||
private const int evictionTickInterval = 12; // every 12 ticks x 5 min = 1 hour
|
||||
|
||||
// ── Parallelism ──────────────────────────────────────────────────────────
|
||||
private static readonly int maxParallelDbCalls = ResolveMaxParallelDbCalls();
|
||||
|
||||
private static int ResolveMaxParallelDbCalls()
|
||||
{
|
||||
DateTime minTradeDate = minTradeDates[symbol];
|
||||
DateTime minCacheDate = minCacheDates[symbol];
|
||||
int @default = Math.Min(Math.Max(1, Environment.ProcessorCount) * 3, 32);
|
||||
string configured = Environment.GetEnvironmentVariable("GL_PRICE_CACHE_PARALLEL_DB_CALLS");
|
||||
return int.TryParse(configured, out int parsed) && parsed > 0 ? parsed : @default;
|
||||
}
|
||||
|
||||
// ── Disposal ─────────────────────────────────────────────────────────────
|
||||
private volatile bool disposed = false;
|
||||
private int refreshInProgress = 0;
|
||||
|
||||
// ── Constructor ──────────────────────────────────────────────────────────
|
||||
private GLPriceCache()
|
||||
{
|
||||
refreshTimer = new Timer(OnCacheRefreshTick, null, cacheCycle, cacheCycle);
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
Dispose(true);
|
||||
GC.SuppressFinalize(this);
|
||||
}
|
||||
|
||||
protected virtual void Dispose(bool disposing)
|
||||
{
|
||||
if (disposed) return;
|
||||
disposed = true;
|
||||
|
||||
if (disposing)
|
||||
{
|
||||
Timer timerToDispose;
|
||||
lock (instanceLock)
|
||||
{
|
||||
timerToDispose = refreshTimer;
|
||||
refreshTimer = null;
|
||||
instance = null;
|
||||
}
|
||||
|
||||
// Block until any in-flight tick completes before disposing
|
||||
using (var waited = new ManualResetEventSlim(false))
|
||||
{
|
||||
if (timerToDispose != null)
|
||||
timerToDispose.Dispose(waited.WaitHandle);
|
||||
waited.Wait(TimeSpan.FromSeconds(10));
|
||||
}
|
||||
|
||||
MDTrace.WriteLine(LogLevel.DEBUG, "[GLPriceCache:Dispose] Disposed.");
|
||||
}
|
||||
}
|
||||
|
||||
// ── Background tick ──────────────────────────────────────────────────────
|
||||
private void OnCacheRefreshTick(object state)
|
||||
{
|
||||
if (disposed) return;
|
||||
try
|
||||
{
|
||||
// Historical fetch — only when cache is missing or incomplete
|
||||
Prices prices = null;
|
||||
if (minCacheDate == DateTime.MaxValue)
|
||||
lock (cacheLock)
|
||||
{
|
||||
prices = PricingDA.GetPrices(symbol, minTradeDate);
|
||||
}
|
||||
else if (minTradeDate < minCacheDate)
|
||||
{
|
||||
prices = PricingDA.GetPrices(symbol, minCacheDate, minTradeDate);
|
||||
long count = CountInternal();
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,
|
||||
$"[GLPriceCache:Tick] Symbols: {priceCache.Keys.Count}. " +
|
||||
$"Items in cache: {Utility.FormatNumber(count, 0, true)}.");
|
||||
}
|
||||
|
||||
if (prices != null && prices.Count > 0)
|
||||
{
|
||||
fetchedPrices[symbol] = prices;
|
||||
}
|
||||
|
||||
// Intraday refresh — open positions and positions closed today only
|
||||
if (mutableSymbols.Contains(symbol))
|
||||
{
|
||||
Price latestPrice = PricingDA.GetPrice(symbol);
|
||||
if (latestPrice != null)
|
||||
latestPrices[symbol] = latestPrice;
|
||||
}
|
||||
if (++tickCount % evictionTickInterval == 0)
|
||||
EvictStaleSymbols();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
MDTrace.WriteLine(LogLevel.DEBUG, $"Error fetching prices for {symbol}: {ex.Message}");
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,
|
||||
$"[GLPriceCache:Tick] [ERROR] Unhandled exception: {ex}");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
lock (thisLock)
|
||||
// ── Eviction ─────────────────────────────────────────────────────────────
|
||||
private void EvictStaleSymbols()
|
||||
{
|
||||
// Historical prices — idempotent, will not overwrite existing entries
|
||||
foreach (KeyValuePair<string, Prices> kvp in fetchedPrices)
|
||||
if (Interlocked.CompareExchange(ref refreshInProgress, 1, 0) == 1)
|
||||
{
|
||||
foreach (var price in kvp.Value)
|
||||
MDTrace.WriteLine(LogLevel.DEBUG, "[GLPriceCache:Evict] Skipped — refresh in progress.");
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
int count;
|
||||
lock (cacheLock)
|
||||
{
|
||||
Add(price);
|
||||
count = priceCache.Count;
|
||||
priceCache.Clear();
|
||||
symbolFetchLocks.Clear();
|
||||
}
|
||||
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,
|
||||
$"[GLPriceCache:Evict] Cache cleared. {count} symbols evicted.");
|
||||
}
|
||||
finally
|
||||
{
|
||||
Interlocked.Exchange(ref refreshInProgress, 0);
|
||||
}
|
||||
}
|
||||
|
||||
// ── Public API ───────────────────────────────────────────────────────────
|
||||
|
||||
public void Add(PortfolioTrades portfolioTrades)
|
||||
{
|
||||
List<string> symbols = portfolioTrades.Symbols;
|
||||
|
||||
Dictionary<string, DateTime> minTradeDates = symbols.ToDictionary(
|
||||
sym => sym, sym => portfolioTrades.GetMinTradeDate(sym));
|
||||
|
||||
// Only open positions need an intraday price refresh.
|
||||
// Closed positions, regardless of close date, have immutable prices — skip the DB call.
|
||||
HashSet<string> mutableSymbols = new HashSet<string>(
|
||||
symbols.Where(sym => portfolioTrades.HasOpenPositions(sym)));
|
||||
|
||||
Dictionary<string, DateTime> minCacheDates;
|
||||
lock (cacheLock)
|
||||
{
|
||||
minCacheDates = symbols.ToDictionary(
|
||||
sym => sym,
|
||||
sym => priceCache.ContainsKey(sym) ? priceCache[sym].MinDate : DateTime.MaxValue);
|
||||
}
|
||||
|
||||
ConcurrentDictionary<string, Prices> fetchedPrices = new ConcurrentDictionary<string, Prices>();
|
||||
ConcurrentDictionary<string, Price> latestPrices = new ConcurrentDictionary<string, Price>();
|
||||
|
||||
Parallel.ForEach(symbols, new ParallelOptions { MaxDegreeOfParallelism = maxParallelDbCalls }, symbol =>
|
||||
{
|
||||
if (disposed) return;
|
||||
|
||||
DateTime minTradeDate = minTradeDates[symbol];
|
||||
DateTime minCacheDate = minCacheDates[symbol];
|
||||
|
||||
// Acquire a per-symbol lock to prevent concurrent clients from issuing
|
||||
// duplicate DB fetches for the same symbol. First thread fetches;
|
||||
// subsequent threads for the same symbol block here, then re-check
|
||||
// the cache on entry and skip the fetch if already populated.
|
||||
object symbolLock = symbolFetchLocks.GetOrAdd(symbol, _ => new object());
|
||||
lock (symbolLock)
|
||||
{
|
||||
try
|
||||
{
|
||||
// Re-check cache state after acquiring symbol lock — another
|
||||
// client thread may have already fetched this symbol while we waited
|
||||
lock (cacheLock)
|
||||
{
|
||||
if (priceCache.ContainsKey(symbol))
|
||||
minCacheDate = priceCache[symbol].MinDate;
|
||||
}
|
||||
|
||||
// Historical fetch — only when cache is missing or incomplete
|
||||
Prices prices = null;
|
||||
if (minCacheDate == DateTime.MaxValue)
|
||||
prices = PricingDA.GetPrices(symbol, minTradeDate);
|
||||
else if (minTradeDate < minCacheDate)
|
||||
prices = PricingDA.GetPrices(symbol, minCacheDate, minTradeDate);
|
||||
|
||||
if (prices != null && prices.Count > 0)
|
||||
fetchedPrices[symbol] = prices;
|
||||
|
||||
// Intraday refresh — mutable symbols only
|
||||
if (mutableSymbols.Contains(symbol))
|
||||
{
|
||||
Price latestPrice = PricingDA.GetPrice(symbol);
|
||||
if (latestPrice != null)
|
||||
latestPrices[symbol] = latestPrice;
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,
|
||||
$"[GLPriceCache:Add] [ERROR] Failed fetching prices for {symbol}: {ex.Message}");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
lock (cacheLock)
|
||||
{
|
||||
// Historical prices — idempotent, will not overwrite existing entries
|
||||
foreach (KeyValuePair<string, Prices> kvp in fetchedPrices)
|
||||
foreach (Price price in kvp.Value)
|
||||
AddInternal(price);
|
||||
|
||||
// Latest prices — unconditional overwrite to capture intraday updates
|
||||
foreach (KeyValuePair<string, Price> kvp in latestPrices)
|
||||
{
|
||||
PricesByDate pricesByDate;
|
||||
if (!priceCache.TryGetValue(kvp.Key, out pricesByDate))
|
||||
{
|
||||
pricesByDate = new PricesByDate();
|
||||
priceCache[kvp.Key] = pricesByDate;
|
||||
}
|
||||
|
||||
if (pricesByDate.ContainsKey(kvp.Value.Date))
|
||||
pricesByDate.Remove(kvp.Value.Date);
|
||||
pricesByDate.Add(kvp.Value.Date, kvp.Value);
|
||||
}
|
||||
}
|
||||
|
||||
// Latest prices — unconditional overwrite to capture any intraday updates
|
||||
foreach (KeyValuePair<string, Price> kvp in latestPrices)
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,
|
||||
$"[GLPriceCache:Add] Symbols: {symbols.Count}, Mutable: {mutableSymbols.Count}, " +
|
||||
$"Historical fetches: {fetchedPrices.Count}, Intraday updates: {latestPrices.Count}");
|
||||
}
|
||||
|
||||
public void Add(Prices prices)
|
||||
{
|
||||
foreach (Price price in prices)
|
||||
Add(price);
|
||||
}
|
||||
|
||||
public void Add(List<string> symbols, DateTime pricingDate)
|
||||
{
|
||||
if (symbols == null || symbols.Count == 0) return;
|
||||
|
||||
ConcurrentDictionary<string, Price> fetchedPrices = new ConcurrentDictionary<string, Price>();
|
||||
|
||||
Parallel.ForEach(symbols, new ParallelOptions { MaxDegreeOfParallelism = maxParallelDbCalls }, symbol =>
|
||||
{
|
||||
if (!priceCache.TryGetValue(kvp.Key, out var pricesByDate))
|
||||
if (disposed) return;
|
||||
|
||||
lock (cacheLock)
|
||||
{
|
||||
pricesByDate = new PricesByDate();
|
||||
priceCache[kvp.Key] = pricesByDate;
|
||||
if (ContainsPriceInternal(symbol, pricingDate)) return;
|
||||
}
|
||||
|
||||
if (pricesByDate.ContainsKey(kvp.Value.Date))
|
||||
pricesByDate.Remove(kvp.Value.Date);
|
||||
pricesByDate.Add(kvp.Value.Date, kvp.Value);
|
||||
try
|
||||
{
|
||||
Price price = PricingDA.GetPrice(symbol, pricingDate);
|
||||
if (price != null) fetchedPrices[symbol] = price;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,
|
||||
$"[GLPriceCache:Add] [ERROR] Failed fetching price for {symbol} on {pricingDate:yyyy-MM-dd}: {ex.Message}");
|
||||
}
|
||||
});
|
||||
|
||||
lock (cacheLock)
|
||||
{
|
||||
foreach (KeyValuePair<string, Price> kvp in fetchedPrices)
|
||||
AddInternal(kvp.Value);
|
||||
}
|
||||
}
|
||||
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,
|
||||
$"[GLPriceCache:Add] Symbols: {symbols.Count}, Mutable: {mutableSymbols.Count}, " +
|
||||
$"Historical fetches: {fetchedPrices.Count}, Intraday updates: {latestPrices.Count}");
|
||||
}
|
||||
|
||||
//public void Add(PortfolioTrades portfolioTrades)
|
||||
//{
|
||||
// List<string> symbols = portfolioTrades.Symbols;
|
||||
// Dictionary<string, DateTime> minTradeDates = symbols.ToDictionary(sym => sym, sym => portfolioTrades.GetMinTradeDate(sym));
|
||||
|
||||
// Dictionary<string, DateTime> minCacheDates;
|
||||
// lock (thisLock)
|
||||
// {
|
||||
// minCacheDates = symbols.ToDictionary(sym => sym, sym => priceCache.ContainsKey(sym) ? priceCache[sym].MinDate : DateTime.MaxValue);
|
||||
// }
|
||||
|
||||
// ConcurrentDictionary<string, Prices> fetchedPrices = new ConcurrentDictionary<string, Prices>();
|
||||
|
||||
// Parallel.ForEach(symbols, new ParallelOptions { MaxDegreeOfParallelism = 8 }, symbol =>
|
||||
// {
|
||||
// DateTime minTradeDate = minTradeDates[symbol];
|
||||
// DateTime minCacheDate = minCacheDates[symbol];
|
||||
// Prices prices = null;
|
||||
|
||||
// try
|
||||
// {
|
||||
// if (minCacheDate == DateTime.MaxValue)
|
||||
// {
|
||||
// prices = PricingDA.GetPrices(symbol, minTradeDate);
|
||||
// }
|
||||
// else if (minTradeDate < minCacheDate)
|
||||
// {
|
||||
// prices = PricingDA.GetPrices(symbol, minCacheDate, minTradeDate);
|
||||
// }
|
||||
|
||||
// if (prices != null && prices.Count > 0)
|
||||
// {
|
||||
// fetchedPrices[symbol] = prices;
|
||||
// }
|
||||
// }
|
||||
// catch (Exception ex)
|
||||
// {
|
||||
// MDTrace.WriteLine(LogLevel.DEBUG, $"Error fetching prices for {symbol}: {ex.Message}");
|
||||
// }
|
||||
// });
|
||||
|
||||
// lock (thisLock)
|
||||
// {
|
||||
// foreach (var kvp in fetchedPrices)
|
||||
// {
|
||||
// foreach (var price in kvp.Value)
|
||||
// {
|
||||
// Add(price);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
//}
|
||||
|
||||
public DateTime GetLatestDate()
|
||||
{
|
||||
lock (thisLock)
|
||||
{
|
||||
if (Utility.IsEpoch(latestDate))
|
||||
public void Add(Price price)
|
||||
{
|
||||
RefreshLatestDate();
|
||||
}
|
||||
return latestDate;
|
||||
}
|
||||
}
|
||||
|
||||
public void RefreshLatestDate()
|
||||
{
|
||||
lock (thisLock)
|
||||
{
|
||||
latestDate = PricingDA.GetLatestDate();
|
||||
}
|
||||
}
|
||||
|
||||
public void Refresh()
|
||||
{
|
||||
List<string> symbols;
|
||||
Dictionary<string, DateTime> currentMaxDates;
|
||||
|
||||
lock (thisLock)
|
||||
{
|
||||
symbols = priceCache.Keys.ToList();
|
||||
currentMaxDates = priceCache.ToDictionary(kvp => kvp.Key, kvp => kvp.Value.MaxDate);
|
||||
}
|
||||
if (symbols.Count == 0) return;
|
||||
ConcurrentDictionary<string, PricesByDate> fullReloads = new ConcurrentDictionary<string, PricesByDate>();
|
||||
ConcurrentDictionary<string, Price> singleUpdates = new ConcurrentDictionary<string, Price>();
|
||||
DateTime latestDateFromDb;
|
||||
lock (fetchLock)
|
||||
{
|
||||
Dictionary<string, DateTime> maxDbDates = PricingDA.GetLatestDates(symbols);
|
||||
latestDateFromDb = PricingDA.GetLatestDate();
|
||||
|
||||
Parallel.ForEach(symbols, new ParallelOptions { MaxDegreeOfParallelism = 8 }, symbol =>
|
||||
{
|
||||
if (!currentMaxDates.TryGetValue(symbol, out var cachedMax)) return;
|
||||
|
||||
if (maxDbDates.TryGetValue(symbol, out var dbMax) && dbMax.Date != cachedMax.Date)
|
||||
{
|
||||
Prices prices = PricingDA.GetPrices(symbol, cachedMax);
|
||||
if (prices != null) fullReloads[symbol] = prices.GetPricesByDate();
|
||||
}
|
||||
else
|
||||
{
|
||||
Price price = PricingDA.GetPrice(symbol, cachedMax);
|
||||
if (price != null) singleUpdates[symbol] = price;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
lock (thisLock)
|
||||
{
|
||||
latestDate = latestDateFromDb;
|
||||
|
||||
foreach (var kvp in fullReloads)
|
||||
{
|
||||
if (priceCache.TryGetValue(kvp.Key, out PricesByDate existing) && existing.MaxDate == currentMaxDates[kvp.Key])
|
||||
{
|
||||
priceCache[kvp.Key] = kvp.Value;
|
||||
}
|
||||
lock (cacheLock)
|
||||
{
|
||||
AddInternal(price);
|
||||
}
|
||||
}
|
||||
|
||||
foreach (var kvp in singleUpdates)
|
||||
public void Refresh()
|
||||
{
|
||||
if (priceCache.TryGetValue(kvp.Key, out PricesByDate pricesByDate) && pricesByDate.MaxDate == currentMaxDates[kvp.Key])
|
||||
{
|
||||
// Remove the old price (if any) and add the new price properly
|
||||
if (pricesByDate.ContainsKey(kvp.Value.Date))
|
||||
pricesByDate.Remove(kvp.Value.Date);
|
||||
pricesByDate.Add(kvp.Value.Date, kvp.Value);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (Interlocked.CompareExchange(ref refreshInProgress, 1, 0) == 1)
|
||||
{
|
||||
MDTrace.WriteLine(LogLevel.DEBUG, "[GLPriceCache:Refresh] Skipped — refresh already in progress.");
|
||||
return;
|
||||
}
|
||||
|
||||
MDTrace.WriteLine(LogLevel.DEBUG, $"Full reloads: {fullReloads.Count}, Single updates: {singleUpdates.Count}");
|
||||
}
|
||||
try
|
||||
{
|
||||
List<string> symbols;
|
||||
Dictionary<string, DateTime> currentMaxDates;
|
||||
|
||||
lock (cacheLock)
|
||||
{
|
||||
symbols = priceCache.Keys.ToList();
|
||||
currentMaxDates = priceCache.ToDictionary(kvp => kvp.Key, kvp => kvp.Value.MaxDate);
|
||||
}
|
||||
|
||||
public void Add(Prices prices)
|
||||
{
|
||||
foreach (Price price in prices)
|
||||
{
|
||||
Add(price);
|
||||
}
|
||||
}
|
||||
if (symbols.Count == 0) return;
|
||||
|
||||
public void Add(List<string> symbols, DateTime pricingDate)
|
||||
{
|
||||
if (symbols == null || symbols.Count == 0) return;
|
||||
ConcurrentDictionary<string, PricesByDate> fullReloads = new ConcurrentDictionary<string, PricesByDate>();
|
||||
ConcurrentDictionary<string, Price> singleUpdates = new ConcurrentDictionary<string, Price>();
|
||||
|
||||
ConcurrentDictionary<string, Price> fetchedPrices = new ConcurrentDictionary<string, Price>();
|
||||
// Fetch outside the cache lock — no timeout risk holding cacheLock over I/O
|
||||
Dictionary<string, DateTime> maxDbDates = PricingDA.GetLatestDates(symbols);
|
||||
DateTime latestDateFromDb = PricingDA.GetLatestDate();
|
||||
|
||||
Parallel.ForEach(symbols, new ParallelOptions { MaxDegreeOfParallelism = 8 }, symbol =>
|
||||
{
|
||||
lock (thisLock)
|
||||
{
|
||||
if (ContainsPrice(symbol, pricingDate)) return;
|
||||
Parallel.ForEach(symbols, new ParallelOptions { MaxDegreeOfParallelism = maxParallelDbCalls }, symbol =>
|
||||
{
|
||||
if (disposed) return;
|
||||
|
||||
DateTime cachedMax;
|
||||
if (!currentMaxDates.TryGetValue(symbol, out cachedMax)) return;
|
||||
|
||||
try
|
||||
{
|
||||
DateTime dbMax;
|
||||
if (maxDbDates.TryGetValue(symbol, out dbMax) && dbMax.Date != cachedMax.Date)
|
||||
{
|
||||
Prices prices = PricingDA.GetPrices(symbol, cachedMax);
|
||||
if (prices != null) fullReloads[symbol] = prices.GetPricesByDate();
|
||||
}
|
||||
else
|
||||
{
|
||||
Price price = PricingDA.GetPrice(symbol, cachedMax);
|
||||
if (price != null) singleUpdates[symbol] = price;
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,
|
||||
$"[GLPriceCache:Refresh] [ERROR] Failed refreshing {symbol}: {ex.Message}");
|
||||
}
|
||||
});
|
||||
|
||||
lock (cacheLock)
|
||||
{
|
||||
latestDate = latestDateFromDb;
|
||||
|
||||
foreach (KeyValuePair<string, PricesByDate> kvp in fullReloads)
|
||||
{
|
||||
PricesByDate existing;
|
||||
if (priceCache.TryGetValue(kvp.Key, out existing) &&
|
||||
existing.MaxDate == currentMaxDates[kvp.Key])
|
||||
{
|
||||
priceCache[kvp.Key] = kvp.Value;
|
||||
}
|
||||
}
|
||||
|
||||
foreach (KeyValuePair<string, Price> kvp in singleUpdates)
|
||||
{
|
||||
PricesByDate pricesByDate;
|
||||
if (priceCache.TryGetValue(kvp.Key, out pricesByDate) &&
|
||||
pricesByDate.MaxDate == currentMaxDates[kvp.Key])
|
||||
{
|
||||
if (pricesByDate.ContainsKey(kvp.Value.Date))
|
||||
pricesByDate.Remove(kvp.Value.Date);
|
||||
pricesByDate.Add(kvp.Value.Date, kvp.Value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,
|
||||
$"[GLPriceCache:Refresh] Full reloads: {fullReloads.Count}, Single updates: {singleUpdates.Count}");
|
||||
}
|
||||
finally
|
||||
{
|
||||
Interlocked.Exchange(ref refreshInProgress, 0);
|
||||
}
|
||||
}
|
||||
|
||||
try
|
||||
public DateTime GetLatestDate()
|
||||
{
|
||||
Price price = PricingDA.GetPrice(symbol, pricingDate);
|
||||
if (price != null) fetchedPrices[symbol] = price;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
MDTrace.WriteLine(LogLevel.DEBUG, $"Error fetching price for {symbol} on {pricingDate:yyyy-MM-dd}: {ex.Message}");
|
||||
}
|
||||
});
|
||||
|
||||
lock (thisLock)
|
||||
{
|
||||
foreach (var kvp in fetchedPrices)
|
||||
{
|
||||
Add(kvp.Value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void Add(Price price)
|
||||
{
|
||||
if (price == null) return;
|
||||
|
||||
lock (thisLock)
|
||||
{
|
||||
if (!priceCache.TryGetValue(price.Symbol, out var pricesByDate))
|
||||
{
|
||||
pricesByDate = new PricesByDate();
|
||||
priceCache[price.Symbol] = pricesByDate;
|
||||
}
|
||||
if (!pricesByDate.ContainsKey(price.Date))
|
||||
{
|
||||
pricesByDate.Add(price.Date, price); // must use Add() to update MinDate/MaxDate
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public DateTime GetMinCacheDate(string symbol)
|
||||
{
|
||||
lock (thisLock)
|
||||
{
|
||||
if (!priceCache.TryGetValue(symbol, out var symbolPrices) || symbolPrices.Count == 0)
|
||||
{
|
||||
return Utility.Epoch;
|
||||
}
|
||||
return symbolPrices.MinDate;
|
||||
}
|
||||
}
|
||||
|
||||
//public void RemoveDate(DateTime date)
|
||||
//{
|
||||
// lock (thisLock)
|
||||
// {
|
||||
// foreach (var kvp in priceCache)
|
||||
// {
|
||||
// kvp.Value.Remove(date);
|
||||
// }
|
||||
// }
|
||||
//}
|
||||
|
||||
public Prices GetPrices(string symbol, DateTime endDate, int dayCount)
|
||||
{
|
||||
lock (thisLock)
|
||||
{
|
||||
if (!priceCache.TryGetValue(symbol, out var pricesByDate)) return new Prices();
|
||||
|
||||
DateGenerator dateGenerator = new DateGenerator();
|
||||
List<DateTime> historicalDates = dateGenerator.GenerateHistoricalDates(endDate, dayCount);
|
||||
|
||||
Prices result = new Prices();
|
||||
foreach (DateTime date in historicalDates)
|
||||
{
|
||||
if (pricesByDate.ContainsKey(date))
|
||||
{
|
||||
result.Add(pricesByDate[date]);
|
||||
}
|
||||
lock (cacheLock)
|
||||
{
|
||||
if (Utility.IsEpoch(latestDate))
|
||||
latestDate = PricingDA.GetLatestDate();
|
||||
return latestDate;
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
public Price GetPrice(string symbol, DateTime date)
|
||||
{
|
||||
lock (thisLock)
|
||||
{
|
||||
if (!priceCache.TryGetValue(symbol, out var pricesByDate)) return null;
|
||||
return pricesByDate.TryGetValue(date, out var price) ? price : null;
|
||||
}
|
||||
}
|
||||
|
||||
public bool ContainsPrice(string symbol, DateTime date)
|
||||
{
|
||||
lock (thisLock)
|
||||
{
|
||||
if (!priceCache.TryGetValue(symbol, out var pricesByDate)) return false;
|
||||
return pricesByDate.ContainsKey(date);
|
||||
}
|
||||
}
|
||||
|
||||
public bool ContainsPrice(List<string> symbols, DateTime date)
|
||||
{
|
||||
if (symbols == null || symbols.Count == 0) return false;
|
||||
|
||||
lock (thisLock)
|
||||
{
|
||||
foreach (string symbol in symbols)
|
||||
public void RefreshLatestDate()
|
||||
{
|
||||
if (!priceCache.TryGetValue(symbol, out var pricesByDate) || !pricesByDate.ContainsKey(date))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
lock (cacheLock)
|
||||
{
|
||||
latestDate = PricingDA.GetLatestDate();
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
public bool ContainsSymbol(string symbol)
|
||||
{
|
||||
lock (thisLock)
|
||||
{
|
||||
return priceCache.ContainsKey(symbol);
|
||||
}
|
||||
}
|
||||
|
||||
private long Count()
|
||||
{
|
||||
lock (thisLock)
|
||||
{
|
||||
long count = 0;
|
||||
foreach (var pricesByDate in priceCache.Values)
|
||||
public DateTime GetMinCacheDate(string symbol)
|
||||
{
|
||||
count += pricesByDate.Count;
|
||||
lock (cacheLock)
|
||||
{
|
||||
PricesByDate symbolPrices;
|
||||
if (!priceCache.TryGetValue(symbol, out symbolPrices) || symbolPrices.Count == 0)
|
||||
return Utility.Epoch;
|
||||
return symbolPrices.MinDate;
|
||||
}
|
||||
}
|
||||
return count;
|
||||
}
|
||||
}
|
||||
|
||||
private void ThreadProc()
|
||||
{
|
||||
int quantums = 0;
|
||||
int quantumInterval = 1000;
|
||||
long lastCount = 0;
|
||||
|
||||
while (threadRun)
|
||||
{
|
||||
Thread.Sleep(quantumInterval);
|
||||
quantums += quantumInterval;
|
||||
if (quantums > cacheCycle)
|
||||
public Prices GetPrices(string symbol, DateTime endDate, int dayCount)
|
||||
{
|
||||
quantums = 0;
|
||||
lock (thisLock)
|
||||
{
|
||||
lastCount = Count();
|
||||
MDTrace.WriteLine(LogLevel.DEBUG, $"[GLPriceCache:ThreadProc] Symbols: {priceCache.Keys.Count}. Items in cache: {Utility.FormatNumber(lastCount,0,true)}.");
|
||||
}
|
||||
}
|
||||
}
|
||||
lock (cacheLock)
|
||||
{
|
||||
PricesByDate pricesByDate;
|
||||
if (!priceCache.TryGetValue(symbol, out pricesByDate)) return new Prices();
|
||||
|
||||
MDTrace.WriteLine(LogLevel.DEBUG, $"[GLPriceCache:ThreadProc] Thread ended. Items in cache:{Utility.FormatNumber(lastCount,0,true)}");
|
||||
DateGenerator dateGenerator = new DateGenerator();
|
||||
List<DateTime> historicalDates = dateGenerator.GenerateHistoricalDates(endDate, dayCount);
|
||||
|
||||
Prices result = new Prices();
|
||||
foreach (DateTime date in historicalDates)
|
||||
if (pricesByDate.ContainsKey(date))
|
||||
result.Add(pricesByDate[date]);
|
||||
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
public Price GetPrice(string symbol, DateTime date)
|
||||
{
|
||||
lock (cacheLock)
|
||||
{
|
||||
PricesByDate pricesByDate;
|
||||
if (!priceCache.TryGetValue(symbol, out pricesByDate)) return null;
|
||||
Price price;
|
||||
return pricesByDate.TryGetValue(date, out price) ? price : null;
|
||||
}
|
||||
}
|
||||
|
||||
public bool ContainsPrice(string symbol, DateTime date)
|
||||
{
|
||||
lock (cacheLock)
|
||||
{
|
||||
return ContainsPriceInternal(symbol, date);
|
||||
}
|
||||
}
|
||||
|
||||
public bool ContainsPrice(List<string> symbols, DateTime date)
|
||||
{
|
||||
if (symbols == null || symbols.Count == 0) return false;
|
||||
lock (cacheLock)
|
||||
{
|
||||
foreach (string symbol in symbols)
|
||||
if (!ContainsPriceInternal(symbol, date)) return false;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
public bool ContainsSymbol(string symbol)
|
||||
{
|
||||
lock (cacheLock)
|
||||
{
|
||||
return priceCache.ContainsKey(symbol);
|
||||
}
|
||||
}
|
||||
|
||||
// ── Private helpers ──────────────────────────────────────────────────────
|
||||
|
||||
// Must be called under cacheLock
|
||||
private void AddInternal(Price price)
|
||||
{
|
||||
if (price == null) return;
|
||||
PricesByDate pricesByDate;
|
||||
if (!priceCache.TryGetValue(price.Symbol, out pricesByDate))
|
||||
{
|
||||
pricesByDate = new PricesByDate();
|
||||
priceCache[price.Symbol] = pricesByDate;
|
||||
}
|
||||
if (!pricesByDate.ContainsKey(price.Date))
|
||||
pricesByDate.Add(price.Date, price);
|
||||
}
|
||||
|
||||
// Must be called under cacheLock
|
||||
private bool ContainsPriceInternal(string symbol, DateTime date)
|
||||
{
|
||||
PricesByDate pricesByDate;
|
||||
if (!priceCache.TryGetValue(symbol, out pricesByDate)) return false;
|
||||
return pricesByDate.ContainsKey(date);
|
||||
}
|
||||
|
||||
// Must be called under cacheLock
|
||||
private long CountInternal()
|
||||
{
|
||||
long count = 0;
|
||||
foreach (PricesByDate pricesByDate in priceCache.Values)
|
||||
count += pricesByDate.Count;
|
||||
return count;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user