Files
ARM64/MarketData/MarketDataLib/Helper/InsiderTransactionMarketDataHelper.cs
2025-11-07 05:52:30 -05:00

208 lines
9.6 KiB
C#
Executable File

using MarketData.MarketDataModel;
using MarketData.DataAccess;
using MarketData.Utils;
namespace MarketData.Helper
{
public class InsiderTransactionThreadHelper : ThreadHelper
{
public InsiderTransactionThreadHelper(String symbol, ManualResetEvent resetEvent)
: base(symbol, resetEvent)
{
}
public int YearGreaterEqual { get; set; } = -1;
}
public class InsiderTransactionMarketDataHelper : MarketDataHelperBase<String>
{
private static int MaxThreads = 10; // 10 threads avoids receiving HTTP Response 429 (Too many requests)
private static int WAIT_TIME_MS=500; // wait between request
private UpdateManager UpdateManager = new UpdateManager();
public InsiderTransactionMarketDataHelper()
{
}
public bool LoadInsiderTransactions(String symbol)
{
return LoadInsiderTransactions(new String[]{symbol}.ToList<String>());
}
public bool LoadInsiderTransactions(List<String> symbols)
{
Profiler profiler=new Profiler();
try
{
if(null==symbols || 0==symbols.Count)return false;
Queue=symbols;
Index=-1;
ManualResetEvent[] resetEvents = new ManualResetEvent[MaxThreads];
for (int eventIndex = 0; eventIndex < resetEvents.Length; eventIndex++)resetEvents[eventIndex] = new ManualResetEvent(true);
MDTrace.WriteLine(LogLevel.DEBUG,$"[LoadInsiderTransactions] Queuing LoadInsiderTransactions Load {symbols.Count} symbols MaxThreads:{MaxThreads}...");
while (true)
{
ManualResetEvent[] availableEvents=GetAvailableEvents(resetEvents);
ManualResetEvent[] busyEvents=GetBusyEvents(resetEvents);
if (null == PeekQueueItem() && 0==busyEvents.Length)
{
MDTrace.WriteLine(LogLevel.DEBUG,$"[LoadInsiderTransactions] queue contains {0} items, busy events {busyEvents.Length}, all done.");
break;
}
for (int index = 0; index < availableEvents.Length; index++)
{
String symbol=GetQueueItem();
if (null != symbol)
{
availableEvents[index].Reset();
ThreadHelper threadHelper = new ThreadHelper(symbol,availableEvents[index]);
ThreadPool.QueueUserWorkItem(ThreadPoolCallback, threadHelper);
try { Thread.Sleep(WAIT_TIME_MS); } catch(Exception) { ;} // wait
}
else
{
busyEvents=GetBusyEvents(resetEvents);
if(busyEvents.Length!=availableEvents.Length)
{
ManualResetEvent[] resizedEvents=new ManualResetEvent[busyEvents.Length];
Array.Copy(busyEvents, resizedEvents,busyEvents.Length);
resetEvents = resizedEvents;
}
break;
}
} // for
MDTrace.WriteLine(LogLevel.DEBUG,"[LoadInsiderTransactions] waiting for free slots...");
if(resetEvents.Length>0)WaitHandle.WaitAny(resetEvents);
if(null==PeekQueueItem())resetEvents=ResizeEvents(resetEvents);
} // while
MDTrace.WriteLine(LogLevel.DEBUG,"[LoadInsiderTransactions] completed.");
return true;
}
finally
{
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[LoadInsiderTransactions] End, total took {0}(ms)",profiler.End()));
}
}
public bool LoadInsiderTransactionsYearGreaterEqual(List<String> symbols,int yearGreaterEqual)
{
Profiler profiler=new Profiler();
try
{
if(null==symbols || 0==symbols.Count)return false;
Queue=symbols;
Index=-1;
ManualResetEvent[] resetEvents = new ManualResetEvent[MaxThreads];
for (int eventIndex = 0; eventIndex < resetEvents.Length; eventIndex++)resetEvents[eventIndex] = new ManualResetEvent(true);
MDTrace.WriteLine(LogLevel.DEBUG,$"[LoadInsiderTransactionsYearGreaterEqual] Queuing LoadInsiderTransactions Load {symbols} Year>={yearGreaterEqual} MaxThreads:{MaxThreads}...");
while (true)
{
ManualResetEvent[] availableEvents=GetAvailableEvents(resetEvents);
ManualResetEvent[] busyEvents=GetBusyEvents(resetEvents);
if (null == PeekQueueItem() && 0==busyEvents.Length)
{
MDTrace.WriteLine(LogLevel.DEBUG,$"[LoadInsiderTransactionsYearGreaterEqual] queue contains {0} items, busy events {busyEvents.Length}, all done.");
break;
}
for (int index = 0; index < availableEvents.Length; index++)
{
String symbol=GetQueueItem();
if (null != symbol)
{
availableEvents[index].Reset();
InsiderTransactionThreadHelper threadHelper = new InsiderTransactionThreadHelper(symbol, availableEvents[index]);
threadHelper.UpdateManager=UpdateManager;
threadHelper.YearGreaterEqual=yearGreaterEqual;
ThreadPool.QueueUserWorkItem(ThreadPoolCallbackYearGreaterEqual, threadHelper);
try { Thread.Sleep(WAIT_TIME_MS); } catch(Exception) { ;} // wait
}
else
{
busyEvents=GetBusyEvents(resetEvents);
if(busyEvents.Length!=availableEvents.Length)
{
ManualResetEvent[] resizedEvents=new ManualResetEvent[busyEvents.Length];
Array.Copy(busyEvents, resizedEvents,busyEvents.Length);
resetEvents = resizedEvents;
}
break;
}
} // for
MDTrace.WriteLine(LogLevel.DEBUG,"[LoadInsiderTransactionsYearGreaterEqual] waiting for free slots...");
if(resetEvents.Length>0)WaitHandle.WaitAny(resetEvents);
if(null==PeekQueueItem())resetEvents=ResizeEvents(resetEvents);
} // while
MDTrace.WriteLine(LogLevel.DEBUG,"[LoadInsiderTransactionsYearGreaterEqual] completed.");
return true;
}
finally
{
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[LoadInsiderTransactionsYearGreaterEqual] End, total took {0}(ms)",profiler.End()));
}
}
private void ThreadPoolCallback(Object threadHelperContext)
{
ThreadHelper threadHelper = (ThreadHelper)threadHelperContext;
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Insider Transactions Thread {0} started for {1}...", Thread.CurrentThread.ManagedThreadId, threadHelper.Symbol));
LoadInsiderTransactionsSymbolEx(threadHelper.Symbol);
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Insider Transactions Thread {0} ended for {1}...", Thread.CurrentThread.ManagedThreadId, threadHelper.Symbol));
threadHelper.ResetEvent.Set();
}
private static void LoadInsiderTransactionsSymbolEx(String symbol)
{
Profiler profiler = new Profiler();
try
{
symbol = symbol.ToUpper();
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Load insider transactions for {0}", symbol));
InsiderTransactions insiderTransactions = MarketDataHelper.GetInsiderTransactions(symbol);
if (null == insiderTransactions || 0 == insiderTransactions.Count)
{
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("No insider transactions for {0}", symbol));
return;
}
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Insider Transactions, Saving {0} records for {1}", insiderTransactions.Count, symbol));
InsiderTransactionDA.InsertInsiderTransactions(insiderTransactions);
}
finally
{
MDTrace.WriteLine(LogLevel.DEBUG,$"{symbol} Done, took {profiler.End()}(ms)");
}
}
private void ThreadPoolCallbackYearGreaterEqual(Object threadHelperContext)
{
InsiderTransactionThreadHelper threadHelper = (InsiderTransactionThreadHelper)threadHelperContext;
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Insider Transactions Thread {0} started for {1}...", Thread.CurrentThread.ManagedThreadId, threadHelper.Symbol));
LoadInsiderTransactionsYearGreaterEqualEx(threadHelper.Symbol,threadHelper.YearGreaterEqual);
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Insider Transactions Thread {0} ended for {1}...", Thread.CurrentThread.ManagedThreadId, threadHelper.Symbol));
threadHelper.UpdateManager.Add(threadHelper.Symbol);
threadHelper.ResetEvent.Set();
}
private static void LoadInsiderTransactionsYearGreaterEqualEx(String symbol, int yearGreaterEqual)
{
Profiler profiler = new Profiler();
try
{
symbol = symbol.ToUpper();
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Load insider transactions for {0} years>={1}", symbol,yearGreaterEqual));
InsiderTransactions insiderTransactions = MarketDataHelper.GetInsiderTransactionsYear(symbol, yearGreaterEqual);
if (null == insiderTransactions || 0 == insiderTransactions.Count)
{
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("No insider transactions for {0} years>={1}", symbol,yearGreaterEqual));
return;
}
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Insider Transactions, Saving {0} records for {1} years>={2}", insiderTransactions.Count, symbol, yearGreaterEqual));
InsiderTransactionDA.DeleteInsiderTransactionsYearsGreaterEqual(symbol, yearGreaterEqual);
InsiderTransactionDA.InsertInsiderTransactions(insiderTransactions);
}
finally
{
MDTrace.WriteLine(LogLevel.DEBUG,$"{symbol} Done, took {profiler.End()}(ms)");
}
}
}
}