Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
xavirg authored Jan 22, 2025
2 parents 5680e22 + 96e7cd8 commit eab1701
Show file tree
Hide file tree
Showing 16 changed files with 850 additions and 93 deletions.
27 changes: 27 additions & 0 deletions .chloggen/netflow-receiver-implementation.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: netflowreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Adds the implementation of the netflow receiver

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [32732]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: The receiver now supports receiving NetFlow v5, NetFow v9, IPFIX, and sFlow v5 logs.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
81 changes: 48 additions & 33 deletions receiver/netflowreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ receivers:
port: 2055
sockets: 16
workers: 32
netflow/sflow:
- scheme: sflow
port: 6343
sockets: 16
workers: 32

processors:
batch:
Expand All @@ -45,7 +50,7 @@ exporters:
service:
pipelines:
logs:
receivers: [netflow]
receivers: [netflow, netflow/sflow]
processors: [batch]
exporters: [debug]
telemetry:
Expand All @@ -61,42 +66,52 @@ You would then configure your network devices to send netflow, sflow, or ipfix d
| Field | Description | Examples | Default |
|-------|-------------|--------| ------- |
| scheme | The type of flow data that to receive | `sflow`, `netflow`, `flow` | `netflow` |
| scheme | The type of flow data that to receive | `sflow`, `netflow` | `netflow` |
| hostname | The hostname or IP address to bind to | `localhost` | `0.0.0.0` |
| port | The port to bind to | `2055` or `6343` | `2055` |
| sockets | The number of sockets to use | 1 | 1 |
| workers | The number of workers used to decode incoming flow messages | 2 | 2 |
| queue_size | The size of the incoming netflow packets queue | 1000 | 1000000 |
| queue_size | The size of the incoming netflow packets queue, it will always be at least 1000. | 5000 | 1000 |

## Data format

The netflow data is standardized for the different schemas and is converted to OpenTelemetry logs following the [semantic conventions](https://opentelemetry.io/docs/specs/semconv/general/attributes/#server-client-and-shared-network-attributes)

The output will adhere the format:

```json
{
"destination": {
"address": "192.168.0.1",
"port": 22
},
"flow": {
"end": 1731073104662487000,
"sampler_address": "192.168.0.2",
"sequence_num": 49,
"start": 1731073077662487000,
"time_received": 1731073138662487000,
"type": "NETFLOW_V5"
},
"io": {
"bytes": 529,
"packets": 378
},
"source": {
"address": "192.168.0.3",
"port": 40
},
"transport": "TCP",
"type": "IPv4"
}
```
The netflow data is standardized for the different schemas and is converted to OpenTelemetry log records following the [semantic conventions](https://opentelemetry.io/docs/specs/semconv/general/attributes/#server-client-and-shared-network-attributes)

The log record will have the following attributes (with examples):

* **source.address**: Str(132.189.238.100)
* **source.port**: Int(1255)
* **destination.address**: Str(241.171.33.110)
* **destination.port**: Int(64744)
* **network.transport**: Str(tcp)
* **network.type**: Str(ipv4)
* **flow.io.bytes**: Int(853)
* **flow.io.packets**: Int(83)
* **flow.type**: Str(netflow_v5)
* **flow.sequence_num**: Int(191)
* **flow.time_received**: Int(1736309689918929427)
* **flow.start**: Int(1736309689830846400)
* **flow.end**: Int(1736309689871846400)
* **flow.sampling_rate**: Int(0)
* **flow.sampler_address**: Str(172.28.176.1)

The log record timestamps will be:

* **Observed timestamp**: The time the flow was received.
* **Timestamp**: The flow `start` field.

### Schema support

#### netflow

* Process [Template Records](https://www.cisco.com/en/US/technologies/tk648/tk362/technologies_white_paper09186a00800a3db9.html) if present
* Process Netflow V5, V9, and IPFIX messages
* Extract the attributes documented above
* Mapping of custom fields is not yet supported

#### sflow

* Process [sFlow version 5](https://sflow.org/sflow_version_5.txt) datagrams
* `flow_sample` and `flow_sample_expanded` are supported.
* `counter_sample` and `counter_sample_expanded` are NOT yet supported.
* Mapping of custom fields is not yet supported
4 changes: 2 additions & 2 deletions receiver/netflowreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type Config struct {

// Validate checks if the receiver configuration is valid
func (cfg *Config) Validate() error {
validSchemes := [3]string{"sflow", "netflow", "flow"}
validSchemes := [2]string{"sflow", "netflow"}

validScheme := false
for _, scheme := range validSchemes {
Expand All @@ -42,7 +42,7 @@ func (cfg *Config) Validate() error {
}
}
if !validScheme {
return fmt.Errorf("scheme must be one of sflow, netflow, or flow")
return fmt.Errorf("scheme must be netflow or sflow")
}

if cfg.Sockets <= 0 {
Expand Down
32 changes: 30 additions & 2 deletions receiver/netflowreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,27 @@ func TestLoadConfig(t *testing.T) {
Port: 2055,
Sockets: 1,
Workers: 1,
QueueSize: 1000000,
QueueSize: 1000,
},
},
{
id: component.NewIDWithName(metadata.Type, "zero_queue"),
expected: &Config{
Scheme: "netflow",
Port: 2055,
Sockets: 1,
Workers: 1,
QueueSize: 1000,
},
},
{
id: component.NewIDWithName(metadata.Type, "sflow"),
expected: &Config{
Scheme: "sflow",
Port: 6343,
Sockets: 1,
Workers: 1,
QueueSize: 1000,
},
},
}
Expand Down Expand Up @@ -68,12 +88,20 @@ func TestInvalidConfig(t *testing.T) {
}{
{
id: component.NewIDWithName(metadata.Type, "invalid_schema"),
err: "scheme must be one of sflow, netflow, or flow",
err: "scheme must be netflow or sflow",
},
{
id: component.NewIDWithName(metadata.Type, "invalid_port"),
err: "port must be greater than 0",
},
{
id: component.NewIDWithName(metadata.Type, "zero_sockets"),
err: "sockets must be greater than 0",
},
{
id: component.NewIDWithName(metadata.Type, "zero_workers"),
err: "workers must be greater than 0",
},
}

for _, tt := range tests {
Expand Down
24 changes: 15 additions & 9 deletions receiver/netflowreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@ import (
)

const (
defaultSockets = 1
defaultWorkers = 2
defaultQueueSize = 1_000_000
defaultSockets = 1
defaultWorkers = 2
// The default UDP packet buffer size in GoFlow2 is 9000 bytes, which means
// that for a full queue of 1000 messages, the size in memory will be 9MB.
// Source: /~https://github.com/netsampler/goflow2/blob/v2.2.1/README.md#security-notes-and-assumptions
defaultQueueSize = 1_000
)

// NewFactory creates a factory for netflow receiver.
Expand All @@ -27,6 +30,8 @@ func NewFactory() receiver.Factory {
receiver.WithLogs(createLogsReceiver, metadata.LogsStability))
}

// Config defines configuration for netflow receiver.
// By default we listen for netflow traffic on port 2055
func createDefaultConfig() component.Config {
return &Config{
Scheme: "netflow",
Expand All @@ -37,14 +42,15 @@ func createDefaultConfig() component.Config {
}
}

// createLogsReceiver creates a netflow receiver.
// We also create the UDP receiver, which is the piece of software that actually listens
// for incoming netflow traffic on an UDP port.
func createLogsReceiver(_ context.Context, params receiver.Settings, cfg component.Config, consumer consumer.Logs) (receiver.Logs, error) {
logger := params.Logger
conf := cfg.(*Config)
conf := *(cfg.(*Config))

nr := &netflowReceiver{
logger: logger,
logConsumer: consumer,
config: conf,
nr, err := newNetflowLogsReceiver(params, conf, consumer)
if err != nil {
return nil, err
}

return nr, nil
Expand Down
5 changes: 4 additions & 1 deletion receiver/netflowreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/netflo
go 1.22.0

require (
github.com/netsampler/goflow2/v2 v2.2.1
github.com/stretchr/testify v1.10.0
go.opentelemetry.io/collector/component v0.118.1-0.20250121185328-fbefb22cc2b3
go.opentelemetry.io/collector/component/componenttest v0.118.1-0.20250121185328-fbefb22cc2b3
go.opentelemetry.io/collector/confmap v1.24.1-0.20250121185328-fbefb22cc2b3
go.opentelemetry.io/collector/consumer v1.24.1-0.20250121185328-fbefb22cc2b3
go.opentelemetry.io/collector/consumer/consumertest v0.118.1-0.20250121185328-fbefb22cc2b3
go.opentelemetry.io/collector/pdata v1.24.1-0.20250121185328-fbefb22cc2b3
go.opentelemetry.io/collector/receiver v0.118.1-0.20250121185328-fbefb22cc2b3
go.opentelemetry.io/collector/receiver/receivertest v0.118.1-0.20250121185328-fbefb22cc2b3
go.opentelemetry.io/collector/semconv v0.118.1-0.20250121185328-fbefb22cc2b3
go.uber.org/goleak v1.3.0
go.uber.org/zap v1.27.0
)
Expand All @@ -26,6 +29,7 @@ require (
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
github.com/knadh/koanf/v2 v2.1.2 // indirect
github.com/libp2p/go-reuseport v0.4.0 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
Expand All @@ -35,7 +39,6 @@ require (
go.opentelemetry.io/collector/config/configtelemetry v0.118.1-0.20250121185328-fbefb22cc2b3 // indirect
go.opentelemetry.io/collector/consumer/consumererror v0.118.1-0.20250121185328-fbefb22cc2b3 // indirect
go.opentelemetry.io/collector/consumer/xconsumer v0.118.1-0.20250121185328-fbefb22cc2b3 // indirect
go.opentelemetry.io/collector/pdata v1.24.1-0.20250121185328-fbefb22cc2b3 // indirect
go.opentelemetry.io/collector/pdata/pprofile v0.118.1-0.20250121185328-fbefb22cc2b3 // indirect
go.opentelemetry.io/collector/pipeline v0.118.1-0.20250121185328-fbefb22cc2b3 // indirect
go.opentelemetry.io/collector/receiver/xreceiver v0.118.1-0.20250121185328-fbefb22cc2b3 // indirect
Expand Down
6 changes: 6 additions & 0 deletions receiver/netflowreceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 0 additions & 11 deletions receiver/netflowreceiver/listener.go

This file was deleted.

27 changes: 0 additions & 27 deletions receiver/netflowreceiver/listener_test.go

This file was deleted.

Loading

0 comments on commit eab1701

Please sign in to comment.