diff --git a/CHANGELOG.md b/CHANGELOG.md index 8f777af..f4dfe71 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +## [0.7.0] + +### Added +- Support for Google Gemini API streaming responses (`flavor = GoogleStream()`). + ## [0.6.1] ### Fixed diff --git a/Project.toml b/Project.toml index 7dfc0b7..1f3dd16 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "StreamCallbacks" uuid = "c1b9e933-98a0-46fc-8ea7-3b58b195fb0a" authors = ["J S <49557684+svilupp@users.noreply.github.com> and contributors"] -version = "0.6.1" +version = "0.7.0" [deps] HTTP = "cd3eb016-35fb-5094-929b-558a96fad6f3" diff --git a/examples/gemini_example.jl b/examples/gemini_example.jl new file mode 100644 index 0000000..bd6896d --- /dev/null +++ b/examples/gemini_example.jl @@ -0,0 +1,30 @@ +# Calling Google Gemini with StreamCallbacks +using HTTP, JSON3 +using StreamCallbacks + +# Prepare target and auth +api_key = get(ENV, "GOOGLE_API_KEY", "") +model = "gemini-2.0-flash" +model = "gemini-2.5-pro-exp-03-25" +url = "https://generativelanguage.googleapis.com/v1beta/models/$model:streamGenerateContent?alt=sse&key=$api_key" +headers = ["Content-Type" => "application/json"] + +# Prepare the request payload +cb = StreamCallback(; out = stdout, flavor = GoogleStream(), verbose = true) +payload = IOBuffer() +JSON3.write(payload, Dict( + :contents => [Dict( + :parts => [Dict( + :text => "Count from 1 to 20." + )] + )] +)) + +# Send the request +resp = streamed_request!(cb, url, headers, payload) + +## Check the response +resp # should be a `HTTP.Response` object with a message body like if we wouldn't use streaming + +## Check the callback +cb.chunks # should be a vector of `StreamChunk` objects, each with a `json` field with received data from the API diff --git a/src/StreamCallbacks.jl b/src/StreamCallbacks.jl index 8230bb7..3735a0a 100644 --- a/src/StreamCallbacks.jl +++ b/src/StreamCallbacks.jl @@ -3,8 +3,9 @@ module StreamCallbacks using HTTP, JSON3 using PrecompileTools -export StreamCallback, StreamChunk, OpenAIStream, AnthropicStream, OllamaStream, - streamed_request! +export StreamCallback, StreamChunk, streamed_request! +export OpenAIStream, AnthropicStream, GoogleStream, OllamaStream + include("interface.jl") include("shared_methods.jl") @@ -15,6 +16,8 @@ include("stream_anthropic.jl") include("stream_ollama.jl") +include("stream_gemini.jl") + @compile_workload begin include("precompilation.jl") end diff --git a/src/interface.jl b/src/interface.jl index d150772..615a3bb 100644 --- a/src/interface.jl +++ b/src/interface.jl @@ -70,6 +70,7 @@ Available flavors: abstract type AbstractStreamFlavor end struct OpenAIStream <: AbstractStreamFlavor end struct AnthropicStream <: AbstractStreamFlavor end +struct GoogleStream <: AbstractStreamFlavor end struct OllamaStream <: AbstractStreamFlavor end ## Default implementations diff --git a/src/precompilation.jl b/src/precompilation.jl index 5e860ea..55d0145 100644 --- a/src/precompilation.jl +++ b/src/precompilation.jl @@ -3,6 +3,9 @@ blob = "event: start\ndata: {\"key\": \"value\"}\n\nevent: end\ndata: {\"status\ chunks, spillover = extract_chunks(OpenAIStream(), blob) blob = "{\"key\":\"value\", \"done\":true}" chunks, spillover = extract_chunks(OllamaStream(), blob) +# Add Gemini precompilation +gemini_blob = "{\"candidates\": [{\"content\": {\"parts\": [{\"text\": \"Hello\"}],\"role\": \"model\"}}],\"usageMetadata\": {\"promptTokenCount\": 9,\"totalTokenCount\": 9},\"modelVersion\": \"gemini-2.0-flash\"}\r\n\r\n" +chunks, spillover = extract_chunks(GoogleStream(), gemini_blob) # Chunk examples io = IOBuffer() @@ -30,4 +33,17 @@ build_response_body(flavor, cb) flavor = OllamaStream() is_done(flavor, example_chunk) extract_content(flavor, example_chunk) -build_response_body(flavor, cb) \ No newline at end of file +build_response_body(flavor, cb) + +# GoogleStream examples +flavor = GoogleStream() +gemini_chunk = StreamChunk( + nothing, + """{"candidates":[{"content":{"parts":[{"text":"Hello"}],"role":"model"}}]}""", + JSON3.read("""{"candidates":[{"content":{"parts":[{"text":"Hello"}],"role":"model"}}]}""") +) +is_done(flavor, gemini_chunk) +extract_content(flavor, gemini_chunk) +cb_gemini = StreamCallback(out = io, flavor = GoogleStream()) +callback(cb_gemini, gemini_chunk) +build_response_body(flavor, cb_gemini) diff --git a/src/stream_gemini.jl b/src/stream_gemini.jl new file mode 100644 index 0000000..b2d38ee --- /dev/null +++ b/src/stream_gemini.jl @@ -0,0 +1,138 @@ +# This file defines the methods for the GoogleStream flavor + +""" + extract_chunks(::GoogleStream, blob::AbstractString; kwargs...) + +Extract the chunks from the received SSE blob for Google Gemini API. +Returns a list of `StreamChunk` and the next spillover (if message was incomplete). +""" +function extract_chunks(::GoogleStream, blob::AbstractString; + spillover::AbstractString = "", verbose::Bool = false, kwargs...) + chunks = StreamChunk[] + next_spillover = "" + + # Gemini uses simpler SSE format with just "data:" prefix + # Split by double newlines which separate SSE events + blob_split = split(blob, r"\r?\n\r?\n") + + for chunk_data in blob_split + isempty(chunk_data) && continue + + # Extract data after "data:" prefix + if startswith(chunk_data, "data: ") + json_str = chunk_data[7:end] # Skip "" prefix + + # Try to parse JSON + json_obj = nothing + try + json_obj = JSON3.read(json_str) + catch e + verbose && @warn "Cannot parse Gemini JSON: $json_str" exception=e + end + + # Create chunk + push!(chunks, StreamChunk(nothing, json_str, json_obj)) + end + end + + return chunks, next_spillover +end + +""" + is_done(::GoogleStream, chunk::AbstractStreamChunk; kwargs...) + +Check if the stream is done for Google Gemini API. +""" +function is_done(::GoogleStream, chunk::AbstractStreamChunk; kwargs...) + if !isnothing(chunk.json) + # Check for completion markers in Gemini response + if haskey(chunk.json, :candidates) && length(chunk.json.candidates) > 0 + candidate = chunk.json.candidates[1] + return haskey(candidate, :finishReason) && candidate.finishReason == "STOP" + end + end + return false +end + +""" + extract_content(::GoogleStream, chunk::AbstractStreamChunk; kwargs...) + +Extract the content from the chunk for Google Gemini API. +""" +function extract_content(::GoogleStream, chunk::AbstractStreamChunk; kwargs...) + if !isnothing(chunk.json) && haskey(chunk.json, :candidates) + candidates = chunk.json.candidates + if length(candidates) > 0 && haskey(candidates[1], :content) && + haskey(candidates[1].content, :parts) && length(candidates[1].content.parts) > 0 + part = candidates[1].content.parts[1] + if haskey(part, :text) + return part.text + end + end + end + return nothing +end +""" + build_response_body(::GoogleStream, cb::AbstractStreamCallback; kwargs...) + +Build the response body from the chunks for Google Gemini API. +Returns an OpenAI-compatible response format to ensure compatibility with code expecting OpenAI responses. +""" +function build_response_body(::GoogleStream, cb::AbstractStreamCallback; kwargs...) + # Extract all non-empty chunks with JSON data + valid_chunks = filter(c -> !isnothing(c.json), cb.chunks) + + if isempty(valid_chunks) + return nothing + end + + # Use the last chunk as the base for our response + last_chunk = valid_chunks[end].json + + # Combine text from all chunks + combined_text = "" + for chunk in valid_chunks + if haskey(chunk.json, :candidates) && length(chunk.json.candidates) > 0 && + haskey(chunk.json.candidates[1], :content) && + haskey(chunk.json.candidates[1].content, :parts) && + length(chunk.json.candidates[1].content.parts) > 0 && + haskey(chunk.json.candidates[1].content.parts[1], :text) + combined_text *= chunk.json.candidates[1].content.parts[1].text + end + end + + # Create an OpenAI-compatible response + openai_resp = Dict{Symbol, Any}( + :choices => [], + :created => round(Int, time()), + :model => get(last_chunk, :modelVersion, "gemini"), + :object => "chat.completion", + :usage => Dict{Symbol, Any}() + ) + + # Extract usage information + if haskey(last_chunk, :usageMetadata) + usage = last_chunk.usageMetadata + openai_resp[:usage] = Dict{Symbol, Any}( + :prompt_tokens => get(usage, :promptTokenCount, 0), + :completion_tokens => get(usage, :candidatesTokenCount, 0), + :total_tokens => get(usage, :totalTokenCount, 0) + ) + end + + # Add the choice with the combined text + if haskey(last_chunk, :candidates) && !isempty(last_chunk.candidates) + finish_reason = get(last_chunk.candidates[1], :finishReason, "stop") + choice = Dict{Symbol, Any}( + :index => 0, + :finish_reason => lowercase(finish_reason), + :message => Dict{Symbol, Any}( + :role => "assistant", + :content => combined_text + ) + ) + push!(openai_resp[:choices], choice) + end + + return openai_resp +end \ No newline at end of file