diff --git a/MarketData/MarketData/Services/MainService.cs b/MarketData/MarketData/Services/MainService.cs index 6361fef..45c718a 100755 --- a/MarketData/MarketData/Services/MainService.cs +++ b/MarketData/MarketData/Services/MainService.cs @@ -32,9 +32,11 @@ namespace MarketData.Services tasks.Add("UPDATELATESTPRICEWATCHLIST",TaskUpdateLatestPriceWatchList); tasks.Add("UPDATELATESTANALYSTRATINGS",TaskUpdateLatestAnalystRatings); tasks.Add("UPDATEANALYSTRATINGS",TaskUpdateAnalystRatings); + tasks.Add("UPDATESECFILINGSWATCHLIST",TaskUpdateSECFilingsWatchList); tasks.Add("ECHO",TaskEcho); GlobalConfig.Instance.Configuration = configuration; // This call sets up configuration stuff so it needs to be first. + if (args.Length < 1 || String.IsNullOrEmpty(args[0])) { DisplayUsage(); @@ -84,6 +86,17 @@ namespace MarketData.Services MDTrace.WriteLine(LogLevel.DEBUG,$"[RunService] Done, total took {profiler.End()}(ms)"); } +// ********************************************************************************************************************************************************************************* + public async Task TaskUpdateSECFilingsWatchList(CommandArgs commandArgs) + { + if(!commandArgs.Has("WATCHLIST")){Console.WriteLine("UPDATESECFILINGSWATCHLIST REQUIRES WATCHLIST");return;} + String watchListName = commandArgs.Coalesce("WATCHLIST"); + List symbols = WatchListDA.GetWatchList(watchListName); + SECFilingMarketDataHelper secFilingMarketDataHelper=new SECFilingMarketDataHelper(); + secFilingMarketDataHelper.UpdateSECFilings(symbols); + await Task.FromResult(true); + } + public async Task TaskLoadHeadlinesWatchList(CommandArgs commandArgs) { if(!commandArgs.Has("WATCHLIST")){Console.WriteLine("LOADHEADLINESWATCHLIST REQUIRES WATCHLIST");return;} @@ -177,6 +190,7 @@ namespace MarketData.Services MDTrace.WriteLine(LogLevel.DEBUG,"UPDATELATESTANALYSTRATINGS"); MDTrace.WriteLine(LogLevel.DEBUG,"UPDATELATESTPRICEWATCHLIST /WATCHLIST:"); MDTrace.WriteLine(LogLevel.DEBUG,"UPDATELATESTPRICEOPENPOSITIONS"); + MDTrace.WriteLine(LogLevel.DEBUG,"UPDATESECFILINGSWATCHLIST /WATCHLIST:"); } // ********************************************************************************************************************************************** @@ -199,6 +213,7 @@ namespace MarketData.Services MDTrace.WriteLine(LogLevel.DEBUG,$"Run date is not today: Current Date:{currentDate.ToShortDateString()} Run Date: {startDate.ToShortDateString()}"); return; } + if(!CheckRunCriteria()) { return; @@ -251,6 +266,7 @@ namespace MarketData.Services SMSClient.SendSMSEmail("UPDATEDAILY2 UPDATEPRICESBIGCHARTS/YAHOO done.", smsUserName, smsRecipients, smsSMTPAddress, smsUserName, smsPassword); }); resetEvents[STAGE_1].WaitOne(); // wait for pricing to finish + ThreadPool.QueueUserWorkItem(delegate { LoadConsumerPriceIndex(); // Load consumer price index data from Bureau of Labor Statistics @@ -320,7 +336,8 @@ namespace MarketData.Services ThreadPool.QueueUserWorkItem(delegate { resetEvents[STAGE_12].Set(); - }); + }); + WaitHandle.WaitAll(resetEvents); } diff --git a/MarketData/MarketDataLib/Helper/SECFilingMarketDataHelper.cs b/MarketData/MarketDataLib/Helper/SECFilingMarketDataHelper.cs index 2453bb5..05b5f6e 100755 --- a/MarketData/MarketDataLib/Helper/SECFilingMarketDataHelper.cs +++ b/MarketData/MarketDataLib/Helper/SECFilingMarketDataHelper.cs @@ -1,112 +1,71 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading; -using MarketData.MarketDataModel; +using MarketData.MarketDataModel; using MarketData.DataAccess; using MarketData.Utils; namespace MarketData.Helper { - public class SECFilingMarketDataHelper + public class SECFilingMarketDataHelper : MarketDataHelperBase { private static int MaxThreads = 5; // 10 requests per second is what is allowable under SEC.GOV. We'll request 5 symbols per batch. Note:each request may contain subrequests - private static int WAIT_TIME_MS=1000; // wait between requests - private List symbols; - private int currentIndex = 0; public SECFilingMarketDataHelper() { } - public bool UpdateSECFilings(List symbols) - { - Profiler profiler=new Profiler(); - try - { - this.symbols=symbols; - currentIndex=0; - while (true) - { - List queueSymbols = GetQueueSymbols(); - if (null == queueSymbols || 0 == queueSymbols.Count) break; - ManualResetEvent[] resetEvents = new ManualResetEvent[queueSymbols.Count]; - for (int eventIndex = 0; eventIndex < resetEvents.Length; eventIndex++) - { - resetEvents[eventIndex] = new ManualResetEvent(false); - } - for (int index = 0; index < queueSymbols.Count; index++) - { - ThreadHelper threadHelper = new ThreadHelper(queueSymbols[index],resetEvents[index]); - ThreadPool.QueueUserWorkItem(ThreadPoolCallbackUpdateSECFiling, threadHelper); - try { Thread.Sleep(WAIT_TIME_MS); } catch(Exception) { ;} // SEC has a traffic limit - } - MDTrace.WriteLine(LogLevel.DEBUG,"Load SEC Filings, waiting for queued items to complete."); - WaitHandle.WaitAll(resetEvents); - } - return true; - } - catch(Exception exception) - { - MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[UpdateSECFilings]Exception {0}",exception.ToString())); - return false; - } - finally - { - MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[UpdateSECFilings]End, total took {0}(ms)",profiler.End())); - } - } - public bool UpdateSECFilings() - { - Profiler profiler=new Profiler(); - try - { - List watchListSymbols = WatchListDA.GetWatchList("Valuations"); // get the current watchlist symbols - List secFilingSymbols=SECFilingDA.GetDistinctFilingSymbols(); // get the current SEC filing symbols - symbols=watchListSymbols.Union(secFilingSymbols).Distinct().ToList(); // use the union of the two - currentIndex = 0; - while (true) - { - List queueSymbols = GetQueueSymbols(); - if (null == queueSymbols || 0 == queueSymbols.Count) break; - ManualResetEvent[] resetEvents = new ManualResetEvent[queueSymbols.Count]; - for (int eventIndex = 0; eventIndex < resetEvents.Length; eventIndex++) - { - resetEvents[eventIndex] = new ManualResetEvent(false); - } - for (int index = 0; index < queueSymbols.Count; index++) - { - ThreadHelper threadHelper = new ThreadHelper(queueSymbols[index],resetEvents[index]); - ThreadPool.QueueUserWorkItem(ThreadPoolCallbackUpdateSECFiling, threadHelper); - try { Thread.Sleep(WAIT_TIME_MS); }catch(Exception) { ;} // SEC has a traffic limit - } - MDTrace.WriteLine(LogLevel.DEBUG,"Load SEC Filings, waiting for queued items to complete."); - WaitHandle.WaitAll(resetEvents); - } - return true; - } - catch(Exception exception) - { - MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[UpdateSECFilings]Exception {0}",exception.ToString())); - return false; - } - finally - { - MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[UpdateSECFilings]End, total took {0}(ms)",profiler.End())); - } - } - private List GetQueueSymbols() + public bool UpdateSECFilings(List symbols) { - List queueSymbols = new List(); - int index = currentIndex; - for (; index < currentIndex + MaxThreads && index < symbols.Count; index++) + Profiler profiler=new Profiler(); + try { - queueSymbols.Add(symbols[index]); + if(null==symbols || 0==symbols.Count)return false; + Queue=symbols; + Index=-1; + ManualResetEvent[] resetEvents = new ManualResetEvent[MaxThreads]; + for (int eventIndex = 0; eventIndex < resetEvents.Length; eventIndex++)resetEvents[eventIndex] = new ManualResetEvent(true); + MDTrace.WriteLine(String.Format("Queuing SEC Filings Load ...")); + while (true) + { + ManualResetEvent[] availableEvents=GetAvailableEvents(resetEvents); + ManualResetEvent[] busyEvents=GetBusyEvents(resetEvents); + if (null == PeekQueueItem() && 0==busyEvents.Length) + { + MDTrace.WriteLine(LogLevel.DEBUG,String.Format("SECFilings queue contains {0} items, busy events {1}, all done.",0,busyEvents.Length)); + break; + } + for (int index = 0; index < availableEvents.Length; index++) + { + String symbol=GetQueueItem(); + if (null != symbol) + { + availableEvents[index].Reset(); + ThreadHelper threadHelper = new ThreadHelper(symbol,availableEvents[index]); + ThreadPool.QueueUserWorkItem(ThreadPoolCallbackUpdateSECFiling, threadHelper); + } + else + { + busyEvents=GetBusyEvents(resetEvents); + if(busyEvents.Length!=availableEvents.Length) + { + ManualResetEvent[] resizedEvents=new ManualResetEvent[busyEvents.Length]; + Array.Copy(busyEvents, resizedEvents,busyEvents.Length); + resetEvents = resizedEvents; + } + break; + } + } // for + MDTrace.WriteLine(LogLevel.DEBUG,"SECFilings waiting for free slots..."); + if(resetEvents.Length>0)WaitHandle.WaitAny(resetEvents); + if(null==PeekQueueItem())resetEvents=ResizeEvents(resetEvents); + } // while + MDTrace.WriteLine(LogLevel.DEBUG,"SECFilings completed."); + return true; + } + finally + { + MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[UpdateSECFilings] End, total took {0}(ms)",profiler.End())); } - currentIndex = index; - return queueSymbols; } + public void ThreadPoolCallbackUpdateSECFiling(Object threadHelperContext) { ThreadHelper threadHelper = (ThreadHelper)threadHelperContext; @@ -121,6 +80,7 @@ namespace MarketData.Helper threadHelper.ResetEvent.Set(); } } + public static void UpdateSECFiling(String symbol) { try @@ -136,16 +96,16 @@ namespace MarketData.Helper SECFilings secFilings = MarketDataHelper.GetSECFilings(symbol, cik); if (null != secFilings) { - for (int index = 0; index < secFilings.Count; index++) - { - StringBuilder sb = new StringBuilder(); - SECFiling secFiling = secFilings[index]; - sb.Append(secFiling.Symbol).Append(","); - sb.Append(Utility.DateTimeToStringYYYYHMMHDD(secFiling.FilingDate)).Append(","); - sb.Append(secFiling.SECAccessionNumber).Append(","); - sb.Append(secFiling.Sequence); - MDTrace.WriteLine(LogLevel.DEBUG,sb.ToString()); - } + // for (int index = 0; index < secFilings.Count; index++) + // { + // StringBuilder sb = new StringBuilder(); + // SECFiling secFiling = secFilings[index]; + // sb.Append(secFiling.Symbol).Append(","); + // sb.Append(Utility.DateTimeToStringYYYYHMMHDD(secFiling.FilingDate)).Append(","); + // sb.Append(secFiling.SECAccessionNumber).Append(","); + // sb.Append(secFiling.Sequence); + // MDTrace.WriteLine(LogLevel.DEBUG,sb.ToString()); + // } MDTrace.WriteLine(LogLevel.DEBUG,"Got "+secFilings.Count+" filings for symbol '"+symbol+"'"); SECFilingDA.InsertOrUpdateSECFilings(secFilings); }