Refactor the SECFilings pull

This commit is contained in:
2025-04-02 11:52:11 -04:00
parent fba7825fab
commit 0315575dda
2 changed files with 81 additions and 104 deletions

View File

@@ -32,9 +32,11 @@ namespace MarketData.Services
tasks.Add("UPDATELATESTPRICEWATCHLIST",TaskUpdateLatestPriceWatchList); tasks.Add("UPDATELATESTPRICEWATCHLIST",TaskUpdateLatestPriceWatchList);
tasks.Add("UPDATELATESTANALYSTRATINGS",TaskUpdateLatestAnalystRatings); tasks.Add("UPDATELATESTANALYSTRATINGS",TaskUpdateLatestAnalystRatings);
tasks.Add("UPDATEANALYSTRATINGS",TaskUpdateAnalystRatings); tasks.Add("UPDATEANALYSTRATINGS",TaskUpdateAnalystRatings);
tasks.Add("UPDATESECFILINGSWATCHLIST",TaskUpdateSECFilingsWatchList);
tasks.Add("ECHO",TaskEcho); tasks.Add("ECHO",TaskEcho);
GlobalConfig.Instance.Configuration = configuration; // This call sets up configuration stuff so it needs to be first. GlobalConfig.Instance.Configuration = configuration; // This call sets up configuration stuff so it needs to be first.
if (args.Length < 1 || String.IsNullOrEmpty(args[0])) if (args.Length < 1 || String.IsNullOrEmpty(args[0]))
{ {
DisplayUsage(); DisplayUsage();
@@ -84,6 +86,17 @@ namespace MarketData.Services
MDTrace.WriteLine(LogLevel.DEBUG,$"[RunService] Done, total took {profiler.End()}(ms)"); MDTrace.WriteLine(LogLevel.DEBUG,$"[RunService] Done, total took {profiler.End()}(ms)");
} }
// *********************************************************************************************************************************************************************************
public async Task TaskUpdateSECFilingsWatchList(CommandArgs commandArgs)
{
if(!commandArgs.Has("WATCHLIST")){Console.WriteLine("UPDATESECFILINGSWATCHLIST REQUIRES WATCHLIST");return;}
String watchListName = commandArgs.Coalesce<String>("WATCHLIST");
List<String> symbols = WatchListDA.GetWatchList(watchListName);
SECFilingMarketDataHelper secFilingMarketDataHelper=new SECFilingMarketDataHelper();
secFilingMarketDataHelper.UpdateSECFilings(symbols);
await Task.FromResult(true);
}
public async Task TaskLoadHeadlinesWatchList(CommandArgs commandArgs) public async Task TaskLoadHeadlinesWatchList(CommandArgs commandArgs)
{ {
if(!commandArgs.Has("WATCHLIST")){Console.WriteLine("LOADHEADLINESWATCHLIST REQUIRES WATCHLIST");return;} if(!commandArgs.Has("WATCHLIST")){Console.WriteLine("LOADHEADLINESWATCHLIST REQUIRES WATCHLIST");return;}
@@ -177,6 +190,7 @@ namespace MarketData.Services
MDTrace.WriteLine(LogLevel.DEBUG,"UPDATELATESTANALYSTRATINGS"); MDTrace.WriteLine(LogLevel.DEBUG,"UPDATELATESTANALYSTRATINGS");
MDTrace.WriteLine(LogLevel.DEBUG,"UPDATELATESTPRICEWATCHLIST /WATCHLIST:"); MDTrace.WriteLine(LogLevel.DEBUG,"UPDATELATESTPRICEWATCHLIST /WATCHLIST:");
MDTrace.WriteLine(LogLevel.DEBUG,"UPDATELATESTPRICEOPENPOSITIONS"); MDTrace.WriteLine(LogLevel.DEBUG,"UPDATELATESTPRICEOPENPOSITIONS");
MDTrace.WriteLine(LogLevel.DEBUG,"UPDATESECFILINGSWATCHLIST /WATCHLIST:");
} }
// ********************************************************************************************************************************************** // **********************************************************************************************************************************************
@@ -199,6 +213,7 @@ namespace MarketData.Services
MDTrace.WriteLine(LogLevel.DEBUG,$"Run date is not today: Current Date:{currentDate.ToShortDateString()} Run Date: {startDate.ToShortDateString()}"); MDTrace.WriteLine(LogLevel.DEBUG,$"Run date is not today: Current Date:{currentDate.ToShortDateString()} Run Date: {startDate.ToShortDateString()}");
return; return;
} }
if(!CheckRunCriteria()) if(!CheckRunCriteria())
{ {
return; return;
@@ -251,6 +266,7 @@ namespace MarketData.Services
SMSClient.SendSMSEmail("UPDATEDAILY2 UPDATEPRICESBIGCHARTS/YAHOO done.", smsUserName, smsRecipients, smsSMTPAddress, smsUserName, smsPassword); SMSClient.SendSMSEmail("UPDATEDAILY2 UPDATEPRICESBIGCHARTS/YAHOO done.", smsUserName, smsRecipients, smsSMTPAddress, smsUserName, smsPassword);
}); });
resetEvents[STAGE_1].WaitOne(); // wait for pricing to finish resetEvents[STAGE_1].WaitOne(); // wait for pricing to finish
ThreadPool.QueueUserWorkItem(delegate ThreadPool.QueueUserWorkItem(delegate
{ {
LoadConsumerPriceIndex(); // Load consumer price index data from Bureau of Labor Statistics LoadConsumerPriceIndex(); // Load consumer price index data from Bureau of Labor Statistics
@@ -320,7 +336,8 @@ namespace MarketData.Services
ThreadPool.QueueUserWorkItem(delegate ThreadPool.QueueUserWorkItem(delegate
{ {
resetEvents[STAGE_12].Set(); resetEvents[STAGE_12].Set();
}); });
WaitHandle.WaitAll(resetEvents); WaitHandle.WaitAll(resetEvents);
} }

View File

@@ -1,112 +1,71 @@
using System; using MarketData.MarketDataModel;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using MarketData.MarketDataModel;
using MarketData.DataAccess; using MarketData.DataAccess;
using MarketData.Utils; using MarketData.Utils;
namespace MarketData.Helper namespace MarketData.Helper
{ {
public class SECFilingMarketDataHelper public class SECFilingMarketDataHelper : MarketDataHelperBase<String>
{ {
private static int MaxThreads = 5; // 10 requests per second is what is allowable under SEC.GOV. We'll request 5 symbols per batch. Note:each request may contain subrequests private static int MaxThreads = 5; // 10 requests per second is what is allowable under SEC.GOV. We'll request 5 symbols per batch. Note:each request may contain subrequests
private static int WAIT_TIME_MS=1000; // wait between requests
private List<String> symbols;
private int currentIndex = 0;
public SECFilingMarketDataHelper() public SECFilingMarketDataHelper()
{ {
} }
public bool UpdateSECFilings(List<String> symbols)
{
Profiler profiler=new Profiler();
try public bool UpdateSECFilings(List<String> symbols)
{
this.symbols=symbols;
currentIndex=0;
while (true)
{
List<String> queueSymbols = GetQueueSymbols();
if (null == queueSymbols || 0 == queueSymbols.Count) break;
ManualResetEvent[] resetEvents = new ManualResetEvent[queueSymbols.Count];
for (int eventIndex = 0; eventIndex < resetEvents.Length; eventIndex++)
{
resetEvents[eventIndex] = new ManualResetEvent(false);
}
for (int index = 0; index < queueSymbols.Count; index++)
{
ThreadHelper threadHelper = new ThreadHelper(queueSymbols[index],resetEvents[index]);
ThreadPool.QueueUserWorkItem(ThreadPoolCallbackUpdateSECFiling, threadHelper);
try { Thread.Sleep(WAIT_TIME_MS); } catch(Exception) { ;} // SEC has a traffic limit
}
MDTrace.WriteLine(LogLevel.DEBUG,"Load SEC Filings, waiting for queued items to complete.");
WaitHandle.WaitAll(resetEvents);
}
return true;
}
catch(Exception exception)
{
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[UpdateSECFilings]Exception {0}",exception.ToString()));
return false;
}
finally
{
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[UpdateSECFilings]End, total took {0}(ms)",profiler.End()));
}
}
public bool UpdateSECFilings()
{
Profiler profiler=new Profiler();
try
{
List<String> watchListSymbols = WatchListDA.GetWatchList("Valuations"); // get the current watchlist symbols
List<String> secFilingSymbols=SECFilingDA.GetDistinctFilingSymbols(); // get the current SEC filing symbols
symbols=watchListSymbols.Union(secFilingSymbols).Distinct().ToList(); // use the union of the two
currentIndex = 0;
while (true)
{
List<String> queueSymbols = GetQueueSymbols();
if (null == queueSymbols || 0 == queueSymbols.Count) break;
ManualResetEvent[] resetEvents = new ManualResetEvent[queueSymbols.Count];
for (int eventIndex = 0; eventIndex < resetEvents.Length; eventIndex++)
{
resetEvents[eventIndex] = new ManualResetEvent(false);
}
for (int index = 0; index < queueSymbols.Count; index++)
{
ThreadHelper threadHelper = new ThreadHelper(queueSymbols[index],resetEvents[index]);
ThreadPool.QueueUserWorkItem(ThreadPoolCallbackUpdateSECFiling, threadHelper);
try { Thread.Sleep(WAIT_TIME_MS); }catch(Exception) { ;} // SEC has a traffic limit
}
MDTrace.WriteLine(LogLevel.DEBUG,"Load SEC Filings, waiting for queued items to complete.");
WaitHandle.WaitAll(resetEvents);
}
return true;
}
catch(Exception exception)
{
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[UpdateSECFilings]Exception {0}",exception.ToString()));
return false;
}
finally
{
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[UpdateSECFilings]End, total took {0}(ms)",profiler.End()));
}
}
private List<String> GetQueueSymbols()
{ {
List<String> queueSymbols = new List<String>(); Profiler profiler=new Profiler();
int index = currentIndex; try
for (; index < currentIndex + MaxThreads && index < symbols.Count; index++)
{ {
queueSymbols.Add(symbols[index]); if(null==symbols || 0==symbols.Count)return false;
Queue=symbols;
Index=-1;
ManualResetEvent[] resetEvents = new ManualResetEvent[MaxThreads];
for (int eventIndex = 0; eventIndex < resetEvents.Length; eventIndex++)resetEvents[eventIndex] = new ManualResetEvent(true);
MDTrace.WriteLine(String.Format("Queuing SEC Filings Load ..."));
while (true)
{
ManualResetEvent[] availableEvents=GetAvailableEvents(resetEvents);
ManualResetEvent[] busyEvents=GetBusyEvents(resetEvents);
if (null == PeekQueueItem() && 0==busyEvents.Length)
{
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("SECFilings queue contains {0} items, busy events {1}, all done.",0,busyEvents.Length));
break;
}
for (int index = 0; index < availableEvents.Length; index++)
{
String symbol=GetQueueItem();
if (null != symbol)
{
availableEvents[index].Reset();
ThreadHelper threadHelper = new ThreadHelper(symbol,availableEvents[index]);
ThreadPool.QueueUserWorkItem(ThreadPoolCallbackUpdateSECFiling, threadHelper);
}
else
{
busyEvents=GetBusyEvents(resetEvents);
if(busyEvents.Length!=availableEvents.Length)
{
ManualResetEvent[] resizedEvents=new ManualResetEvent[busyEvents.Length];
Array.Copy(busyEvents, resizedEvents,busyEvents.Length);
resetEvents = resizedEvents;
}
break;
}
} // for
MDTrace.WriteLine(LogLevel.DEBUG,"SECFilings waiting for free slots...");
if(resetEvents.Length>0)WaitHandle.WaitAny(resetEvents);
if(null==PeekQueueItem())resetEvents=ResizeEvents(resetEvents);
} // while
MDTrace.WriteLine(LogLevel.DEBUG,"SECFilings completed.");
return true;
}
finally
{
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[UpdateSECFilings] End, total took {0}(ms)",profiler.End()));
} }
currentIndex = index;
return queueSymbols;
} }
public void ThreadPoolCallbackUpdateSECFiling(Object threadHelperContext) public void ThreadPoolCallbackUpdateSECFiling(Object threadHelperContext)
{ {
ThreadHelper threadHelper = (ThreadHelper)threadHelperContext; ThreadHelper threadHelper = (ThreadHelper)threadHelperContext;
@@ -121,6 +80,7 @@ namespace MarketData.Helper
threadHelper.ResetEvent.Set(); threadHelper.ResetEvent.Set();
} }
} }
public static void UpdateSECFiling(String symbol) public static void UpdateSECFiling(String symbol)
{ {
try try
@@ -136,16 +96,16 @@ namespace MarketData.Helper
SECFilings secFilings = MarketDataHelper.GetSECFilings(symbol, cik); SECFilings secFilings = MarketDataHelper.GetSECFilings(symbol, cik);
if (null != secFilings) if (null != secFilings)
{ {
for (int index = 0; index < secFilings.Count; index++) // for (int index = 0; index < secFilings.Count; index++)
{ // {
StringBuilder sb = new StringBuilder(); // StringBuilder sb = new StringBuilder();
SECFiling secFiling = secFilings[index]; // SECFiling secFiling = secFilings[index];
sb.Append(secFiling.Symbol).Append(","); // sb.Append(secFiling.Symbol).Append(",");
sb.Append(Utility.DateTimeToStringYYYYHMMHDD(secFiling.FilingDate)).Append(","); // sb.Append(Utility.DateTimeToStringYYYYHMMHDD(secFiling.FilingDate)).Append(",");
sb.Append(secFiling.SECAccessionNumber).Append(","); // sb.Append(secFiling.SECAccessionNumber).Append(",");
sb.Append(secFiling.Sequence); // sb.Append(secFiling.Sequence);
MDTrace.WriteLine(LogLevel.DEBUG,sb.ToString()); // MDTrace.WriteLine(LogLevel.DEBUG,sb.ToString());
} // }
MDTrace.WriteLine(LogLevel.DEBUG,"Got "+secFilings.Count+" filings for symbol '"+symbol+"'"); MDTrace.WriteLine(LogLevel.DEBUG,"Got "+secFilings.Count+" filings for symbol '"+symbol+"'");
SECFilingDA.InsertOrUpdateSECFilings(secFilings); SECFilingDA.InsertOrUpdateSECFilings(secFilings);
} }