-
-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathbasic.js
More file actions
163 lines (136 loc) · 4.42 KB
/
basic.js
File metadata and controls
163 lines (136 loc) · 4.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
'use strict'
/**
* Example Basic with @fastify/sse
*
* Test endpoints with curl:
*
* curl -N -H "Accept: text/event-stream" http://localhost:3000/events
* curl -N -H "Accept: text/event-stream" http://localhost:3000/stream
* curl -N -H "Accept: text/event-stream" http://localhost:3000/live
* curl -N -H "Accept: text/event-stream" http://localhost:3000/headers
* curl -N -H "Accept: text/event-stream" http://localhost:3000/replay
*
**/
async function buildServer () {
const fastify = require('fastify')({ logger: true })
// Register the SSE plugin
await fastify.register(require('../index.js'))
// Basic SSE endpoint
fastify.get('/events', { sse: true }, async (request, reply) => {
// Send a simple message
await reply.sse.send({ data: 'Hello SSE!' })
// Send multiple events
for (let i = 0; i < 5; i++) {
await reply.sse.send({
id: String(i),
event: 'counter',
data: { count: i, timestamp: Date.now() }
})
}
// Connection will close automatically when handler ends
})
// Stream endpoint with async generator
fastify.get('/stream', { sse: true }, async (request, reply) => {
async function * generateEvents () {
for (let i = 0; i < 10; i++) {
await new Promise(resolve => setTimeout(resolve, 1000))
yield {
id: String(i),
event: 'tick',
data: { tick: i, time: new Date().toISOString() }
}
}
}
await reply.sse.send(generateEvents())
})
// Persistent connection with keepAlive
fastify.get('/live', { sse: true }, async (request, reply) => {
// Keep connection alive
reply.sse.keepAlive()
// Send initial event
await reply.sse.send({ data: 'Connected to live stream' })
// Send periodic updates
const interval = setInterval(async () => {
try {
if (reply.sse.isConnected) {
await reply.sse.send({
event: 'heartbeat',
data: { timestamp: Date.now() }
})
} else {
clearInterval(interval)
}
} catch (error) {
clearInterval(interval)
}
}, 2000)
// Clean up on close
reply.sse.onClose(() => {
clearInterval(interval)
console.log('Live stream connection closed')
})
})
// Custom headers with sendHeaders
fastify.get('/headers', { sse: true }, async (request, reply) => {
// Set custom headers using Fastify's header methods
reply.header('X-Session-ID', 'session-12345')
reply.header('X-API-Version', 'v1.2.3')
reply.headers({
'X-User-Agent': request.headers['user-agent'] || 'unknown',
'X-Request-Time': new Date().toISOString()
})
// Manually send headers before any SSE data
reply.sse.sendHeaders()
// Now send SSE data
await reply.sse.send({
data: 'Headers sent manually before this message',
event: 'custom-headers'
})
})
// Replay functionality
const messageHistory = []
let eventId = 0
fastify.get('/replay', { sse: true }, async (request, reply) => {
// Handle replay if client reconnects
reply.sse.keepAlive()
await reply.sse.replay(async (lastEventId) => {
const startIndex = messageHistory.findIndex(msg => msg.id === lastEventId)
const messagesToReplay = startIndex !== -1
? messageHistory.slice(startIndex + 1)
: messageHistory
for (const message of messagesToReplay) {
await reply.sse.send(message)
}
})
// Send new message
const newMessage = {
id: String(++eventId),
data: { message: `New event ${eventId}`, timestamp: Date.now() }
}
messageHistory.push(newMessage)
// Keep only last 100 messages
if (messageHistory.length > 100) {
messageHistory.shift()
}
await reply.sse.send(newMessage)
})
return fastify
}
// Start the server
const start = async () => {
const server = await buildServer()
try {
await server.listen({ port: 3000, host: '0.0.0.0' })
console.log('Server listening on http://localhost:3000')
console.log('Try these endpoints:')
console.log(' GET /events - Basic SSE messages')
console.log(' GET /stream - Streaming with async generator')
console.log(' GET /live - Persistent connection with heartbeat')
console.log(' GET /headers - Custom headers with sendHeaders()')
console.log(' GET /replay - Messages with replay support')
} catch (err) {
server.log.error(err)
process.exit(1)
}
}
start()