Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Fixed

## [0.6.2]

### Fixed
- Fixes a bug in processing SSE messages in `data: ` strings in the payload of the message

## [0.6.1]

### Fixed
Expand Down
4 changes: 3 additions & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
name = "StreamCallbacks"
uuid = "c1b9e933-98a0-46fc-8ea7-3b58b195fb0a"
authors = ["J S <49557684+svilupp@users.noreply.github.com> and contributors"]
version = "0.6.1"
version = "0.6.2"

[deps]
HTTP = "cd3eb016-35fb-5094-929b-558a96fad6f3"
JSON3 = "0f8b85d8-7281-11e9-16c2-39a750bddbf1"
LibCURL = "b27032c2-a3e7-50c8-80cd-2d36dbcbfd21"
PrecompileTools = "aea7be01-6a6a-4083-8856-8a6e6704d82a"

[compat]
Aqua = "0.8"
HTTP = "1.10"
JSON3 = "1.14"
LibCURL = "0.6.4"
PrecompileTools = "1.2"
Test = "1"
julia = "1.9"
Expand Down
46 changes: 46 additions & 0 deletions examples/error_handling_test.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Test error handling with custom IO that fails on "5"
using HTTP, JSON3
using StreamCallbacks
using StreamCallbacks: OpenAIStream, libcurl_streamed_request!, streamed_request_http!, streamed_request_libcurl!

# Prepare target and auth
url = "https://api.openai.com/v1/chat/completions"
headers = [
"Content-Type" => "application/json",
"Authorization" => "Bearer $(get(ENV, "OPENAI_API_KEY", ""))"
]

# Custom IO type that throws when it sees "5"
struct ErrorOnFiveIO <: IO
buffer::Vector{String}
end
ErrorOnFiveIO() = ErrorOnFiveIO(String[])

function StreamCallbacks.print_content(out::ErrorOnFiveIO, text::AbstractString; kwargs...)
push!(out.buffer, text)
if occursin("5", text)
error("Custom IO error: Found forbidden number '5' in: $(text)")
end
end

messages = [Dict("role" => "user", "content" => "Count from 1 to 10.")]
payload = IOBuffer()
JSON3.write(payload, (; stream = true, messages, model = "gpt-4o-mini", stream_options = (; include_usage = true)))
payload_str = String(take!(payload))

println("=== Testing Error Handling ===")

# Test 1: HTTP.jl with error handling
println("\n1. Testing HTTP.jl error handling...")
cb_http = StreamCallback(; out = ErrorOnFiveIO(), flavor = OpenAIStream(), throw_on_error = true)
# resp_http = streamed_request_http!(cb_http, url, headers, IOBuffer(payload_str))
# println("HTTP: No error occurred (unexpected)")

# Test 2: LibCURL with error handling
println("\n2. Testing LibCURL error handling...")
cb_curl = StreamCallback(; out = ErrorOnFiveIO(), flavor = OpenAIStream(), throw_on_error = true)

resp_curl = streamed_request_libcurl!(cb_curl, url, headers, payload_str)
println("LibCURL: No error occurred (unexpected)")

println("\n=== Error Handling Test Complete ===")
23 changes: 23 additions & 0 deletions examples/google_openai_streaming_example.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Calling Google AI with OpenAI Schema using StreamCallbacks
using HTTP, JSON3
using StreamCallbacks

# Prepare target and auth for Google AI Studio
url = "https://generativelanguage.googleapis.com/v1beta/openai/chat/completions"
headers = [
"Content-Type" => "application/json",
"Authorization" => "Bearer $(get(ENV, "GOOGLE_API_KEY", ""))"
]

# Send the request with OpenAI-compatible format
cb = StreamCallback(; out = stdout, flavor = OpenAIStream()) # Use OpenAIStream for Google's OpenAI schema
messages = [Dict("role" => "user",
"content" => "Count from 1 to 10. Start with numbers only.")]
payload = IOBuffer()
JSON3.write(payload,
(; stream = true, messages, model = "gemini-2.5-flash", stream_options = (; include_usage = true)))

resp = streamed_request!(cb, url, headers, payload);

println("Response status: ", resp.status)
println("Collected chunks: ", length(cb.chunks))
38 changes: 38 additions & 0 deletions examples/long_context_test.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Test long context with HTTP vs LibCURL performance comparison
using HTTP, JSON3
using StreamCallbacks
using StreamCallbacks: OpenAIStream, streamed_request_libcurl!

# Prepare target and auth
url = "https://api.openai.com/v1/chat/completions"
headers = [
"Content-Type" => "application/json",
"Authorization" => "Bearer $(get(ENV, "OPENAI_API_KEY", ""))"
]

# Create very long context
very_long_text = ["(Random text chunk $i.) " for i in 1:100_000] |> join
very_long_text = ["(Random text chunk $i.) " for i in 1:1] |> join
messages = [Dict("role" => "user", "content" => very_long_text * "Count from 1 to 10.")]

payload = IOBuffer()
JSON3.write(payload, (; stream = true, messages, model = "gpt-4o-mini", stream_options = (; include_usage = true)))
payload_str = String(take!(payload))

println("=== Testing Long Context Performance ===")
println("Context length: $(length(very_long_text)) characters")

# Test 1: HTTP.jl based streaming
println("\n1. Testing HTTP.jl streaming...")
# cb_http = StreamCallback(; out = stdout, flavor = OpenAIStream(), throw_on_error = true)
# resp_http = @time streamed_request!(cb_http, url, headers, IOBuffer(payload_str))
# @show resp_http
# println("HTTP chunks received: $(length(cb_http.chunks))")

# Test 2: LibCURL based streaming
println("\n2. Testing LibCURL streaming...")
cb_curl = StreamCallback(; out = stdout, flavor = OpenAIStream(), throw_on_error = true)
resp_curl = @time streamed_request_libcurl!(cb_curl, url, headers, payload_str)
println("LibCURL chunks received: $(length(cb_curl.chunks))")

println("\n=== Performance Comparison Complete ===")
48 changes: 41 additions & 7 deletions examples/openai_example.jl
Original file line number Diff line number Diff line change
@@ -1,24 +1,58 @@
# Calling OpenAI with StreamCallbacks
using HTTP, JSON3
using StreamCallbacks
using StreamCallbacks: OpenAIStream
using StreamCallbacks: streamed_request_libcurl!

## Prepare target and auth
# Prepare target and auth
url = "https://api.openai.com/v1/chat/completions"
headers = [
"Content-Type" => "application/json",
"Authorization" => "Bearer $(get(ENV, "OPENAI_API_KEY", ""))"
];
# Custom IO type that throws when it sees "5"
struct ErrorOnFiveIO <: IO
buffer::Vector{String}
end
ErrorOnFiveIO() = ErrorOnFiveIO(String[])

## Send the request
cb = StreamCallback(; out = stdout, flavor = OpenAIStream())
messages = [Dict("role" => "user",
"content" => "Count from 1 to 100.")]
function StreamCallbacks.print_content(out::ErrorOnFiveIO, text::AbstractString; kwargs...)
push!(out.buffer, text)
if occursin("5", text)
error("Custom IO error: Found forbidden number '5' in: $(text)")
end
end

# Send the request
cb = StreamCallback(; out = stdout, flavor = OpenAIStream(), throw_on_error = true)
# cb = StreamCallback(; out = ErrorOnFiveIO(), flavor = OpenAIStream(), throw_on_error = true)
very_long_text = ["(Just some random text $i.) " for i in 1:1] |> join
# very_long_text = ""
messages = [Dict("role" => "user", "content" => very_long_text * "Count from 1 to 100.")]
using LLMRateLimiters
# @show LLMRateLimiters.estimate_tokens(messages[1]["content"])

#
payload = IOBuffer()
JSON3.write(payload,
(; stream = true, messages, model = "gpt-4o-mini",
(; stream = true, messages, model = "gpt-5-mini",
stream_options = (; include_usage = true)))
resp = streamed_request!(cb, url, headers, payload);

# Test different streaming methods:
# 1. HTTP.jl based (default)
# payload_str = String(take!(payload))
# resp = @time streamed_request!(cb, url, headers, IOBuffer(payload_str));
# @show resp

# 2. Socket-based streaming
# resp = socket_streamed_request!(cb, url, headers, String(take!(payload)));

# 3. LibCURL-based streaming (recommended)
# Clear chunks from previous test to avoid accumulation
empty!(cb.chunks)
resp = @time streamed_request_libcurl!(cb, url, headers, payload);
@show resp
;
## Check the response
resp # should be a `HTTP.Response` object with a message body like if we wouldn't use streaming

Expand Down
24 changes: 24 additions & 0 deletions examples/promptingtools_aigenerate_example.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using PromptingTools
using StreamCallbacks
using GoogleGenAI
using PromptingTools: GoogleSchema

# Stream to stdout with callback collecting chunks
cb = StreamCallback(out = stdout, verbose = false)

# Use a real model id; adjust as needed
msg = @time aigenerate(GoogleSchema(), "Tell me a short story of humanity:";
model = "gemini-2.5-pro-preview-06-05",
streamcallback = cb)

println("\n\nFinal content:\n", msg.content)
#%%
using GoogleGenAI

models = list_models()
for m in models
if "createCachedContent" in m[:supported_generation_methods]
println(m[:name])
end
end

21 changes: 21 additions & 0 deletions examples/responses_stream_example.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using HTTP
using JSON3
using OpenAI
using PromptingTools
const PT = PromptingTools
using PromptingTools: OpenAIResponseSchema, AbstractResponseSchema, airespond
using StreamCallbacks: StreamCallback

# This example demonstrates the use of the OpenAI Responses API
# with proper schema support and streaming capabilities

# Make sure your OpenAI API key is set in the environment variable OPENAI_API_KEY

# Basic usage with the new schema
schema = OpenAIResponseSchema()
cb = StreamCallback(out=stdout)

response = airespond(schema, "What is the 6th largest city in the Czech Republic? you can think, but in the answer I only want to see the city.";
model = "gpt-5.1-codex", streamcallback=cb)
@show response.tokens
@show response.extras[:usage]
72 changes: 72 additions & 0 deletions examples/stream_comparison_example.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Calling OpenAI with StreamCallbacks
using HTTP, JSON3
using StreamCallbacks
using StreamCallbacks: OpenAIStream
using StreamCallbacks: libcurl_streamed_request!

# Prepare target and auth
url = "https://api.openai.com/v1/chat/completions"
headers = [
"Content-Type" => "application/json",
"Authorization" => "Bearer $(get(ENV, "OPENAI_API_KEY", ""))"
];
# Custom IO type that throws when it sees "5"
struct ErrorOnFiveIO <: IO
buffer::Vector{String}
end
ErrorOnFiveIO() = ErrorOnFiveIO(String[])

function StreamCallbacks.print_content(out::ErrorOnFiveIO, text::AbstractString; kwargs...)
push!(out.buffer, text)
print(text)
if occursin("5", text)
# error("Custom IO error: Found forbidden number '5' in: $(text)")
end
end

# Send the request
# cb = StreamCallback(; out = stdout, flavor = OpenAIStream(), throw_on_error = false)
cb = StreamCallback(; out = ErrorOnFiveIO(), flavor = OpenAIStream(), throw_on_error = true)
# cb = StreamCallback(; out = ErrorOnFiveIO(), flavor = AnthropicStream(), throw_on_error = true)
using JLD2
# @load "call_error.jld2" url headers body kwargs
# @show typeof(headers)
very_long_text = ["(Just some random text $i.) " for i in 1:1] |> join
# very_long_text = ""
messages = [Dict("role" => "user", "content" => very_long_text * "Count from 1 to 10.")]
using LLMRateLimiters
# @show LLMRateLimiters.estimate_tokens(messages[1]["content"])

#
payload = IOBuffer()
JSON3.write(payload,
(; stream = true, messages, model = "gpt-4o-mini",
stream_options = (; include_usage = true)))

# Test different streaming methods:
# 1. HTTP.jl based (default)
# payload_str = String(take!(payload))
# resp = @time streamed_request!(cb, url, headers, IOBuffer(payload_str));
# @show resp

# 2. Socket-based streaming
# resp = socket_streamed_request!(cb, url, headers, String(take!(payload)));

# 3. LibCURL-based streaming (recommended)
# Clear chunks from previous test to avoid accumulation
body_dict = JSON3.read(body, Dict)
# @show body_dict
# body_dict["system"][1]["text"] = "Count from 1 to 10."
body_str = String(JSON3.write(body_dict))
@show body_str
# empty!(cb.chunks)
resp = @time streamed_request_libcurl!(cb, url, headers, body_str; kwargs...);
@show resp
;
## Check the response
resp # should be a `HTTP.Response` object with a message body like if we wouldn't use streaming

## Check the callback
cb.chunks # should be a vector of `StreamChunk` objects, each with a `json` field with received data from the API

# TIP: For debugging, use `cb.verbose = true` in the `StreamCallback` constructor to get more details on each chunk and enable DEBUG loglevel.
9 changes: 7 additions & 2 deletions src/StreamCallbacks.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,24 @@ module StreamCallbacks

using HTTP, JSON3
using PrecompileTools
using LibCURL

export StreamCallback, StreamChunk, OpenAIStream, AnthropicStream, OllamaStream, ResponseStream,
streamed_request!, libcurl_streamed_request!

export StreamCallback, StreamChunk, OpenAIStream, AnthropicStream, OllamaStream,
streamed_request!
include("interface.jl")

include("shared_methods.jl")
include("shared_methods_libcurl.jl")

include("stream_openai.jl")

include("stream_anthropic.jl")

include("stream_ollama.jl")

include("stream_response.jl")

@compile_workload begin
include("precompilation.jl")
end
Expand Down
3 changes: 3 additions & 0 deletions src/interface.jl
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,14 @@ Abstract type for the stream flavor, ie, the API provider.
Available flavors:
- `OpenAIStream` for OpenAI API
- `AnthropicStream` for Anthropic API
- `OllamaStream` for Ollama API
- `ResponseStream` for OpenAI Response API
"""
abstract type AbstractStreamFlavor end
struct OpenAIStream <: AbstractStreamFlavor end
struct AnthropicStream <: AbstractStreamFlavor end
struct OllamaStream <: AbstractStreamFlavor end
struct ResponseStream <: AbstractStreamFlavor end

## Default implementations
"""
Expand Down
2 changes: 1 addition & 1 deletion src/precompilation.jl
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ build_response_body(flavor, cb)
flavor = OllamaStream()
is_done(flavor, example_chunk)
extract_content(flavor, example_chunk)
build_response_body(flavor, cb)
build_response_body(flavor, cb)
Loading
Loading