-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathstream.ex
98 lines (79 loc) · 3.11 KB
/
stream.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
defmodule KafkaClient.Consumer.Stream do
@moduledoc """
Kafka consumer as Elixir stream.
"""
alias KafkaClient.Consumer.Poller
@doc """
Creates a stream of Kafka poller notifications.
This function takes the same options as `Poller.start_link/1` (except the `:processor` option).
It starts the poller as the direct child of the current process, and creates a stream of poller
notifications of the type `t:Poller.notification/0`, such as `{:assigned, partitions}`,
`{:record, record}`, ...
This is an infinite stream that never stops on its own. If the client wants to stop the stream,
it can use the standard `Stream` and `Enum` functions, such as `take`, `take_while`, or
`reduce_while`. After the stream is closed the poller process is stopped.
The client code is responsible for acknowledging when a record is processed. I.e. for every
`{:record, record}` element the client must invoke `Poller.ack/1`. Failing to do so will cause
the stream to hang, once all the buffers are full.
Refer to the `Poller` documentation for explanation of the common behaviour, such as load
control, or telemetry.
Example:
# Anonymous consuming (no consumer group)
KafkaClient.Consumer.Stream.new([group_id: nil, ...])
# Stop once all the records have been processed. For this to work, it is important to ack each record.
|> Stream.take_while(&(&1 != :caught_up))
# Take only the record notifications (i.e. ignore assigned/unassigned).
|> Stream.filter(&match?({:record, record}, &1))
# Process each record
|> Enum.each(fn {:record, record} ->
do_something_with(record)
# don't forget to ack after the record is processed
KafkaClient.Consumer.Poller.ack(record)
end)
If you're fine with sending the ack as soon as the record is received, you can wrap this in a
helper function:
def acked_records(opts) do
opts
|> KafkaClient.Consumer.Stream.new()
|> Stream.take_while(&(&1 != :caught_up))
|> Stream.filter(&match?({:record, record}, &1))
|> Stream.map(fn {:record, record} -> record end)
|> Stream.each(&KafkaClient.Consumer.Poller.ack/1)
end
"""
@spec new([Poller.option()]) :: Enumerable.t()
def new(opts) do
Stream.resource(
fn -> start_poller(opts) end,
&next_notification/1,
&stop_poller/1
)
end
defp start_poller(opts) do
opts = Keyword.put(opts, :processor, self())
{:ok, poller} = Poller.start_link(opts)
mref = Process.monitor(poller)
{poller, mref}
end
defp next_notification({poller, mref}) do
receive do
{^poller, notification} ->
with {:record, record} <- notification, do: Poller.started_processing(record)
{[notification], {poller, mref}}
{:DOWN, _mref, :process, ^poller, reason} ->
exit(reason)
end
end
defp stop_poller({poller, mref}) do
Process.demonitor(mref, [:flush])
Poller.stop(poller)
flush_messages(poller)
end
defp flush_messages(poller) do
receive do
{^poller, _notification} -> flush_messages(poller)
after
0 -> :ok
end
end
end