397 lines
10 KiB
C#
397 lines
10 KiB
C#
using MarketData.MarketDataModel;
|
|
using MarketData.Utils;
|
|
using MarketData.DataAccess;
|
|
using System.Collections.Concurrent;
|
|
using System.Threading;
|
|
using System.Collections.Generic;
|
|
using System;
|
|
using System.Threading.Tasks;
|
|
using System.Linq;
|
|
|
|
namespace MarketData.Cache
|
|
{
|
|
public class LocalPriceCache
|
|
{
|
|
private Dictionary<string, PricesByDate> priceCache = new Dictionary<string, PricesByDate>();
|
|
private static LocalPriceCache instance = null;
|
|
private DateTime latestDate = Utility.Epoch;
|
|
private Thread cacheMonitorThread = null;
|
|
private volatile bool threadRun = true;
|
|
private int cacheCycle = 300000;
|
|
private object thisLock = new object();
|
|
private object fetchLock = new object();
|
|
|
|
private LocalPriceCache()
|
|
{
|
|
cacheMonitorThread = new Thread(new ThreadStart(ThreadProc));
|
|
cacheMonitorThread.Start();
|
|
}
|
|
|
|
public void Clear()
|
|
{
|
|
lock (thisLock)
|
|
{
|
|
priceCache = new Dictionary<string, PricesByDate>();
|
|
RefreshLatestDate();
|
|
}
|
|
}
|
|
|
|
public void Dispose()
|
|
{
|
|
Thread threadToJoin = null;
|
|
|
|
lock (thisLock)
|
|
{
|
|
if (instance == null || !threadRun) return;
|
|
threadRun = false;
|
|
threadToJoin = cacheMonitorThread;
|
|
cacheMonitorThread = null;
|
|
instance = null;
|
|
}
|
|
|
|
if (threadToJoin != null)
|
|
{
|
|
MDTrace.WriteLine(LogLevel.DEBUG, $"[LocalPriceCache:Dispose] Thread state is '{Utility.ThreadStateToString(threadToJoin)}'. Joining...");
|
|
threadToJoin.Join(5000);
|
|
}
|
|
|
|
MDTrace.WriteLine(LogLevel.DEBUG, "[LocalPriceCache:Dispose] End");
|
|
}
|
|
|
|
public static LocalPriceCache GetInstance()
|
|
{
|
|
lock (typeof(LocalPriceCache))
|
|
{
|
|
if (instance == null)
|
|
{
|
|
instance = new LocalPriceCache();
|
|
}
|
|
return instance;
|
|
}
|
|
}
|
|
|
|
public void RefreshLatestDate()
|
|
{
|
|
lock (thisLock)
|
|
{
|
|
latestDate = PricingDA.GetLatestDate();
|
|
}
|
|
}
|
|
|
|
public DateTime GetLatestDate()
|
|
{
|
|
lock (thisLock)
|
|
{
|
|
if (Utility.IsEpoch(latestDate))
|
|
{
|
|
RefreshLatestDate();
|
|
}
|
|
return latestDate;
|
|
}
|
|
}
|
|
|
|
public void Refresh()
|
|
{
|
|
List<string> symbols;
|
|
Dictionary<string, DateTime> currentMaxDates;
|
|
|
|
lock (thisLock)
|
|
{
|
|
symbols = priceCache.Keys.ToList();
|
|
currentMaxDates = priceCache.ToDictionary(kvp => kvp.Key, kvp => kvp.Value.MaxDate);
|
|
}
|
|
|
|
if (symbols.Count == 0) return;
|
|
|
|
ConcurrentDictionary<string, PricesByDate> fullReloads = new ConcurrentDictionary<string, PricesByDate>();
|
|
ConcurrentDictionary<string, Price> singleUpdates = new ConcurrentDictionary<string, Price>();
|
|
DateTime latestDateFromDb;
|
|
|
|
lock (fetchLock)
|
|
{
|
|
Dictionary<string, DateTime> maxDbDates = PricingDA.GetLatestDates(symbols);
|
|
latestDateFromDb = PricingDA.GetLatestDate();
|
|
|
|
Parallel.ForEach(symbols, new ParallelOptions { MaxDegreeOfParallelism = 8 }, symbol =>
|
|
{
|
|
if (!currentMaxDates.TryGetValue(symbol, out var cachedMax)) return;
|
|
|
|
if (maxDbDates.TryGetValue(symbol, out var dbMax) && dbMax.Date != cachedMax.Date)
|
|
{
|
|
Prices prices = PricingDA.GetPrices(symbol, cachedMax);
|
|
if (prices != null) fullReloads[symbol] = prices.GetPricesByDate();
|
|
}
|
|
else
|
|
{
|
|
Price price = PricingDA.GetPrice(symbol, cachedMax);
|
|
if (price != null) singleUpdates[symbol] = price;
|
|
}
|
|
});
|
|
}
|
|
|
|
lock (thisLock)
|
|
{
|
|
latestDate = latestDateFromDb;
|
|
|
|
foreach (var kvp in fullReloads)
|
|
{
|
|
if (priceCache.TryGetValue(kvp.Key, out PricesByDate existing) && existing.MaxDate == currentMaxDates[kvp.Key])
|
|
{
|
|
priceCache[kvp.Key] = kvp.Value;
|
|
}
|
|
}
|
|
|
|
foreach (var kvp in singleUpdates)
|
|
{
|
|
if (priceCache.TryGetValue(kvp.Key, out PricesByDate pricesByDate) && pricesByDate.MaxDate == currentMaxDates[kvp.Key])
|
|
{
|
|
// Remove the old price (if any) and add the new price properly
|
|
if (pricesByDate.ContainsKey(kvp.Value.Date))
|
|
pricesByDate.Remove(kvp.Value.Date);
|
|
pricesByDate.Add(kvp.Value.Date, kvp.Value);
|
|
}
|
|
}
|
|
}
|
|
|
|
MDTrace.WriteLine(LogLevel.DEBUG, $"Full reloads: {fullReloads.Count}, Single updates: {singleUpdates.Count}");
|
|
}
|
|
|
|
public void Add(PortfolioTrades portfolioTrades)
|
|
{
|
|
List<string> symbols = portfolioTrades.Symbols;
|
|
Dictionary<string, DateTime> minTradeDates = symbols.ToDictionary(sym => sym, sym => portfolioTrades.GetMinTradeDate(sym));
|
|
|
|
Dictionary<string, DateTime> minCacheDates;
|
|
lock (thisLock)
|
|
{
|
|
minCacheDates = symbols.ToDictionary(sym => sym, sym => priceCache.ContainsKey(sym) ? priceCache[sym].MinDate : DateTime.MaxValue);
|
|
}
|
|
|
|
ConcurrentDictionary<string, Prices> fetchedPrices = new ConcurrentDictionary<string, Prices>();
|
|
|
|
Parallel.ForEach(symbols, new ParallelOptions { MaxDegreeOfParallelism = 8 }, symbol =>
|
|
{
|
|
DateTime minTradeDate = minTradeDates[symbol];
|
|
DateTime minCacheDate = minCacheDates[symbol];
|
|
Prices prices = null;
|
|
|
|
try
|
|
{
|
|
if (minCacheDate == DateTime.MaxValue)
|
|
{
|
|
prices = PricingDA.GetPrices(symbol, minTradeDate);
|
|
}
|
|
else if (minTradeDate < minCacheDate)
|
|
{
|
|
prices = PricingDA.GetPrices(symbol, minCacheDate, minTradeDate);
|
|
}
|
|
|
|
if (prices != null && prices.Count > 0)
|
|
{
|
|
fetchedPrices[symbol] = prices;
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
MDTrace.WriteLine(LogLevel.DEBUG, $"Error fetching prices for {symbol}: {ex.Message}");
|
|
}
|
|
});
|
|
|
|
lock (thisLock)
|
|
{
|
|
foreach (var kvp in fetchedPrices)
|
|
{
|
|
foreach (var price in kvp.Value)
|
|
{
|
|
Add(price);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
public void Add(Prices prices)
|
|
{
|
|
foreach (Price price in prices)
|
|
{
|
|
Add(price);
|
|
}
|
|
}
|
|
|
|
public void Add(List<string> symbols, DateTime pricingDate)
|
|
{
|
|
if (symbols == null || symbols.Count == 0) return;
|
|
|
|
ConcurrentDictionary<string, Price> fetchedPrices = new ConcurrentDictionary<string, Price>();
|
|
|
|
Parallel.ForEach(symbols, new ParallelOptions { MaxDegreeOfParallelism = 8 }, symbol =>
|
|
{
|
|
lock (thisLock)
|
|
{
|
|
if (ContainsPrice(symbol, pricingDate)) return;
|
|
}
|
|
|
|
try
|
|
{
|
|
Price price = PricingDA.GetPrice(symbol, pricingDate);
|
|
if (price != null) fetchedPrices[symbol] = price;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
MDTrace.WriteLine(LogLevel.DEBUG, $"Error fetching price for {symbol} on {pricingDate:yyyy-MM-dd}: {ex.Message}");
|
|
}
|
|
});
|
|
|
|
lock (thisLock)
|
|
{
|
|
foreach (var kvp in fetchedPrices)
|
|
{
|
|
Add(kvp.Value);
|
|
}
|
|
}
|
|
}
|
|
|
|
public void Add(Price price)
|
|
{
|
|
if (price == null) return;
|
|
|
|
lock (thisLock)
|
|
{
|
|
if (!priceCache.TryGetValue(price.Symbol, out var pricesByDate))
|
|
{
|
|
pricesByDate = new PricesByDate();
|
|
priceCache[price.Symbol] = pricesByDate;
|
|
}
|
|
if (!pricesByDate.ContainsKey(price.Date))
|
|
{
|
|
pricesByDate.Add(price.Date, price); // must use Add() to update MinDate/MaxDate
|
|
}
|
|
}
|
|
}
|
|
|
|
public DateTime GetMinCacheDate(string symbol)
|
|
{
|
|
lock (thisLock)
|
|
{
|
|
if (!priceCache.TryGetValue(symbol, out var symbolPrices) || symbolPrices.Count == 0)
|
|
{
|
|
return Utility.Epoch;
|
|
}
|
|
return symbolPrices.MinDate;
|
|
}
|
|
}
|
|
|
|
public void RemoveDate(DateTime date)
|
|
{
|
|
lock (thisLock)
|
|
{
|
|
foreach (var kvp in priceCache)
|
|
{
|
|
kvp.Value.Remove(date);
|
|
}
|
|
}
|
|
}
|
|
|
|
public Prices GetPrices(string symbol, DateTime endDate, int dayCount)
|
|
{
|
|
lock (thisLock)
|
|
{
|
|
if (!priceCache.TryGetValue(symbol, out var pricesByDate)) return new Prices();
|
|
|
|
DateGenerator dateGenerator = new DateGenerator();
|
|
List<DateTime> historicalDates = dateGenerator.GenerateHistoricalDates(endDate, dayCount);
|
|
|
|
Prices result = new Prices();
|
|
foreach (DateTime date in historicalDates)
|
|
{
|
|
if (pricesByDate.ContainsKey(date))
|
|
{
|
|
result.Add(pricesByDate[date]);
|
|
}
|
|
}
|
|
|
|
return result;
|
|
}
|
|
}
|
|
|
|
public Price GetPrice(string symbol, DateTime date)
|
|
{
|
|
lock (thisLock)
|
|
{
|
|
if (!priceCache.TryGetValue(symbol, out var pricesByDate)) return null;
|
|
return pricesByDate.TryGetValue(date, out var price) ? price : null;
|
|
}
|
|
}
|
|
|
|
public bool ContainsPrice(string symbol, DateTime date)
|
|
{
|
|
lock (thisLock)
|
|
{
|
|
if (!priceCache.TryGetValue(symbol, out var pricesByDate)) return false;
|
|
return pricesByDate.ContainsKey(date);
|
|
}
|
|
}
|
|
|
|
public bool ContainsPrice(List<string> symbols, DateTime date)
|
|
{
|
|
if (symbols == null || symbols.Count == 0) return false;
|
|
|
|
lock (thisLock)
|
|
{
|
|
foreach (string symbol in symbols)
|
|
{
|
|
if (!priceCache.TryGetValue(symbol, out var pricesByDate) || !pricesByDate.ContainsKey(date))
|
|
{
|
|
return false;
|
|
}
|
|
}
|
|
return true;
|
|
}
|
|
}
|
|
|
|
public bool ContainsSymbol(string symbol)
|
|
{
|
|
lock (thisLock)
|
|
{
|
|
return priceCache.ContainsKey(symbol);
|
|
}
|
|
}
|
|
|
|
private long Count()
|
|
{
|
|
lock (thisLock)
|
|
{
|
|
long count = 0;
|
|
foreach (var pricesByDate in priceCache.Values)
|
|
{
|
|
count += pricesByDate.Count;
|
|
}
|
|
return count;
|
|
}
|
|
}
|
|
|
|
private void ThreadProc()
|
|
{
|
|
int quantums = 0;
|
|
int quantumInterval = 1000;
|
|
long lastCount = 0;
|
|
|
|
while (threadRun)
|
|
{
|
|
Thread.Sleep(quantumInterval);
|
|
quantums += quantumInterval;
|
|
if (quantums > cacheCycle)
|
|
{
|
|
quantums = 0;
|
|
lock (thisLock)
|
|
{
|
|
lastCount = Count();
|
|
MDTrace.WriteLine(LogLevel.DEBUG, $"[LocalPriceCache:ThreadProc] Symbols: {priceCache.Keys.Count}. Items in cache: {Utility.FormatNumber(lastCount,0,true)}.");
|
|
}
|
|
}
|
|
}
|
|
|
|
MDTrace.WriteLine(LogLevel.DEBUG, $"[LocalPriceCache:ThreadProc] Thread ended. Items in cache:{Utility.FormatNumber(lastCount,0,true)}");
|
|
}
|
|
}
|
|
}
|