147 lines
6.2 KiB
C#
147 lines
6.2 KiB
C#
using MarketData.MarketDataModel;
|
|
using MarketData.DataAccess;
|
|
using MarketData.Utils;
|
|
|
|
namespace MarketData.Helper
|
|
{
|
|
public class AnalystRatingsThreadHelper : ThreadHelper
|
|
{
|
|
public AnalystRatingsThreadHelper(String symbol, DateTime modified, DateTime maxAnalystRatingsDate, ManualResetEvent resetEvent)
|
|
{
|
|
Symbol = symbol;
|
|
Modified = modified;
|
|
ResetEvent = resetEvent;
|
|
MaxAnalystRatingsDate = maxAnalystRatingsDate;
|
|
}
|
|
|
|
public DateTime MaxAnalystRatingsDate { get; private set; }
|
|
}
|
|
|
|
// ****************************************************************************************************************************************
|
|
|
|
public class AnalystRatingsMarketDataHelper : MarketDataHelperBase<String>
|
|
{
|
|
private static int MaxThreads = 10;
|
|
private static int SLEEP_TIME_MS = 500;
|
|
|
|
public AnalystRatingsMarketDataHelper()
|
|
{
|
|
}
|
|
|
|
public bool UpdateAnalystRatings()
|
|
{
|
|
return false;
|
|
}
|
|
|
|
public bool UpdateMissingAnalystRatings()
|
|
{
|
|
Profiler profiler = new Profiler();
|
|
try
|
|
{
|
|
List<String> symbols = PricingDA.GetSymbols();
|
|
DateTime maxRatingsDate = AnalystRatingsDA.GetMaxDateNoZacks();
|
|
Queue = symbols;
|
|
// ShuffleQueue(); // shuffle up the symbols
|
|
Index = -1;
|
|
ManualResetEvent[] resetEvents = new ManualResetEvent[MaxThreads];
|
|
for (int eventIndex = 0; eventIndex < resetEvents.Length; eventIndex++) resetEvents[eventIndex] = new ManualResetEvent(true);
|
|
MDTrace.WriteLine(String.Format("Queuing UpdateMissingAnalystRatings fetches ..."));
|
|
DateTime modified = DateTime.Now;
|
|
while (true)
|
|
{
|
|
ManualResetEvent[] availableEvents = GetAvailableEvents(resetEvents);
|
|
ManualResetEvent[] busyEvents = GetBusyEvents(resetEvents);
|
|
if (null == PeekQueueItem() && 0 == busyEvents.Length)
|
|
{
|
|
MDTrace.WriteLine(LogLevel.DEBUG, String.Format("UpdateMissingAnalystRatings queue contains {0} items, busy events {1}, all done.", 0, busyEvents.Length));
|
|
break;
|
|
}
|
|
for (int index = 0; index < availableEvents.Length; index++)
|
|
{
|
|
String symbol = GetQueueItem();
|
|
if (null != symbol)
|
|
{
|
|
availableEvents[index].Reset();
|
|
AnalystRatingsThreadHelper analystRatingsThreadHelper = new AnalystRatingsThreadHelper(symbol, maxRatingsDate, modified, availableEvents[index]);
|
|
ThreadPool.QueueUserWorkItem(ThreadPoolCallbackUpdateMissingAnalystRating, analystRatingsThreadHelper);
|
|
try { Thread.Sleep(SLEEP_TIME_MS); } catch (Exception) {; }
|
|
}
|
|
else
|
|
{
|
|
busyEvents = GetBusyEvents(resetEvents);
|
|
if (busyEvents.Length != availableEvents.Length)
|
|
{
|
|
ManualResetEvent[] resizedEvents = new ManualResetEvent[busyEvents.Length];
|
|
Array.Copy(busyEvents, resizedEvents, busyEvents.Length);
|
|
resetEvents = resizedEvents;
|
|
}
|
|
break;
|
|
}
|
|
} // for
|
|
MDTrace.WriteLine(LogLevel.DEBUG, "UpdateMissingAnalystRatings waiting for free slots...");
|
|
if (resetEvents.Length > 0) WaitHandle.WaitAny(resetEvents);
|
|
if (null == PeekQueueItem()) resetEvents = ResizeEvents(resetEvents);
|
|
} // while
|
|
MDTrace.WriteLine(LogLevel.DEBUG, "UpdateMissingAnalystRatings completed.");
|
|
return true;
|
|
}
|
|
finally
|
|
{
|
|
MDTrace.WriteLine(LogLevel.DEBUG, String.Format("[UpdateMissingAnalystRatings]Done, total took {0}(ms)", profiler.End()));
|
|
}
|
|
}
|
|
|
|
public void ThreadPoolCallbackUpdateMissingAnalystRating(Object analystRatingThreadHelperContext)
|
|
{
|
|
AnalystRatingsThreadHelper threadHelper = (AnalystRatingsThreadHelper)analystRatingThreadHelperContext;
|
|
try
|
|
{
|
|
MDTrace.WriteLine(LogLevel.DEBUG, String.Format("Load missing analyst rating, Thread {0} started for {1}...", Thread.CurrentThread.ManagedThreadId, threadHelper.Symbol));
|
|
UpdateMissingAnalystRatingEx(threadHelper.Symbol, threadHelper.MaxAnalystRatingsDate, threadHelper.Modified);
|
|
MDTrace.WriteLine(LogLevel.DEBUG, String.Format("Load missing analyst rating, Thread {0} ended for {1}", Thread.CurrentThread.ManagedThreadId, threadHelper.Symbol));
|
|
}
|
|
finally
|
|
{
|
|
threadHelper.ResetEvent.Set();
|
|
}
|
|
}
|
|
|
|
public void UpdateMissingAnalystRatingEx(String symbol, DateTime maxAnalystRatingsDate, DateTime modified)
|
|
{
|
|
try
|
|
{
|
|
AnalystRatings analystRatings = MarketDataHelper.GetAnalystRatingsMarketBeat(symbol);
|
|
if (null == analystRatings || 0 == analystRatings.Count)
|
|
{
|
|
return;
|
|
}
|
|
|
|
analystRatings = new AnalystRatings((from AnalystRating analystRating in analystRatings where analystRating.Date >= maxAnalystRatingsDate.Date select analystRating).ToList());
|
|
AnalystRatings duplicateList = new AnalystRatings();
|
|
foreach (AnalystRating analystRating in analystRatings)
|
|
{
|
|
if (AnalystRatingsDA.ContainsAnalystRating(analystRating))
|
|
{
|
|
MDTrace.WriteLine(LogLevel.DEBUG, String.Format("Already have analyst rating for {0} on {1} from brokerage firm {2}", analystRating.Symbol, analystRating.Date.ToShortDateString(), analystRating.BrokerageFirm));
|
|
duplicateList.Add(analystRating);
|
|
return;
|
|
}
|
|
}
|
|
analystRatings = new AnalystRatings(analystRatings.Except(duplicateList).ToList());
|
|
foreach (AnalystRating analystRating in analystRatings)
|
|
{
|
|
MDTrace.WriteLine(LogLevel.DEBUG, String.Format("Inserting Analyst Rating for {0} on {1} from brokerage firm {2}", analystRating.Symbol, analystRating.Date.ToShortDateString(), analystRating.BrokerageFirm));
|
|
}
|
|
AnalystRatingsDA.InsertAnalystRatings(analystRatings);
|
|
}
|
|
catch (Exception exception)
|
|
{
|
|
MDTrace.WriteLine(LogLevel.DEBUG, $"UpdateMissingAnalystRatingEx : Exception {exception.ToString()}");
|
|
}
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
|