Refactor the TorWebClient

This commit is contained in:
2024-03-12 12:20:53 -04:00
parent 076ab7b887
commit bcc56a1e73
34 changed files with 351 additions and 139 deletions

View File

@@ -19,6 +19,10 @@ namespace Tor.Proxy
[DebuggerStepThrough]
public sealed class Proxy : MarshalByRefObject, IDisposable
{
private Thread monitorThread=null;
private volatile bool threadRun=true;
private int refreshAfter=120000;
private readonly Client client;
private readonly object synchronize;
private List<Connection> connections;
@@ -35,6 +39,8 @@ namespace Tor.Proxy
/// <param name="client">The client for which this object instance belongs.</param>
internal Proxy(Client client)
{
monitorThread=new Thread(new ThreadStart(ThreadProc));
monitorThread.Start();
this.client = client;
this.connections = new List<Connection>();
this.webProxy = null;
@@ -126,13 +132,24 @@ namespace Tor.Proxy
/// <param name="disposing"><c>true</c> to release both managed and unmanaged resources; <c>false</c> to release only unmanaged resources.</param>
private void Dispose(bool disposing)
{
if (disposed)
return;
if (disposed)return;
if (disposing)
{
lock (synchronize)
{
if(threadRun)
{
threadRun=false;
if(null!=monitorThread)
{
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[Proxy:Dispose]Thread state is '{0}'. Joining main thread...",Utility.ThreadStateToString(monitorThread)));
monitorThread.Join(5000);
monitorThread=null;
}
}
suppressDispose = true;
foreach (ConnectionProcessor processor in processors)
@@ -154,53 +171,6 @@ namespace Tor.Proxy
#region Tor.Net.ForwardSocket
/// <summary>
/// Called when the internal listener socket accepts a TCP connection.
/// </summary>
/// <param name="ar">The asynchronous result object for this callback.</param>
//private void OnSocketAccept(IAsyncResult ar)
//{
// try
// {
// if (client != null)
// {
// MDTrace.WriteLine(LogLevel.DEBUG,String.Format("INFO Accepting incoming..."));
// Socket accepted = socket.EndAccept(ar);
// MDTrace.WriteLine(LogLevel.DEBUG,String.Format("INFO Accepted connection:{0}->{1}",accepted.LocalEndPoint,accepted.RemoteEndPoint));
// Connection connection = new Connection(client, accepted, OnConnectionDisposed);
// lock (synchronize)
// {
// connections.Add(connection);
// MDTrace.WriteLine(LogLevel.DEBUG,String.Format("There are {0} connections.",connections.Count));
// }
// ConnectionProcessor processor = new ConnectionProcessor(client, connection, OnConnectionProcessorDisposed);
// lock (synchronize)
// {
// processors.Add(processor);
// }
// processor.Start();
// }
// }
// catch(Exception exception)
// {
// MDTrace.WriteLine(LogLevel.DEBUG,exception.ToString());
// }
// try
// {
// if (socket != null)
// socket.BeginAccept(OnSocketAccept, socket);
// }
// catch
// {
// }
//}
private void OnSocketAccept(IAsyncResult ar)
{
try
@@ -210,10 +180,10 @@ namespace Tor.Proxy
{
Task workerTask= Task.Factory.StartNew( () =>
{
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("INFO Accepting incoming..."));
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[Proxy:OnSocketAccept] Accepting incoming..."));
accepted = socket.EndAccept(ar);
if (socket != null)socket.BeginAccept(OnSocketAccept, socket);
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("INFO Accepted connection:{0}->{1}",accepted.LocalEndPoint,accepted.RemoteEndPoint));
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[Proxy:OnSocketAccept] Accepted connection:{0}->{1}",accepted.LocalEndPoint,accepted.RemoteEndPoint));
});
workerTask.ContinueWith((continuation) =>
{
@@ -222,25 +192,16 @@ namespace Tor.Proxy
lock (synchronize)
{
connections.Add(connection);
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("There are {0} connections.",connections.Count));
}
ConnectionProcessor processor = new ConnectionProcessor(client, connection, OnConnectionProcessorDisposed);
lock (synchronize)
{
processors.Add(processor);
processors.Add(processor);
}
processor.Start();
//try
//{
// if (socket != null)
// socket.BeginAccept(OnSocketAccept, socket);
//}
//catch
//{
//}
});
}
}
@@ -269,8 +230,8 @@ namespace Tor.Proxy
lock (synchronize)
{
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Removing 1 connection {0}:{1}. There are {2} active connections.",connection.Host,connection.Port,connections.Count-1));
connections.Remove(connection);
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[Proxy:OnConnectionDisposed] Removed Connection for {0}:{1}. There are {2} connections.",connection.Host,connection.Port,connections.Count));
}
}
@@ -289,8 +250,8 @@ namespace Tor.Proxy
lock (synchronize)
{
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Disposing connection processor. There are {0} connection processors remaining.",processors.Count));
processors.Remove(processor);
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[Proxy:OnConnectionProcessorDisposed] Removed Connection Processor. There are {0} connection processors remaining.",processors.Count));
}
}
@@ -320,7 +281,7 @@ namespace Tor.Proxy
}
catch(Exception exception)
{
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Proxy: Exception:{0}",exception.ToString()));
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[Proxy:Start] Exception:{0}",exception.ToString()));
if (socket != null)
{
socket.Dispose();
@@ -352,5 +313,177 @@ namespace Tor.Proxy
webProxy = null;
}
}
private void ThreadProc()
{
int quantums=0;
int quantumInterval=1000;
while(threadRun)
{
Thread.Sleep(quantumInterval);
if(!threadRun) break;
quantums+=quantumInterval;
if(quantums>refreshAfter)
{
quantums=0;
lock(synchronize)
{
MonitorDisposedConnections();
MonitorDisposedConnectionProcessors();
MonitorIdleConnections();
}
}
}
MDTrace.WriteLine(LogLevel.DEBUG,"[Proxy:ThreadProc]Thread ended.");
}
private void MonitorDisposedConnections()
{
List<Connection> connectionsToRemove=new List<Connection>();
lock(synchronize)
{
try
{
foreach(Connection connection in connections)
{
if(connection.IsDisposed())
{
connectionsToRemove.Add(connection);
}
}
if(0!=connectionsToRemove.Count)MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[Proxy:MonitorDisposedConnections] Removing {0} disposed connections.",connectionsToRemove.Count));
foreach(Connection connectionToRemove in connectionsToRemove)
{
connections.Remove(connectionToRemove);
}
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[Proxy:MonitorDisposedConnections] There are {0} remaining connections.",connections.Count));
}
catch(Exception exception)
{
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[Proxy:MonitorDisposedConnections] Exception:{0}",exception.ToString()));
}
}
}
private void MonitorDisposedConnectionProcessors()
{
List<ConnectionProcessor> connectionsProcessorsToRemove=new List<ConnectionProcessor>();
lock(synchronize)
{
try
{
foreach(ConnectionProcessor connectionProcessor in processors)
{
if(connectionProcessor.IsDisposed())
{
connectionsProcessorsToRemove.Add(connectionProcessor);
}
}
if(0!=connectionsProcessorsToRemove.Count)MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[Proxy:MonitorDisposedConnectionProcessors] Removing {0} disposed connection processors.",connectionsProcessorsToRemove.Count));
foreach(ConnectionProcessor connectionProcessorToRemove in connectionsProcessorsToRemove)
{
processors.Remove(connectionProcessorToRemove);
}
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[Proxy:MonitorDisposedConnectionProcessors] There are {0} remaining connection processors.",processors.Count));
}
catch(Exception exception)
{
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[Proxy:MonitorDisposedConnectionProcessors] Exception:{0}",exception.ToString()));
}
}
}
// Idle connections where no activity for 5 minutes
// 1000*60*5
private void MonitorIdleConnections()
{
List<Connection> connectionsToRemove=new List<Connection>();
lock(synchronize)
{
try
{
connectionsToRemove = connections.Where(x => x.Profiler.ElapsedTime()>300000 && x.IsSocketClosed()).ToList();
if(null!=connectionsToRemove)
{
if(0!=connectionsToRemove.Count)MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[Proxy:MonitorIdleConnections] Disposing {0} idle connections",connectionsToRemove.Count));
foreach(Connection connection in connectionsToRemove)
{
RemoveConnection(connection);
}
if(0==processors.Count && 0!=connections.Count)
{
RemoveOrphanedConnections(connections);
}
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[Proxy:MonitorIdleConnections] There are {0} remaining connections.",connections.Count));
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[Proxy:MonitorIdleConnections] There are {0} remaining connection processors.",processors.Count));
}
}
catch(Exception exception)
{
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[Proxy:MonitorIdleConnections] Exception:{0}",exception.ToString()));
}
}
}
// Remove connections that are considered to be inactive and the socket is considered to be closed.
private void RemoveOrphanedConnections(List<Connection> connections)
{
lock(synchronize)
{
try
{
List<Connection> connectionsToRemove=new List<Connection>();
foreach(Connection connection in connections)
{
if(connection.IsSocketClosed())
{
connectionsToRemove.Add(connection);
}
else
{
UInt64 lingerTimeMinutes = connection.Profiler.ElapsedTime()/1000/60;
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[Proxy:RemoveOrphanedConnections] Orphaned Connection {0}:{1}. Socket reports active data but there has been no buffer exchange for {2} minutes.",connection.Host,connection.Port,lingerTimeMinutes));
if(lingerTimeMinutes>5)
{
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[Proxy:RemoveOrphanedConnections] Disposing Orphaned Connection {0}:{1}.",connection.Host,connection.Port));
connectionsToRemove.Add(connection);
}
}
}
foreach(Connection connection in connectionsToRemove)
{
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[Proxy:RemoveOrphanedConnections] Removing orphaned connection {0}:{1} .",connection.Host,connection.Port));
connection.Dispose();
connections.Remove(connection);
}
}
catch(Exception exception)
{
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[Proxy:RemoveOrphanedConnections] Exception:{0}",exception.ToString()));
}
}
}
// Remove the connection and associated connection processor
private void RemoveConnection(Connection connection)
{
lock(synchronize)
{
try
{
ConnectionProcessor connectionProcessor = processors.Where(x => x.Connection == connection).FirstOrDefault();
if(null!=connectionProcessor)
{
connectionProcessor.Dispose();
processors.Remove(connectionProcessor);
}
connection.Dispose();
connections.Remove(connection);
}
catch(Exception exception)
{
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[Proxy:RemoveConnection] Exception:{0}",exception.ToString()));
}
}
}
}
}