GLPriceCache Dispose shoud exit quickly
This commit is contained in:
@@ -1,58 +1,67 @@
|
|||||||
using MarketData.MarketDataModel;
|
using MarketData.MarketDataModel;
|
||||||
using MarketData.Utils;
|
using MarketData.Utils;
|
||||||
using MarketData.DataAccess;
|
using MarketData.DataAccess;
|
||||||
using System.Collections.Concurrent;
|
using System.Collections.Concurrent;
|
||||||
using System.Collections.Generic;
|
|
||||||
using System;
|
using System;
|
||||||
using System.Linq;
|
using System.Collections.Generic;
|
||||||
using System.Threading;
|
using System.Threading;
|
||||||
|
using System.Linq;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
namespace MarketData.Cache
|
namespace MarketData.Cache
|
||||||
{
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// GLPriceCache - Used by Gain/Loss Generator which are usedby MarketDataSErver and the User Interface.
|
||||||
|
/// The entire cache evicts every hour so calling Add(PortfolioTrades portfolioTrades) will force price reload
|
||||||
|
/// Always ensures that open trade symbols get most recent price from the database.
|
||||||
|
/// </summary>
|
||||||
public class GLPriceCache : IDisposable
|
public class GLPriceCache : IDisposable
|
||||||
{
|
{
|
||||||
// ── Singleton ────────────────────────────────────────────────────────────
|
// -- Singleton ------------------------------------------------------------
|
||||||
private static readonly object instanceLock = new object();
|
private static readonly object instanceLock = new object();
|
||||||
private static GLPriceCache instance = null;
|
private static GLPriceCache instance = null;
|
||||||
|
|
||||||
|
// -- Disposal -------------------------------------------------------------
|
||||||
|
private volatile bool disposed = false;
|
||||||
|
private int refreshInProgress = 0;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
///
|
||||||
|
/// </summary>
|
||||||
|
/// <returns></returns>
|
||||||
public static GLPriceCache GetInstance()
|
public static GLPriceCache GetInstance()
|
||||||
{
|
{
|
||||||
lock (instanceLock)
|
lock (instanceLock)
|
||||||
{
|
{
|
||||||
if (instance == null)
|
if (instance == null)instance = new GLPriceCache();
|
||||||
instance = new GLPriceCache();
|
|
||||||
return instance;
|
return instance;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── State ────────────────────────────────────────────────────────────────
|
// -- State ----------------------------------------------------------------
|
||||||
private readonly Dictionary<string, PricesByDate> priceCache = new Dictionary<string, PricesByDate>();
|
private readonly Dictionary<string, PricesByDate> priceCache = new Dictionary<string, PricesByDate>();
|
||||||
private readonly ConcurrentDictionary<string, object> symbolFetchLocks = new ConcurrentDictionary<string, object>();
|
private readonly ConcurrentDictionary<string, object> symbolFetchLocks = new ConcurrentDictionary<string, object>();
|
||||||
private readonly object cacheLock = new object();
|
private readonly object cacheLock = new object();
|
||||||
private DateTime latestDate = Utility.Epoch;
|
private DateTime latestDate = Utility.Epoch;
|
||||||
|
|
||||||
// ── Background refresh ───────────────────────────────────────────────────
|
// -- Background refresh ---------------------------------------------------
|
||||||
private readonly TimeSpan cacheCycle = TimeSpan.FromMinutes(5);
|
private readonly TimeSpan cacheCycle = TimeSpan.FromMinutes(5);
|
||||||
private Timer refreshTimer = null;
|
private Timer refreshTimer = null;
|
||||||
private int tickCount = 0;
|
private int tickCount = 0;
|
||||||
private const int evictionTickInterval = 12; // every 12 ticks x 5 min = 1 hour
|
private const int evictionTickInterval = 12; // every 12 ticks x 5 min = 1 hour
|
||||||
|
|
||||||
// ── Parallelism ──────────────────────────────────────────────────────────
|
// -- Parallelism ----------------------------------------------------------
|
||||||
private static readonly int maxParallelDbCalls = ResolveMaxParallelDbCalls();
|
private static readonly int maxParallelDbCalls = ResolveMaxParallelDbCalls();
|
||||||
|
|
||||||
private static int ResolveMaxParallelDbCalls()
|
private static int ResolveMaxParallelDbCalls()
|
||||||
{
|
{
|
||||||
int @default = Math.Min(Math.Max(1, Environment.ProcessorCount) * 3, 32);
|
return Math.Min(Math.Max(1, Environment.ProcessorCount) * 3, 32);
|
||||||
string configured = Environment.GetEnvironmentVariable("GL_PRICE_CACHE_PARALLEL_DB_CALLS");
|
// int @default = Math.Min(Math.Max(1, Environment.ProcessorCount) * 3, 32);
|
||||||
return int.TryParse(configured, out int parsed) && parsed > 0 ? parsed : @default;
|
// string configured = Environment.GetEnvironmentVariable("GL_PRICE_CACHE_PARALLEL_DB_CALLS");
|
||||||
|
// return int.TryParse(configured, out int parsed) && parsed > 0 ? parsed : @default;
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── Disposal ─────────────────────────────────────────────────────────────
|
// -- Constructor ----------------------------------------------------------
|
||||||
private volatile bool disposed = false;
|
|
||||||
private int refreshInProgress = 0;
|
|
||||||
|
|
||||||
// ── Constructor ──────────────────────────────────────────────────────────
|
|
||||||
private GLPriceCache()
|
private GLPriceCache()
|
||||||
{
|
{
|
||||||
refreshTimer = new Timer(OnCacheRefreshTick, null, cacheCycle, cacheCycle);
|
refreshTimer = new Timer(OnCacheRefreshTick, null, cacheCycle, cacheCycle);
|
||||||
@@ -71,7 +80,7 @@ namespace MarketData.Cache
|
|||||||
|
|
||||||
if (disposing)
|
if (disposing)
|
||||||
{
|
{
|
||||||
Timer timerToDispose;
|
Timer timerToDispose = null;
|
||||||
lock (instanceLock)
|
lock (instanceLock)
|
||||||
{
|
{
|
||||||
timerToDispose = refreshTimer;
|
timerToDispose = refreshTimer;
|
||||||
@@ -79,19 +88,49 @@ namespace MarketData.Cache
|
|||||||
instance = null;
|
instance = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Block until any in-flight tick completes before disposing
|
// Dispose timer immediately; no blocking wait
|
||||||
using (var waited = new ManualResetEventSlim(false))
|
timerToDispose?.Dispose();
|
||||||
{
|
|
||||||
if (timerToDispose != null)
|
|
||||||
timerToDispose.Dispose(waited.WaitHandle);
|
|
||||||
waited.Wait(TimeSpan.FromSeconds(10));
|
|
||||||
}
|
|
||||||
|
|
||||||
MDTrace.WriteLine(LogLevel.DEBUG, "[GLPriceCache:Dispose] Disposed.");
|
MDTrace.WriteLine(LogLevel.DEBUG, "[GLPriceCache:Dispose] Disposed.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── Background tick ──────────────────────────────────────────────────────
|
/// <summary>
|
||||||
|
/// Dispoal
|
||||||
|
/// </summary>
|
||||||
|
//public void Dispose()
|
||||||
|
//{
|
||||||
|
// Dispose(true);
|
||||||
|
// GC.SuppressFinalize(this);
|
||||||
|
//}
|
||||||
|
|
||||||
|
//protected virtual void Dispose(bool disposing)
|
||||||
|
//{
|
||||||
|
// if (disposed) return;
|
||||||
|
// disposed = true;
|
||||||
|
|
||||||
|
// if (disposing)
|
||||||
|
// {
|
||||||
|
// Timer timerToDispose;
|
||||||
|
// lock (instanceLock)
|
||||||
|
// {
|
||||||
|
// timerToDispose = refreshTimer;
|
||||||
|
// refreshTimer = null;
|
||||||
|
// instance = null;
|
||||||
|
// }
|
||||||
|
// // Block until any in-flight tick completes before disposing
|
||||||
|
// using (ManualResetEventSlim waited = new ManualResetEventSlim(false))
|
||||||
|
// {
|
||||||
|
// if (timerToDispose != null)
|
||||||
|
// timerToDispose.Dispose(waited.WaitHandle);
|
||||||
|
// waited.Wait(TimeSpan.FromSeconds(10));
|
||||||
|
// }
|
||||||
|
|
||||||
|
// MDTrace.WriteLine(LogLevel.DEBUG, "[GLPriceCache:Dispose] Disposed.");
|
||||||
|
// }
|
||||||
|
//}
|
||||||
|
|
||||||
|
// -- Background tick ------------------------------------------------------
|
||||||
private void OnCacheRefreshTick(object state)
|
private void OnCacheRefreshTick(object state)
|
||||||
{
|
{
|
||||||
if (disposed) return;
|
if (disposed) return;
|
||||||
@@ -100,30 +139,27 @@ namespace MarketData.Cache
|
|||||||
lock (cacheLock)
|
lock (cacheLock)
|
||||||
{
|
{
|
||||||
long count = CountInternal();
|
long count = CountInternal();
|
||||||
MDTrace.WriteLine(LogLevel.DEBUG,
|
MDTrace.WriteLine(LogLevel.DEBUG,$"[GLPriceCache:Tick] Symbols: {priceCache.Keys.Count}. " + $"Items in cache: {Utility.FormatNumber(count, 0, true)}.");
|
||||||
$"[GLPriceCache:Tick] Symbols: {priceCache.Keys.Count}. " +
|
|
||||||
$"Items in cache: {Utility.FormatNumber(count, 0, true)}.");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (++tickCount % evictionTickInterval == 0)
|
if (++tickCount % evictionTickInterval == 0)
|
||||||
|
{
|
||||||
EvictStaleSymbols();
|
EvictStaleSymbols();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
MDTrace.WriteLine(LogLevel.DEBUG,
|
MDTrace.WriteLine(LogLevel.DEBUG,$"[GLPriceCache:Tick] [ERROR] Unhandled exception: {ex}");
|
||||||
$"[GLPriceCache:Tick] [ERROR] Unhandled exception: {ex}");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── Eviction ─────────────────────────────────────────────────────────────
|
// -- Eviction -------------------------------------------------------------
|
||||||
private void EvictStaleSymbols()
|
private void EvictStaleSymbols()
|
||||||
{
|
{
|
||||||
if (Interlocked.CompareExchange(ref refreshInProgress, 1, 0) == 1)
|
if (Interlocked.CompareExchange(ref refreshInProgress, 1, 0) == 1)
|
||||||
{
|
{
|
||||||
MDTrace.WriteLine(LogLevel.DEBUG, "[GLPriceCache:Evict] Skipped — refresh in progress.");
|
MDTrace.WriteLine(LogLevel.DEBUG, "[GLPriceCache:Evict] Skipped refresh in progress.");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
int count;
|
int count;
|
||||||
@@ -133,9 +169,7 @@ namespace MarketData.Cache
|
|||||||
priceCache.Clear();
|
priceCache.Clear();
|
||||||
symbolFetchLocks.Clear();
|
symbolFetchLocks.Clear();
|
||||||
}
|
}
|
||||||
|
MDTrace.WriteLine(LogLevel.DEBUG,$"[GLPriceCache:Evict] Cache cleared. {count} symbols evicted.");
|
||||||
MDTrace.WriteLine(LogLevel.DEBUG,
|
|
||||||
$"[GLPriceCache:Evict] Cache cleared. {count} symbols evicted.");
|
|
||||||
}
|
}
|
||||||
finally
|
finally
|
||||||
{
|
{
|
||||||
@@ -143,89 +177,185 @@ namespace MarketData.Cache
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── Public API ───────────────────────────────────────────────────────────
|
// -- Public API -----------------------------------------------------------
|
||||||
|
|
||||||
|
// public void Add(PortfolioTrades portfolioTrades)
|
||||||
|
// {
|
||||||
|
// List<string> symbols = portfolioTrades.Symbols;
|
||||||
|
|
||||||
|
// Dictionary<string, DateTime> minTradeDates = symbols.ToDictionary(
|
||||||
|
// sym => sym, sym => portfolioTrades.GetMinTradeDate(sym));
|
||||||
|
|
||||||
|
// // Only open positions need an intraday price refresh.
|
||||||
|
// // Closed positions, regardless of close date, have immutable prices skip the DB call.
|
||||||
|
// HashSet<string> mutableSymbols = new HashSet<string>(
|
||||||
|
// symbols.Where(sym => portfolioTrades.HasOpenPositions(sym)));
|
||||||
|
|
||||||
|
// Dictionary<string, DateTime> minCacheDates;
|
||||||
|
// lock (cacheLock)
|
||||||
|
// {
|
||||||
|
// minCacheDates = symbols.ToDictionary(
|
||||||
|
// sym => sym,
|
||||||
|
// sym => priceCache.ContainsKey(sym) ? priceCache[sym].MinDate : DateTime.MaxValue);
|
||||||
|
// }
|
||||||
|
|
||||||
|
// ConcurrentDictionary<string, Prices> fetchedPrices = new ConcurrentDictionary<string, Prices>();
|
||||||
|
// ConcurrentDictionary<string, Price> latestPrices = new ConcurrentDictionary<string, Price>();
|
||||||
|
|
||||||
|
// Parallel.ForEach(symbols, new ParallelOptions { MaxDegreeOfParallelism = maxParallelDbCalls }, symbol =>
|
||||||
|
// {
|
||||||
|
// if (disposed) return;
|
||||||
|
|
||||||
|
// DateTime minTradeDate = minTradeDates[symbol];
|
||||||
|
// DateTime minCacheDate = minCacheDates[symbol];
|
||||||
|
|
||||||
|
// // Acquire a per-symbol lock to prevent concurrent clients from issuing
|
||||||
|
// // duplicate DB fetches for the same symbol. First thread fetches;
|
||||||
|
// // subsequent threads for the same symbol block here, then re-check
|
||||||
|
// // the cache on entry and skip the fetch if already populated.
|
||||||
|
// object symbolLock = symbolFetchLocks.GetOrAdd(symbol, _ => new object());
|
||||||
|
// lock (symbolLock)
|
||||||
|
// {
|
||||||
|
// try
|
||||||
|
// {
|
||||||
|
// // Re-check cache state after acquiring symbol lock another
|
||||||
|
// // client thread may have already fetched this symbol while we waited
|
||||||
|
// lock (cacheLock)
|
||||||
|
// {
|
||||||
|
// if (priceCache.ContainsKey(symbol))
|
||||||
|
// minCacheDate = priceCache[symbol].MinDate;
|
||||||
|
// }
|
||||||
|
|
||||||
|
// // Historical fetch only when cache is missing or incomplete
|
||||||
|
// Prices prices = null;
|
||||||
|
// if (minCacheDate == DateTime.MaxValue)
|
||||||
|
// prices = PricingDA.GetPrices(symbol, minTradeDate);
|
||||||
|
// else if (minTradeDate < minCacheDate)
|
||||||
|
// prices = PricingDA.GetPrices(symbol, minCacheDate, minTradeDate);
|
||||||
|
|
||||||
|
// if (prices != null && prices.Count > 0)
|
||||||
|
// fetchedPrices[symbol] = prices;
|
||||||
|
|
||||||
|
// // Intraday refresh mutable symbols only
|
||||||
|
// if (mutableSymbols.Contains(symbol))
|
||||||
|
// {
|
||||||
|
// Price latestPrice = PricingDA.GetPrice(symbol);
|
||||||
|
// if (latestPrice != null)
|
||||||
|
// latestPrices[symbol] = latestPrice;
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// catch (Exception ex)
|
||||||
|
// {
|
||||||
|
// MDTrace.WriteLine(LogLevel.DEBUG,
|
||||||
|
// $"[GLPriceCache:Add] [ERROR] Failed fetching prices for {symbol}: {ex.Message}");
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// });
|
||||||
|
|
||||||
|
// lock (cacheLock)
|
||||||
|
// {
|
||||||
|
// // Historical prices idempotent, will not overwrite existing entries
|
||||||
|
// foreach (KeyValuePair<string, Prices> kvp in fetchedPrices)
|
||||||
|
// foreach (Price price in kvp.Value)
|
||||||
|
// AddInternal(price);
|
||||||
|
|
||||||
|
// // Latest prices unconditional overwrite to capture intraday updates
|
||||||
|
// foreach (KeyValuePair<string, Price> kvp in latestPrices)
|
||||||
|
// {
|
||||||
|
// PricesByDate pricesByDate;
|
||||||
|
// if (!priceCache.TryGetValue(kvp.Key, out pricesByDate))
|
||||||
|
// {
|
||||||
|
// pricesByDate = new PricesByDate();
|
||||||
|
// priceCache[kvp.Key] = pricesByDate;
|
||||||
|
// }
|
||||||
|
|
||||||
|
// if (pricesByDate.ContainsKey(kvp.Value.Date))
|
||||||
|
// pricesByDate.Remove(kvp.Value.Date);
|
||||||
|
// pricesByDate.Add(kvp.Value.Date, kvp.Value);
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
|
// MDTrace.WriteLine(LogLevel.DEBUG,
|
||||||
|
// $"[GLPriceCache:Add] Symbols: {symbols.Count}, Mutable: {mutableSymbols.Count}, " +
|
||||||
|
// $"Historical fetches: {fetchedPrices.Count}, Intraday updates: {latestPrices.Count}");
|
||||||
|
// }
|
||||||
|
|
||||||
public void Add(PortfolioTrades portfolioTrades)
|
public void Add(PortfolioTrades portfolioTrades)
|
||||||
{
|
{
|
||||||
List<string> symbols = portfolioTrades.Symbols;
|
List<string> symbols = portfolioTrades.Symbols;
|
||||||
|
|
||||||
Dictionary<string, DateTime> minTradeDates = symbols.ToDictionary(
|
// Map each symbol to its earliest trade date
|
||||||
sym => sym, sym => portfolioTrades.GetMinTradeDate(sym));
|
Dictionary<string, DateTime> minTradeDates = symbols.ToDictionary(symbol => symbol, symbol => portfolioTrades.GetMinTradeDate(symbol));
|
||||||
|
|
||||||
// Only open positions need an intraday price refresh.
|
// Only open positions need intraday price refresh
|
||||||
// Closed positions, regardless of close date, have immutable prices — skip the DB call.
|
HashSet<string> mutableSymbols = new HashSet<string>(symbols.Where(sym => portfolioTrades.HasOpenPositions(sym)));
|
||||||
HashSet<string> mutableSymbols = new HashSet<string>(
|
// Pre-filter symbols to skip fully cached immutable symbols
|
||||||
symbols.Where(sym => portfolioTrades.HasOpenPositions(sym)));
|
List<string> symbolsToProcess = symbols
|
||||||
|
.Where(sym =>
|
||||||
Dictionary<string, DateTime> minCacheDates;
|
{
|
||||||
|
DateTime minCacheDate;
|
||||||
lock (cacheLock)
|
lock (cacheLock)
|
||||||
{
|
{
|
||||||
minCacheDates = symbols.ToDictionary(
|
minCacheDate = priceCache.ContainsKey(sym) ? priceCache[sym].MinDate : DateTime.MaxValue;
|
||||||
sym => sym,
|
|
||||||
sym => priceCache.ContainsKey(sym) ? priceCache[sym].MinDate : DateTime.MaxValue);
|
|
||||||
}
|
}
|
||||||
|
DateTime minTradeDate = minTradeDates[sym];
|
||||||
|
// Skip if symbol is immutable AND cache already covers all trade dates
|
||||||
|
return mutableSymbols.Contains(sym) || minTradeDate < minCacheDate;
|
||||||
|
})
|
||||||
|
.ToList();
|
||||||
|
|
||||||
|
if (symbolsToProcess.Count == 0) return; // nothing to do
|
||||||
ConcurrentDictionary<string, Prices> fetchedPrices = new ConcurrentDictionary<string, Prices>();
|
ConcurrentDictionary<string, Prices> fetchedPrices = new ConcurrentDictionary<string, Prices>();
|
||||||
ConcurrentDictionary<string, Price> latestPrices = new ConcurrentDictionary<string, Price>();
|
ConcurrentDictionary<string, Price> latestPrices = new ConcurrentDictionary<string, Price>();
|
||||||
|
Parallel.ForEach(symbolsToProcess, new ParallelOptions { MaxDegreeOfParallelism = maxParallelDbCalls }, symbol =>
|
||||||
Parallel.ForEach(symbols, new ParallelOptions { MaxDegreeOfParallelism = maxParallelDbCalls }, symbol =>
|
|
||||||
{
|
{
|
||||||
if (disposed) return;
|
if (disposed) return;
|
||||||
|
|
||||||
DateTime minTradeDate = minTradeDates[symbol];
|
DateTime minTradeDate = minTradeDates[symbol];
|
||||||
DateTime minCacheDate = minCacheDates[symbol];
|
DateTime minCacheDate;
|
||||||
|
lock (cacheLock)
|
||||||
// Acquire a per-symbol lock to prevent concurrent clients from issuing
|
{
|
||||||
// duplicate DB fetches for the same symbol. First thread fetches;
|
minCacheDate = priceCache.ContainsKey(symbol) ? priceCache[symbol].MinDate : DateTime.MaxValue;
|
||||||
// subsequent threads for the same symbol block here, then re-check
|
}
|
||||||
// the cache on entry and skip the fetch if already populated.
|
|
||||||
object symbolLock = symbolFetchLocks.GetOrAdd(symbol, _ => new object());
|
object symbolLock = symbolFetchLocks.GetOrAdd(symbol, _ => new object());
|
||||||
lock (symbolLock)
|
lock (symbolLock)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
// Re-check cache state after acquiring symbol lock — another
|
// Re-check cache after acquiring lock
|
||||||
// client thread may have already fetched this symbol while we waited
|
|
||||||
lock (cacheLock)
|
lock (cacheLock)
|
||||||
{
|
{
|
||||||
if (priceCache.ContainsKey(symbol))
|
if (priceCache.ContainsKey(symbol))minCacheDate = priceCache[symbol].MinDate;
|
||||||
minCacheDate = priceCache[symbol].MinDate;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Historical fetch — only when cache is missing or incomplete
|
|
||||||
Prices prices = null;
|
Prices prices = null;
|
||||||
if (minCacheDate == DateTime.MaxValue)
|
// Historical fetch only if cache is missing or incomplete
|
||||||
prices = PricingDA.GetPrices(symbol, minTradeDate);
|
if (minCacheDate == DateTime.MaxValue)prices = PricingDA.GetPrices(symbol, minTradeDate);
|
||||||
else if (minTradeDate < minCacheDate)
|
else if (minTradeDate < minCacheDate)prices = PricingDA.GetPrices(symbol, minCacheDate, minTradeDate);
|
||||||
prices = PricingDA.GetPrices(symbol, minCacheDate, minTradeDate);
|
if (prices != null && prices.Count > 0)fetchedPrices[symbol] = prices;
|
||||||
|
// Intraday refresh for mutable symbols only
|
||||||
if (prices != null && prices.Count > 0)
|
|
||||||
fetchedPrices[symbol] = prices;
|
|
||||||
|
|
||||||
// Intraday refresh — mutable symbols only
|
|
||||||
if (mutableSymbols.Contains(symbol))
|
if (mutableSymbols.Contains(symbol))
|
||||||
{
|
{
|
||||||
Price latestPrice = PricingDA.GetPrice(symbol);
|
Price latestPrice = PricingDA.GetPrice(symbol);
|
||||||
if (latestPrice != null)
|
if (latestPrice != null)latestPrices[symbol] = latestPrice;
|
||||||
latestPrices[symbol] = latestPrice;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
MDTrace.WriteLine(LogLevel.DEBUG,
|
MDTrace.WriteLine(LogLevel.DEBUG,$"[GLPriceCache:Add] [ERROR] Failed fetching prices for {symbol}: {ex.Message}");
|
||||||
$"[GLPriceCache:Add] [ERROR] Failed fetching prices for {symbol}: {ex.Message}");
|
|
||||||
}
|
}
|
||||||
}
|
} // lcok(symLock)
|
||||||
});
|
}); // Parallel
|
||||||
|
|
||||||
lock (cacheLock)
|
lock (cacheLock)
|
||||||
{
|
{
|
||||||
// Historical prices — idempotent, will not overwrite existing entries
|
// Historical prices idempotent, do not overwrite existing entries
|
||||||
foreach (KeyValuePair<string, Prices> kvp in fetchedPrices)
|
foreach (KeyValuePair<string, Prices> kvp in fetchedPrices)
|
||||||
|
{
|
||||||
foreach (Price price in kvp.Value)
|
foreach (Price price in kvp.Value)
|
||||||
|
{
|
||||||
AddInternal(price);
|
AddInternal(price);
|
||||||
|
}
|
||||||
// Latest prices — unconditional overwrite to capture intraday updates
|
}
|
||||||
|
// Latest prices unconditional overwrite to capture intraday updates
|
||||||
foreach (KeyValuePair<string, Price> kvp in latestPrices)
|
foreach (KeyValuePair<string, Price> kvp in latestPrices)
|
||||||
{
|
{
|
||||||
PricesByDate pricesByDate;
|
PricesByDate pricesByDate;
|
||||||
@@ -234,39 +364,47 @@ namespace MarketData.Cache
|
|||||||
pricesByDate = new PricesByDate();
|
pricesByDate = new PricesByDate();
|
||||||
priceCache[kvp.Key] = pricesByDate;
|
priceCache[kvp.Key] = pricesByDate;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pricesByDate.ContainsKey(kvp.Value.Date))
|
if (pricesByDate.ContainsKey(kvp.Value.Date))
|
||||||
|
{
|
||||||
pricesByDate.Remove(kvp.Value.Date);
|
pricesByDate.Remove(kvp.Value.Date);
|
||||||
|
}
|
||||||
pricesByDate.Add(kvp.Value.Date, kvp.Value);
|
pricesByDate.Add(kvp.Value.Date, kvp.Value);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
MDTrace.WriteLine(LogLevel.DEBUG,
|
// Only log if we actually fetched or updated something
|
||||||
$"[GLPriceCache:Add] Symbols: {symbols.Count}, Mutable: {mutableSymbols.Count}, " +
|
if (fetchedPrices.Count > 0 || latestPrices.Count > 0)
|
||||||
|
{
|
||||||
|
MDTrace.WriteLine(LogLevel.DEBUG,$"[GLPriceCache:Add] Symbols processed: {symbolsToProcess.Count}, Mutable: {mutableSymbols.Count}, " +
|
||||||
$"Historical fetches: {fetchedPrices.Count}, Intraday updates: {latestPrices.Count}");
|
$"Historical fetches: {fetchedPrices.Count}, Intraday updates: {latestPrices.Count}");
|
||||||
}
|
}
|
||||||
|
|
||||||
public void Add(Prices prices)
|
|
||||||
{
|
|
||||||
foreach (Price price in prices)
|
|
||||||
Add(price);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Add prices
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="prices"></param>
|
||||||
|
public void Add(Prices prices)
|
||||||
|
{
|
||||||
|
foreach (Price price in prices)Add(price);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Add(List<string> symbols, DateTime pricingDate)
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="symbols"></param>
|
||||||
|
/// <param name="pricingDate"></param>
|
||||||
public void Add(List<string> symbols, DateTime pricingDate)
|
public void Add(List<string> symbols, DateTime pricingDate)
|
||||||
{
|
{
|
||||||
if (symbols == null || symbols.Count == 0) return;
|
if (symbols == null || symbols.Count == 0) return;
|
||||||
|
|
||||||
ConcurrentDictionary<string, Price> fetchedPrices = new ConcurrentDictionary<string, Price>();
|
ConcurrentDictionary<string, Price> fetchedPrices = new ConcurrentDictionary<string, Price>();
|
||||||
|
|
||||||
Parallel.ForEach(symbols, new ParallelOptions { MaxDegreeOfParallelism = maxParallelDbCalls }, symbol =>
|
Parallel.ForEach(symbols, new ParallelOptions { MaxDegreeOfParallelism = maxParallelDbCalls }, symbol =>
|
||||||
{
|
{
|
||||||
if (disposed) return;
|
if (disposed) return;
|
||||||
|
|
||||||
lock (cacheLock)
|
lock (cacheLock)
|
||||||
{
|
{
|
||||||
if (ContainsPriceInternal(symbol, pricingDate)) return;
|
if (ContainsPriceInternal(symbol, pricingDate)) return;
|
||||||
}
|
}
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
Price price = PricingDA.GetPrice(symbol, pricingDate);
|
Price price = PricingDA.GetPrice(symbol, pricingDate);
|
||||||
@@ -274,18 +412,23 @@ namespace MarketData.Cache
|
|||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
MDTrace.WriteLine(LogLevel.DEBUG,
|
MDTrace.WriteLine(LogLevel.DEBUG,$"[GLPriceCache:Add] [ERROR] Failed fetching price for {symbol} on {pricingDate:yyyy-MM-dd}: {ex.Message}");
|
||||||
$"[GLPriceCache:Add] [ERROR] Failed fetching price for {symbol} on {pricingDate:yyyy-MM-dd}: {ex.Message}");
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
lock (cacheLock)
|
lock (cacheLock)
|
||||||
{
|
{
|
||||||
foreach (KeyValuePair<string, Price> kvp in fetchedPrices)
|
foreach (KeyValuePair<string, Price> kvp in fetchedPrices)
|
||||||
|
{
|
||||||
AddInternal(kvp.Value);
|
AddInternal(kvp.Value);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Add(Price price)
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="price"></param>
|
||||||
public void Add(Price price)
|
public void Add(Price price)
|
||||||
{
|
{
|
||||||
lock (cacheLock)
|
lock (cacheLock)
|
||||||
@@ -294,14 +437,16 @@ namespace MarketData.Cache
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Refresh
|
||||||
|
/// </summary>
|
||||||
public void Refresh()
|
public void Refresh()
|
||||||
{
|
{
|
||||||
if (Interlocked.CompareExchange(ref refreshInProgress, 1, 0) == 1)
|
if (Interlocked.CompareExchange(ref refreshInProgress, 1, 0) == 1)
|
||||||
{
|
{
|
||||||
MDTrace.WriteLine(LogLevel.DEBUG, "[GLPriceCache:Refresh] Skipped — refresh already in progress.");
|
MDTrace.WriteLine(LogLevel.DEBUG, "[GLPriceCache:Refresh] Skipped refresh already in progress.");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
List<string> symbols;
|
List<string> symbols;
|
||||||
@@ -312,23 +457,19 @@ namespace MarketData.Cache
|
|||||||
symbols = priceCache.Keys.ToList();
|
symbols = priceCache.Keys.ToList();
|
||||||
currentMaxDates = priceCache.ToDictionary(kvp => kvp.Key, kvp => kvp.Value.MaxDate);
|
currentMaxDates = priceCache.ToDictionary(kvp => kvp.Key, kvp => kvp.Value.MaxDate);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (symbols.Count == 0) return;
|
if (symbols.Count == 0) return;
|
||||||
|
|
||||||
ConcurrentDictionary<string, PricesByDate> fullReloads = new ConcurrentDictionary<string, PricesByDate>();
|
ConcurrentDictionary<string, PricesByDate> fullReloads = new ConcurrentDictionary<string, PricesByDate>();
|
||||||
ConcurrentDictionary<string, Price> singleUpdates = new ConcurrentDictionary<string, Price>();
|
ConcurrentDictionary<string, Price> singleUpdates = new ConcurrentDictionary<string, Price>();
|
||||||
|
|
||||||
// Fetch outside the cache lock — no timeout risk holding cacheLock over I/O
|
// Fetch outside the cache lock no timeout risk holding cacheLock over I/O
|
||||||
Dictionary<string, DateTime> maxDbDates = PricingDA.GetLatestDates(symbols);
|
Dictionary<string, DateTime> maxDbDates = PricingDA.GetLatestDates(symbols);
|
||||||
DateTime latestDateFromDb = PricingDA.GetLatestDate();
|
DateTime latestDateFromDb = PricingDA.GetLatestDate();
|
||||||
|
|
||||||
Parallel.ForEach(symbols, new ParallelOptions { MaxDegreeOfParallelism = maxParallelDbCalls }, symbol =>
|
Parallel.ForEach(symbols, new ParallelOptions { MaxDegreeOfParallelism = maxParallelDbCalls }, symbol =>
|
||||||
{
|
{
|
||||||
if (disposed) return;
|
if (disposed) return;
|
||||||
|
|
||||||
DateTime cachedMax;
|
DateTime cachedMax;
|
||||||
if (!currentMaxDates.TryGetValue(symbol, out cachedMax)) return;
|
if (!currentMaxDates.TryGetValue(symbol, out cachedMax)) return;
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
DateTime dbMax;
|
DateTime dbMax;
|
||||||
@@ -345,40 +486,35 @@ namespace MarketData.Cache
|
|||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
MDTrace.WriteLine(LogLevel.DEBUG,
|
MDTrace.WriteLine(LogLevel.DEBUG,$"[GLPriceCache:Refresh] [ERROR] Failed refreshing {symbol}: {ex.Message}");
|
||||||
$"[GLPriceCache:Refresh] [ERROR] Failed refreshing {symbol}: {ex.Message}");
|
|
||||||
}
|
}
|
||||||
});
|
}); // Parallel
|
||||||
|
|
||||||
lock (cacheLock)
|
lock (cacheLock)
|
||||||
{
|
{
|
||||||
latestDate = latestDateFromDb;
|
latestDate = latestDateFromDb;
|
||||||
|
|
||||||
foreach (KeyValuePair<string, PricesByDate> kvp in fullReloads)
|
foreach (KeyValuePair<string, PricesByDate> kvp in fullReloads)
|
||||||
{
|
{
|
||||||
PricesByDate existing;
|
PricesByDate existing;
|
||||||
if (priceCache.TryGetValue(kvp.Key, out existing) &&
|
if (priceCache.TryGetValue(kvp.Key, out existing) && existing.MaxDate == currentMaxDates[kvp.Key])
|
||||||
existing.MaxDate == currentMaxDates[kvp.Key])
|
|
||||||
{
|
{
|
||||||
priceCache[kvp.Key] = kvp.Value;
|
priceCache[kvp.Key] = kvp.Value;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
foreach (KeyValuePair<string, Price> kvp in singleUpdates)
|
foreach (KeyValuePair<string, Price> kvp in singleUpdates)
|
||||||
{
|
{
|
||||||
PricesByDate pricesByDate;
|
PricesByDate pricesByDate;
|
||||||
if (priceCache.TryGetValue(kvp.Key, out pricesByDate) &&
|
if (priceCache.TryGetValue(kvp.Key, out pricesByDate) && pricesByDate.MaxDate == currentMaxDates[kvp.Key])
|
||||||
pricesByDate.MaxDate == currentMaxDates[kvp.Key])
|
|
||||||
{
|
{
|
||||||
if (pricesByDate.ContainsKey(kvp.Value.Date))
|
if (pricesByDate.ContainsKey(kvp.Value.Date))
|
||||||
|
{
|
||||||
pricesByDate.Remove(kvp.Value.Date);
|
pricesByDate.Remove(kvp.Value.Date);
|
||||||
|
}
|
||||||
pricesByDate.Add(kvp.Value.Date, kvp.Value);
|
pricesByDate.Add(kvp.Value.Date, kvp.Value);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
} // lock(cacheLock)
|
||||||
|
MDTrace.WriteLine(LogLevel.DEBUG,$"[GLPriceCache:Refresh] Full reloads: {fullReloads.Count}, Single updates: {singleUpdates.Count}");
|
||||||
MDTrace.WriteLine(LogLevel.DEBUG,
|
|
||||||
$"[GLPriceCache:Refresh] Full reloads: {fullReloads.Count}, Single updates: {singleUpdates.Count}");
|
|
||||||
}
|
}
|
||||||
finally
|
finally
|
||||||
{
|
{
|
||||||
@@ -386,16 +522,25 @@ namespace MarketData.Cache
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// GetLatestDate
|
||||||
|
/// </summary>
|
||||||
|
/// <returns></returns>
|
||||||
public DateTime GetLatestDate()
|
public DateTime GetLatestDate()
|
||||||
{
|
{
|
||||||
lock (cacheLock)
|
lock (cacheLock)
|
||||||
{
|
{
|
||||||
if (Utility.IsEpoch(latestDate))
|
if (Utility.IsEpoch(latestDate))
|
||||||
|
{
|
||||||
latestDate = PricingDA.GetLatestDate();
|
latestDate = PricingDA.GetLatestDate();
|
||||||
|
}
|
||||||
return latestDate;
|
return latestDate;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// RefreshLatestDate
|
||||||
|
/// </summary>
|
||||||
public void RefreshLatestDate()
|
public void RefreshLatestDate()
|
||||||
{
|
{
|
||||||
lock (cacheLock)
|
lock (cacheLock)
|
||||||
@@ -404,36 +549,58 @@ namespace MarketData.Cache
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// GetMinCacheDate
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="symbol"></param>
|
||||||
|
/// <returns></returns>
|
||||||
public DateTime GetMinCacheDate(string symbol)
|
public DateTime GetMinCacheDate(string symbol)
|
||||||
{
|
{
|
||||||
lock (cacheLock)
|
lock (cacheLock)
|
||||||
{
|
{
|
||||||
PricesByDate symbolPrices;
|
PricesByDate symbolPrices;
|
||||||
if (!priceCache.TryGetValue(symbol, out symbolPrices) || symbolPrices.Count == 0)
|
if (!priceCache.TryGetValue(symbol, out symbolPrices) || symbolPrices.Count == 0)
|
||||||
|
{
|
||||||
return Utility.Epoch;
|
return Utility.Epoch;
|
||||||
|
}
|
||||||
return symbolPrices.MinDate;
|
return symbolPrices.MinDate;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// GetPrices
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="symbol"></param>
|
||||||
|
/// <param name="endDate"></param>
|
||||||
|
/// <param name="dayCount"></param>
|
||||||
|
/// <returns></returns>
|
||||||
public Prices GetPrices(string symbol, DateTime endDate, int dayCount)
|
public Prices GetPrices(string symbol, DateTime endDate, int dayCount)
|
||||||
{
|
{
|
||||||
lock (cacheLock)
|
lock (cacheLock)
|
||||||
{
|
{
|
||||||
PricesByDate pricesByDate;
|
PricesByDate pricesByDate;
|
||||||
if (!priceCache.TryGetValue(symbol, out pricesByDate)) return new Prices();
|
if (!priceCache.TryGetValue(symbol, out pricesByDate)) return new Prices();
|
||||||
|
|
||||||
DateGenerator dateGenerator = new DateGenerator();
|
DateGenerator dateGenerator = new DateGenerator();
|
||||||
List<DateTime> historicalDates = dateGenerator.GenerateHistoricalDates(endDate, dayCount);
|
List<DateTime> historicalDates = dateGenerator.GenerateHistoricalDates(endDate, dayCount);
|
||||||
|
|
||||||
Prices result = new Prices();
|
Prices result = new Prices();
|
||||||
foreach (DateTime date in historicalDates)
|
foreach (DateTime date in historicalDates)
|
||||||
|
{
|
||||||
if (pricesByDate.ContainsKey(date))
|
if (pricesByDate.ContainsKey(date))
|
||||||
|
{
|
||||||
result.Add(pricesByDate[date]);
|
result.Add(pricesByDate[date]);
|
||||||
|
}
|
||||||
|
}
|
||||||
return result;
|
return result;
|
||||||
}
|
} // lock(cacheLock)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// GetPrice
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="symbol"></param>
|
||||||
|
/// <param name="date"></param>
|
||||||
|
/// <returns></returns>
|
||||||
public Price GetPrice(string symbol, DateTime date)
|
public Price GetPrice(string symbol, DateTime date)
|
||||||
{
|
{
|
||||||
lock (cacheLock)
|
lock (cacheLock)
|
||||||
@@ -445,6 +612,12 @@ namespace MarketData.Cache
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// ContainsPrice
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="symbol"></param>
|
||||||
|
/// <param name="date"></param>
|
||||||
|
/// <returns></returns>
|
||||||
public bool ContainsPrice(string symbol, DateTime date)
|
public bool ContainsPrice(string symbol, DateTime date)
|
||||||
{
|
{
|
||||||
lock (cacheLock)
|
lock (cacheLock)
|
||||||
@@ -453,17 +626,30 @@ namespace MarketData.Cache
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// ContainsPrice
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="symbols"></param>
|
||||||
|
/// <param name="date"></param>
|
||||||
|
/// <returns></returns>
|
||||||
public bool ContainsPrice(List<string> symbols, DateTime date)
|
public bool ContainsPrice(List<string> symbols, DateTime date)
|
||||||
{
|
{
|
||||||
if (symbols == null || symbols.Count == 0) return false;
|
if (symbols == null || symbols.Count == 0) return false;
|
||||||
lock (cacheLock)
|
lock (cacheLock)
|
||||||
{
|
{
|
||||||
foreach (string symbol in symbols)
|
foreach (string symbol in symbols)
|
||||||
|
{
|
||||||
if (!ContainsPriceInternal(symbol, date)) return false;
|
if (!ContainsPriceInternal(symbol, date)) return false;
|
||||||
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// ContainsSymbol
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="symbol"></param>
|
||||||
|
/// <returns></returns>
|
||||||
public bool ContainsSymbol(string symbol)
|
public bool ContainsSymbol(string symbol)
|
||||||
{
|
{
|
||||||
lock (cacheLock)
|
lock (cacheLock)
|
||||||
@@ -472,7 +658,7 @@ namespace MarketData.Cache
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── Private helpers ──────────────────────────────────────────────────────
|
// -- Private helpers ------------------------------------------------------
|
||||||
|
|
||||||
// Must be called under cacheLock
|
// Must be called under cacheLock
|
||||||
private void AddInternal(Price price)
|
private void AddInternal(Price price)
|
||||||
@@ -485,10 +671,17 @@ namespace MarketData.Cache
|
|||||||
priceCache[price.Symbol] = pricesByDate;
|
priceCache[price.Symbol] = pricesByDate;
|
||||||
}
|
}
|
||||||
if (!pricesByDate.ContainsKey(price.Date))
|
if (!pricesByDate.ContainsKey(price.Date))
|
||||||
|
{
|
||||||
pricesByDate.Add(price.Date, price);
|
pricesByDate.Add(price.Date, price);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Must be called under cacheLock
|
/// <summary>
|
||||||
|
/// ContainsPriceInternal - mUST BE CALLED UNDER CACHELOCK
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="symbol"></param>
|
||||||
|
/// <param name="date"></param>
|
||||||
|
/// <returns></returns>
|
||||||
private bool ContainsPriceInternal(string symbol, DateTime date)
|
private bool ContainsPriceInternal(string symbol, DateTime date)
|
||||||
{
|
{
|
||||||
PricesByDate pricesByDate;
|
PricesByDate pricesByDate;
|
||||||
@@ -496,12 +689,17 @@ namespace MarketData.Cache
|
|||||||
return pricesByDate.ContainsKey(date);
|
return pricesByDate.ContainsKey(date);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Must be called under cacheLock
|
/// <summary>
|
||||||
|
/// CountInternal - MUST BE CALLED UNDER CACHELOCK
|
||||||
|
/// </summary>
|
||||||
|
/// <returns></returns>
|
||||||
private long CountInternal()
|
private long CountInternal()
|
||||||
{
|
{
|
||||||
long count = 0;
|
long count = 0;
|
||||||
foreach (PricesByDate pricesByDate in priceCache.Values)
|
foreach (PricesByDate pricesByDate in priceCache.Values)
|
||||||
|
{
|
||||||
count += pricesByDate.Count;
|
count += pricesByDate.Count;
|
||||||
|
}
|
||||||
return count;
|
return count;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user