Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test/#139 unit test #142

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,10 +452,7 @@ func runClient(ctx *cli.Context) error {
paramDumpBatch := ctx.String("dumpbatch")

// Create client
c, err := datastreamer.NewClient(server, StSequencer)
if err != nil {
return err
}
c := datastreamer.NewClient(server, StSequencer)

// Set process entry callback function
if !sanityCheck {
Expand All @@ -478,10 +475,7 @@ func runClient(ctx *cli.Context) error {
}

// Start client (connect to the server)
err = c.Start()
if err != nil {
return err
}
c.Start()

// Query file header information
if queryHeader {
Expand Down
24 changes: 14 additions & 10 deletions datastreamer/datastreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ var (
},
WriteTimeout: 3 * time.Second,
}
leveldb = config.Filename[0:strings.IndexRune(config.Filename, '.')] + ".db"
streamServer *datastreamer.StreamServer
streamType = datastreamer.StreamType(1)
entryType1 = datastreamer.EntryType(1)
Expand Down Expand Up @@ -180,15 +179,15 @@ var (
}
)

func deleteFiles() error {
func deleteFiles(fileName string) error {
// Delete test file from filesystem
err := os.Remove(config.Filename)
err := os.Remove(fileName)
if err != nil && !os.IsNotExist(err) {
return err
}

// Delete leveldb folder from filesystem
err = os.RemoveAll(leveldb)
err = os.RemoveAll(fileName[0:strings.IndexRune(fileName, '.')] + ".db")
if err != nil && !os.IsNotExist(err) {
return err
}
Expand All @@ -197,7 +196,7 @@ func deleteFiles() error {
}

func TestServer(t *testing.T) {
err := deleteFiles()
err := deleteFiles(config.Filename)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -448,6 +447,8 @@ func TestServer(t *testing.T) {

// Log final file header
datastreamer.PrintHeaderEntry(streamServer.GetHeader(), "final tests")

streamServer.BookmarkPrintDump()
}

func TestClient(t *testing.T) {
Expand All @@ -456,15 +457,13 @@ func TestClient(t *testing.T) {
var entry datastreamer.FileEntry
var header datastreamer.HeaderEntry

client, err := datastreamer.NewClient(fmt.Sprintf("localhost:%d", config.Port), streamType)
require.NoError(t, err)
client := datastreamer.NewClient(fmt.Sprintf("localhost:%d", config.Port), streamType)

err = client.Start()
require.NoError(t, err)
client.Start()

// Case: Query data from not existing bookmark -> FAIL
fromBookmark = nonAddedBookmark.Encode()
_, err = client.ExecCommandGetBookmark(fromBookmark)
_, err := client.ExecCommandGetBookmark(fromBookmark)
require.EqualError(t, datastreamer.ErrBookmarkNotFound, err.Error())

// Case: Query data from existing bookmark -> OK
Expand Down Expand Up @@ -534,4 +533,9 @@ func TestClient(t *testing.T) {
entry, err = client.ExecCommandGetEntry(fromEntry)
require.NoError(t, err)
require.Equal(t, testEntries[2], TestEntry{}.Decode(entry.Data))
require.Equal(t, uint64(0), client.GetFromStream())
require.Equal(t, uint64(1304), client.GetTotalEntries())
require.True(t, client.IsStarted())
log.Debug("closing connection from the test")
client.CloseConnection()
}
32 changes: 15 additions & 17 deletions datastreamer/streamclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type StreamClient struct {
}

// NewClient creates a new data stream client
func NewClient(server string, streamType StreamType) (*StreamClient, error) {
func NewClient(server string, streamType StreamType) *StreamClient {
// Create the client data stream
c := StreamClient{
server: server,
Expand All @@ -69,17 +69,17 @@ func NewClient(server string, streamType StreamType) (*StreamClient, error) {
// Set default callback function to process entry
c.setProcessEntryFunc(PrintReceivedEntry, c.relayServer)

return &c, nil
return &c
}

// NewClientWithLogsConfig creates a new data stream client with logs configuration
func NewClientWithLogsConfig(server string, streamType StreamType, logsConfig log.Config) (*StreamClient, error) {
func NewClientWithLogsConfig(server string, streamType StreamType, logsConfig log.Config) *StreamClient {
log.Init(logsConfig)
return NewClient(server, streamType)
}

// Start connects to the data stream server and starts getting data from the server
func (c *StreamClient) Start() error {
func (c *StreamClient) Start() {
// Connect to server
c.connectServer()

Expand All @@ -91,8 +91,6 @@ func (c *StreamClient) Start() error {

// Flag stared
c.started = true

return nil
}

// connectServer waits until the server connection is established and returns if a command result is pending
Expand All @@ -116,7 +114,7 @@ func (c *StreamClient) connectServer() bool {
if c.streaming {
_, _, err = c.execCommand(CmdStart, true, c.nextEntry, nil)
if err != nil {
c.closeConnection()
c.CloseConnection()
time.Sleep(defaultTimeout)
continue
}
Expand All @@ -129,8 +127,8 @@ func (c *StreamClient) connectServer() bool {
return false
}

// closeConnection closes connection to the server
func (c *StreamClient) closeConnection() {
// CloseConnection closes connection to the server
func (c *StreamClient) CloseConnection() {
if c.conn != nil {
log.Infof("%s Close connection", c.ID)
c.conn.Close()
Expand Down Expand Up @@ -432,7 +430,7 @@ func (c *StreamClient) readResultEntry() (ResultEntry, error) {
if err != nil {
return e, err
}
// PrintResultEntry(e)
// e.PrintResultEntry()
return e, nil
}

Expand All @@ -453,7 +451,7 @@ func (c *StreamClient) readContent(buffer []byte) error {

// readEntries reads from the server all type of packets
func (c *StreamClient) readEntries() {
defer c.closeConnection()
defer c.CloseConnection()

for {
// Wait for connection
Expand All @@ -463,7 +461,7 @@ func (c *StreamClient) readEntries() {
packet := make([]byte, 1)
err := c.readContent(packet)
if err != nil {
c.closeConnection()
c.CloseConnection()
continue
}

Expand All @@ -473,7 +471,7 @@ func (c *StreamClient) readEntries() {
// Read result entry data
r, err := c.readResultEntry()
if err != nil {
c.closeConnection()
c.CloseConnection()
continue
}
// Send data to results channel
Expand All @@ -482,7 +480,7 @@ func (c *StreamClient) readEntries() {
if deferredResult {
r := c.getResult(CmdStart)
if r.errorNum != uint32(CmdErrOK) {
c.closeConnection()
c.CloseConnection()
time.Sleep(defaultTimeout)
continue
}
Expand All @@ -492,7 +490,7 @@ func (c *StreamClient) readEntries() {
// Read result entry data
r, err := c.readDataEntry()
if err != nil {
c.closeConnection()
c.CloseConnection()
continue
}
c.entryRsp <- r
Expand All @@ -501,7 +499,7 @@ func (c *StreamClient) readEntries() {
// Read header entry data
h, err := c.readHeaderEntry()
if err != nil {
c.closeConnection()
c.CloseConnection()
continue
}
// Send data to headers channel
Expand All @@ -511,7 +509,7 @@ func (c *StreamClient) readEntries() {
// Read file/stream entry data
e, err := c.readDataEntry()
if err != nil {
c.closeConnection()
c.CloseConnection()
continue
}
// Send data to stream entries channel
Expand Down
22 changes: 22 additions & 0 deletions datastreamer/streamfile_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package datastreamer_test

import (
"testing"

"github.com/0xPolygonHermez/zkevm-data-streamer/datastreamer"
"github.com/stretchr/testify/require"
)

func TestOpenFile(t *testing.T) {
fileName := "/tmp/datastreamer_test_3.bin"
err := deleteFiles(fileName)
require.NoError(t, err)
_, err = datastreamer.NewStreamFile(fileName, 1, 137, streamType)
require.NoError(t, err)

_, err = datastreamer.NewStreamFile(fileName, 1, 137, streamType)
require.NoError(t, err)

err = deleteFiles(fileName)
require.NoError(t, err)
}
12 changes: 2 additions & 10 deletions datastreamer/streamrelay.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,7 @@ func NewRelay(server string, port uint16, version uint8, systemID uint64,
var err error

// Create client side
r.client, err = NewClient(server, streamType)
if err != nil {
log.Errorf("Error creating relay client side: %v", err)
return nil, err
}
r.client = NewClient(server, streamType)

// Create server side
r.server, err = NewServer(port, version, systemID, streamType, fileName, writeTimeout,
Expand All @@ -43,11 +39,7 @@ func NewRelay(server string, port uint16, version uint8, systemID uint64,
// Start connects and syncs with master server then opens access to relay clients
func (r *StreamRelay) Start() error {
// Start client side
err := r.client.Start()
if err != nil {
log.Errorf("Error starting relay client: %v", err)
return err
}
r.client.Start()

// Get total entries from the master server
header, err := r.client.ExecCommandGetHeader()
Expand Down
58 changes: 58 additions & 0 deletions datastreamer/streamrelay_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package datastreamer_test

import (
"fmt"
"testing"
"time"

"github.com/0xPolygonHermez/zkevm-data-streamer/datastreamer"
"github.com/stretchr/testify/require"
)

func TestRelay(t *testing.T) {
fileName1 := "/tmp/datastreamer_test_0.bin"
fileName2 := "/tmp/datastreamer_test_1.bin"

err := deleteFiles(fileName1)
require.NoError(t, err)
err = deleteFiles(fileName2)
require.NoError(t, err)

streamServer, err := datastreamer.NewServer(6901, 1, 137, streamType,
fileName1, config.WriteTimeout, config.InactivityTimeout, 5*time.Second, &config.Log)
require.NoError(t, err)

err = streamServer.Start()
require.NoError(t, err)
err = streamServer.StartAtomicOp()
require.NoError(t, err)

entryNumber, err := streamServer.AddStreamBookmark(testBookmark.Encode())
require.NoError(t, err)
require.Equal(t, uint64(0), entryNumber)

entryNumber, err = streamServer.AddStreamEntry(entryType1, testEntries[1].Encode())
require.NoError(t, err)
require.Equal(t, uint64(1), entryNumber)

entryNumber, err = streamServer.AddStreamBookmark(testBookmark2.Encode())
require.NoError(t, err)
require.Equal(t, uint64(2), entryNumber)

entryNumber, err = streamServer.AddStreamEntry(entryType1, testEntries[2].Encode())
require.NoError(t, err)
require.Equal(t, uint64(3), entryNumber)

err = streamServer.CommitAtomicOp()
require.NoError(t, err)

var relayPort uint16 = 6902
sr, err := datastreamer.NewRelay(fmt.Sprintf("localhost:%d", 6901), relayPort, 1, 137, datastreamer.StreamType(1),
fileName2, config.WriteTimeout, config.InactivityTimeout, 5*time.Second, nil)
require.NoError(t, err)
err = sr.Start()
require.NoError(t, err)

client := datastreamer.NewClient(fmt.Sprintf("localhost:%d", relayPort), streamType)
client.Start()
}
2 changes: 1 addition & 1 deletion datastreamer/streamserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1263,7 +1263,7 @@ func DecodeBinaryToResultEntry(b []byte) (ResultEntry, error) {
}

// PrintResultEntry prints result entry type
func PrintResultEntry(e ResultEntry) {
func (e ResultEntry) PrintResultEntry() {
log.Debug("--- RESULT ENTRY -------------------------")
log.Debugf("packetType: [%d]", e.packetType)
log.Debugf("length: [%d]", e.length)
Expand Down
Loading