Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
using System;
using System.Collections.Generic;
using System.Net;
using System.Threading.Tasks;
using JetBrains.Annotations;

namespace Fluctio.FluctioSim.EditorUtils.Http
{
	public class HttpServer
	{
		[CanBeNull] private HttpListener _listener;
		private readonly UriBuilder _prefix;
		private readonly Func<HttpListenerContext, Task> _requestProcesser;
		private readonly Action<Exception> _logger;
		private readonly Func<int?> _portFinder;
		private readonly int _maxConcurrentRequests;

		public bool IsRunning => _listener is { IsListening: true };

		public int Port
		{
			get
			{
				if (!IsRunning)
				{
					throw new InvalidOperationException("Server must be running to get its port");
				}

				return _prefix.Port;
			}
		}
		public Uri Prefix
		{
			get
			{
				if (!IsRunning)
				{
					throw new InvalidOperationException("Server must be running to get its prefix");
				}

				return _prefix.Uri;
			}
		}

		public HttpServer(UriBuilder prefix, Func<HttpListenerContext, Task> requestProcesser, Action<Exception> logger = null, Func<int?> portFinder = null, int maxConcurrentRequests = 5)
		{
			if (prefix.Scheme != Uri.UriSchemeHttp && prefix.Scheme != Uri.UriSchemeHttps)
			{
				throw new ArgumentException("Scheme should be either 'http' or 'https'", nameof(prefix));
			}

			_prefix = prefix;
			_requestProcesser = requestProcesser;
			_logger = logger ?? Console.WriteLine;
			_portFinder = portFinder ?? PortFinders.Constant(prefix.Uri.Port);
			_maxConcurrentRequests = maxConcurrentRequests;
		}

		public void Start()
		{
			if (IsRunning)
			{
				throw new InvalidOperationException("Server already started");
			}

			FindPort();
			_listener = new HttpListener();
			_listener.Prefixes.Add(_prefix.Uri.AbsoluteUri);
			try
			{
				_listener.Start();
			}
			catch
			{
				Stop();
				throw;
			}
			ProcessRequestsAsync();
		}

		public void Stop()
		{
			if (!IsRunning)
			{
				return;
			}
			_listener?.Close();
			_listener = null;
		}

		private void FindPort()
		{
			var port = _portFinder();
			if (port == null)
			{
				throw new InvalidOperationException("Could not find find free port with specified constraints");
			}
			_prefix.Port = port.Value;
		}

		private async void ProcessRequestsAsync()
		{
			var tasks = new HashSet<Task>();
			for (var i = 0; i < _maxConcurrentRequests; i++)
			{
				tasks.Add(_listener!.GetContextAsync());
			}

			while (IsRunning)
			{
				var justCompletedTask = await Task.WhenAny(tasks);
				if (!IsRunning)
				{
					break;
				}
				tasks.Remove(justCompletedTask);

				if (justCompletedTask is not Task<HttpListenerContext> getContextTask)
				{
					// request processed, wait for a new one
					tasks.Add(_listener!.GetContextAsync());
					continue;
				}
				 
				// received request, process it
				var processTask = _requestProcesser(getContextTask.Result);
				_ = processTask.ContinueWith(_ => getContextTask.Result.Finish());
				_ = processTask.ContinueWith(t => _logger(t.Exception), TaskContinuationOptions.OnlyOnFaulted);
				tasks.Add(processTask);
				
			}
		}
	}
}