Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
622 changes: 0 additions & 622 deletions bindata_test.go

This file was deleted.

70 changes: 40 additions & 30 deletions envelope_stream_connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,33 @@ import (
"google.golang.org/protobuf/proto"

"code.cloudfoundry.org/go-loggregator/v10"
"code.cloudfoundry.org/go-loggregator/v10/internal/testhelper"
"code.cloudfoundry.org/go-loggregator/v10/rpc/loggregator_v2"
"code.cloudfoundry.org/tlsconfig"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

var _ = Describe("Connector", func() {

var (
certs *testhelper.TestCerts
)

BeforeEach(func() {
certs = testhelper.GenerateCerts("loggregatorCA")
})

It("initiates a connection to receive envelopes", func() {
producer, err := newFakeEventProducer()
producer, err := newFakeEventProducer(certs)
Expect(err).NotTo(HaveOccurred())
producer.start()
defer producer.stop()
tlsConf, err := loggregator.NewIngressTLSConfig(
fixture("CA.crt"),
fixture("server.crt"),
fixture("server.key"),

tlsConf, err := testhelper.NewIngressTLSConfig(
certs.CA(),
certs.Cert("reverselogproxy"),
certs.Key("reverselogproxy"),
)
Expect(err).NotTo(HaveOccurred())

Expand All @@ -49,17 +60,17 @@ var _ = Describe("Connector", func() {
})

It("reconnects if the stream fails", func() {
producer, err := newFakeEventProducer()
producer, err := newFakeEventProducer(certs)
Expect(err).NotTo(HaveOccurred())

// Producer will grab a port on start. When the producer is restarted,
// it will grab the same port.
producer.start()

tlsConf, err := loggregator.NewIngressTLSConfig(
fixture("CA.crt"),
fixture("server.crt"),
fixture("server.key"),
tlsConf, err := testhelper.NewIngressTLSConfig(
certs.CA(),
certs.Cert("reverselogproxy"),
certs.Key("reverselogproxy"),
)
Expect(err).NotTo(HaveOccurred())

Expand Down Expand Up @@ -94,18 +105,18 @@ var _ = Describe("Connector", func() {
})

It("enables buffering", func() {
producer, err := newFakeEventProducer()
producer, err := newFakeEventProducer(certs)
Expect(err).NotTo(HaveOccurred())

// Producer will grab a port on start. When the producer is restarted,
// it will grab the same port.
producer.start()
defer producer.stop()

tlsConf, err := loggregator.NewIngressTLSConfig(
fixture("CA.crt"),
fixture("server.crt"),
fixture("server.key"),
tlsConf, err := testhelper.NewIngressTLSConfig(
certs.CA(),
certs.Cert("reverselogproxy"),
certs.Key("reverselogproxy"),
)
Expect(err).NotTo(HaveOccurred())

Expand Down Expand Up @@ -157,15 +168,15 @@ var _ = Describe("Connector", func() {
})

It("won't panic when context canceled", func() {
producer, err := newFakeEventProducer()
producer, err := newFakeEventProducer(certs)
Expect(err).NotTo(HaveOccurred())
producer.start()
defer producer.stop()

tlsConf, err := loggregator.NewIngressTLSConfig(
fixture("CA.crt"),
fixture("server.crt"),
fixture("server.key"),
tlsConf, err := testhelper.NewIngressTLSConfig(
certs.CA(),
certs.Cert("reverselogproxy"),
certs.Key("reverselogproxy"),
)
Expect(err).NotTo(HaveOccurred())

Expand All @@ -189,14 +200,17 @@ type fakeEventProducer struct {

server *grpc.Server
addr string
certs *testhelper.TestCerts

mu sync.Mutex
connectionAttempts_ int
actualReq_ *loggregator_v2.EgressBatchRequest
}

func newFakeEventProducer() (*fakeEventProducer, error) {
f := &fakeEventProducer{}
func newFakeEventProducer(certs *testhelper.TestCerts) (*fakeEventProducer, error) {
f := &fakeEventProducer{
certs: certs,
}

return f, nil
}
Expand Down Expand Up @@ -261,7 +275,7 @@ func (f *fakeEventProducer) start() {
break
}
f.addr = lis.Addr().String()
c, err := newServerMutualTLSConfig()
c, err := newServerMutualTLSConfig(f.certs)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -298,15 +312,11 @@ func (f *fakeEventProducer) connectionAttempts() int {
return f.connectionAttempts_
}

func newServerMutualTLSConfig() (*tls.Config, error) {
certFile := fixture("server.crt")
keyFile := fixture("server.key")
caCertFile := fixture("CA.crt")

func newServerMutualTLSConfig(certs *testhelper.TestCerts) (*tls.Config, error) {
return tlsconfig.Build(
tlsconfig.WithInternalServiceDefaults(),
tlsconfig.WithIdentityFromFile(certFile, keyFile),
tlsconfig.WithIdentityFromFile(certs.Cert("reverselogproxy"), certs.Key("reverselogproxy")),
).Server(
tlsconfig.WithClientAuthenticationFromFile(caCertFile),
tlsconfig.WithClientAuthenticationFromFile(certs.CA()),
)
}
29 changes: 0 additions & 29 deletions fixtures_test.go

This file was deleted.

5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,20 @@ require (
)

require (
filippo.io/edwards25519 v1.2.0 // indirect
github.com/Masterminds/semver/v3 v3.4.0 // indirect
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
github.com/google/go-cmp v0.7.0 // indirect
github.com/google/pprof v0.0.0-20260202012954-cb029daf43ef // indirect
github.com/kr/text v0.2.0 // indirect
github.com/nu7hatch/gouuid v0.0.0-20131221200532-179d4d0c4d8d // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/rogpeppe/go-internal v1.14.1 // indirect
github.com/square/certstrap v1.3.0 // indirect
go.step.sm/crypto v0.76.2 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/crypto v0.48.0 // indirect
golang.org/x/mod v0.33.0 // indirect
golang.org/x/sync v0.19.0 // indirect
golang.org/x/sys v0.41.0 // indirect
Expand Down
14 changes: 8 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ code.cloudfoundry.org/go-diodes v0.0.0-20260209061029-a81ffbc46978 h1:uZ6UIz7zl3
code.cloudfoundry.org/go-diodes v0.0.0-20260209061029-a81ffbc46978/go.mod h1:ZZMgJNANhsfqeXF//d5qDK0dNnQ4jTBsib4WR0xbWJQ=
code.cloudfoundry.org/tlsconfig v0.45.0 h1:axem2ESLqMm645WyTkw1RZ9zjfZzUiojImNqxGXEtlI=
code.cloudfoundry.org/tlsconfig v0.45.0/go.mod h1:62TfSYz4O4pgggNLF0wEZOTr2ZtrsnKPX9c2LSL1Cl0=
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
filippo.io/edwards25519 v1.2.0 h1:crnVqOiS4jqYleHd9vaKZ+HKtHfllngJIiOpNpoJsjo=
filippo.io/edwards25519 v1.2.0/go.mod h1:xzAOLCNug/yB62zG1bQ8uziwrIqIuxhctzJT18Q77mc=
github.com/Masterminds/semver/v3 v3.4.0 h1:Zog+i5UMtVoCU8oKka5P7i9q9HgrJeGzI9SA1Xbatp0=
github.com/Masterminds/semver/v3 v3.4.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM=
github.com/apoydence/eachers v0.0.0-20181020210610-23942921fe77 h1:afT88tB6u9JCKQZVAAaa9ICz/uGn5Uw9ekn6P22mYKM=
Expand Down Expand Up @@ -68,8 +68,8 @@ github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0t
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
github.com/square/certstrap v1.3.0 h1:N9P0ZRA+DjT8pq5fGDj0z3FjafRKnBDypP0QHpMlaAk=
github.com/square/certstrap v1.3.0/go.mod h1:wGZo9eE1B7WX2GKBn0htJ+B3OuRl2UsdCFySNooy9hU=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY=
github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
Expand All @@ -90,8 +90,8 @@ go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2W
go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew=
go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI=
go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA=
go.step.sm/crypto v0.76.0 h1:K23BSaeoiY7Y5dvvijTeYC9EduDBetNwQYMBwMhi1aA=
go.step.sm/crypto v0.76.0/go.mod h1:PXYJdKkK8s+GHLwLguFaLxHNAFsFL3tL1vSBrYfey5k=
go.step.sm/crypto v0.76.2 h1:JJ/yMcs/rmcCAwlo+afrHjq74XBFRTJw5B2y4Q4Z4c4=
go.step.sm/crypto v0.76.2/go.mod h1:m6KlB/HzIuGFep0UWI5e0SYi38UxpoKeCg6qUaHV6/Q=
go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc=
go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg=
golang.org/x/crypto v0.48.0 h1:/VRzVqiRSggnhY7gNRxPauEQ5Drw9haKdM0jqfcCFts=
Expand All @@ -104,6 +104,8 @@ golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4=
golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k=
golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/term v0.40.0 h1:36e4zGLqU4yhjlmxEaagx2KuYbJq3EwY8K943ZsHcvg=
golang.org/x/term v0.40.0/go.mod h1:w2P8uVp06p2iyKKuvXIm7N/y0UCRt3UfJTfZ7oOpglM=
golang.org/x/text v0.34.0 h1:oL/Qq0Kdaqxa1KbNeMKwQq0reLCCaFtqu2eNuSeNHbk=
golang.org/x/text v0.34.0/go.mod h1:homfLqTYRFyVYemLBFl5GgL/DWEiH5wcsQ5gSh1yziA=
golang.org/x/tools v0.42.0 h1:uNgphsn75Tdz5Ji2q36v/nsFSfR/9BRFvqhGBaJGd5k=
Expand Down
41 changes: 30 additions & 11 deletions ingress_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"code.cloudfoundry.org/go-loggregator/v10"
"code.cloudfoundry.org/go-loggregator/v10/internal/testhelper"
"code.cloudfoundry.org/go-loggregator/v10/rpc/loggregator_v2"
"code.cloudfoundry.org/go-loggregator/v10/runtimeemitter"

Expand All @@ -22,21 +23,35 @@ import (
// instead of just buffering them. It seems to buffer up until 2000.
const logCount = 3000

var certs *testhelper.TestCerts

var _ = BeforeSuite(func() {
})

// TestMain acts as the log emitter for gRPC SendRecv() test.
func TestMain(m *testing.M) {

if os.Getenv("INGRESS_CLIENT_TEST_PROCESS") != "" {
client, _ := buildIngressClient(os.Getenv("INGRESS_CLIENT_TEST_PROCESS"), time.Hour, false)
certs = testhelper.LoadCertsFromEnv()
client, _ := buildIngressClient(os.Getenv("INGRESS_CLIENT_TEST_PROCESS"), time.Hour, false, certs)
for i := 0; i < logCount; i++ {
client.EmitLog(fmt.Sprint("message", i))
}
_ = client.CloseSend()
return
}

// Parent process: generate and pass paths to child
certs = testhelper.GenerateCerts("loggregatorCA")
os.Setenv("TEST_CA_FILE", certs.CA())
os.Setenv("TEST_CERT_FILE", certs.Cert("metron"))
os.Setenv("TEST_KEY_FILE", certs.Key("metron"))

os.Exit(m.Run())
}

var _ = Describe("IngressClient", func() {

var (
client *loggregator.IngressClient
server *testIngressServer
Expand All @@ -45,17 +60,18 @@ var _ = Describe("IngressClient", func() {

BeforeEach(func() {
var err error

server, err = newTestIngressServer(
fixture("server.crt"),
fixture("server.key"),
fixture("CA.crt"),
certs.Cert("reverselogproxy"),
certs.Key("reverselogproxy"),
certs.CA(),
)
Expect(err).NotTo(HaveOccurred())

err = server.start()
Expect(err).NotTo(HaveOccurred())

client, cancel = buildIngressClient(server.addr, 50*time.Millisecond, false)
client, cancel = buildIngressClient(server.addr, 50*time.Millisecond, false, certs)
})

AfterEach(func() {
Expand Down Expand Up @@ -87,7 +103,7 @@ var _ = Describe("IngressClient", func() {
})

It("returns an error after context has been cancelled", func() {
client, cancel := buildIngressClient(server.addr, time.Hour, false)
client, cancel := buildIngressClient(server.addr, time.Hour, false, certs)
cancel()

t := time.NewTicker(1 * time.Millisecond)
Expand Down Expand Up @@ -416,6 +432,9 @@ var _ = Describe("IngressClient", func() {
cmd := exec.Command(path)
cmd.Env = []string{
"INGRESS_CLIENT_TEST_PROCESS=" + server.addr,
"TEST_CA_FILE=" + os.Getenv("TEST_CA_FILE"),
"TEST_CERT_FILE=" + os.Getenv("TEST_CERT_FILE"),
"TEST_KEY_FILE=" + os.Getenv("TEST_KEY_FILE"),
}
Expect(cmd.Start()).To(Succeed())
err = cmd.Wait()
Expand Down Expand Up @@ -464,11 +483,11 @@ func getEnvelopeAt(receivers chan loggregator_v2.Ingress_BatchSenderServer, idx
return envBatch.Batch[idx], nil
}

func buildIngressClient(serverAddr string, flushInterval time.Duration, addContext bool) (*loggregator.IngressClient, func()) {
tlsConfig, err := loggregator.NewIngressTLSConfig(
fixture("CA.crt"),
fixture("client.crt"),
fixture("client.key"),
func buildIngressClient(serverAddr string, flushInterval time.Duration, addContext bool, certs *testhelper.TestCerts) (*loggregator.IngressClient, func()) {
tlsConfig, err := testhelper.NewIngressTLSConfig(
certs.CA(),
certs.Cert("metron"),
certs.Key("metron"),
)
if err != nil {
panic(err)
Expand Down
Loading