1/* Part of SWI-Prolog 2 3 Author: Jan Wielemaker 4 E-mail: jan@swi-prolog.org 5 WWW: https://www.swi-prolog.org 6 Copyright (c) 2025, SWI-Prolog Solutions b.v. 7 All rights reserved. 8 9 Redistribution and use in source and binary forms, with or without 10 modification, are permitted provided that the following conditions 11 are met: 12 13 1. Redistributions of source code must retain the above copyright 14 notice, this list of conditions and the following disclaimer. 15 16 2. Redistributions in binary form must reproduce the above copyright 17 notice, this list of conditions and the following disclaimer in 18 the documentation and/or other materials provided with the 19 distribution. 20 21 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 29 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 30 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 31 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 32 POSSIBILITY OF SUCH DAMAGE. 33*/ 34 35:- module(json_rpc_client, 36 [ json_call/4, % +Stream, +Goal, -Result, +Options 37 json_notify/3, % +Stream, +Goal, +Options 38 json_batch/5, % +Stream, +Notifications, +Calls, -Results, +Options 39 json_full_duplex/2 % +Stream, :Options 40 ]). 41:- autoload(library(json), [json_write_dict/3, json_read_dict/3]). 42:- autoload(library(option), [option/2]). 43:- use_module(library(debug), [debug/3]). 44:- autoload(library(apply), [maplist/4, maplist/3]). 45:- autoload(library(lists), [append/3, member/2]). 46:- autoload(library(terms), [mapsubterms/3]). 47:- autoload(library(http/http_stream), [stream_range_open/3]). 48 49:- meta_predicate 50 json_full_duplex(+, :).
60:- dynamic 61 json_result_queue/2, % Stream, Queue 62 failed_id/2. % Queue, Id
If Stream is closed this library terminates the thread and related message queue.
93json_call(Stream, Goal, Result, Options) :- 94 Goal =.. [Name|Args0], 95 call_args(Args0, Args), 96 client_id(Id, Options), 97 debug(json_rpc, 'Sending request ~p', [Id]), 98 json_send(Stream, 99 #{ jsonrpc: "2.0", 100 id: Id, 101 method: Name, 102 params: Args 103 }, Options), 104 setup_call_catcher_cleanup( 105 true, 106 json_wait_reply(Stream, Id, Result, Options), 107 Catcher, 108 client_cleanup(Catcher, Stream, Id)). 109 110call_args([Arg], Args), is_dict(Arg) => 111 Args = Arg. 112call_args([Args0], Args), is_list(Args0) => 113 Args = Args0. 114call_args(Args0, Args) => 115 Args = Args0. 116 117json_wait_reply(Stream, Id, Result, Options) :- 118 with_mutex(json_rpc_client, 119 get_json_result_queue(Stream, Queue, Options)), 120 debug(json_rpc, 'Waiting for reply', []), 121 ( thread_get_message(Queue, done(Id, Result0), Options) 122 -> map_reply(Result0, Result1, Options), 123 debug(json_rpc, 'Got reply for ~p', [Id]), 124 ( Result1 = throw(Error) 125 -> throw(Error) 126 ; Result1 = true(Result) 127 ) 128 ; assertz(failed_id(Queue, Id)), 129 fail 130 ). 131 132map_reply(Reply0, Reply, Options) :- 133 option(value_string_as(atom), Options), 134 !, 135 mapsubterms(map_string, Reply0, Reply). 136map_reply(Reply, Reply, _). 137 138map_string(String, Atom) :- 139 string(String), 140 atom_string(Atom,String). 141 142client_id(Id, Options) :- 143 option(id(Id), Options), 144 !. 145client_id(Id, _Options) :- 146 flag(json_client_id, Id, Id+1). 147 148client_cleanup(exit, _, _) => 149 true. 150client_cleanup(_, Stream, Id) => 151 json_result_queue(Stream, Queue), 152 assertz(failed_id(Queue, Id)).
159json_notify(Stream, Goal, Options) :-
160 Goal =.. [Name|Args0],
161 call_args(Args0, Args),
162 json_send(Stream,
163 #{ jsonrpc: "2.0",
164 method: Name,
165 params: Args
166 }, Options).error(Dict), where Dict holds the
code, message and optional data field. Note that error(Dict)
is not a valid JSON type and this is thus unambiguous. While the
JSON RPC standard allows the server to process the messages in any
order and allows for concurrent processing, all results are sent in
one message and this client ensures the elements of the Results list
are in the same order as the Calls list. If the Calls list is empty
this predicate does not wait for a reply.183json_batch(Stream, Notifications, Calls, Results, Options) :- 184 maplist(call_to_json_request, Calls, IDs, Requests1), 185 maplist(call_to_json_notification, Notifications, Requests2), 186 append(Requests1, Requests2, Batch), 187 json_send(Stream, Batch, Options), 188 flush_output(Stream), 189 ( IDs == [] 190 -> true 191 ; batch_id(IDs, Id), 192 json_wait_reply(Stream, Id, Results0, Options), 193 sort(id, <, Results0, Results1), 194 maplist(batch_result, Results1, Results) 195 ). 196 197call_to_json_request(Goal, Id, Request) :- 198 Goal =.. [Name|Args], 199 client_id(Id, []), 200 Request = #{ jsonrpc: "2.0", 201 id: Id, 202 method: Name, 203 params: Args 204 }. 205 206call_to_json_notification(Goal, Notification) :- 207 Goal =.. [Name|Args], 208 Notification = #{ jsonrpc: "2.0", 209 method: Name, 210 params: Args 211 }. 212 213batch_id(IDs, Id) :- 214 sort(IDs, Canonical), 215 variant_sha1(Canonical, Id). 216 217batch_result(Reply, Result), Result0 = Reply.get(result) => 218 Result = Result0. 219batch_result(Reply, Result), Result0 = Reply.get(error) => 220 Result = error(Result0).
224json_send(Stream, Dict, Options) :- 225 option(header(true), Options), 226 !, 227 with_output_to(string(Msg), 228 json_write_dict(current_output, Dict, Options)), 229 utf8_length(Msg, Len), 230 format(Stream, 231 'Content-Length: ~d\r\n\r\n~s', [Len, Msg]), 232 flush_output(Stream). 233json_send(Stream, Dict, Options) :- 234 with_output_to(Stream, 235 json_write_dict(Stream, Dict, Options)), 236 flush_output(Stream). 237 238utf8_length(String, Len) :- 239 setup_call_cleanup( 240 open_null_stream(Null), 241 ( set_stream(Null, encoding(utf8)), 242 format(Null, '~s', [String]), 243 flush_output(Null), 244 byte_count(Null, Len) 245 ), 246 close(Null)). 247 248 /******************************* 249 * INCOMMING DATA * 250 *******************************/
library(jso_rpc_server) in the module derived from the Options
list.258json_full_duplex(Stream, Options) :- 259 with_mutex(json_rpc_client, json_full_duplex_(Stream, Options)). 260 261json_full_duplex_(Stream, _) :- 262 json_result_queue(Stream, _Queue), 263 !, 264 permission_error(json, full_duplex, Stream). 265json_full_duplex_(Stream, M:Options) :- 266 get_json_result_queue(Stream, _Queue, 267 [server_module(M)|Options]).
276get_json_result_queue(Stream, Queue, _Options) :- 277 json_result_queue(Stream, Queue), 278 !. 279get_json_result_queue(Stream, Queue, Options) :- 280 message_queue_create(Queue), 281 asserta(json_result_queue(Stream, Queue)), 282 flag(json_rpc_client_dispatcher, N, N+1), 283 format(atom(Alias), 'json_rpc_client:~w', [N]), 284 thread_create( 285 handle_result_loop(Stream, Options), 286 _Id, 287 [ detached(true), 288 alias(Alias), 289 inherit_from(main), 290 at_exit(cleanup_client(Stream)) 291 ]). 292 293handle_result_loop(Stream, Options) :- 294 handle_result(Stream, EOF, Options), 295 ( EOF == true 296 -> true 297 ; handle_result_loop(Stream, Options) 298 ). 299 300handle_result(Stream, EOF, Options) :- 301 Error = error(Formal, _), 302 catch(json_receive(Stream, Reply, Options), 303 Error, 304 true), 305 debug(json_rpc, 'Received ~p', [Reply]), 306 ( Reply == end_of_file(true) 307 -> EOF = true 308 ; var(Formal) 309 -> handle_reply(Stream, Reply, Options) 310 ; handle_error(Error, EOF) 311 ). 312 313json_receive(Stream, Reply, Options) :- 314 option(header(true), Options), 315 !, 316 read_header(Stream, Lines), 317 ( Lines == [] 318 -> Reply = end_of_file(true) 319 ; header_content_length(Lines, Length), 320 setup_call_cleanup( 321 stream_range_open(Stream, Data, [size(Length)]), 322 json_read_dict(Data, 323 Reply, 324 Options), 325 close(Data)) 326 ). 327json_receive(Stream, Reply, Options) :- 328 json_read_dict(Stream, 329 Reply, 330 [ end_of_file(end_of_file(true)) 331 | Options 332 ]). 333 334read_header(Stream, Lines) :- 335 read_string(Stream, "\n", "\r\t ", Sep, Line), 336 ( (Line == "" ; Sep == -1) 337 -> Lines = [] 338 ; Lines = [Line|Rest], 339 read_header(Stream, Rest) 340 ). 341 342header_content_length(Lines, Length) :- 343 member(Line, Lines), 344 split_string(Line, ":", "\t\s", [Field,Value]), 345 string_lower(Field, "content-length"), 346 !, 347 number_string(Length, Value). 348 349handle_reply(Stream, Batch, _Options), 350 is_list(Batch) => 351 maplist(get_dict(id), Batch, IDs), 352 batch_id(IDs, Id), 353 json_result_queue(Stream, Queue), 354 send_done(Queue, Id, true(Batch)). 355handle_reply(Stream, Reply, _Options), 356 #{ jsonrpc: "2.0", 357 result: Result, 358 id: Id } :< Reply => 359 json_result_queue(Stream, Queue), 360 send_done(Queue, Id, true(Result)). 361handle_reply(Stream, Reply, _Options), 362 #{ jsonrpc: "2.0", 363 error: Error, 364 id: Id } :< Reply => 365 json_result_queue(Stream, Queue), 366 send_done(Queue, Id, throw(error(json_rpc_error(Error), _))). 367handle_reply(Stream, Request, Options), 368 #{ jsonrpc: "2.0", 369 method: _Method, 370 params: _Params } :< Request => 371 option(server_module(M), Options), 372 json_rpc_server:json_rpc_dispatch_request(M, Stream, Request, Options). 373 374 375send_done(Queue, Id, _Data) :- 376 retract(failed_id(Queue, Id)), 377 !. 378send_done(Queue, Id, Data) :- 379 thread_send_message(Queue, done(Id, Data)), 380 clean_dead_requests(Queue). 381 382clean_dead_requests(Queue) :- 383 forall(failed_id(Queue, Id), 384 cleanup_dead_id(Queue, Id)). 385 386cleanup_dead_id(Queue, Id) :- 387 ( thread_get_message(Queue, done(Id, _), [timeout(0)]) 388 -> retract(failed_id(Queue, Id)) 389 ; true 390 ). 391 392handle_error(error(existence_error(stream, _), _), EOF) => 393 EOF = true. 394handle_error(Error, _EOF) => 395 print_message(error, Error).
401cleanup_client(Stream) :- 402 forall(retract(json_result_queue(Stream, Queue)), 403 do_cleanup(Stream, Queue)). 404 405do_cleanup(Stream, Queue) :- 406 close(Stream, [force(true)]), 407 message_queue_destroy(Queue)
JSON RPC client
This module implements a JSON RPC compliant client. The three predicates require a stream pair (see stream_pair/2) that connects us to a JSON RPC server.
*/