From 3446a0f9fbb493b62c6fa6274f89c14f1cefcec2 Mon Sep 17 00:00:00 2001 From: Sean Date: Fri, 4 Apr 2025 12:19:15 -0400 Subject: [PATCH] Fix threading in InsiderTransactionMarketDaatHelper --- .../InsiderTransactionMarketDataHelper.cs | 234 +++++++++--------- 1 file changed, 115 insertions(+), 119 deletions(-) diff --git a/MarketData/MarketDataLib/Helper/InsiderTransactionMarketDataHelper.cs b/MarketData/MarketDataLib/Helper/InsiderTransactionMarketDataHelper.cs index ab41b4c..7e6c63e 100755 --- a/MarketData/MarketDataLib/Helper/InsiderTransactionMarketDataHelper.cs +++ b/MarketData/MarketDataLib/Helper/InsiderTransactionMarketDataHelper.cs @@ -1,150 +1,146 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading; -using MarketData.MarketDataModel; +using MarketData.MarketDataModel; using MarketData.DataAccess; using MarketData.Utils; namespace MarketData.Helper { - public class InsiderTransactionThreadHelper : ThreadHelper { public InsiderTransactionThreadHelper(String symbol, ManualResetEvent resetEvent) : base(symbol, resetEvent) { } - public int YearGreaterEqual { get; set; } + public int YearGreaterEqual { get; set; } = -1; } - public class InsiderTransactionMarketDataHelper + public class InsiderTransactionMarketDataHelper : MarketDataHelperBase { private static int MaxThreads = 10; // 10 threads avoids receiving HTTP Response 429 (Too many requests) private static int WAIT_TIME_MS=500; // wait between request - private List symbols; - private int currentIndex = 0; private UpdateManager UpdateManager = new UpdateManager(); public InsiderTransactionMarketDataHelper() { } + public bool LoadInsiderTransactions(String symbol) { - Profiler profiler=new Profiler(); - try - { - symbols = new List(); - symbols.Add(symbol); - currentIndex = 0; - while (true) - { - List queueSymbols = GetQueueSymbols(); - if (null == queueSymbols || 0 == queueSymbols.Count) break; - ManualResetEvent[] resetEvents = new ManualResetEvent[queueSymbols.Count]; - for (int eventIndex = 0; eventIndex < resetEvents.Length; eventIndex++) - { - resetEvents[eventIndex] = new ManualResetEvent(false); - } - for (int index = 0; index < queueSymbols.Count; index++) - { - ThreadHelper threadHelper = new ThreadHelper(queueSymbols[index], resetEvents[index]); - ThreadPool.QueueUserWorkItem(ThreadPoolCallback, threadHelper); - } - MDTrace.WriteLine(LogLevel.DEBUG,"Insider Transactions, waiting for queued items to complete."); - WaitHandle.WaitAll(resetEvents); - } - return true; - } - finally - { - MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[LoadInsiderTransactions]End, total took {0}(ms)",profiler.End())); - } + return LoadInsiderTransactions(new String[]{symbol}.ToList()); } + public bool LoadInsiderTransactions(List symbols) { - Profiler profiler=new Profiler(); - try - { - this.symbols = symbols; - currentIndex = 0; - while (true) - { - List queueSymbols = GetQueueSymbols(); - if (null == queueSymbols || 0 == queueSymbols.Count) break; - ManualResetEvent[] resetEvents = new ManualResetEvent[queueSymbols.Count]; - for (int eventIndex = 0; eventIndex < resetEvents.Length; eventIndex++) - { - resetEvents[eventIndex] = new ManualResetEvent(false); - } - for (int index = 0; index < queueSymbols.Count; index++) - { - ThreadHelper threadHelper = new ThreadHelper(queueSymbols[index], resetEvents[index]); - ThreadPool.QueueUserWorkItem(ThreadPoolCallback, threadHelper); - try { Thread.Sleep(WAIT_TIME_MS); } catch(Exception) { ;} // wait - } - MDTrace.WriteLine(LogLevel.DEBUG,"Insider Transactions, waiting for queued items to complete."); - WaitHandle.WaitAll(resetEvents); - } - return true; - } - finally - { - MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[LoadInsiderTransactions]End, total took {0}(ms)",profiler.End())); - } - } + Profiler profiler=new Profiler(); + try + { + if(null==symbols || 0==symbols.Count)return false; + Queue=symbols; + Index=-1; + ManualResetEvent[] resetEvents = new ManualResetEvent[MaxThreads]; + for (int eventIndex = 0; eventIndex < resetEvents.Length; eventIndex++)resetEvents[eventIndex] = new ManualResetEvent(true); + MDTrace.WriteLine(LogLevel.DEBUG,$"[LoadInsiderTransactions] Queuing LoadInsiderTransactions Load {symbols.Count} symbols MaxThreads:{MaxThreads}..."); + while (true) + { + ManualResetEvent[] availableEvents=GetAvailableEvents(resetEvents); + ManualResetEvent[] busyEvents=GetBusyEvents(resetEvents); + if (null == PeekQueueItem() && 0==busyEvents.Length) + { + MDTrace.WriteLine(LogLevel.DEBUG,$"[LoadInsiderTransactions] queue contains {0} items, busy events {busyEvents.Length}, all done."); + break; + } + for (int index = 0; index < availableEvents.Length; index++) + { + String symbol=GetQueueItem(); + if (null != symbol) + { + availableEvents[index].Reset(); + ThreadHelper threadHelper = new ThreadHelper(symbol,availableEvents[index]); + ThreadPool.QueueUserWorkItem(ThreadPoolCallback, threadHelper); + try { Thread.Sleep(WAIT_TIME_MS); } catch(Exception) { ;} // wait + } + else + { + busyEvents=GetBusyEvents(resetEvents); + if(busyEvents.Length!=availableEvents.Length) + { + ManualResetEvent[] resizedEvents=new ManualResetEvent[busyEvents.Length]; + Array.Copy(busyEvents, resizedEvents,busyEvents.Length); + resetEvents = resizedEvents; + } + break; + } + } // for + MDTrace.WriteLine(LogLevel.DEBUG,"[LoadInsiderTransactions] waiting for free slots..."); + if(resetEvents.Length>0)WaitHandle.WaitAny(resetEvents); + if(null==PeekQueueItem())resetEvents=ResizeEvents(resetEvents); + } // while + MDTrace.WriteLine(LogLevel.DEBUG,"[LoadInsiderTransactions] completed."); + return true; + } + finally + { + MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[LoadInsiderTransactions] End, total took {0}(ms)",profiler.End())); + } + } + public bool LoadInsiderTransactionsYearGreaterEqual(List symbols,int yearGreaterEqual) { - Profiler profiler=new Profiler(); - try - { - this.symbols = symbols; - currentIndex = 0; - - UpdateManager.Prepare("load_insider_transactions_year.txt", 7); // use max age 7 days - this.symbols=this.symbols.Except(new List(UpdateManager.Entries)).ToList(); - - while (true) - { - List queueSymbols = GetQueueSymbols(); - if (null == queueSymbols || 0 == queueSymbols.Count) break; - ManualResetEvent[] resetEvents = new ManualResetEvent[queueSymbols.Count]; - for (int eventIndex = 0; eventIndex < resetEvents.Length; eventIndex++) - { - resetEvents[eventIndex] = new ManualResetEvent(false); - } - for (int index = 0; index < queueSymbols.Count; index++) - { - InsiderTransactionThreadHelper threadHelper = new InsiderTransactionThreadHelper(queueSymbols[index], resetEvents[index]); - threadHelper.UpdateManager=UpdateManager; - threadHelper.YearGreaterEqual=yearGreaterEqual; - ThreadPool.QueueUserWorkItem(ThreadPoolCallbackYearGreaterEqual, threadHelper); - try { Thread.Sleep(WAIT_TIME_MS); } catch(Exception) { ;} // wait - } - MDTrace.WriteLine(LogLevel.DEBUG,"Insider Transactions, waiting for queued items to complete."); - WaitHandle.WaitAll(resetEvents); - } - return true; - } - finally - { - MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[LoadInsiderTransactions]End, total took {0}(ms)",profiler.End())); - } - } - - private List GetQueueSymbols() - { - List queueSymbols = new List(); - int index = currentIndex; - for (; index < currentIndex + MaxThreads && index < symbols.Count; index++) + Profiler profiler=new Profiler(); + try { - queueSymbols.Add(symbols[index]); + if(null==symbols || 0==symbols.Count)return false; + Queue=symbols; + Index=-1; + ManualResetEvent[] resetEvents = new ManualResetEvent[MaxThreads]; + for (int eventIndex = 0; eventIndex < resetEvents.Length; eventIndex++)resetEvents[eventIndex] = new ManualResetEvent(true); + MDTrace.WriteLine(LogLevel.DEBUG,$"[LoadInsiderTransactionsYearGreaterEqual] Queuing LoadInsiderTransactions Load {symbols} Year>={yearGreaterEqual} MaxThreads:{MaxThreads}..."); + while (true) + { + ManualResetEvent[] availableEvents=GetAvailableEvents(resetEvents); + ManualResetEvent[] busyEvents=GetBusyEvents(resetEvents); + if (null == PeekQueueItem() && 0==busyEvents.Length) + { + MDTrace.WriteLine(LogLevel.DEBUG,$"[LoadInsiderTransactionsYearGreaterEqual] queue contains {0} items, busy events {busyEvents.Length}, all done."); + break; + } + for (int index = 0; index < availableEvents.Length; index++) + { + String symbol=GetQueueItem(); + if (null != symbol) + { + availableEvents[index].Reset(); + InsiderTransactionThreadHelper threadHelper = new InsiderTransactionThreadHelper(symbol, availableEvents[index]); + threadHelper.UpdateManager=UpdateManager; + threadHelper.YearGreaterEqual=yearGreaterEqual; + ThreadPool.QueueUserWorkItem(ThreadPoolCallbackYearGreaterEqual, threadHelper); + try { Thread.Sleep(WAIT_TIME_MS); } catch(Exception) { ;} // wait + } + else + { + busyEvents=GetBusyEvents(resetEvents); + if(busyEvents.Length!=availableEvents.Length) + { + ManualResetEvent[] resizedEvents=new ManualResetEvent[busyEvents.Length]; + Array.Copy(busyEvents, resizedEvents,busyEvents.Length); + resetEvents = resizedEvents; + } + break; + } + } // for + MDTrace.WriteLine(LogLevel.DEBUG,"[LoadInsiderTransactionsYearGreaterEqual] waiting for free slots..."); + if(resetEvents.Length>0)WaitHandle.WaitAny(resetEvents); + if(null==PeekQueueItem())resetEvents=ResizeEvents(resetEvents); + } // while + MDTrace.WriteLine(LogLevel.DEBUG,"[LoadInsiderTransactionsYearGreaterEqual] completed."); + return true; } - currentIndex = index; - return queueSymbols; - } + finally + { + MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[LoadInsiderTransactionsYearGreaterEqual] End, total took {0}(ms)",profiler.End())); + } + } - public void ThreadPoolCallback(Object threadHelperContext) + private void ThreadPoolCallback(Object threadHelperContext) { ThreadHelper threadHelper = (ThreadHelper)threadHelperContext; MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Insider Transactions Thread {0} started for {1}...", Thread.CurrentThread.ManagedThreadId, threadHelper.Symbol)); @@ -153,7 +149,7 @@ namespace MarketData.Helper threadHelper.ResetEvent.Set(); } - public static void LoadInsiderTransactionsSymbolEx(String symbol) + private static void LoadInsiderTransactionsSymbolEx(String symbol) { symbol = symbol.ToUpper(); MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Load insider transactions for {0}", symbol)); @@ -168,7 +164,7 @@ namespace MarketData.Helper MDTrace.WriteLine(LogLevel.DEBUG,"Insider Transactions - Done."); } - public void ThreadPoolCallbackYearGreaterEqual(Object threadHelperContext) + private void ThreadPoolCallbackYearGreaterEqual(Object threadHelperContext) { InsiderTransactionThreadHelper threadHelper = (InsiderTransactionThreadHelper)threadHelperContext; MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Insider Transactions Thread {0} started for {1}...", Thread.CurrentThread.ManagedThreadId, threadHelper.Symbol)); @@ -178,7 +174,7 @@ namespace MarketData.Helper threadHelper.ResetEvent.Set(); } - public static void LoadInsiderTransactionsYearGreaterEqualEx(String symbol, int yearGreaterEqual) + private static void LoadInsiderTransactionsYearGreaterEqualEx(String symbol, int yearGreaterEqual) { symbol = symbol.ToUpper(); MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Load insider transactions for {0} years>={1}", symbol,yearGreaterEqual));