Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,6 @@ docs/site/
# committed for packages, but should be committed for applications that require a static
# environment.
Manifest.toml

benchmark/Manifest.toml
benchmark/random_data.arrow
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name = "StreamSampling"
uuid = "ff63dad9-3335-55d8-95ec-f8139d39e468"
version = "0.7.6"
version = "0.8.0"

[deps]
Accessors = "7d9f7c33-5ae7-4f3b-8dc6-eff91059b697"
Expand Down
27 changes: 15 additions & 12 deletions benchmark/Project.toml
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@

[deps]
StreamSampling = "ff63dad9-3335-55d8-95ec-f8139d39e468"
CairoMakie = "13f3f980-e62b-5c42-98c6-ff1f3baf88f0"
ChunkSplitter = "ae650224-84b6-46f8-82ea-d812ca08434e"
PyCall = "438e738f-606a-5dbb-bf0a-cddfbfd45ab0"
Arrow = "69666777-d1a9-59fb-9406-91d4454c9d45"
BenchmarkTools = "6e4b80f9-dd63-53aa-95a3-0cdb28fa8baf"
StatsBase = "2913bbd2-ae8a-5f71-8c99-4fb6c76f3a91"
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
ChunkSplitters = "ae650224-84b6-46f8-82ea-d812ca08434e"
Distributions = "31c24e10-a181-5473-b8eb-7969acd0382f"
PyCall = "438e738f-606a-5dbb-bf0a-cddfbfd45ab0"
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
StatsBase = "2913bbd2-ae8a-5f71-8c99-4fb6c76f3a91"
StreamSampling = "ff63dad9-3335-55d8-95ec-f8139d39e468"
CairoMakie = "13f3f980-e62b-5c42-98c6-ff1f3baf88f0"
ThreadPinning = "811555cd-349b-4f26-b7bc-1f208b848042"

[compat]
StreamSampling = "0.4"
Arrow = "2"
BenchmarkTools = "1"
ChunkSplitters = "3"
Distributions = "0.25"
PyCall = "1"
Random = "1"
StatsBase = "0.34"
CairoMakie = "0.12"
PyCall = "1.96"
BenchmarkTools = "1.6"
ChunkSplitter = "3"
StreamSampling = "0.8"
CairoMakie = "0.15"
ThreadPinning = "1"
26 changes: 14 additions & 12 deletions benchmark/benchmark_comparison_stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ end
function strsamplesum(rng, stream, wf, n, alg, W=nothing)
W == nothing && (W = sum(wf(x) for x in stream))
st = if alg in (AlgD(), AlgORDSWR())
StreamSampler{Int}(rng, stream, n, W, alg)
SequentialSampler{Int}(rng, stream, n, W, alg)
else
StreamSampler{Int}(rng, stream, w, n, W, alg)
SequentialSampler{Int}(rng, stream, w, n, W, alg)
end
return sum(st)
end
Expand Down Expand Up @@ -96,11 +96,11 @@ end

using CairoMakie

f = Figure(fontsize = 9,);
f = Figure(fontsize = 9, size = (600, 600));
axs = [Axis(f[i, j], yscale = log10, xscale = log10, xgridstyle = :dot,
ygridstyle = :dot) for i in 1:4 for j in 1:2];
ygridstyle = :dot, titlesize=13, xlabelsize=10, ylabelsize=10) for i in 1:4 for j in 1:2];

labels = ("population", "reservoir", "stream", "stream - one pass" )
labels = ("population", "reservoir", "sequential", "sequential - one pass" )

markers = (:circle, :rect, :utriangle, :xcross)
a, b = 0, 0
Expand All @@ -126,8 +126,14 @@ for j in 1:8
j in (1, 2, 5, 6) && hidexdecorations!(axs[j], grid = false)
end

for i in 1:8
axs[i].yticks = LogTicks(WilkinsonTicks(4, k_min=4, k_max=6))
for i in (1, 2, 5, 6)
axs[i].yticks = ([1e1, 1e2, 1e3, 1e4, 1e5], ["10¹", "10²", "10³", "10⁴", "10⁵"])
ylims!(axs[i], 1e1, 1e5)
end

for i in (3, 4, 7, 8)
axs[i].yticks = ([1e-1, 1e0, 1e1, 1e2, 1e3, 1e4], ["10⁻¹", "10⁰", "10¹", "10²", "10³", "10⁴"])
ylims!(axs[i], 1e-1, 1e4)
end

linkyaxes!((axs[i] for i in [1,2,5,6])...)
Expand All @@ -140,12 +146,8 @@ for i in [3,4]
axs[i].xticklabelsvisible = false
end


f[5, 1] = Legend(f, axs[1], framevisible = false, orientation = :horizontal,
halign = :center, padding=(248,0,0,0))

Label(f[0, :], "Performance of Sampling Algorithms on Iterators", fontsize = 13,
font=:bold)
halign = :center, padding=(248,0,0,0), labelsize=10)

f

Expand Down
38 changes: 28 additions & 10 deletions benchmark/benchmark_ondisk.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ const totaltpl = 10^11÷32 #100GB!
const chunktpl = totaltpl ÷ 100
const numchunks = ceil(Int, totaltpl / chunktpl)

function drop_page_cache()
run(`sync`)
run(`sudo sh -c "echo 3 > /proc/sys/vm/drop_caches"`)
end

function generate_file(filename)
for i in 1:numchunks
starttpl, endtpl = (i-1)*chunktpl+1, min(i*chunktpl, totaltpl)
Expand Down Expand Up @@ -83,7 +88,7 @@ wf(d) = d[end]
function sample_file_st(data, rng, n, alg)
W = sum(x[end] for x in data)
s = Vector{dtype}(undef, n)
@inbounds for (i, d) in enumerate(StreamSampler{dtype}(rng, data, wf, n, W, alg))
@inbounds for (i, d) in enumerate(SequentialSampler{dtype}(rng, data, wf, n, W, alg))
s[i] = d
end
return s
Expand All @@ -93,8 +98,7 @@ function psample_file_st(data, rngs, n, alg)
weights = Vector{Float64}(undef, Threads.nthreads())
Threads.@threads for (i,c) in enumerate(chunks(1:length(data), n=Threads.nthreads()))
W = sum((@inbounds data[j][end]) for j in c)
st_data = ((@inbounds data[j]) for j in c)
samples[i] = collect(StreamSampler{dtype}(rngs[i], @view(data[c]), wf, n, W, alg))
samples[i] = collect(SequentialSampler{dtype}(rngs[i], @view(data[c]), wf, n, W, alg))
weights[i] = W
end
return combine(rngs, samples, weights)
Expand All @@ -117,22 +121,35 @@ precompile(sample_file_st, typeof.((data, rng, n, AlgORDWSWR())))
precompile(psample_file_st, typeof.((data, rngs, n, AlgORDWSWR())))

times = []
drop_page_cache()
for n in (totaltpl ÷ 100000, totaltpl ÷ 10000, totaltpl ÷ 1000, totaltpl ÷ 100)


if n != totaltpl ÷ 100
t1 = @elapsed sample_file_pop(data, rng, n);
drop_page_cache()

t2 = @elapsed psample_file_pop(data, rngs, n);
drop_page_cache()
else
t1 = nothing
t2 = nothing
end

t3 = @elapsed sample_file_st(data, rng, n, AlgORDWSWR());
drop_page_cache()

t4 = @elapsed psample_file_st(data, rngs, n, AlgORDWSWR());
drop_page_cache()

t5 = @elapsed sample_file_rs(data, rng, n, AlgWRSWRSKIP());
drop_page_cache()

t6 = @elapsed psample_file_rs(data, rngs, n, AlgWRSWRSKIP());
drop_page_cache()

push!(times, [t1, t2, t3, t4, t5, t6])
println(times)
end
times = hcat(times...)

Expand All @@ -142,17 +159,18 @@ x = 1:4
xtick_positions = [1,2,3,4]
xtick_labels = ["0.001%","0.01%","0.1%","1%"]

algonames = ["chunks", "chunks (4 threads)", "stream", "stream (4 threads)",
algonames = ["chunks", "chunks (4 threads)", "sequential", "sequential (4 threads)",
"reservoir", "reservoir (4 threads)",]
markers = [:circle, :rect, :utriangle, :hexagon, :diamond, :xcross]

fig = Figure();
ax = Axis(fig[1, 1]; xlabel = "sample size", ylabel = "time (s)",
title = "Sampling Performance on Persistent Data",
xticks = (xtick_positions, xtick_labels),
title = "Sampling Performance from On-Disk Data", titlesize = 16,
xticks = (xtick_positions, xtick_labels),
yticks = 0:50:300,
xgridstyle = :dot, ygridstyle = :dot,
xticklabelsize = 10, yticklabelsize = 10,
xlabelsize = 12, ylabelsize = 12
xticklabelsize = 11, yticklabelsize = 11,
xlabelsize = 14, ylabelsize = 14
)

for i in 1:size(times, 1)
Expand All @@ -164,9 +182,9 @@ for i in 1:size(times, 1)
linewidth = 2,)
end

ylims!(low=0, high = 250)
ylims!(low=0, high = 300)
fig[2, 1] = Legend(fig, ax, framevisible = false, orientation = :horizontal,
halign = :center, nbanks=2, fontsize=10)

fig
save("comparison_ondisk_algs.svg", fig)
save("comparison_ondisk_algs.pdf", fig)
Binary file added benchmark/comparison_ondisk_algs.pdf
Binary file not shown.
Binary file added benchmark/comparison_stream_algs.pdf
Binary file not shown.
1 change: 0 additions & 1 deletion docs/src/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

```@docs
ReservoirSampler
StreamSampler
SequentialSampler
```

Expand Down
19 changes: 10 additions & 9 deletions docs/src/basics.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@

# Overview of the functionalities

The `itsample` function allows to consume all the stream at once and return the sample collected:
The `itsample` function allows to consume all the stream at once and return the sample
collected:

```@example 1
using StreamSampling
Expand All @@ -11,8 +12,8 @@ st = 1:100;
itsample(st, 5)
```

In some cases, one needs to control the updates the `ReservoirSampler` will be subject to. In this case
you can simply use the `fit!` function to update the reservoir:
In some cases, one needs to control the updates the `ReservoirSampler` will be subject to.
In this case, you can simply use the `fit!` function to update the reservoir:

```@example 1
st = 1:100;
Expand All @@ -26,13 +27,13 @@ end
value(rs)
```

If the total number of elements in the stream is known beforehand and the sampling is unweighted, it is
also possible to iterate over a `StreamSampler` like so
If the total number of elements in the stream is known beforehand and the sampling
is unweighted, it is also possible to iterate over a `SequentialSampler` like so

```@example 1
st = 1:100;

ss = StreamSampler{Int}(st, 5, 100);
ss = SequentialSampler{Int}(st, 5, 100);

r = Int[];

Expand All @@ -43,6 +44,6 @@ end
r
```

The advantage of `StreamSampler` iterators in respect to `ReservoirSampler` is that they require `O(1)`
memory if not collected, while reservoir techniques require `O(k)` memory where `k` is the number
of elements in the sample.
The advantage of `SequentialSampler` iterators in respect to `ReservoirSampler`
is that they require `O(1)` memory if not collected, while reservoir techniques
require `O(k)` memory where `k` is the number of elements in the sample.
2 changes: 1 addition & 1 deletion docs/src/benchmark.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ The iterator used is a filtered generator which creates an integer range between
benchmark more accurately mimic a somewhat realistic iterator, on which the methods could be actually used in practice.

The “population” methods use `StatsBase.sample` and consider collecting the iterator in memory as part of the benchmark. The reservoir and stream
methods use instead `ReservoirSampler` and `StreamSampler` of this package.
methods use instead `ReservoirSampler` and `SequentialSampler` of this package.

The code to reproduce the results is at [StreamSampling.jl/benchmark/benchmark_comparison_stream.jl](https://github.com/JuliaDynamics/StreamSampling.jl/blob/main/benchmark/benchmark_comparison_stream.jl).

Expand Down
11 changes: 5 additions & 6 deletions docs/src/example.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,11 @@ rngs = [Xoshiro(i) for i in 1:Threads.nthreads()]
5.868995 seconds (175.91 k allocations: 3.288 GiB, 6.44% gc time, 64714 lock conflicts)
```

As you can see, the speed-up is not linear in the number of threads for an hdf5 file. This is
mainly due to the fact that accessing the chunks is single-threaded, so one would need to use
`MPI.jl` as explained at [HDF5.jl/stable/mpi/](https://juliaio.github.io/HDF5.jl/stable/mpi/) to
improve the multi-threading performance. Though, we are already sampling at 500MB/s, which is not bad!
Using `Arrow.jl` gives an even better performance, and a scalability which is better than
linear somehow, reaching a 2GB/s sampling speed!
As you can see, the speed-up is not linear in the number of threads for an hdf5 file.
This is mainly due to the fact that accessing the chunks is single-threaded, so one would
need to use `MPI.jl` as explained at [HDF5.jl/stable/mpi/](https://juliaio.github.io/HDF5.jl/stable/mpi/) to improve the multi-threading performance. Though, we are already sampling at 500MB/s,
which is not bad! Using `Arrow.jl` gives an even better performance, and a scalability which
is better than linear somehow, reaching a 2GB/s sampling speed!

## Monitoring

Expand Down
Loading
Loading