View source with raw comments or as raw
    1/*  Part of SWISH
    2
    3    Author:        Jan Wielemaker
    4    E-mail:        J.Wielemaker@cs.vu.nl
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (C): 2022, VU University Amsterdam
    7			 CWI Amsterdam
    8                         SWI-Prolog Solutions b.v.
    9    All rights reserved.
   10
   11    Redistribution and use in source and binary forms, with or without
   12    modification, are permitted provided that the following conditions
   13    are met:
   14
   15    1. Redistributions of source code must retain the above copyright
   16       notice, this list of conditions and the following disclaimer.
   17
   18    2. Redistributions in binary form must reproduce the above copyright
   19       notice, this list of conditions and the following disclaimer in
   20       the documentation and/or other materials provided with the
   21       distribution.
   22
   23    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   24    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   25    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   26    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   27    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   28    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   29    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   30    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   31    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   32    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   33    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   34    POSSIBILITY OF SUCH DAMAGE.
   35*/
   36
   37:- module(swish_redis,
   38          [ reinit_redis/0,
   39            redis_swish_stream/2,       % +Name, -Key
   40            redis_consumer/1,           % -Consumer
   41            swish_cluster/1             % -Pairs
   42          ]).   43:- use_module(library(redis)).   44:- use_module(library(redis_streams)).   45:- use_module(library(broadcast)).   46:- use_module(library(option)).   47:- use_module(library(socket)).   48:- use_module(library(apply)).   49:- use_module(library(pairs)).   50:- use_module(library(http/http_path)).   51:- use_module(library(http/http_dispatch)).   52:- use_module(library(http/http_json)).   53
   54:- use_module(config).

Redis stream connection

Setup to listening to redis events. We need all the push facilities of Redis:

Note that config-available sets up the redis server using the alias swish. Streams (redis keys) to listen on are registered using the multifile predicate stream/2. */

   70:- multifile
   71    stream/2.   72
   73:- listen(http(pre_server_start(Port)),
   74          init_redis(Port)).   75
   76:- dynamic
   77    port/1,                             % Server port
   78    thread/1.                           % Listener thread.
   79
   80init_redis(_Port) :-
   81    \+ swish_config:config(redis, _),
   82    !.
   83init_redis(_Port) :-
   84    catch(thread_property(redis_listener, id(_)), error(_,_), fail),
   85    !.
   86init_redis(Port) :-
   87    init_pubsub,
   88    retractall(port(_)),
   89    asserta(port(Port)),
   90    findall(Group-S, group_stream(S, Group), Pairs),
   91    keysort(Pairs, Sorted),
   92    group_pairs_by_key(Sorted, Grouped),
   93    consumer(Port, Consumer),
   94    maplist(create_listener(Consumer), Grouped),
   95    publish_consumer(Consumer).
   96
   97create_listener(_, (-)-Streams) :-
   98    !,
   99    thread_create(xlisten(swish, Streams, []),
  100                  Id, [ alias(redis_no_group)
  101                      ]),
  102    assertz(thread(Id)).
  103create_listener(Consumer, Group-Streams) :-
  104    atom_concat(redis_, Group, Alias),
  105    thread_create(xlisten_group(swish, Group, Consumer, Streams,
  106                                [ block(1)
  107                                ]),
  108                  Id, [ alias(Alias)
  109                      ]),
  110    assertz(thread(Id)).
 reinit_redis
Stop and start the redis thread. May be used to reconfigure it or restart when crashed.
  117reinit_redis :-
  118    forall(retract(thread(Id)),
  119           catch(stop_listener(Id), error(_,_), true)),
  120    port(Port),
  121    init_redis(Port).
  122
  123stop_listener(Id) :-
  124    thread_signal(Id, redis(stop(false))),
  125    thread_join(Id, _).
  126
  127group_stream(Key, Group) :-
  128    stream(Name, Options),
  129    redis_swish_stream(Name, Key),
  130    option(max_len(MaxLen), Options, 1000),
  131    option(group(Group), Options, -),
  132    add_consumer_group(Group, Key),
  133    xstream_set(swish, Key, maxlen(MaxLen)).
  134
  135add_consumer_group(-, _) :-
  136    !.
  137add_consumer_group(Group, Key) :-
  138    catch(redis(swish, xgroup(create, Key, Group, $, mkstream), _),
  139          error(redis_error(busygroup,_),_),
  140          true).
  141
  142redis_swish_stream(Name, Key) :-
  143    swish_config(redis_prefix, Prefix),
  144    atomic_list_concat([Prefix, Name], :, Key).
 consumer(+Address, -Consumer) is det
Find the name of this node in the redis network. Each node needs to have a name to be part of a Redis consumer node, as well as to know which sessions reside on which node.
  152:- dynamic consumer/1.  153
  154consumer(_, Consumer) :-
  155    consumer(Consumer0), !,
  156    Consumer = Consumer0.
  157consumer(Address, Consumer) :-
  158    address_consumer(Address, Consumer0),
  159    asserta(consumer(Consumer0)),
  160    Consumer = Consumer0.
  161
  162address_consumer(_, Consumer) :-
  163    swish_config(redis_consumer, Consumer),
  164    !.
  165address_consumer(Host:Port, Consumer) :-
  166    !,
  167    atomic_list_concat([Host,Port], :, Consumer).
  168address_consumer(Port, Consumer) :-
  169    gethostname(Host),
  170    atomic_list_concat([Host,Port], :, Consumer).
 redis_consumer(-Consumer) is det
True when Consumer is the name of this redis node.
  176redis_consumer(Consumer) :-
  177    consumer(Consumer).
  178
  179publish_consumer(Consumer) :-
  180    http_absolute_uri(swish(.), URL),
  181    consumer_key(Server, Key),
  182    redis(Server, hset(Key:url, Consumer, URL)),
  183    redis(Server, publish(swish:swish, joined(Consumer, URL) as prolog), Count),
  184    print_message(informational, swish(redis_peers(Count))),
  185    at_halt(publish_halt).
  186
  187% More reliable than at_halt/1.
  188:- listen(http(shutdown), publish_halt).  189
  190publish_halt :-
  191    redis_consumer(Consumer),
  192    consumer_key(Server, Key),
  193    (   redis(Server, hdel(Key:url, Consumer), 0)
  194    ->  true
  195    ;   redis(Server, publish(swish:swish, left(Consumer) as prolog), _Count)
  196    ).
  197
  198consumer_key(swish, Key) :-
  199    swish_config(redis_prefix, Prefix),
  200    atomic_list_concat([Prefix, consumer], :, Key).
 swish_cluster(-Pairs) is det
True when Pairs is a list Consumer-URL of peer SWISH servers in this cluster.
  207swish_cluster(Pairs) :-
  208    consumer_key(Server, Key),
  209    redis(Server, hgetall(Key:url), Pairs).
  210
  211:- http_handler(swish(backends), backends, [id(backends)]).  212
  213backends(_Request) :-
  214    swish_cluster(Pairs),
  215    maplist(backend_stats, Pairs, Pairs1),
  216    dict_pairs(Dict, json, Pairs1),
  217    reply_json(Dict).
  218
  219backend_stats(Consumer-URL, Consumer-Stat) :-
  220    broadcast_request(swish(backend_status(Consumer, Stat0))),
  221    !,
  222    Stat = Stat0.put(url, URL).
  223backend_stats(Consumer-URL, Consumer-json{url:URL}).
 init_pubsub is det
Prepare to listen to the SWISH pubsub channels.
  230init_pubsub :-
  231    redis_current_subscription(redis_pubsub, _),
  232    !.
  233init_pubsub :-
  234    redis_subscribe(swish,
  235                    [ swish:swish,    % Overall control
  236                      swish:chat,     % Chat broadcast messages
  237                      swish:gitty     % Gitty sync requests
  238                    ],
  239                    _,
  240                    [ alias(redis_pubsub)
  241                    ]).
  242
  243:- initialization
  244    listen(redis(_, 'swish:swish', Message),
  245           swish_message(swish(Message))).  246
  247swish_message(Message) :-
  248    print_message(informational, Message).
  249
  250:- multifile prolog:message//1.  251
  252prolog:message(swish(redis_peers(Count))) -->
  253    [ 'Redis: the are ~d peers in the cluster'-[Count] ].
  254prolog:message(swish(joined(Consumer, URL))) -->
  255    (   { redis_consumer(Consumer) }
  256    ->  []
  257    ;   [ 'Redis: ~w joined the cluster, at ~w'-[Consumer, URL] ]
  258    ).
  259prolog:message(swish(left(Consumer))) -->
  260    (   { redis_consumer(Consumer) }
  261    ->  []
  262    ;   [ 'Redis: ~w left the cluster'-[Consumer] ]
  263    )