Repository URL to install this package:
Version:
1.3.1 ▾
|
using System;
using System.Collections.Generic;
using System.Net;
using System.Threading.Tasks;
using JetBrains.Annotations;
namespace Fluctio.FluctioSim.EditorUtils.Http
{
public class HttpServer
{
[CanBeNull] private HttpListener _listener;
private readonly UriBuilder _prefix;
private readonly Func<HttpListenerContext, Task> _requestProcesser;
private readonly Action<Exception> _logger;
private readonly Func<int?> _portFinder;
private readonly int _maxConcurrentRequests;
public bool IsRunning => _listener is { IsListening: true };
public int Port
{
get
{
if (!IsRunning)
{
throw new InvalidOperationException("Server must be running to get its port");
}
return _prefix.Port;
}
}
public Uri Prefix
{
get
{
if (!IsRunning)
{
throw new InvalidOperationException("Server must be running to get its prefix");
}
return _prefix.Uri;
}
}
public HttpServer(UriBuilder prefix, Func<HttpListenerContext, Task> requestProcesser, Action<Exception> logger = null, Func<int?> portFinder = null, int maxConcurrentRequests = 5)
{
if (prefix.Scheme != Uri.UriSchemeHttp && prefix.Scheme != Uri.UriSchemeHttps)
{
throw new ArgumentException("Scheme should be either 'http' or 'https'", nameof(prefix));
}
_prefix = prefix;
_requestProcesser = requestProcesser;
_logger = logger ?? Console.WriteLine;
_portFinder = portFinder ?? PortFinders.Constant(prefix.Uri.Port);
_maxConcurrentRequests = maxConcurrentRequests;
}
public void Start()
{
if (IsRunning)
{
throw new InvalidOperationException("Server already started");
}
FindPort();
_listener = new HttpListener();
_listener.Prefixes.Add(_prefix.Uri.AbsoluteUri);
try
{
_listener.Start();
}
catch
{
Stop();
throw;
}
ProcessRequestsAsync();
}
public void Stop()
{
if (!IsRunning)
{
return;
}
_listener?.Close();
_listener = null;
}
private void FindPort()
{
var port = _portFinder();
if (port == null)
{
throw new InvalidOperationException("Could not find find free port with specified constraints");
}
_prefix.Port = port.Value;
}
private async void ProcessRequestsAsync()
{
var tasks = new HashSet<Task>();
for (var i = 0; i < _maxConcurrentRequests; i++)
{
tasks.Add(_listener!.GetContextAsync());
}
while (IsRunning)
{
var justCompletedTask = await Task.WhenAny(tasks);
if (!IsRunning)
{
break;
}
tasks.Remove(justCompletedTask);
if (justCompletedTask is not Task<HttpListenerContext> getContextTask)
{
// request processed, wait for a new one
tasks.Add(_listener!.GetContextAsync());
continue;
}
// received request, process it
var processTask = _requestProcesser(getContextTask.Result);
_ = processTask.ContinueWith(_ => getContextTask.Result.Finish());
_ = processTask.ContinueWith(t => _logger(t.Exception), TaskContinuationOptions.OnlyOnFaulted);
tasks.Add(processTask);
}
}
}
}