diff --git a/.gitignore b/.gitignore index 29126e4..4beb294 100644 --- a/.gitignore +++ b/.gitignore @@ -22,3 +22,6 @@ docs/site/ # committed for packages, but should be committed for applications that require a static # environment. Manifest.toml + +benchmark/Manifest.toml +benchmark/random_data.arrow diff --git a/Project.toml b/Project.toml index 96840e2..e517c7c 100644 --- a/Project.toml +++ b/Project.toml @@ -1,6 +1,6 @@ name = "StreamSampling" uuid = "ff63dad9-3335-55d8-95ec-f8139d39e468" -version = "0.7.6" +version = "0.8.0" [deps] Accessors = "7d9f7c33-5ae7-4f3b-8dc6-eff91059b697" diff --git a/benchmark/Project.toml b/benchmark/Project.toml index 922d5aa..bf9d595 100644 --- a/benchmark/Project.toml +++ b/benchmark/Project.toml @@ -1,20 +1,23 @@ - [deps] -StreamSampling = "ff63dad9-3335-55d8-95ec-f8139d39e468" -CairoMakie = "13f3f980-e62b-5c42-98c6-ff1f3baf88f0" -ChunkSplitter = "ae650224-84b6-46f8-82ea-d812ca08434e" -PyCall = "438e738f-606a-5dbb-bf0a-cddfbfd45ab0" +Arrow = "69666777-d1a9-59fb-9406-91d4454c9d45" BenchmarkTools = "6e4b80f9-dd63-53aa-95a3-0cdb28fa8baf" -StatsBase = "2913bbd2-ae8a-5f71-8c99-4fb6c76f3a91" -Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" +ChunkSplitters = "ae650224-84b6-46f8-82ea-d812ca08434e" Distributions = "31c24e10-a181-5473-b8eb-7969acd0382f" +PyCall = "438e738f-606a-5dbb-bf0a-cddfbfd45ab0" +Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" +StatsBase = "2913bbd2-ae8a-5f71-8c99-4fb6c76f3a91" +StreamSampling = "ff63dad9-3335-55d8-95ec-f8139d39e468" +CairoMakie = "13f3f980-e62b-5c42-98c6-ff1f3baf88f0" +ThreadPinning = "811555cd-349b-4f26-b7bc-1f208b848042" [compat] -StreamSampling = "0.4" +Arrow = "2" +BenchmarkTools = "1" +ChunkSplitters = "3" Distributions = "0.25" +PyCall = "1" Random = "1" StatsBase = "0.34" -CairoMakie = "0.12" -PyCall = "1.96" -BenchmarkTools = "1.6" -ChunkSplitter = "3" +StreamSampling = "0.8" +CairoMakie = "0.15" +ThreadPinning = "1" diff --git a/benchmark/benchmark_comparison_stream.jl b/benchmark/benchmark_comparison_stream.jl index 6a92d68..698358d 100644 --- a/benchmark/benchmark_comparison_stream.jl +++ b/benchmark/benchmark_comparison_stream.jl @@ -29,9 +29,9 @@ end function strsamplesum(rng, stream, wf, n, alg, W=nothing) W == nothing && (W = sum(wf(x) for x in stream)) st = if alg in (AlgD(), AlgORDSWR()) - StreamSampler{Int}(rng, stream, n, W, alg) + SequentialSampler{Int}(rng, stream, n, W, alg) else - StreamSampler{Int}(rng, stream, w, n, W, alg) + SequentialSampler{Int}(rng, stream, w, n, W, alg) end return sum(st) end @@ -96,11 +96,11 @@ end using CairoMakie -f = Figure(fontsize = 9,); +f = Figure(fontsize = 9, size = (600, 600)); axs = [Axis(f[i, j], yscale = log10, xscale = log10, xgridstyle = :dot, - ygridstyle = :dot) for i in 1:4 for j in 1:2]; + ygridstyle = :dot, titlesize=13, xlabelsize=10, ylabelsize=10) for i in 1:4 for j in 1:2]; -labels = ("population", "reservoir", "stream", "stream - one pass" ) +labels = ("population", "reservoir", "sequential", "sequential - one pass" ) markers = (:circle, :rect, :utriangle, :xcross) a, b = 0, 0 @@ -126,8 +126,14 @@ for j in 1:8 j in (1, 2, 5, 6) && hidexdecorations!(axs[j], grid = false) end -for i in 1:8 - axs[i].yticks = LogTicks(WilkinsonTicks(4, k_min=4, k_max=6)) +for i in (1, 2, 5, 6) + axs[i].yticks = ([1e1, 1e2, 1e3, 1e4, 1e5], ["10¹", "10²", "10³", "10⁴", "10⁵"]) + ylims!(axs[i], 1e1, 1e5) +end + +for i in (3, 4, 7, 8) + axs[i].yticks = ([1e-1, 1e0, 1e1, 1e2, 1e3, 1e4], ["10⁻¹", "10⁰", "10¹", "10²", "10³", "10⁴"]) + ylims!(axs[i], 1e-1, 1e4) end linkyaxes!((axs[i] for i in [1,2,5,6])...) @@ -140,12 +146,8 @@ for i in [3,4] axs[i].xticklabelsvisible = false end - f[5, 1] = Legend(f, axs[1], framevisible = false, orientation = :horizontal, - halign = :center, padding=(248,0,0,0)) - -Label(f[0, :], "Performance of Sampling Algorithms on Iterators", fontsize = 13, - font=:bold) + halign = :center, padding=(248,0,0,0), labelsize=10) f diff --git a/benchmark/benchmark_ondisk.jl b/benchmark/benchmark_ondisk.jl index cfde840..bb002ac 100644 --- a/benchmark/benchmark_ondisk.jl +++ b/benchmark/benchmark_ondisk.jl @@ -10,6 +10,11 @@ const totaltpl = 10^11÷32 #100GB! const chunktpl = totaltpl ÷ 100 const numchunks = ceil(Int, totaltpl / chunktpl) +function drop_page_cache() + run(`sync`) + run(`sudo sh -c "echo 3 > /proc/sys/vm/drop_caches"`) +end + function generate_file(filename) for i in 1:numchunks starttpl, endtpl = (i-1)*chunktpl+1, min(i*chunktpl, totaltpl) @@ -83,7 +88,7 @@ wf(d) = d[end] function sample_file_st(data, rng, n, alg) W = sum(x[end] for x in data) s = Vector{dtype}(undef, n) - @inbounds for (i, d) in enumerate(StreamSampler{dtype}(rng, data, wf, n, W, alg)) + @inbounds for (i, d) in enumerate(SequentialSampler{dtype}(rng, data, wf, n, W, alg)) s[i] = d end return s @@ -93,8 +98,7 @@ function psample_file_st(data, rngs, n, alg) weights = Vector{Float64}(undef, Threads.nthreads()) Threads.@threads for (i,c) in enumerate(chunks(1:length(data), n=Threads.nthreads())) W = sum((@inbounds data[j][end]) for j in c) - st_data = ((@inbounds data[j]) for j in c) - samples[i] = collect(StreamSampler{dtype}(rngs[i], @view(data[c]), wf, n, W, alg)) + samples[i] = collect(SequentialSampler{dtype}(rngs[i], @view(data[c]), wf, n, W, alg)) weights[i] = W end return combine(rngs, samples, weights) @@ -117,22 +121,35 @@ precompile(sample_file_st, typeof.((data, rng, n, AlgORDWSWR()))) precompile(psample_file_st, typeof.((data, rngs, n, AlgORDWSWR()))) times = [] +drop_page_cache() for n in (totaltpl ÷ 100000, totaltpl ÷ 10000, totaltpl ÷ 1000, totaltpl ÷ 100) + if n != totaltpl ÷ 100 t1 = @elapsed sample_file_pop(data, rng, n); + drop_page_cache() + t2 = @elapsed psample_file_pop(data, rngs, n); + drop_page_cache() else t1 = nothing t2 = nothing end + t3 = @elapsed sample_file_st(data, rng, n, AlgORDWSWR()); + drop_page_cache() + t4 = @elapsed psample_file_st(data, rngs, n, AlgORDWSWR()); + drop_page_cache() t5 = @elapsed sample_file_rs(data, rng, n, AlgWRSWRSKIP()); + drop_page_cache() + t6 = @elapsed psample_file_rs(data, rngs, n, AlgWRSWRSKIP()); + drop_page_cache() push!(times, [t1, t2, t3, t4, t5, t6]) + println(times) end times = hcat(times...) @@ -142,17 +159,18 @@ x = 1:4 xtick_positions = [1,2,3,4] xtick_labels = ["0.001%","0.01%","0.1%","1%"] -algonames = ["chunks", "chunks (4 threads)", "stream", "stream (4 threads)", +algonames = ["chunks", "chunks (4 threads)", "sequential", "sequential (4 threads)", "reservoir", "reservoir (4 threads)",] markers = [:circle, :rect, :utriangle, :hexagon, :diamond, :xcross] fig = Figure(); ax = Axis(fig[1, 1]; xlabel = "sample size", ylabel = "time (s)", - title = "Sampling Performance on Persistent Data", - xticks = (xtick_positions, xtick_labels), + title = "Sampling Performance from On-Disk Data", titlesize = 16, + xticks = (xtick_positions, xtick_labels), + yticks = 0:50:300, xgridstyle = :dot, ygridstyle = :dot, - xticklabelsize = 10, yticklabelsize = 10, - xlabelsize = 12, ylabelsize = 12 + xticklabelsize = 11, yticklabelsize = 11, + xlabelsize = 14, ylabelsize = 14 ) for i in 1:size(times, 1) @@ -164,9 +182,9 @@ for i in 1:size(times, 1) linewidth = 2,) end -ylims!(low=0, high = 250) +ylims!(low=0, high = 300) fig[2, 1] = Legend(fig, ax, framevisible = false, orientation = :horizontal, halign = :center, nbanks=2, fontsize=10) fig -save("comparison_ondisk_algs.svg", fig) +save("comparison_ondisk_algs.pdf", fig) diff --git a/benchmark/comparison_ondisk_algs.pdf b/benchmark/comparison_ondisk_algs.pdf new file mode 100644 index 0000000..bd37321 Binary files /dev/null and b/benchmark/comparison_ondisk_algs.pdf differ diff --git a/benchmark/comparison_stream_algs.pdf b/benchmark/comparison_stream_algs.pdf new file mode 100644 index 0000000..4f0dd19 Binary files /dev/null and b/benchmark/comparison_stream_algs.pdf differ diff --git a/docs/src/api.md b/docs/src/api.md index 7386b89..76fcaec 100644 --- a/docs/src/api.md +++ b/docs/src/api.md @@ -4,7 +4,6 @@ ```@docs ReservoirSampler -StreamSampler SequentialSampler ``` diff --git a/docs/src/basics.md b/docs/src/basics.md index b33edc3..ab687b7 100644 --- a/docs/src/basics.md +++ b/docs/src/basics.md @@ -1,7 +1,8 @@ # Overview of the functionalities -The `itsample` function allows to consume all the stream at once and return the sample collected: +The `itsample` function allows to consume all the stream at once and return the sample +collected: ```@example 1 using StreamSampling @@ -11,8 +12,8 @@ st = 1:100; itsample(st, 5) ``` -In some cases, one needs to control the updates the `ReservoirSampler` will be subject to. In this case -you can simply use the `fit!` function to update the reservoir: +In some cases, one needs to control the updates the `ReservoirSampler` will be subject to. +In this case, you can simply use the `fit!` function to update the reservoir: ```@example 1 st = 1:100; @@ -26,13 +27,13 @@ end value(rs) ``` -If the total number of elements in the stream is known beforehand and the sampling is unweighted, it is -also possible to iterate over a `StreamSampler` like so +If the total number of elements in the stream is known beforehand and the sampling +is unweighted, it is also possible to iterate over a `SequentialSampler` like so ```@example 1 st = 1:100; -ss = StreamSampler{Int}(st, 5, 100); +ss = SequentialSampler{Int}(st, 5, 100); r = Int[]; @@ -43,6 +44,6 @@ end r ``` -The advantage of `StreamSampler` iterators in respect to `ReservoirSampler` is that they require `O(1)` -memory if not collected, while reservoir techniques require `O(k)` memory where `k` is the number -of elements in the sample. +The advantage of `SequentialSampler` iterators in respect to `ReservoirSampler` +is that they require `O(1)` memory if not collected, while reservoir techniques +require `O(k)` memory where `k` is the number of elements in the sample. diff --git a/docs/src/benchmark.md b/docs/src/benchmark.md index 5bf6dd0..8e5c37a 100644 --- a/docs/src/benchmark.md +++ b/docs/src/benchmark.md @@ -11,7 +11,7 @@ The iterator used is a filtered generator which creates an integer range between benchmark more accurately mimic a somewhat realistic iterator, on which the methods could be actually used in practice. The “population” methods use `StatsBase.sample` and consider collecting the iterator in memory as part of the benchmark. The reservoir and stream -methods use instead `ReservoirSampler` and `StreamSampler` of this package. +methods use instead `ReservoirSampler` and `SequentialSampler` of this package. The code to reproduce the results is at [StreamSampling.jl/benchmark/benchmark_comparison_stream.jl](https://github.com/JuliaDynamics/StreamSampling.jl/blob/main/benchmark/benchmark_comparison_stream.jl). diff --git a/docs/src/example.md b/docs/src/example.md index 1fb9d0f..71a47f3 100644 --- a/docs/src/example.md +++ b/docs/src/example.md @@ -125,12 +125,11 @@ rngs = [Xoshiro(i) for i in 1:Threads.nthreads()] 5.868995 seconds (175.91 k allocations: 3.288 GiB, 6.44% gc time, 64714 lock conflicts) ``` -As you can see, the speed-up is not linear in the number of threads for an hdf5 file. This is -mainly due to the fact that accessing the chunks is single-threaded, so one would need to use -`MPI.jl` as explained at [HDF5.jl/stable/mpi/](https://juliaio.github.io/HDF5.jl/stable/mpi/) to -improve the multi-threading performance. Though, we are already sampling at 500MB/s, which is not bad! -Using `Arrow.jl` gives an even better performance, and a scalability which is better than -linear somehow, reaching a 2GB/s sampling speed! +As you can see, the speed-up is not linear in the number of threads for an hdf5 file. +This is mainly due to the fact that accessing the chunks is single-threaded, so one would +need to use `MPI.jl` as explained at [HDF5.jl/stable/mpi/](https://juliaio.github.io/HDF5.jl/stable/mpi/) to improve the multi-threading performance. Though, we are already sampling at 500MB/s, +which is not bad! Using `Arrow.jl` gives an even better performance, and a scalability which +is better than linear somehow, reaching a 2GB/s sampling speed! ## Monitoring diff --git a/src/SamplingInterface.jl b/src/SamplingInterface.jl index 527f030..bebfc3c 100644 --- a/src/SamplingInterface.jl +++ b/src/SamplingInterface.jl @@ -115,9 +115,9 @@ end """ combine([rng], samples::AbstractArray, weights::AbstractArray) -Combines different stream samples in a single one. The number of +Combines different sequential samples in a single one. The number of elements in the new sampler will be the minimum number of elements -in the samples. `weights` should contain the weight of each stream, +in the samples. `weights` should contain the total weight of each stream, which in the unweighted case coincides with the length of the streams. """ combine(ss::AbstractArray, ns::AbstractArray) = combine(Random.default_rng(), ss, ns) @@ -131,76 +131,69 @@ Returns the maximum number of elements that are stored in the reservoir. Base.size(rs::AbstractReservoirSampler) = rs.n """ - StreamSampler{T}([rng], iter, n, [N], method = AlgD()) + SequentialSampler{T}([rng], iter, n, [N], method = AlgD()) -Initializes a stream sampler, which can then be iterated over +Initializes a sequential sampler, which can then be iterated over to return the sampling elements of the iterable `iter` which is assumed to have a `eltype` of `T`. The methods implemented in -[`StreamSampler`](@ref) require the knowledge of the total number +[`SequentialSampler`](@ref) require the knowledge of the total number of elements in the stream `N`, if not provided it is assumed to be available by calling `length(iter)`. ----- - StreamSampler{T}([rng], iter, wfunc, n, W, method = AlgORDWSWR()) + SequentialSampler{T}([rng], iter, wfunc, n, W, method = AlgORDWSWR()) -Initializes a weigthed stream sampler, which can then be iterated over +Initializes a weigthed sequential sampler, which can then be iterated over to return the sampling elements of the iterable `iter` which is assumed to have a `eltype` of `T`. The methods implemented in -[`StreamSampler`](@ref) for weighted streams require the knowledge +[`SequentialSampler`](@ref) for weighted streams require the knowledge of the total weight of the stream `W` and a weight function `wfunc` -specifying how to map an element to its weight. +specifying how to map an element to its weight. + +----- + + SequentialSampler([rng], n::Integer, N::Integer, method = AlgD()) + +Initializes a sequential sampler, which can then be iterated over +to return `n` ordered indices between 1 and `N`, respecting the sampling +scheme of the selected method, which can be `AlgD()`, `AlgHiddenShuffle()` +or `AlgORDSWR()`. + """ -struct StreamSampler{T} 1 === 1 end +struct SequentialSampler{T} 1 === 1 end -function StreamSampler{T}(iter, wfunc::Function, n, W, method::StreamAlgorithm = AlgORDWSWR()) where T - return StreamSampler{T}(Random.default_rng(), iter, wfunc, n, W, method) +function SequentialSampler{T}(iter, wfunc::Function, n, W, method::StreamAlgorithm = AlgORDWSWR()) where T + return SequentialSampler{T}(Random.default_rng(), iter, wfunc, n, W, method) end -function StreamSampler{T}(rng::AbstractRNG, iter, wfunc::Function, n, W, method::StreamAlgorithm = AlgORDWSWR()) where T - return StreamSampler{T}(rng, iter, wfunc, n, W, method) +function SequentialSampler{T}(rng::AbstractRNG, iter, wfunc::Function, n, W, method::StreamAlgorithm = AlgORDWSWR()) where T + return SequentialSampler{T}(rng, iter, wfunc, n, W, method) end -function StreamSampler{T}(iter, n, N, method::StreamAlgorithm = AlgD()) where T - return StreamSampler{T}(Random.default_rng(), iter, n, N, method) +function SequentialSampler{T}(iter, n, N, method::StreamAlgorithm = AlgD()) where T + return SequentialSampler{T}(Random.default_rng(), iter, n, N, method) end -function StreamSampler{T}(iter, n, method::StreamAlgorithm = AlgD()) where T - return StreamSampler{T}(Random.default_rng(), iter, n, length(iter), method) +function SequentialSampler{T}(iter, n, method::StreamAlgorithm = AlgD()) where T + return SequentialSampler{T}(Random.default_rng(), iter, n, length(iter), method) end -function StreamSampler{T}(rng::AbstractRNG, iter, n, method::StreamAlgorithm = AlgD()) where T - return StreamSampler{T}(rng, iter, n, length(iter), method) +function SequentialSampler{T}(rng::AbstractRNG, iter, n, method::StreamAlgorithm = AlgD()) where T + return SequentialSampler{T}(rng, iter, n, length(iter), method) end -function StreamSampler{T}(rng::AbstractRNG, iter, n, N, method::StreamAlgorithm = AlgD()) where T - return StreamSampler{T}(rng, iter, n, N, method) +function SequentialSampler{T}(rng::AbstractRNG, iter, n, N, method::StreamAlgorithm = AlgD()) where T + return SequentialSampler{T}(rng, iter, n, N, method) end -""" - SequentialSampler([rng], n, N, method = AlgD()) - -Initializes a sequential sampler, which can then be iterated over -to return `n` ordered indices between 1 and `N`, respecting the sampling -scheme of the selected method, which can be `AlgD()`, `AlgHiddenShuffle()` -or `AlgORDSWR()`. -""" -struct SequentialSampler{S} - s::S - SequentialSampler(n, N) = SequentialSampler(Random.default_rng(), n, N, AlgD()) - SequentialSampler(n, N, alg) = SequentialSampler(Random.default_rng(), n, N, alg) - SequentialSampler(rng::AbstractRNG, n, N) = SequentialSampler(rng, n, N, AlgD()) - function SequentialSampler(rng, n, N, ::AlgD) - return new{SeqSampleIter{typeof(rng)}}(SeqSampleIter(rng, N, n)) - end - function SequentialSampler(rng, n, N, ::AlgHiddenShuffle) - return new{SeqIterHiddenShuffleSampler{typeof(rng)}}(SeqIterHiddenShuffleSampler(rng, N, n)) - end - function SequentialSampler(rng, n, N, ::AlgORDSWR) - return new{SeqIterWRSampler{typeof(rng)}}(SeqIterWRSampler(rng, N, n)) - end +function SequentialSampler(rng, n::Integer, N::Integer, ::AlgD) + return SeqSampleIter(rng, N, n) +end +function SequentialSampler(rng, n::Integer, N::Integer, ::AlgHiddenShuffle) + return SeqIterHiddenShuffleSampler(rng, N, n) +end +function SequentialSampler(rng, n::Integer, N::Integer, ::AlgORDSWR) + return SeqIterWRSampler(rng, N, n) end -Base.iterate(s::SequentialSampler) = iterate(s.s) -Base.iterate(s::SequentialSampler, state) = iterate(s.s, state) -Base.IteratorEltype(::SequentialSampler) = Base.HasEltype() -Base.eltype(::SequentialSampler) = Int -Base.IteratorSize(::SequentialSampler) = Base.HasLength() -Base.length(s::SequentialSampler) = s.s.n +SequentialSampler(n::Integer, N::Integer) = SequentialSampler(Random.default_rng(), n, N, AlgD()) +SequentialSampler(n::Integer, N::Integer, alg) = SequentialSampler(Random.default_rng(), n, N, alg) +SequentialSampler(rng::AbstractRNG, n::Integer, N::Integer) = SequentialSampler(rng, n, N, AlgD()) """ itsample([rng], iter, method = AlgRSWRSKIP()) @@ -255,7 +248,7 @@ Base.@constprop :aggressive function itsample(rng::AbstractRNG, iter, n::Int, me return update_all!(s, iter, ordered) else m = method isa AlgL || method isa AlgR || method isa AlgD ? AlgD() : AlgORDSWR() - s = collect(StreamSampler{iter_type}(rng, iter, n, length(iter), m)) + s = collect(SequentialSampler{iter_type}(rng, iter, n, length(iter), m)) return ordered ? s : fshuffle!(rng, s) end end diff --git a/src/SamplingReduction.jl b/src/SamplingReduction.jl index 045dfa7..20b1174 100644 --- a/src/SamplingReduction.jl +++ b/src/SamplingReduction.jl @@ -18,7 +18,7 @@ function reduce_samples(ps::AbstractArray, rngs, t::Union{TypeS,TypeUnion}, ss:: Threads.@threads for i in 1:nt s = ss[i] vi = Vector{T}(undef, ns[i]) - @inbounds for (q, j) in enumerate(SequentialSampler(extract_rng(rngs, i), + @inbounds for (q, j) in enumerate(SequentialSampler(extract_rng(rngs, 1), ns[i], length(s), AlgHiddenShuffle())) vi[q] = s[j] end diff --git a/src/SortedSamplingMulti.jl b/src/SortedSamplingMulti.jl index 9988ca7..a3f6bab 100644 --- a/src/SortedSamplingMulti.jl +++ b/src/SortedSamplingMulti.jl @@ -46,7 +46,7 @@ Base.eltype(::MultiAlgWeightedORDSampler{T}) where T = T Base.IteratorSize(::MultiAlgWeightedORDSampler) = Base.HasLength() Base.length(s::MultiAlgWeightedORDSampler) = s.n -struct MultiAlgORDSampler{T,R,I,D} <: AbstractStreamSampler +struct MultiAlgORDSampler{T,R,I,D} <: AbstractSequentialSampler rng::R it::I n::Int @@ -56,16 +56,16 @@ struct MultiAlgORDSampler{T,R,I,D} <: AbstractStreamSampler end end -function StreamSampler{T}(rng::AbstractRNG, iter, wfunc::Function, n, W, ::AlgORDWSWR) where T +function SequentialSampler{T}(rng::AbstractRNG, iter, wfunc::Function, n, W, ::AlgORDWSWR) where T return MultiAlgWeightedORDSampler{T}(rng, iter, wfunc, n, W) end -function StreamSampler{T}(rng::AbstractRNG, iter, n, N, ::AlgD) where T +function SequentialSampler{T}(rng::AbstractRNG, iter, n, N, ::AlgD) where T return MultiAlgORDSampler{T}(rng, iter, min(n, N), SeqSampleIter(rng, N, min(n, N))) end -function StreamSampler{T}(rng::AbstractRNG, iter, n, N, ::AlgHiddenShuffle) where T +function SequentialSampler{T}(rng::AbstractRNG, iter, n, N, ::AlgHiddenShuffle) where T return MultiAlgORDSampler{T}(rng, iter, min(n, N), SeqIterHiddenShuffleSampler(rng, N, min(n, N))) end -function StreamSampler{T}(rng::AbstractRNG, iter, n, N, ::AlgORDSWR) where T +function SequentialSampler{T}(rng::AbstractRNG, iter, n, N, ::AlgORDSWR) where T return MultiAlgORDSampler{T}(rng, iter, n, SeqIterWRSampler(rng, N, n)) end diff --git a/src/StreamSampling.jl b/src/StreamSampling.jl index 9d153c8..e76c89e 100644 --- a/src/StreamSampling.jl +++ b/src/StreamSampling.jl @@ -18,7 +18,7 @@ using Random using StatsBase export fit!, merge!, combine, value, ordvalue, nobs, itsample -export AbstractReservoirSampler, ReservoirSampler, StreamSampler, SequentialSampler +export AbstractReservoirSampler, ReservoirSampler, SequentialSampler, SequentialSampler export AlgL, AlgR, AlgRSWRSKIP, AlgARes, AlgAExpJ, AlgWRSWRSKIP, AlgD, AlgHiddenShuffle, AlgORDSWR, AlgORDWSWR @@ -28,7 +28,7 @@ struct MutSampler end struct Ord end struct Unord end -abstract type AbstractStreamSampler end +abstract type AbstractSequentialSampler end abstract type AbstractReservoirSampler <: OnlineStat{Any} end abstract type AbstractWeightedReservoirSampler <: AbstractReservoirSampler end @@ -88,7 +88,7 @@ Replacement, A. Meligrana, 2024". struct AlgWRSWRSKIP <: ReservoirAlgorithm end """ -Implements random stream sampling without replacement. To be used with [`StreamSampler`](@ref) +Implements random stream sampling without replacement. To be used with [`SequentialSampler`](@ref) or [`itsample`](@ref). Adapted from algorithm D described in "An Efficient Algorithm for Sequential Random Sampling, @@ -97,7 +97,7 @@ J. S. Vitter, 1987". struct AlgD <: StreamAlgorithm end """ -Implements random stream sampling without replacement. To be used with [`StreamSampler`](@ref) +Implements random stream sampling without replacement. To be used with [`SequentialSampler`](@ref) or [`itsample`](@ref). Adapted from algorithm HiddenShuffle described in "Sequential Random Sampling Revisited: Hidden Shuffle Method, @@ -106,7 +106,7 @@ M. Shekelyan, G. Cormode, 2021". struct AlgHiddenShuffle <: StreamAlgorithm end """ -Implements random stream sampling with replacement. To be used with [`StreamSampler`](@ref) +Implements random stream sampling with replacement. To be used with [`SequentialSampler`](@ref) or [`itsample`](@ref). Adapted from algorithm 4 described in "Generating Sorted Lists of Random Numbers, J. L. Bentley, @@ -115,7 +115,7 @@ J. B. Saxe, 1980". struct AlgORDSWR <: StreamAlgorithm end """ -Implements weighted random stream sampling with replacement. To be used with [`StreamSampler`](@ref). +Implements weighted random stream sampling with replacement. To be used with [`SequentialSampler`](@ref). Adapted from algorithm 3 described in "An asymptotically optimal, online algorithm for weighted random sampling with replacement, M. Startek, 2016". diff --git a/test/runtests.jl b/test/runtests.jl index 552eff7..ed7ef16 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -16,8 +16,8 @@ using StreamSampling include("unweighted_sampling_multi_tests.jl") include("weighted_sampling_single_tests.jl") include("weighted_sampling_multi_tests.jl") - include("stream_sampling_tests.jl") include("sequential_sampling_tests.jl") + include("sequential_inds_sampling_tests.jl") include("merge_tests.jl") include("empty_tests.jl") include("benchmark_tests.jl") diff --git a/test/sequential_inds_sampling_tests.jl b/test/sequential_inds_sampling_tests.jl new file mode 100644 index 0000000..bc87b7d --- /dev/null +++ b/test/sequential_inds_sampling_tests.jl @@ -0,0 +1,64 @@ + +@testset "SequentialSampler tests" begin + rng = StableRNG(42) + N = 10 + n = 2 + reps = 10000 + + for alg in (AlgD(), AlgHiddenShuffle()) + s = SequentialSampler(rng, n, N, alg) + out = collect(s) + @test length(out) == n + @test issorted(out) + @test allunique(out) + @test all(1 .<= out .<= N) + + dict_res = Dict{Vector{Int}, Int}() + for _ in 1:reps + s = SequentialSampler(rng, n, N, alg) + out = collect(s) + dict_res[out] = get(dict_res, out, 0) + 1 + end + + valid_couples = 0 + for i in 1:N, j in i+1:N + valid_couples += 1 + end + + count_est = Int[] + for i in 1:N, j in i+1:N + push!(count_est, get(dict_res, [i, j], 0)) + end + + chisq_test = ChisqTest(count_est, fill(1/valid_couples, valid_couples)) + @test pvalue(chisq_test) > 0.05 + end + + s = SequentialSampler(rng, n, N, AlgORDSWR()) + out = collect(s) + @test length(out) == n + @test issorted(out) + @test all(1 .<= out .<= N) + + dict_res = Dict{Vector{Int}, Int}() + for _ in 1:reps + s = SequentialSampler(rng, n, N, AlgORDSWR()) + out = collect(s) + dict_res[out] = get(dict_res, out, 0) + 1 + end + + count_est = Int[] + ps_exact = Float64[] + + for i in 1:N, j in i:N + push!(count_est, get(dict_res, [i, j], 0)) + if i == j + push!(ps_exact, 1/(N^2)) + else + push!(ps_exact, 2/(N^2)) + end + end + + chisq_test = ChisqTest(count_est, ps_exact) + @test pvalue(chisq_test) > 0.05 +end diff --git a/test/sequential_sampling_tests.jl b/test/sequential_sampling_tests.jl index bc87b7d..50ce74d 100644 --- a/test/sequential_sampling_tests.jl +++ b/test/sequential_sampling_tests.jl @@ -1,61 +1,74 @@ - @testset "SequentialSampler tests" begin - rng = StableRNG(42) + rng = StableRNG(45) N = 10 - n = 2 - reps = 10000 + n = 3 + reps = 100000 for alg in (AlgD(), AlgHiddenShuffle()) - s = SequentialSampler(rng, n, N, alg) - out = collect(s) - @test length(out) == n - @test issorted(out) - @test allunique(out) - @test all(1 .<= out .<= N) - dict_res = Dict{Vector{Int}, Int}() for _ in 1:reps - s = SequentialSampler(rng, n, N, alg) + s = SequentialSampler{Int}(rng, 1:N, n, N, alg) out = collect(s) dict_res[out] = get(dict_res, out, 0) + 1 end - - valid_couples = 0 - for i in 1:N, j in i+1:N - valid_couples += 1 - end + valid_triples = 120 count_est = Int[] - for i in 1:N, j in i+1:N - push!(count_est, get(dict_res, [i, j], 0)) + for i in 1:N, j in i+1:N, k in j+1:N + push!(count_est, get(dict_res, [i, j, k], 0)) end - chisq_test = ChisqTest(count_est, fill(1/valid_couples, valid_couples)) + chisq_test = ChisqTest(count_est, fill(1/valid_triples, valid_triples)) @test pvalue(chisq_test) > 0.05 end - s = SequentialSampler(rng, n, N, AlgORDSWR()) - out = collect(s) - @test length(out) == n - @test issorted(out) - @test all(1 .<= out .<= N) - dict_res = Dict{Vector{Int}, Int}() for _ in 1:reps - s = SequentialSampler(rng, n, N, AlgORDSWR()) + s = SequentialSampler{Int}(rng, 1:N, n, N, AlgORDSWR()) out = collect(s) dict_res[out] = get(dict_res, out, 0) + 1 end - + count_est = Int[] ps_exact = Float64[] + for i in 1:N, j in i:N, k in j:N + push!(count_est, get(dict_res, [i, j, k], 0)) + if i == j == k + push!(ps_exact, 1/(N^3)) + elseif i == j || j == k + push!(ps_exact, 3/(N^3)) + else + push!(ps_exact, 6/(N^3)) + end + end - for i in 1:N, j in i:N - push!(count_est, get(dict_res, [i, j], 0)) - if i == j - push!(ps_exact, 1/(N^2)) + chisq_test = ChisqTest(count_est, ps_exact) + @test pvalue(chisq_test) > 0.05 + + weights = [i <= 5 ? 1.0 : 2.0 for i in 1:N] + W = sum(weights) + wfunc(i) = weights[i] + + dict_res = Dict{Vector{Int}, Int}() + for _ in 1:reps + s = SequentialSampler{Int}(rng, 1:N, wfunc, n, W, AlgORDWSWR()) + out = collect(s) + dict_res[out] = get(dict_res, out, 0) + 1 + end + + count_est = Int[] + ps_exact = Float64[] + for i in 1:N, j in i:N, k in j:N + push!(count_est, get(dict_res, [i, j, k], 0)) + wi, wj, wk = weights[i], weights[j], weights[k] + if i == j == k + push!(ps_exact, (wi^3) / (W^3)) + elseif i == j + push!(ps_exact, 3 * (wi^2 * wk) / (W^3)) + elseif j == k + push!(ps_exact, 3 * (wi * wj^2) / (W^3)) else - push!(ps_exact, 2/(N^2)) + push!(ps_exact, 6 * (wi * wj * wk) / (W^3)) end end diff --git a/test/stream_sampling_tests.jl b/test/stream_sampling_tests.jl deleted file mode 100644 index f2069bb..0000000 --- a/test/stream_sampling_tests.jl +++ /dev/null @@ -1,77 +0,0 @@ -@testset "StreamSampler tests" begin - rng = StableRNG(45) - N = 10 - n = 3 - reps = 100000 - - for alg in (AlgD(), AlgHiddenShuffle()) - dict_res = Dict{Vector{Int}, Int}() - for _ in 1:reps - s = StreamSampler{Int}(rng, 1:N, n, N, alg) - out = collect(s) - dict_res[out] = get(dict_res, out, 0) + 1 - end - - valid_triples = 120 - count_est = Int[] - for i in 1:N, j in i+1:N, k in j+1:N - push!(count_est, get(dict_res, [i, j, k], 0)) - end - - chisq_test = ChisqTest(count_est, fill(1/valid_triples, valid_triples)) - @test pvalue(chisq_test) > 0.05 - end - - dict_res = Dict{Vector{Int}, Int}() - for _ in 1:reps - s = StreamSampler{Int}(rng, 1:N, n, N, AlgORDSWR()) - out = collect(s) - dict_res[out] = get(dict_res, out, 0) + 1 - end - - count_est = Int[] - ps_exact = Float64[] - for i in 1:N, j in i:N, k in j:N - push!(count_est, get(dict_res, [i, j, k], 0)) - if i == j == k - push!(ps_exact, 1/(N^3)) - elseif i == j || j == k - push!(ps_exact, 3/(N^3)) - else - push!(ps_exact, 6/(N^3)) - end - end - - chisq_test = ChisqTest(count_est, ps_exact) - @test pvalue(chisq_test) > 0.05 - - weights = [i <= 5 ? 1.0 : 2.0 for i in 1:N] - W = sum(weights) - wfunc(i) = weights[i] - - dict_res = Dict{Vector{Int}, Int}() - for _ in 1:reps - s = StreamSampler{Int}(rng, 1:N, wfunc, n, W, AlgORDWSWR()) - out = collect(s) - dict_res[out] = get(dict_res, out, 0) + 1 - end - - count_est = Int[] - ps_exact = Float64[] - for i in 1:N, j in i:N, k in j:N - push!(count_est, get(dict_res, [i, j, k], 0)) - wi, wj, wk = weights[i], weights[j], weights[k] - if i == j == k - push!(ps_exact, (wi^3) / (W^3)) - elseif i == j - push!(ps_exact, 3 * (wi^2 * wk) / (W^3)) - elseif j == k - push!(ps_exact, 3 * (wi * wj^2) / (W^3)) - else - push!(ps_exact, 6 * (wi * wj * wk) / (W^3)) - end - end - - chisq_test = ChisqTest(count_est, ps_exact) - @test pvalue(chisq_test) > 0.05 -end