using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using MarketData.MarketDataModel; using MarketData.DataAccess; using MarketData.Utils; namespace MarketData.Helper { public class OptionsChainsMarketDataHelper : MarketDataHelperBase { private static int MaxThreads = 10; public OptionsChainsMarketDataHelper() { } // ****************************************************************************************************************************************************** // ********************************************************** O P T I M I Z E D E V E N T H A N D L E R S T A R T *********************************** // ****************************************************************************************************************************************************** public bool LoadOptionsChains(String symbol) { List symbols=new List(); symbols.Add(symbol); return LoadOptionsChains(symbols); } public bool LoadOptionsChains(List symbols) { Profiler profiler=new Profiler(); try { Queue=symbols; Index=-1; ManualResetEvent[] resetEvents = new ManualResetEvent[MaxThreads]; for (int eventIndex = 0; eventIndex < resetEvents.Length; eventIndex++)resetEvents[eventIndex] = new ManualResetEvent(true); MDTrace.WriteLine(String.Format("Queuing options chains ...")); while (true) { ManualResetEvent[] availableEvents=GetAvailableEvents(resetEvents); ManualResetEvent[] busyEvents=GetBusyEvents(resetEvents); if (null == PeekQueueItem() && 0==busyEvents.Length) { MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Queue contains {0} items, busy events {1}, all done.",0,busyEvents.Length)); break; } for (int index = 0; index < availableEvents.Length; index++) { String symbol=GetQueueItem(); if (null != symbol) { availableEvents[index].Reset(); ThreadHelper optionsChainsThreadHelper = new ThreadHelper(symbol, availableEvents[index]); ThreadPool.QueueUserWorkItem(ThreadPoolCallback, optionsChainsThreadHelper); } else { busyEvents=GetBusyEvents(resetEvents); if(busyEvents.Length!=availableEvents.Length) { ManualResetEvent[] resizedEvents=new ManualResetEvent[busyEvents.Length]; Array.Copy(busyEvents, resizedEvents,busyEvents.Length); resetEvents = resizedEvents; } break; } } // for MDTrace.WriteLine(LogLevel.DEBUG,"Waiting for free slots..."); if(resetEvents.Length>0)WaitHandle.WaitAny(resetEvents); if(null==PeekQueueItem())resetEvents=ResizeEvents(resetEvents); } // while MDTrace.WriteLine(LogLevel.DEBUG,"Options chains handler completed."); return true; } finally { MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[LoadOptionsChains]End, total took {0}(ms)",profiler.End())); } } public void ThreadPoolCallback(Object optionsChainsThreadHelperContext) { ThreadHelper optionsChainsThreadHelper = (ThreadHelper)optionsChainsThreadHelperContext; MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Options Chains, Thread {0} started for {1}...", Thread.CurrentThread.ManagedThreadId, optionsChainsThreadHelper.Symbol)); LoadOptionsChainsSymbolEx(optionsChainsThreadHelper.Symbol); MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Options Chains, Thread {0} ended for {1}", Thread.CurrentThread.ManagedThreadId, optionsChainsThreadHelper.Symbol)); optionsChainsThreadHelper.ResetEvent.Set(); } public static void LoadOptionsChainsSymbolEx(String symbol) { if(null==symbol)return; symbol = symbol.ToUpper(); MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Load Options Chains for {0}", symbol)); Options options = MarketDataHelper.GetOptions(symbol); if (null == options || 0 == options.Count) { MDTrace.WriteLine(LogLevel.DEBUG,"Unable to retrieve option chain for " + symbol); return; } MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Options Chains, saving {0} records for {1}", options.Count, symbol)); if (OptionsDA.AddOptions(options)) MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Options Chains, Added {0} for {1}", options.Count, symbol)); else MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Options Chains, Failed to add options chain for {0}",symbol)); } } }