From 88d1504d26836fe364752f30d7cd6f184549f7e1 Mon Sep 17 00:00:00 2001 From: Sean Date: Sun, 22 Mar 2026 15:07:02 -0400 Subject: [PATCH] Implement SafeBatchInsert for Headlines Insert in the HeadlinesDA --- .../MarketDataLib/DataAccess/HeadlinesDA.cs | 77 +++++++++++++++++-- 1 file changed, 72 insertions(+), 5 deletions(-) diff --git a/MarketData/MarketDataLib/DataAccess/HeadlinesDA.cs b/MarketData/MarketDataLib/DataAccess/HeadlinesDA.cs index 8d064a0..95e7729 100755 --- a/MarketData/MarketDataLib/DataAccess/HeadlinesDA.cs +++ b/MarketData/MarketDataLib/DataAccess/HeadlinesDA.cs @@ -301,39 +301,106 @@ namespace MarketData.DataAccess } } + /// + /// InsertHeadlines - Inserts headlines by batching them up into groups. + /// + /// + /// public static bool InsertHeadlines(Headlines headlines) { - MySqlCommand sqlCommand=null; + const int batchSize = 1000; MySqlConnection sqlConnection=null; MySqlTransaction sqlTransaction=null; try { if (null == headlines || 0 == headlines.Count) return false; - StringBuilder sb = new StringBuilder(); sqlConnection = SqlUtils.CreateMySqlConnection(MainDataSource.Instance.LocateDataSource("market_data")); sqlTransaction = sqlConnection.BeginTransaction(System.Data.IsolationLevel.ReadCommitted); headlines=new Headlines(headlines.Distinct(new HeadlinesEqualityComparer()).ToList()); - foreach(Headline headline in headlines) + foreach (IEnumerable batch in headlines.Chunk(batchSize)) { - if(!HeadlineExists(headline,sqlConnection,sqlTransaction))InsertHeadline(headline,sqlConnection,sqlTransaction); + SafeInsertBatch(batch.ToList(), sqlConnection, sqlTransaction); } sqlTransaction.Commit(); return true; } catch (Exception exception) { + sqlTransaction?.Rollback(); // This will rollback without explicityl calling Rollback but we'll do this anyway. MDTrace.WriteLine(LogLevel.DEBUG,exception); return false; } finally { - if(null!=sqlCommand)sqlCommand.Dispose(); if(null!=sqlTransaction)sqlTransaction.Dispose(); if(null!=sqlConnection) sqlConnection.Close(); } } + /// + /// SafeInsertBatch - This recursive approach allows us to catch exceptions and recursively call SafeInsertBatch continually dividing the + /// batch until we find the offensive row. + /// + /// + /// + /// + private static void SafeInsertBatch(List batch, MySqlConnection sqlConnection, MySqlTransaction sqlTransaction) + { + try + { + InsertBatch(batch, sqlConnection, sqlTransaction); + } + catch (Exception ex) + { + // If only one row, it's the offender + if (batch.Count == 1) + { + MDTrace.WriteLine(LogLevel.DEBUG, ex); + MDTrace.WriteLine(LogLevel.DEBUG, $"Bad row: Symbol={batch[0].Symbol}, Headline={batch[0].Entry}"); + return; + } + // Split batch and retry + int mid = batch.Count / 2; + List firstHalf = batch.Take(mid).ToList(); + List secondHalf = batch.Skip(mid).ToList(); + SafeInsertBatch(firstHalf, sqlConnection, sqlTransaction); + SafeInsertBatch(secondHalf, sqlConnection, sqlTransaction); + } + } + + /// + /// Inserts a batch of headlines + /// + /// + /// + /// + private static void InsertBatch(IEnumerable batch, MySqlConnection sqlConnection, MySqlTransaction sqlTransaction) + { + StringBuilder sb = new StringBuilder(); + using MySqlCommand sqlCommand = new MySqlCommand(); + sqlCommand.Connection = sqlConnection; + sqlCommand.Transaction = sqlTransaction; + DateTime now = DateTime.Now; + + sb.Append("INSERT IGNORE INTO Headlines (symbol, asof, headline, source, modified) VALUES "); + int index = 0; + foreach (Headline headline in batch) + { + if (index > 0) sb.Append(","); + sb.Append($"(@symbol{index}, @asof{index}, @headline{index}, @source{index}, @modified{index})"); + sqlCommand.Parameters.AddWithValue($"@symbol{index}", headline.Symbol); + sqlCommand.Parameters.AddWithValue($"@asof{index}", headline.Date); + sqlCommand.Parameters.AddWithValue($"@headline{index}", headline.Entry); + sqlCommand.Parameters.AddWithValue($"@source{index}", headline.Source); + DateTime modified = Utility.IsEpoch(headline.Modified) ? now : headline.Modified; + sqlCommand.Parameters.AddWithValue($"@modified{index}", modified); + index++; + } + sqlCommand.CommandText = sb.ToString(); + sqlCommand.ExecuteNonQuery(); + } + /// /// InsertHeadline - This is now parameterized. The MySql driver should handle all escaping etc., ///