using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using MarketData.MarketDataModel; using MarketData.DataAccess; using MarketData.Utils; namespace MarketData.Helper { public class DividendHistoryThreadHelper : ThreadHelper { public DividendHistoryThreadHelper() { } public DividendHistoryThreadHelper(String symbol, ManualResetEvent resetEvent) { Symbol=symbol; ResetEvent=resetEvent; } public DividendHistoryThreadHelper(String symbol, DateTime modified,ManualResetEvent resetEvent) { Symbol=symbol; Modified=modified; ResetEvent=resetEvent; } } public class DividendHistoryMarketDataHelper { private static int MaxThreads =10; //2;//(int)ThreadHelperEnum.MaxThreads; private static int WAIT_TIME_MS=1000; // wait between request private static Random generator=new Random(); private List symbols; private int currentIndex = 0; private static int successHits=0; public DividendHistoryMarketDataHelper() { } public static int NextRandom(int min,int max) { return generator.Next(min,max); } public bool UpdateDividendHistory(String symbol) { Profiler profiler=new Profiler(); try { symbols = new List(); symbols.Add(symbol); currentIndex = 0; while (true) { List queueSymbols = GetQueueSymbols(); if (null == queueSymbols || 0 == queueSymbols.Count) break; ManualResetEvent[] resetEvents = new ManualResetEvent[queueSymbols.Count]; for (int eventIndex = 0; eventIndex < resetEvents.Length; eventIndex++) { resetEvents[eventIndex] = new ManualResetEvent(false); } for (int index = 0; index < queueSymbols.Count; index++) { ThreadHelper threadHelper = new DividendHistoryThreadHelper(queueSymbols[index],resetEvents[index]); ThreadPool.QueueUserWorkItem(ThreadPoolCallbackLoadDividendHistory, threadHelper); } MDTrace.WriteLine(LogLevel.DEBUG,"Load dividend history, waiting for queued items to complete."); WaitHandle.WaitAll(resetEvents); } return true; } finally { MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[UpdateDividendHistory]End, total took {0}(ms)",profiler.End())); } } public bool UpdateDividendHistory() { Profiler profiler=new Profiler(); try { symbols = PricingDA.GetSymbols(); currentIndex = 0; while (true) { List queueSymbols = GetQueueSymbols(); if (null == queueSymbols || 0 == queueSymbols.Count) break; ManualResetEvent[] resetEvents = new ManualResetEvent[queueSymbols.Count]; for (int eventIndex = 0; eventIndex < resetEvents.Length; eventIndex++) { resetEvents[eventIndex] = new ManualResetEvent(false); } for (int index = 0; index < queueSymbols.Count; index++) { ThreadHelper threadHelper = new DividendHistoryThreadHelper(queueSymbols[index],resetEvents[index]); ThreadPool.QueueUserWorkItem(ThreadPoolCallbackLoadDividendHistory, threadHelper); try { Thread.Sleep(WAIT_TIME_MS); }catch(Exception) { ;} } MDTrace.WriteLine(LogLevel.DEBUG,"Load dividend history, waiting for queued items to complete."); WaitHandle.WaitAll(resetEvents); } return true; } finally { MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[UpdateDividendHistory]End, total took {0}(ms)",profiler.End())); } } private List GetQueueSymbols() { List queueSymbols = new List(); int index = currentIndex; for (; index < currentIndex + MaxThreads && index < symbols.Count; index++) { queueSymbols.Add(symbols[index]); } currentIndex = index; return queueSymbols; } public void ThreadPoolCallbackLoadDividendHistory(Object threadHelperContext) { DividendHistoryThreadHelper threadHelper = (DividendHistoryThreadHelper)threadHelperContext; try { // nasdaq website is very bot sensitive. this is why all the sleeps and delays here in this block MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Load dividend history, Thread {0} started for {1}...", Thread.CurrentThread.ManagedThreadId, threadHelper.Symbol)); if(UpdateDividendHistoryEx(threadHelper.Symbol))successHits++; //if(successHits>100) //{ // successHits=0; // MDTrace.WriteLine(LogLevel.DEBUG,"Pausing for 60 seconds."); // Thread.Sleep(60000); //} //else if(1==DividendHistoryMarketDataHelper.NextRandom(1,20)) //{ // MDTrace.WriteLine(LogLevel.DEBUG,"Pausing for 10 seconds."); // Thread.Sleep(10000); //} //else if(1==DividendHistoryMarketDataHelper.NextRandom(1,100)) //{ // MDTrace.WriteLine(LogLevel.DEBUG,"Pausing for 30 seconds."); // Thread.Sleep(30000); //} //else Thread.Sleep(50); MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Load dividend history, Thread {0} ended for {1}", Thread.CurrentThread.ManagedThreadId, threadHelper.Symbol)); } finally { threadHelper.ResetEvent.Set(); } } public static bool UpdateDividendHistoryEx(String symbol) { MDTrace.WriteLine(LogLevel.DEBUG,"Retrieving dividend history for " + symbol); DividendHistory dividendHistory = MarketDataHelper.GetDividendHistory(symbol); if (null == dividendHistory || 0 == dividendHistory.Count) { MDTrace.WriteLine(LogLevel.DEBUG,"Unable to retrieve dividend history for " + symbol); return false; } MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Got {0} dividend history records for {1}", dividendHistory.Count, symbol)); MDTrace.WriteLine(LogLevel.DEBUG,"Adding..."); if (DividendHistoryDA.InsertOrUpdate(dividendHistory)) MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Added {0} dividend history records for {1}", dividendHistory.Count, symbol)); else MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Failed to update dividend history for {0}", symbol)); return true; } } }