Files
ARM64/MarketData/MarketDataLib/Helper/AnalystRatingsMarketDataHelper.cs

147 lines
6.2 KiB
C#

using MarketData.MarketDataModel;
using MarketData.DataAccess;
using MarketData.Utils;
namespace MarketData.Helper
{
public class AnalystRatingsThreadHelper : ThreadHelper
{
public AnalystRatingsThreadHelper(String symbol, DateTime modified, DateTime maxAnalystRatingsDate, ManualResetEvent resetEvent)
{
Symbol = symbol;
Modified = modified;
ResetEvent = resetEvent;
MaxAnalystRatingsDate = maxAnalystRatingsDate;
}
public DateTime MaxAnalystRatingsDate { get; private set; }
}
// ****************************************************************************************************************************************
public class AnalystRatingsMarketDataHelper : MarketDataHelperBase<String>
{
private static int MaxThreads = 10;
private static int SLEEP_TIME_MS = 500;
public AnalystRatingsMarketDataHelper()
{
}
public bool UpdateAnalystRatings()
{
return false;
}
public bool UpdateMissingAnalystRatings()
{
Profiler profiler = new Profiler();
try
{
List<String> symbols = PricingDA.GetSymbols();
DateTime maxRatingsDate = AnalystRatingsDA.GetMaxDateNoZacks();
Queue = symbols;
// ShuffleQueue(); // shuffle up the symbols
Index = -1;
ManualResetEvent[] resetEvents = new ManualResetEvent[MaxThreads];
for (int eventIndex = 0; eventIndex < resetEvents.Length; eventIndex++) resetEvents[eventIndex] = new ManualResetEvent(true);
MDTrace.WriteLine(String.Format("Queuing UpdateMissingAnalystRatings fetches ..."));
DateTime modified = DateTime.Now;
while (true)
{
ManualResetEvent[] availableEvents = GetAvailableEvents(resetEvents);
ManualResetEvent[] busyEvents = GetBusyEvents(resetEvents);
if (null == PeekQueueItem() && 0 == busyEvents.Length)
{
MDTrace.WriteLine(LogLevel.DEBUG, String.Format("UpdateMissingAnalystRatings queue contains {0} items, busy events {1}, all done.", 0, busyEvents.Length));
break;
}
for (int index = 0; index < availableEvents.Length; index++)
{
String symbol = GetQueueItem();
if (null != symbol)
{
availableEvents[index].Reset();
AnalystRatingsThreadHelper analystRatingsThreadHelper = new AnalystRatingsThreadHelper(symbol, maxRatingsDate, modified, availableEvents[index]);
ThreadPool.QueueUserWorkItem(ThreadPoolCallbackUpdateMissingAnalystRating, analystRatingsThreadHelper);
try { Thread.Sleep(SLEEP_TIME_MS); } catch (Exception) {; }
}
else
{
busyEvents = GetBusyEvents(resetEvents);
if (busyEvents.Length != availableEvents.Length)
{
ManualResetEvent[] resizedEvents = new ManualResetEvent[busyEvents.Length];
Array.Copy(busyEvents, resizedEvents, busyEvents.Length);
resetEvents = resizedEvents;
}
break;
}
} // for
MDTrace.WriteLine(LogLevel.DEBUG, "UpdateMissingAnalystRatings waiting for free slots...");
if (resetEvents.Length > 0) WaitHandle.WaitAny(resetEvents);
if (null == PeekQueueItem()) resetEvents = ResizeEvents(resetEvents);
} // while
MDTrace.WriteLine(LogLevel.DEBUG, "UpdateMissingAnalystRatings completed.");
return true;
}
finally
{
MDTrace.WriteLine(LogLevel.DEBUG, String.Format("[UpdateMissingAnalystRatings]Done, total took {0}(ms)", profiler.End()));
}
}
public void ThreadPoolCallbackUpdateMissingAnalystRating(Object analystRatingThreadHelperContext)
{
AnalystRatingsThreadHelper threadHelper = (AnalystRatingsThreadHelper)analystRatingThreadHelperContext;
try
{
MDTrace.WriteLine(LogLevel.DEBUG, String.Format("Load missing analyst rating, Thread {0} started for {1}...", Thread.CurrentThread.ManagedThreadId, threadHelper.Symbol));
UpdateMissingAnalystRatingEx(threadHelper.Symbol, threadHelper.MaxAnalystRatingsDate, threadHelper.Modified);
MDTrace.WriteLine(LogLevel.DEBUG, String.Format("Load missing analyst rating, Thread {0} ended for {1}", Thread.CurrentThread.ManagedThreadId, threadHelper.Symbol));
}
finally
{
threadHelper.ResetEvent.Set();
}
}
public void UpdateMissingAnalystRatingEx(String symbol, DateTime maxAnalystRatingsDate, DateTime modified)
{
try
{
AnalystRatings analystRatings = MarketDataHelper.GetAnalystRatingsMarketBeat(symbol);
if (null == analystRatings || 0 == analystRatings.Count)
{
return;
}
analystRatings = new AnalystRatings((from AnalystRating analystRating in analystRatings where analystRating.Date >= maxAnalystRatingsDate.Date select analystRating).ToList());
AnalystRatings duplicateList = new AnalystRatings();
foreach (AnalystRating analystRating in analystRatings)
{
if (AnalystRatingsDA.ContainsAnalystRating(analystRating))
{
MDTrace.WriteLine(LogLevel.DEBUG, String.Format("Already have analyst rating for {0} on {1} from brokerage firm {2}", analystRating.Symbol, analystRating.Date.ToShortDateString(), analystRating.BrokerageFirm));
duplicateList.Add(analystRating);
return;
}
}
analystRatings = new AnalystRatings(analystRatings.Except(duplicateList).ToList());
foreach (AnalystRating analystRating in analystRatings)
{
MDTrace.WriteLine(LogLevel.DEBUG, String.Format("Inserting Analyst Rating for {0} on {1} from brokerage firm {2}", analystRating.Symbol, analystRating.Date.ToShortDateString(), analystRating.BrokerageFirm));
}
AnalystRatingsDA.InsertAnalystRatings(analystRatings);
}
catch (Exception exception)
{
MDTrace.WriteLine(LogLevel.DEBUG, $"UpdateMissingAnalystRatingEx : Exception {exception.ToString()}");
}
}
}
}