-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathPodHandler.cs
More file actions
301 lines (269 loc) · 13.4 KB
/
PodHandler.cs
File metadata and controls
301 lines (269 loc) · 13.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
using BencodeNET.Objects;
using BencodeNET.Parsing;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using Microsoft.Data.SqlClient;
using System.IO;
using System.IO.Pipelines;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace pod.xledger.sql_server {
public class PodHandler {
Stream _inputStream;
Stream _outputStream;
PipeReader _reader;
PipeWriter _writer;
BencodeParser _parser;
public PodHandler(Stream inputStream, Stream outputStream) {
_inputStream = inputStream;
_outputStream = outputStream;
_reader = PipeReader.Create(inputStream);
_writer = PipeWriter.Create(outputStream, new StreamPipeWriterOptions(leaveOpen: true));
_parser = new BencodeParser();
}
public async Task HandleMessages() {
var cts = new CancellationTokenSource();
while (!cts.IsCancellationRequested && _inputStream.CanRead && _outputStream.CanWrite) {
try {
var msg = await _parser.ParseAsync<BDictionary>(_reader, cts.Token);
if (msg.TryGetValue("op", out IBObject op)) {
var s = ((BString)op).ToString();
await HandleMessage(s, msg, cts);
}
} catch (BencodeNET.Exceptions.BencodeException ex) when /* HACK */ (ex.Message.Contains("but reached end of stream")) {
// ^ This message filter appears to be the only way to check if the stream is closed, because
// the BencodeNET does not expose an inner exception, specific code, etc when the stream closes.
await SendException(null, "Reached end of stream");
return;
} catch (OperationCanceledException) {
} catch (Exception ex) {
await SendException(null, ex.Message);
}
}
}
async Task HandleMessage(string operation, BDictionary msg, CancellationTokenSource cts) {
switch (operation) {
case "describe":
var resp = new BDictionary {
["format"] = new BString("json"),
["namespaces"] = new BList {
new BDictionary {
["name"] = new BString("pod.xledger.sql-server"),
["vars"] = new BList {
new BDictionary { ["name"] = new BString("execute!") },
new BDictionary { ["name"] = new BString("execute-one!") }
}
}
},
["ops"] = new BDictionary {
["shutdown"] = new BDictionary()
}
};
await resp.EncodeToAsync(_writer);
break;
case "shutdown":
cts.Cancel();
break;
case "invoke":
await HandleInvoke(msg, cts);
break;
default:
break;
}
}
static readonly SqlTypesJsonConverter _sqlTypesConverter = new SqlTypesJsonConverter();
public static string JSON(object o) {
var s = JsonConvert.SerializeObject(o, _sqlTypesConverter);
return s;
}
public static JToken ParseJson(string s) {
var reader = new JsonTextReader(new StringReader(s));
// We don't need/want NewtonSoft to tamper with our data:
reader.DateParseHandling = DateParseHandling.None;
reader.DateTimeZoneHandling = DateTimeZoneHandling.RoundtripKind;
return JToken.Load(reader);
}
public static class StatusMessages {
public static readonly BList DONE_ERROR = new BList(new[] { "done", "error" });
public static readonly BList DONE = new BList(new[] { "done" });
}
async Task SendException(string id, string exMessage, object exData = null) {
var resp = new BDictionary {
["ex-message"] = new BString(exMessage),
["status"] = StatusMessages.DONE_ERROR
};
if (id != null) { resp["id"] = new BString(id); }
if (exData != null) { resp["ex-data"] = new BString(JSON(exData)); }
await resp.EncodeToAsync(_writer);
}
async Task SendResult(string id, object result, bool isJson = false) {
var json = isJson ? (string)result : JSON(result);
var resp = new BDictionary {
["id"] = new BString(id),
["value"] = new BString(json),
["status"] = StatusMessages.DONE
};
await resp.EncodeToAsync(_writer);
}
async Task HandleInvoke(BDictionary msg, CancellationTokenSource cts) {
if (!(msg.TryGetNonBlankString("id", out var id)
&& msg.TryGetNonBlankString("var", out var varname))) {
await SendException(id, "Missing \"id\" and/or \"var\" keys in \"invoke\" operation payload");
return;
}
switch (varname) {
case "pod.xledger.sql-server/execute!":
await HandleVar_Execute(id, msg);
break;
case "pod.xledger.sql-server/execute-one!":
await HandleVar_ExecuteOne(id, msg);
break;
case "pod.xledger.sql-server/execute-raw!":
await HandleVar_ExecuteRaw(id, msg);
break;
default:
await SendException(id, $"No such var: \"{varname}\"");
break;
}
}
async Task HandleVar_Execute_Internal(string id, BDictionary msg, bool expectOne = false, bool rawResults = false) {
if (!msg.TryGetValue("args", out var beArgs) || !(beArgs is BString beArgsStr)) {
await SendException(id, $"Missing required \"args\" argument.");
return;
}
IReadOnlyDictionary<string, JToken> argMap;
try {
argMap = JsonConvert.DeserializeObject<IList<IReadOnlyDictionary<string, JToken>>>(beArgsStr.ToString()).First();
} catch (Exception ex) {
await SendException(id, $"Couldn't deserialize json payload. Expected a map. Error: {ex.Message}");
return;
}
if (!argMap.TryGetNonBlankString("connection-string", out var connStr)) {
await SendException(id, $"Missing required \"connection-string\" argument.");
return;
}
if (!argMap.TryGetNonBlankString("command-text", out var commandText)) {
await SendException(id, $"Missing required \"command-text\" argument.");
return;
}
try {
using (var conn = new SqlConnection(connStr))
using (var cmd = conn.CreateCommand()) {
await conn.OpenAsync();
cmd.CommandText = commandText;
if (argMap.TryGetValue("command-type", out JToken commandTypeTok)
&& commandTypeTok.Type != JTokenType.Null) {
if (commandTypeTok.Type != JTokenType.String) {
await SendException(id, $"Expected string. Failing key: \"$.command-type\"");
return;
}
var commandType = commandTypeTok.Value<string>();
switch (commandType) {
case "stored-procedure":
cmd.CommandType = System.Data.CommandType.StoredProcedure;
break;
case "text":
break; // This is the default
default:
await SendException(id, $"Expected \"stored-procedure\" | \"text\". Failing key: \"$.command-type\"");
return;
}
}
if (argMap.TryGetValue("parameters", out JToken paramTok)
&& paramTok is JObject paramObj) {
foreach (var item in paramObj) {
if (!(item.Value is JValue v)) {
await SendException(id, $"Can only accept simple values (integers, strings, etc) for parameters. Failing key: \"$.parameters.{item.Key}\"");
return;
}
cmd.Parameters.AddWithValue(item.Key, v.Value);
}
}
var results = new List<object>();
bool multiResultSet;
argMap.TryGetBool("multi-rs", out multiResultSet); // same key as next.jdbc
using (var rdr = await cmd.ExecuteReaderAsync()) {
do {
var fieldCount = rdr.FieldCount;
var rs = new ResultSet {
columns =
Enumerable.Range(0, fieldCount)
.Select(i => rdr.GetName(i))
.ToArray()
};
var isJson = rs.columns.Length == 1 && rs.columns[0] == "JSON_F52E2B61-18A1-11d1-B105-00805F49916B";
if (isJson) {
var sb = new StringBuilder();
while (rdr.Read()) { sb.Append(rdr.GetString(0)); }
if (expectOne || !multiResultSet) {
await SendResult(id, sb.ToString(), isJson: true);
return;
} else {
// @PERF - Think of a way to eliminate deserialize -> serialize for this case
var obj = ParseJson(sb.ToString());
results.Add(obj);
}
} else {
var rows = rs.rows = new List<object[]>();
while (rdr.Read()) {
var row = new object[fieldCount];
for (int i = 0; i < fieldCount; i++) {
rdr.GetValues(row);
}
rows.Add(row);
}
if (expectOne) {
if (rows.Count > 0) {
await SendResult(id, ResultSet2DictArray(rs)[0]);
} else {
await SendResult(id, null);
}
return;
}
results.Add(rawResults ? (object)rs : ResultSet2DictArray(rs));
}
} while (rdr.NextResult() && multiResultSet);
}
object result = null;
if (rawResults || multiResultSet) {
result = results;
} else if (results.Count > 0) {
result = results[0];
}
await SendResult(id, result);
}
} catch (Exception ex) {
await SendException(id, ex.Message);
}
}
Dictionary<string, object>[] ResultSet2DictArray(ResultSet resultSet) {
var ret = new Dictionary<string, object>[resultSet.rows.Count];
for (int i = 0; i < resultSet.rows.Count; i++) {
var d = ret[i] = new Dictionary<string, object>(resultSet.columns.Length);
for (int j = 0; j < resultSet.columns.Length; j++) {
var k = resultSet.columns[j];
var v = resultSet.rows[i][j];
if (Convert.IsDBNull(v)) {
v = null;
}
d[k] = v;
}
}
return ret;
}
async Task HandleVar_Execute(string id, BDictionary msg) =>
await HandleVar_Execute_Internal(id, msg);
async Task HandleVar_ExecuteOne(string id, BDictionary msg) =>
await HandleVar_Execute_Internal(id, msg, expectOne: true);
async Task HandleVar_ExecuteRaw(string id, BDictionary msg) =>
await HandleVar_Execute_Internal(id, msg, rawResults: true);
}
public class ResultSet {
public string[] columns;
public List<object[]> rows;
}
}