Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Fixed

## [0.7.0]

### Added
- Support for Google Gemini API streaming responses (`flavor = GoogleStream()`).

## [0.6.1]

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "StreamCallbacks"
uuid = "c1b9e933-98a0-46fc-8ea7-3b58b195fb0a"
authors = ["J S <49557684+svilupp@users.noreply.github.com> and contributors"]
version = "0.6.1"
version = "0.7.0"

[deps]
HTTP = "cd3eb016-35fb-5094-929b-558a96fad6f3"
Expand Down
30 changes: 30 additions & 0 deletions examples/gemini_example.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Calling Google Gemini with StreamCallbacks
using HTTP, JSON3
using StreamCallbacks

# Prepare target and auth
api_key = get(ENV, "GOOGLE_API_KEY", "")
model = "gemini-2.0-flash"
model = "gemini-2.5-pro-exp-03-25"
url = "https://generativelanguage.googleapis.com/v1beta/models/$model:streamGenerateContent?alt=sse&key=$api_key"
headers = ["Content-Type" => "application/json"]

# Prepare the request payload
cb = StreamCallback(; out = stdout, flavor = GoogleStream(), verbose = true)
payload = IOBuffer()
JSON3.write(payload, Dict(
:contents => [Dict(
:parts => [Dict(
:text => "Count from 1 to 20."
)]
)]
))

# Send the request
resp = streamed_request!(cb, url, headers, payload)

## Check the response
resp # should be a `HTTP.Response` object with a message body like if we wouldn't use streaming

## Check the callback
cb.chunks # should be a vector of `StreamChunk` objects, each with a `json` field with received data from the API
7 changes: 5 additions & 2 deletions src/StreamCallbacks.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ module StreamCallbacks
using HTTP, JSON3
using PrecompileTools

export StreamCallback, StreamChunk, OpenAIStream, AnthropicStream, OllamaStream,
streamed_request!
export StreamCallback, StreamChunk, streamed_request!
export OpenAIStream, AnthropicStream, GoogleStream, OllamaStream

include("interface.jl")

include("shared_methods.jl")
Expand All @@ -15,6 +16,8 @@ include("stream_anthropic.jl")

include("stream_ollama.jl")

include("stream_gemini.jl")

@compile_workload begin
include("precompilation.jl")
end
Expand Down
1 change: 1 addition & 0 deletions src/interface.jl
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ Available flavors:
abstract type AbstractStreamFlavor end
struct OpenAIStream <: AbstractStreamFlavor end
struct AnthropicStream <: AbstractStreamFlavor end
struct GoogleStream <: AbstractStreamFlavor end
struct OllamaStream <: AbstractStreamFlavor end

## Default implementations
Expand Down
18 changes: 17 additions & 1 deletion src/precompilation.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ blob = "event: start\ndata: {\"key\": \"value\"}\n\nevent: end\ndata: {\"status\
chunks, spillover = extract_chunks(OpenAIStream(), blob)
blob = "{\"key\":\"value\", \"done\":true}"
chunks, spillover = extract_chunks(OllamaStream(), blob)
# Add Gemini precompilation
gemini_blob = "{\"candidates\": [{\"content\": {\"parts\": [{\"text\": \"Hello\"}],\"role\": \"model\"}}],\"usageMetadata\": {\"promptTokenCount\": 9,\"totalTokenCount\": 9},\"modelVersion\": \"gemini-2.0-flash\"}\r\n\r\n"
chunks, spillover = extract_chunks(GoogleStream(), gemini_blob)

# Chunk examples
io = IOBuffer()
Expand Down Expand Up @@ -30,4 +33,17 @@ build_response_body(flavor, cb)
flavor = OllamaStream()
is_done(flavor, example_chunk)
extract_content(flavor, example_chunk)
build_response_body(flavor, cb)
build_response_body(flavor, cb)

# GoogleStream examples
flavor = GoogleStream()
gemini_chunk = StreamChunk(
nothing,
"""{"candidates":[{"content":{"parts":[{"text":"Hello"}],"role":"model"}}]}""",
JSON3.read("""{"candidates":[{"content":{"parts":[{"text":"Hello"}],"role":"model"}}]}""")
)
is_done(flavor, gemini_chunk)
extract_content(flavor, gemini_chunk)
cb_gemini = StreamCallback(out = io, flavor = GoogleStream())
callback(cb_gemini, gemini_chunk)
build_response_body(flavor, cb_gemini)
138 changes: 138 additions & 0 deletions src/stream_gemini.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# This file defines the methods for the GoogleStream flavor

"""
extract_chunks(::GoogleStream, blob::AbstractString; kwargs...)

Extract the chunks from the received SSE blob for Google Gemini API.
Returns a list of `StreamChunk` and the next spillover (if message was incomplete).
"""
function extract_chunks(::GoogleStream, blob::AbstractString;
spillover::AbstractString = "", verbose::Bool = false, kwargs...)
chunks = StreamChunk[]
next_spillover = ""

# Gemini uses simpler SSE format with just "data:" prefix
# Split by double newlines which separate SSE events
blob_split = split(blob, r"\r?\n\r?\n")

for chunk_data in blob_split
isempty(chunk_data) && continue

# Extract data after "data:" prefix
if startswith(chunk_data, "data: ")
json_str = chunk_data[7:end] # Skip "" prefix

# Try to parse JSON
json_obj = nothing
try
json_obj = JSON3.read(json_str)
catch e
verbose && @warn "Cannot parse Gemini JSON: $json_str" exception=e
end

# Create chunk
push!(chunks, StreamChunk(nothing, json_str, json_obj))
end
end

return chunks, next_spillover
end

"""
is_done(::GoogleStream, chunk::AbstractStreamChunk; kwargs...)

Check if the stream is done for Google Gemini API.
"""
function is_done(::GoogleStream, chunk::AbstractStreamChunk; kwargs...)
if !isnothing(chunk.json)
# Check for completion markers in Gemini response
if haskey(chunk.json, :candidates) && length(chunk.json.candidates) > 0
candidate = chunk.json.candidates[1]
return haskey(candidate, :finishReason) && candidate.finishReason == "STOP"
end
end
return false
end

"""
extract_content(::GoogleStream, chunk::AbstractStreamChunk; kwargs...)

Extract the content from the chunk for Google Gemini API.
"""
function extract_content(::GoogleStream, chunk::AbstractStreamChunk; kwargs...)
if !isnothing(chunk.json) && haskey(chunk.json, :candidates)
candidates = chunk.json.candidates
if length(candidates) > 0 && haskey(candidates[1], :content) &&
haskey(candidates[1].content, :parts) && length(candidates[1].content.parts) > 0
part = candidates[1].content.parts[1]
if haskey(part, :text)
return part.text
end
end
end
return nothing
end
"""
build_response_body(::GoogleStream, cb::AbstractStreamCallback; kwargs...)

Build the response body from the chunks for Google Gemini API.
Returns an OpenAI-compatible response format to ensure compatibility with code expecting OpenAI responses.
"""
function build_response_body(::GoogleStream, cb::AbstractStreamCallback; kwargs...)
# Extract all non-empty chunks with JSON data
valid_chunks = filter(c -> !isnothing(c.json), cb.chunks)

if isempty(valid_chunks)
return nothing
end

# Use the last chunk as the base for our response
last_chunk = valid_chunks[end].json

# Combine text from all chunks
combined_text = ""
for chunk in valid_chunks
if haskey(chunk.json, :candidates) && length(chunk.json.candidates) > 0 &&
haskey(chunk.json.candidates[1], :content) &&
haskey(chunk.json.candidates[1].content, :parts) &&
length(chunk.json.candidates[1].content.parts) > 0 &&
haskey(chunk.json.candidates[1].content.parts[1], :text)
combined_text *= chunk.json.candidates[1].content.parts[1].text
end
end

# Create an OpenAI-compatible response
openai_resp = Dict{Symbol, Any}(
:choices => [],
:created => round(Int, time()),
:model => get(last_chunk, :modelVersion, "gemini"),
:object => "chat.completion",
:usage => Dict{Symbol, Any}()
)

# Extract usage information
if haskey(last_chunk, :usageMetadata)
usage = last_chunk.usageMetadata
openai_resp[:usage] = Dict{Symbol, Any}(
:prompt_tokens => get(usage, :promptTokenCount, 0),
:completion_tokens => get(usage, :candidatesTokenCount, 0),
:total_tokens => get(usage, :totalTokenCount, 0)
)
end

# Add the choice with the combined text
if haskey(last_chunk, :candidates) && !isempty(last_chunk.candidates)
finish_reason = get(last_chunk.candidates[1], :finishReason, "stop")
choice = Dict{Symbol, Any}(
:index => 0,
:finish_reason => lowercase(finish_reason),
:message => Dict{Symbol, Any}(
:role => "assistant",
:content => combined_text
)
)
push!(openai_resp[:choices], choice)
end

return openai_resp
end
Loading