Skip to content

Commit

Permalink
[full persistence] stateful reduce (#7954)
Browse files Browse the repository at this point in the history
Co-authored-by: Sergey <sergey@pathway.com>
GitOrigin-RevId: c771e0777582518194f3f448ffeb25258b2a4f05
  • Loading branch information
2 people authored and Manul from Pathway committed Jan 8, 2025
1 parent 5409db8 commit 009e885
Show file tree
Hide file tree
Showing 3 changed files with 447 additions and 254 deletions.
78 changes: 76 additions & 2 deletions python/pathway/tests/test_persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,11 @@ def run_computation(inputs, expected):
persistence_mode=mode,
)
)
result = consolidate(pd.read_csv(output_path))
assert set(combine_columns(result)) == expected
try:
result = combine_columns(consolidate(pd.read_csv(output_path)))
except pd.errors.EmptyDataError:
result = pd.Series([])
assert set(result) == expected

return run_computation, input_path

Expand Down Expand Up @@ -566,3 +569,74 @@ def logic(t_1: pw.Table) -> pw.Table:
os.remove(input_path / "2")
run(["a,b"], {"1,1,3,3,1", "1,2,4,3,-1"})
run(["a,b", "2,0"], {"2,2,9,5,-1", "2,3,9,5,1"})


@pytest.mark.parametrize(
"mode", [api.PersistenceMode.OPERATOR_PERSISTING]
) # can't use api.PersistenceMode.PERSISTING because it is not compatible with stateful_reduce
def test_groupby_2(tmp_path, mode):
class InputSchema(pw.Schema):
a: int
b: int = pw.column_definition(primary_key=True)

# ignore below because stateful_many doesn't allow better narrower types in annotation
@pw.reducers.stateful_many # type: ignore
def count_max(
state: tuple[int, int] | None, rows: list[tuple[list[int], int]]
) -> tuple[int, int] | None:
m = None
for row, cnt in rows:
value = row[0]
if m is None or value > m:
m = value
assert m is not None # rows are non-empty
if state is None:
state = (1, m)
elif state[1] < m:
state = (state[0] + 1, m)
return state

def logic(t_1: pw.Table) -> pw.Table:
return t_1.groupby(pw.this.a).reduce(
pw.this.a,
e=pw.reducers.earliest(pw.this.b),
l=pw.reducers.latest(pw.this.b),
s=count_max(pw.this.b)[0],
)

run, _ = get_one_table_runner(tmp_path, mode, logic, InputSchema)

run(["a,b", "1,3", "2,4"], {"1,3,3,1,1", "2,4,4,1,1"})
time.sleep(2)
run(["a,b", "1,5", "1,0"], {"1,3,3,1,-1", "1,3,5,2,1"})
time.sleep(2)
run(["a,b", "2,6"], {"2,4,4,1,-1", "2,4,6,2,1"})
time.sleep(2)
run(["a,b", "2,8"], {"2,4,6,2,-1", "2,4,8,3,1"})


@pytest.mark.parametrize(
"mode",
[api.PersistenceMode.PERSISTING, api.PersistenceMode.OPERATOR_PERSISTING],
)
@pytest.mark.parametrize("persistent_id", [None, "ded"])
def test_deduplicate(tmp_path, mode, persistent_id):
class InputSchema(pw.Schema):
a: int

def acceptor(new_value, old_value) -> bool:
return new_value > old_value + 2

def logic(t_1: pw.Table) -> pw.Table:
return t_1.deduplicate(
value=pw.this.a, acceptor=acceptor, persistent_id=persistent_id
)

run, _ = get_one_table_runner(tmp_path, mode, logic, InputSchema)

run(["a", "1"], {"1,1"})
run(["a", "2"], set())
run(["a", "4"], {"1,-1", "4,1"})
run(["a", "6"], set())
run(["a", "3"], set())
run(["a", "7"], {"4,-1", "7,1"})
Loading

0 comments on commit 009e885

Please sign in to comment.