Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
gop / usr / lib / gop / x / fakenet / conn.go
Size: Mime:
// Copyright 2018 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package fakenet

import (
	"io"
	"net"
	"sync"
	"time"
)

// NewConn returns a net.Conn built on top of the supplied reader and writer.
// It decouples the read and write on the conn from the underlying stream
// to enable Close to abort ones that are in progress.
// It's primary use is to fake a network connection from stdin and stdout.
func NewConn(name string, in io.ReadCloser, out io.WriteCloser) net.Conn {
	c := &fakeConn{
		name:   name,
		reader: newFeeder(in.Read),
		writer: newFeeder(out.Write),
		in:     in,
		out:    out,
	}
	go c.reader.run()
	go c.writer.run()
	return c
}

type fakeConn struct {
	name   string
	reader *connFeeder
	writer *connFeeder
	in     io.ReadCloser
	out    io.WriteCloser
}

type fakeAddr string

// connFeeder serializes calls to the source function (io.Reader.Read or
// io.Writer.Write) by delegating them to a channel. This also allows calls to
// be intercepted when the connection is closed, and cancelled early if the
// connection is closed while the calls are still outstanding.
type connFeeder struct {
	source func([]byte) (int, error)
	input  chan []byte
	result chan feedResult
	mu     sync.Mutex
	closed bool
	done   chan struct{}
}

type feedResult struct {
	n   int
	err error
}

func (c *fakeConn) Close() error {
	c.reader.close()
	c.writer.close()
	c.in.Close()
	c.out.Close()
	return nil
}

func (c *fakeConn) Read(b []byte) (n int, err error)   { return c.reader.do(b) }
func (c *fakeConn) Write(b []byte) (n int, err error)  { return c.writer.do(b) }
func (c *fakeConn) LocalAddr() net.Addr                { return fakeAddr(c.name) }
func (c *fakeConn) RemoteAddr() net.Addr               { return fakeAddr(c.name) }
func (c *fakeConn) SetDeadline(t time.Time) error      { return nil }
func (c *fakeConn) SetReadDeadline(t time.Time) error  { return nil }
func (c *fakeConn) SetWriteDeadline(t time.Time) error { return nil }
func (a fakeAddr) Network() string                     { return "fake" }
func (a fakeAddr) String() string                      { return string(a) }

func newFeeder(source func([]byte) (int, error)) *connFeeder {
	return &connFeeder{
		source: source,
		input:  make(chan []byte),
		result: make(chan feedResult),
		done:   make(chan struct{}),
	}
}

func (f *connFeeder) close() {
	f.mu.Lock()
	if !f.closed {
		f.closed = true
		close(f.done)
	}
	f.mu.Unlock()
}

func (f *connFeeder) do(b []byte) (n int, err error) {
	// send the request to the worker
	select {
	case f.input <- b:
	case <-f.done:
		return 0, io.EOF
	}
	// get the result from the worker
	select {
	case r := <-f.result:
		return r.n, r.err
	case <-f.done:
		return 0, io.EOF
	}
}

func (f *connFeeder) run() {
	var b []byte
	for {
		// wait for an input request
		select {
		case b = <-f.input:
		case <-f.done:
			return
		}
		// invoke the underlying method
		n, err := f.source(b)
		// send the result back to the requester
		select {
		case f.result <- feedResult{n: n, err: err}:
		case <-f.done:
			return
		}
	}
}