Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.13

require (
github.com/99designs/keyring v1.1.6
github.com/apache/pulsar-client-go v0.4.0
github.com/apache/pulsar-client-go/oauth2 v0.0.0-20210220083636-af91e9ca0ee2
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/docker/go-connections v0.4.0
Expand All @@ -18,13 +19,13 @@ require (
github.com/mattn/go-runewidth v0.0.4 // indirect
github.com/olekukonko/tablewriter v0.0.1
github.com/pkg/errors v0.9.1
github.com/spf13/cobra v0.0.5
github.com/spf13/cobra v0.0.7
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.4.0
github.com/testcontainers/testcontainers-go v0.0.10
github.com/thediveo/enumflag v0.10.1
golang.org/x/net v0.0.0-20210220033124-5f55cee0dc0d // indirect
golang.org/x/oauth2 v0.0.0-20210220000619-9bb904979d93
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.25.0 // indirect
gopkg.in/yaml.v2 v2.3.0
)
97 changes: 81 additions & 16 deletions go.sum

Large diffs are not rendered by default.

194 changes: 194 additions & 0 deletions pkg/cat/reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
package cat

import (
"context"
"encoding/base64"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"github.com/apache/pulsar-client-go/pulsar"
"io"
"time"
"unicode/utf8"
)

type ReaderFormat int

const (
PayloadOnlyAsString ReaderFormat = iota
PayloadOnlyAsHex
PayloadOnlyAsBase64
MetadataAsJson
)

var ReaderFormatIds = map[ReaderFormat][]string{
PayloadOnlyAsString: {"payload-string"},
PayloadOnlyAsHex: {"payload-hex"},
PayloadOnlyAsBase64: {"payload-base64"},
MetadataAsJson: {"full-message"},
}

type Reader struct {
client pulsar.Client
reader pulsar.Reader
opts ReaderOpts
readMsgs int
partialMsg []byte
finished bool
closed bool
}

type ReaderMessage struct {
Topic string `json:"topic"`
ProducerName string `json:"producer_name"`
Properties map[string]string `json:"properties,omitempty"`
PayloadString string `json:"payload,omitempty"`
PayloadBytes []byte `json:"payloadRaw,omitempty"`
ID string `json:"id"`
PublishTime time.Time `json:"publish_time"`
EventTime time.Time `json:"event_time,omitempty"`
Key string `json:"key,omitempty"`
OrderingKey string `json:"ordering_key,omitempty"`
RedeliveryCount uint32 `json:"redelivery_count,omitempty"`
IsReplicated bool `json:"is_replicated,omitempty"`
ReplicatedFrom string `json:"replicated_from,omitempty"`
}

func FromPulsarMessage(msg pulsar.Message) *ReaderMessage {
payloadString := ""
payloadBytes := make([]byte, 0)
if utf8.Valid(msg.Payload()) {
payloadString = string(msg.Payload())
} else {
payloadBytes = msg.Payload()
}
return &ReaderMessage{
Topic: msg.Topic(),
ProducerName: msg.ProducerName(),
Properties: msg.Properties(),
PayloadString: payloadString,
PayloadBytes: payloadBytes,
ID: msg.ID().String(),
PublishTime: msg.PublishTime(),
EventTime: msg.EventTime(),
Key: msg.Key(),
OrderingKey: msg.OrderingKey(),
RedeliveryCount: msg.RedeliveryCount(),
IsReplicated: msg.IsReplicated(),
ReplicatedFrom: msg.GetReplicatedFrom(),
}
}

type ReaderOpts struct {
Topic string
StartAt pulsar.MessageID
Format ReaderFormat
Compacted bool
MsgCount int
Tailing bool
}

func NewReader(client pulsar.Client, opts ReaderOpts) (*Reader, error) {
pulsarReader, err := client.CreateReader(pulsar.ReaderOptions{
Topic: opts.Topic,
StartMessageID: opts.StartAt,
ReadCompacted: opts.Compacted,
StartMessageIDInclusive: true,
ReceiverQueueSize: 10000,
})
if err != nil {
return nil, err
}
reader := &Reader{client: client, reader: pulsarReader, opts: opts, readMsgs: 0, partialMsg: make([]byte, 0), finished: false, closed: false}
return reader, nil
}

func (r *Reader) Read(p []byte) (n int, err error) {
if r.closed {
return 0, io.EOF
}
if r.finished && !r.opts.Tailing {
return 0, io.EOF
}
toRead := len(p)
readSoFar := 0
if len(r.partialMsg) > 0 {
copied := copy(p, r.partialMsg)
if copied < len(r.partialMsg) {
r.partialMsg = r.partialMsg[copied:]
} else {
r.partialMsg = make([]byte, 0)
}
toRead -= copied
readSoFar += copied
}
reachedMsgCount := false
if r.opts.MsgCount != -1 && r.readMsgs >= r.opts.MsgCount {
reachedMsgCount = true
}
if !reachedMsgCount {
for toRead > 0 && !r.closed {
ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*100)
msg, err := r.reader.Next(ctx)
if msg == nil {
continue
}
if err != nil {
return len(p) - toRead, err
}
r.readMsgs += 1
buff, err := r.formatMsg(msg)
if err != nil {
return len(p) - toRead, err
}
var copied int
if toRead < len(buff) {
copied = copy(p[readSoFar:], buff[:toRead])
r.partialMsg = buff[toRead:]
} else {
copied = copy(p[readSoFar:], buff)
}
readSoFar += copied
toRead -= copied
}
} else {
r.finished = true
}
return readSoFar, nil
}

func (r *Reader) formatMsg(msg pulsar.Message) ([]byte, error) {
switch r.opts.Format {
case PayloadOnlyAsString:
if utf8.Valid(msg.Payload()) {
return append(msg.Payload(), []byte("\n")...), nil
} else {
return []byte{}, errors.New("invalid utf8 string")
}
case PayloadOnlyAsBase64:
return []byte(base64.StdEncoding.EncodeToString(msg.Payload())), nil
case PayloadOnlyAsHex:
return []byte(hex.EncodeToString(msg.Payload())), nil
case MetadataAsJson:
meta := FromPulsarMessage(msg)
msg, err := json.Marshal(meta)
if err != nil {
return []byte{}, err
}
return append(msg, []byte("\n")...), err
default:
return nil, errors.New("unrecognized format")
}
}

func (r *Reader) ReadStats() string {
// TODO improve the stats
return fmt.Sprintf("Topic: %v, messagesRead %v", r.opts.Topic, r.readMsgs)
}

func (r *Reader) Close() error {
r.closed = true
r.reader.Close()
return nil
}
5 changes: 5 additions & 0 deletions pkg/cmdutils/cmdutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"strings"
"time"

brokerPulsar "github.com/apache/pulsar-client-go/pulsar"
"github.com/streamnative/pulsarctl/pkg/bookkeeper"
"github.com/streamnative/pulsarctl/pkg/cli"
"github.com/streamnative/pulsarctl/pkg/pulsar"
Expand Down Expand Up @@ -95,6 +96,10 @@ func NewBookieClient() bookkeeper.Client {
return PulsarCtlConfig.BookieClient()
}

func NewBrokerClient() (brokerPulsar.Client, error) {
return PulsarCtlConfig.BrokerClient()
}

func PrintJSON(w io.Writer, obj interface{}) {
b, err := json.MarshalIndent(obj, "", " ")
if err != nil {
Expand Down
47 changes: 47 additions & 0 deletions pkg/cmdutils/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package cmdutils

import (
"fmt"
auth2 "github.com/streamnative/pulsarctl/pkg/auth"
"io/ioutil"
"log"
"os"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/streamnative/pulsarctl/pkg/pulsar/utils"
"gopkg.in/yaml.v2"

brokerPulsar "github.com/apache/pulsar-client-go/pulsar"
"github.com/streamnative/pulsarctl/pkg/bookkeeper"
"github.com/streamnative/pulsarctl/pkg/pulsar"
"github.com/streamnative/pulsarctl/pkg/pulsar/common"
Expand All @@ -52,6 +54,12 @@ func (c *ClusterConfig) FlagSet() *pflag.FlagSet {
c.WebServiceURL,
"The admin web service url that pulsarctl connects to.")

flags.StringVar(
&c.BrokerServiceURL,
"broker-service-url",
c.BrokerServiceURL,
"The broker service url that pulsarctl connects to.")

flags.StringVar(
&c.AuthPlugin,
"auth-plugin",
Expand Down Expand Up @@ -208,6 +216,10 @@ func (c *ClusterConfig) Client(version common.APIVersion) pulsar.Client {
c.WebServiceURL = pulsar.DefaultWebServiceURL
}

if len(c.BrokerServiceURL) == 0 {
c.BrokerServiceURL = pulsar.DefaultBrokerServiceURL
}

if len(c.Token) > 0 && len(c.TokenFile) > 0 {
logger.Critical("the token and token file can not be specified at the same time")
os.Exit(1)
Expand All @@ -230,6 +242,40 @@ func (c *ClusterConfig) Client(version common.APIVersion) pulsar.Client {
return client
}


func (c *ClusterConfig) BrokerClientOpts() (brokerPulsar.ClientOptions, error) {
if len(c.BrokerServiceURL) == 0 {
c.BrokerServiceURL = pulsar.DefaultBrokerServiceURL
}

auth, err := auth2.GetAuthProvider((*common.Config)(c))
if err != nil {
return brokerPulsar.ClientOptions{}, err
}

opts := brokerPulsar.ClientOptions{
URL: c.BrokerServiceURL,
TLSTrustCertsFilePath: c.TLSTrustCertsFilePath,
TLSAllowInsecureConnection: c.TLSAllowInsecureConnection,
TLSValidateHostname: c.TLSEnableHostnameVerification,
}
if !utils.IsNilFixed(auth) {
opts.Authentication = *auth
} else {
fmt.Printf("No Auth Provider found\n")
}
return opts, nil

}

func (c *ClusterConfig) BrokerClient() (brokerPulsar.Client, error) {
opts, err := c.BrokerClientOpts()
if err != nil {
return nil, err
}
return brokerPulsar.NewClient(opts)
}

func (c *ClusterConfig) BookieClient() bookkeeper.Client {
config := bookkeeper.DefaultConfig()
ctxConf := c.DecodeContext()
Expand Down Expand Up @@ -258,6 +304,7 @@ func loadFromEnv() *ClusterConfig {
if envConf, ok := os.LookupEnv("PULSAR_CLIENT_CONF"); ok {
if props, err := properties.LoadFile(envConf, properties.UTF8); err == nil && props != nil {
config.WebServiceURL = props.GetString("webServiceUrl", pulsar.DefaultWebServiceURL)
config.BrokerServiceURL = props.GetString("brokerServiceUrl", pulsar.DefaultBrokerServiceURL)
config.TLSAllowInsecureConnection = props.GetBool("tlsAllowInsecureConnection", false)
config.TLSTrustCertsFilePath = props.GetString("tlsTrustCertsFilePath", "")
config.BKWebServiceURL = props.GetString("brokerServiceUrl", bookkeeper.DefaultWebServiceURL)
Expand Down
Loading