using MarketData.MarketDataModel; using MarketData.DataAccess; using MarketData.Utils; namespace MarketData.Helper { public class CompanyProfileThreadHelper : ThreadHelper { public CompanyProfile CompanyProfile{get;set;} public CompanyProfileThreadHelper() { } public CompanyProfileThreadHelper(CompanyProfile companyProfile, ManualResetEvent resetEvent) { CompanyProfile=companyProfile; ResetEvent=resetEvent; } } public class CompanyProfileMarketDataHelper : MarketDataHelperBase { private static int MaxThreads = (int)ThreadHelperEnum.MaxThreads; private static int SLEEP_TIME_MS = 500; public CompanyProfileMarketDataHelper() { } public bool UpdateCompanyProfiles() { return UpdateCompanyProfile(); } public bool UpdateCompanyProfile(String symbol=null) { Profiler profiler=new Profiler(); try { if(null==symbol) { Queue=CompanyProfileDA.GetCompanyProfiles(); } else { CompanyProfile companyProfile=CompanyProfileDA.GetCompanyProfile(symbol); if(null==companyProfile)return false; Queue = new CompanyProfile[]{companyProfile}.ToList(); } Index=-1; ManualResetEvent[] resetEvents = new ManualResetEvent[MaxThreads]; for (int eventIndex = 0; eventIndex < resetEvents.Length; eventIndex++)resetEvents[eventIndex] = new ManualResetEvent(true); MDTrace.WriteLine(String.Format("Queuing company profile fetches ...")); DateTime modified=DateTime.Now; while (true) { ManualResetEvent[] availableEvents=GetAvailableEvents(resetEvents); ManualResetEvent[] busyEvents=GetBusyEvents(resetEvents); MDTrace.WriteLine(LogLevel.DEBUG,$"Available:{availableEvents.Length} Busy:{busyEvents.Length}"); if (null == PeekQueueItem() && 0==busyEvents.Length) { MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Queue contains {0} items, busy events {1}, all done.",0,busyEvents.Length)); break; } for (int index = 0; index < availableEvents.Length; index++) { CompanyProfile queueItem=GetQueueItem(); if (null != queueItem) { availableEvents[index].Reset(); CompanyProfileThreadHelper companyProfileThreadHelper = new CompanyProfileThreadHelper(queueItem,availableEvents[index]); ThreadPool.QueueUserWorkItem(ThreadPoolCallback, companyProfileThreadHelper); try { Thread.Sleep(SLEEP_TIME_MS); }catch (Exception) { ;} } else { busyEvents=GetBusyEvents(resetEvents); if(busyEvents.Length!=availableEvents.Length) { ManualResetEvent[] resizedEvents=new ManualResetEvent[busyEvents.Length]; Array.Copy(busyEvents, resizedEvents,busyEvents.Length); resetEvents = resizedEvents; } break; } } // for MDTrace.WriteLine(LogLevel.DEBUG,"[UpdateCompanyProfile] waiting for free slots..."); if(resetEvents.Length>0)WaitHandle.WaitAny(resetEvents); if(null==PeekQueueItem())resetEvents=ResizeEvents(resetEvents); } // while MDTrace.WriteLine(LogLevel.DEBUG,"[UpdateCompanyProfile] completed."); return true; } finally { MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[UpdateCompanyProfile] End, total took {0}(ms)",profiler.End())); } } public void ThreadPoolCallback(Object companyProfileThreadHelperContext) { CompanyProfileThreadHelper companyProfileThreadHelper = (CompanyProfileThreadHelper)companyProfileThreadHelperContext; try { MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[UpdateCompanyProfile] Thread {0} started for {1}...", Thread.CurrentThread.ManagedThreadId, companyProfileThreadHelper.CompanyProfile.Symbol)); LoadCompanyProfileEx(companyProfileThreadHelper.CompanyProfile); MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[UpdateCompanyProfile] Thread {0} ended for {1}", Thread.CurrentThread.ManagedThreadId, companyProfileThreadHelper.CompanyProfile.Symbol)); } finally { companyProfileThreadHelper.ResetEvent.Set(); } } private static void LoadCompanyProfileEx(CompanyProfile companyProfile) { CompanyProfile marketCompanyProfile = MarketDataHelper.GetCompanyProfile(companyProfile.Symbol); if (null == marketCompanyProfile) { MDTrace.WriteLine(LogLevel.DEBUG,"[[UpdateCompanyProfile]] No company profile for symbol '" + companyProfile.Symbol + "'"); return; } if(!String.IsNullOrEmpty(marketCompanyProfile.Sector) && !marketCompanyProfile.Sector.Equals("N/A"))companyProfile.Sector=marketCompanyProfile.Sector; if(!String.IsNullOrEmpty(marketCompanyProfile.Industry) && !marketCompanyProfile.Industry.Equals("N/A"))companyProfile.Industry=marketCompanyProfile.Industry; if(!String.IsNullOrEmpty(marketCompanyProfile.Description) && !marketCompanyProfile.Description.Equals("N/A"))companyProfile.Description=marketCompanyProfile.Description; MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[UpdateCompanyProfile] Updating company profile.....Symbol:{0} Sector:{1} Industry:{2}",companyProfile.Symbol,companyProfile.Sector, companyProfile.Industry)); CompanyProfileDA.UpdateCompanyProfile(companyProfile); } } }