Files
ARM64/MarketData/MarketDataLib/Helper/SECFilingMarketDataHelper.cs
2025-04-02 11:52:11 -04:00

123 lines
4.8 KiB
C#
Executable File

using MarketData.MarketDataModel;
using MarketData.DataAccess;
using MarketData.Utils;
namespace MarketData.Helper
{
public class SECFilingMarketDataHelper : MarketDataHelperBase<String>
{
private static int MaxThreads = 5; // 10 requests per second is what is allowable under SEC.GOV. We'll request 5 symbols per batch. Note:each request may contain subrequests
public SECFilingMarketDataHelper()
{
}
public bool UpdateSECFilings(List<String> symbols)
{
Profiler profiler=new Profiler();
try
{
if(null==symbols || 0==symbols.Count)return false;
Queue=symbols;
Index=-1;
ManualResetEvent[] resetEvents = new ManualResetEvent[MaxThreads];
for (int eventIndex = 0; eventIndex < resetEvents.Length; eventIndex++)resetEvents[eventIndex] = new ManualResetEvent(true);
MDTrace.WriteLine(String.Format("Queuing SEC Filings Load ..."));
while (true)
{
ManualResetEvent[] availableEvents=GetAvailableEvents(resetEvents);
ManualResetEvent[] busyEvents=GetBusyEvents(resetEvents);
if (null == PeekQueueItem() && 0==busyEvents.Length)
{
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("SECFilings queue contains {0} items, busy events {1}, all done.",0,busyEvents.Length));
break;
}
for (int index = 0; index < availableEvents.Length; index++)
{
String symbol=GetQueueItem();
if (null != symbol)
{
availableEvents[index].Reset();
ThreadHelper threadHelper = new ThreadHelper(symbol,availableEvents[index]);
ThreadPool.QueueUserWorkItem(ThreadPoolCallbackUpdateSECFiling, threadHelper);
}
else
{
busyEvents=GetBusyEvents(resetEvents);
if(busyEvents.Length!=availableEvents.Length)
{
ManualResetEvent[] resizedEvents=new ManualResetEvent[busyEvents.Length];
Array.Copy(busyEvents, resizedEvents,busyEvents.Length);
resetEvents = resizedEvents;
}
break;
}
} // for
MDTrace.WriteLine(LogLevel.DEBUG,"SECFilings waiting for free slots...");
if(resetEvents.Length>0)WaitHandle.WaitAny(resetEvents);
if(null==PeekQueueItem())resetEvents=ResizeEvents(resetEvents);
} // while
MDTrace.WriteLine(LogLevel.DEBUG,"SECFilings completed.");
return true;
}
finally
{
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[UpdateSECFilings] End, total took {0}(ms)",profiler.End()));
}
}
public void ThreadPoolCallbackUpdateSECFiling(Object threadHelperContext)
{
ThreadHelper threadHelper = (ThreadHelper)threadHelperContext;
try
{
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Load SEC Filing, Thread {0} started for {1}...", Thread.CurrentThread.ManagedThreadId, threadHelper.Symbol));
UpdateSECFiling(threadHelper.Symbol);
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Load SEC Filing, Thread {0} ended for {1}", Thread.CurrentThread.ManagedThreadId, threadHelper.Symbol));
}
finally
{
threadHelper.ResetEvent.Set();
}
}
public static void UpdateSECFiling(String symbol)
{
try
{
MDTrace.WriteLine(LogLevel.DEBUG,"UpdateSECFiling '"+symbol+"'");
String cik = PricingDA.GetCIKForSymbol(symbol);
if (null == cik)
{
MDTrace.WriteLine(LogLevel.DEBUG,"No CIK for symbol '" + symbol + "'");
return;
}
MDTrace.WriteLine(LogLevel.DEBUG,"UpdateSECFiling for symbol '" + symbol + "'");
SECFilings secFilings = MarketDataHelper.GetSECFilings(symbol, cik);
if (null != secFilings)
{
// for (int index = 0; index < secFilings.Count; index++)
// {
// StringBuilder sb = new StringBuilder();
// SECFiling secFiling = secFilings[index];
// sb.Append(secFiling.Symbol).Append(",");
// sb.Append(Utility.DateTimeToStringYYYYHMMHDD(secFiling.FilingDate)).Append(",");
// sb.Append(secFiling.SECAccessionNumber).Append(",");
// sb.Append(secFiling.Sequence);
// MDTrace.WriteLine(LogLevel.DEBUG,sb.ToString());
// }
MDTrace.WriteLine(LogLevel.DEBUG,"Got "+secFilings.Count+" filings for symbol '"+symbol+"'");
SECFilingDA.InsertOrUpdateSECFilings(secFilings);
}
return;
}
catch (Exception exception)
{
MDTrace.WriteLine(LogLevel.DEBUG,exception);
return;
}
}
}
}