View source with raw comments or as raw
    1/*  Part of SWISH
    2
    3    Author:        Jan Wielemaker
    4    E-mail:        J.Wielemaker@vu.nl
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2015-2020, VU University Amsterdam
    7                              CWI Amsterdam
    8                              SWI-Prolog Solutions b.v.
    9    All rights reserved.
   10
   11    Redistribution and use in source and binary forms, with or without
   12    modification, are permitted provided that the following conditions
   13    are met:
   14
   15    1. Redistributions of source code must retain the above copyright
   16       notice, this list of conditions and the following disclaimer.
   17
   18    2. Redistributions in binary form must reproduce the above copyright
   19       notice, this list of conditions and the following disclaimer in
   20       the documentation and/or other materials provided with the
   21       distribution.
   22
   23    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   24    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   25    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   26    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   27    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   28    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   29    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   30    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   31    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   32    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   33    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   34    POSSIBILITY OF SUCH DAMAGE.
   35*/
   36
   37:- module(gitty_driver_files,
   38          [ gitty_open/2,               % +Store, +Options
   39            gitty_close/1,              % +Store
   40            gitty_file/4,               % +Store, ?Name, ?Ext, ?Hash
   41
   42            gitty_update_head/5,        % +Store, +Name, +OldCommit, +NewCommit
   43					% +DataHash
   44            delete_head/2,              % +Store, +Name
   45            set_head/3,                 % +Store, +Name, +Hash
   46            store_object/4,             % +Store, +Hash, +Header, +Data
   47            delete_object/2,            % +Store, +Hash
   48
   49            gitty_hash/2,               % +Store, ?Hash
   50            load_plain_commit/3,        % +Store, +Hash, -Meta
   51            load_object/5,              % +Store, +Hash, -Data, -Type, -Size
   52            gitty_object_file/3,        % +Store, +Hash, -File
   53
   54            repack_objects/2,           % +Store, +Options
   55            pack_objects/6,             % +Store, +Objs, +Packs, +PackDir,
   56                                        % -File, +Opts
   57            unpack_packs/1,             % +Store
   58            unpack_pack/2,              % +Store, +PackFile
   59
   60            attach_pack/2,              % +Store, +PackFile
   61            gitty_fsck/1,               % +Store
   62            fsck_pack/1,                % +PackFile
   63            load_object_from_pack/4,    % +Hash, -Data, -Type, -Size
   64
   65            gitty_rescan/1              % Store
   66          ]).   67:- use_module(library(apply)).   68:- use_module(library(zlib)).   69:- use_module(library(filesex)).   70:- use_module(library(lists)).   71:- use_module(library(apply)).   72:- use_module(library(error)).   73:- use_module(library(debug)).   74:- use_module(library(zlib)).   75:- use_module(library(hash_stream)).   76:- use_module(library(option)).   77:- use_module(library(dcg/basics)).   78:- use_module(library(redis)).   79:- use_module(library(redis_streams)).   80:- use_module(gitty, [is_gitty_hash/1]).   81
   82:- use_module(swish_redis).

Gitty plain files driver

This version of the driver uses plain files to store the gitty data. It consists of a nested directory structure with files named after the hash. Objects and hash computation is the same as for git. The heads (files) are computed on startup by scanning all objects. There is a file ref/head that is updated if a head is updated. Other clients can watch this file and update their notion of the head. This implies that the store can handle multiple clients that can access a shared file system, optionally shared using NFS from different machines.

The store is simple and robust. The main disadvantages are long startup times as the store holds more objects and relatively high disk usage due to rounding the small objects to disk allocation units.

bug
- Shared access does not work on Windows. */
  102:- dynamic
  103    head/4,                             % Store, Name, Ext, Hash
  104    store/2,                            % Store, Updated
  105    commit/3,                           % Store, Hash, Meta
  106    heads_input_stream_cache/2,         % Store, Stream
  107    pack_object/6,                      % Hash, Type, Size, Offset, Store,PackFile
  108    attached_packs/1,                   % Store
  109    attached_pack/2,                    % Store, PackFile
  110    redis_db/4.                         % Store, DB, RO, Prefix
  111
  112:- volatile
  113    head/4,
  114    store/2,
  115    commit/3,
  116    heads_input_stream_cache/2,
  117    pack_object/6,
  118    attached_packs/1,
  119    attached_pack/2.  120
  121:- multifile
  122    gitty:check_object/4.  123
  124% enable/disable syncing remote servers running on  the same file store.
  125% This facility requires shared access to files and thus doesn't work on
  126% Windows.
  127
  128:- if(current_prolog_flag(windows, true)).  129remote_sync(false).
  130:- else.  131remote_sync(true).
  132:- endif.
 gitty_open(+Store, +Options) is det
Driver specific initialization. Handles setting up a Redis connection when requested. This processes:
redis(+DB)
Name of the redis DB to connect to. See redis_server/3.
redis_ro(+DB)
Read-only redis DB.
redis_prefix(+Prefix)
Prefix for all keys. This can be used to host multiple SWISH servers on the same redis cluster. Default is swish.
  147gitty_open(Store, Options) :-
  148    option(redis(DB), Options),
  149    !,
  150    option(redis_ro(RO), Options, DB),
  151    option(redis_prefix(Prefix), Options, swish),
  152    asserta(redis_db(Store, DB, RO, Prefix)),
  153    thread_create(gitty_scan(Store), _, [detached(true)]).
  154gitty_open(_, _).
 gitty_close(+Store) is det
Close resources associated with a store.
  161gitty_close(Store) :-
  162    (   retract(heads_input_stream_cache(Store, In))
  163    ->  close(In)
  164    ;   true
  165    ),
  166    retractall(head(Store,_,_,_)),
  167    retractall(store(Store,_)),
  168    retractall(pack_object(_,_,_,_,Store,_)).
 gitty_file(+Store, ?File, ?Ext, ?Head) is nondet
True when File entry in the gitty store and Head is the HEAD revision.
  176gitty_file(Store, Head, Ext, Hash) :-
  177    redis_db(Store, _, _, _),
  178    !,
  179    gitty_scan(Store),
  180    redis_file(Store, Head, Ext, Hash).
  181gitty_file(Store, Head, Ext, Hash) :-
  182    gitty_scan(Store),
  183    head(Store, Head, Ext, Hash).
 load_plain_commit(+Store, +Hash, -Meta:dict) is semidet
Load the commit data as a dict. Loaded commits are cached in commit/3. Note that only adding a fact to the cache is synchronized. This means that during a race situation we may load the same object multiple times from disk, but this is harmless while a lock around the whole predicate serializes loading different objects, which is not needed.
  194load_plain_commit(Store, Hash, Meta) :-
  195    must_be(atom, Store),
  196    must_be(atom, Hash),
  197    commit(Store, Hash, Meta),
  198    !.
  199load_plain_commit(Store, Hash, Meta) :-
  200    load_object(Store, Hash, String, _, _),
  201    term_string(Meta0, String, []),
  202    with_mutex(gitty_commit_cache,
  203               assert_cached_commit(Store, Hash, Meta0)),
  204    Meta = Meta0.
  205
  206assert_cached_commit(Store, Hash, Meta) :-
  207    commit(Store, Hash, Meta0),
  208    !,
  209    assertion(Meta0 =@= Meta).
  210assert_cached_commit(Store, Hash, Meta) :-
  211    assertz(commit(Store, Hash, Meta)).
 store_object(+Store, +Hash, +Header:string, +Data:string) is det
Store the actual object. The store must associate Hash with the concatenation of Hdr and Data.
  218store_object(Store, Hash, _Hdr, _Data) :-
  219    pack_object(Hash, _Type, _Size, _Offset, Store, _Pack),
  220    !.
  221store_object(Store, Hash, Hdr, Data) :-
  222    gitty_object_file(Store, Hash, Path),
  223    with_mutex(gitty_file, exists_or_create(Path, Out0)),
  224    (   var(Out0)
  225    ->  true
  226    ;   setup_call_cleanup(
  227            zopen(Out0, Out, [format(gzip)]),
  228            format(Out, '~s~s', [Hdr, Data]),
  229            close(Out))
  230    ).
  231
  232exists_or_create(Path, _Out) :-
  233    exists_file(Path),
  234    !.
  235exists_or_create(Path, Out) :-
  236    file_directory_name(Path, Dir),
  237    make_directory_path(Dir),
  238    open(Path, write, Out, [encoding(utf8), lock(write)]).
  239
  240:- if(\+current_predicate(ensure_directory/1)).  241% in Library as of SWI-Prolog 9.1.20
  242ensure_directory(Dir) :-
  243    exists_directory(Dir),
  244    !.
  245ensure_directory(Dir) :-
  246    make_directory(Dir).
  247:- endif.
 store_object_raw(+Store, +Hash, +Bytes:string, -New) is det
Store an object from raw bytes. This is used for replicating objects.
  254store_object_raw(Store, Hash, _Bytes, false) :-
  255    pack_object(Hash, _Type, _Size, _Offset, Store, _Pack),
  256    !.
  257store_object_raw(Store, Hash, Bytes, New) :-
  258    gitty_object_file(Store, Hash, Path),
  259    with_mutex(gitty_file, exists_or_create(Path, Out)),
  260    (   var(Out)
  261    ->  New = false
  262    ;   call_cleanup(
  263            ( set_stream(Out, type(binary)),
  264              write(Out, Bytes)
  265            ),
  266            close(Out)),
  267        New = true
  268    ).
 load_object(+Store, +Hash, -Data, -Type, -Size) is det
Load the given object.
  274load_object(_Store, Hash, Data, Type, Size) :-
  275    load_object_from_pack(Hash, Data0, Type0, Size0),
  276    !,
  277    f(Data0, Type0, Size0) = f(Data, Type, Size).
  278load_object(Store, Hash, Data, Type, Size) :-
  279    load_object_file(Store, Hash, Data0, Type0, Size0),
  280    !,
  281    f(Data0, Type0, Size0) = f(Data, Type, Size).
  282load_object(Store, Hash, Data, Type, Size) :-
  283    redis_db(Store, _, _, _),
  284    redis_replicate_get(Store, Hash),
  285    load_object_file(Store, Hash, Data, Type, Size).
  286
  287load_object_file(Store, Hash, Data, Type, Size) :-
  288    gitty_object_file(Store, Hash, Path),
  289    exists_file(Path),
  290    !,
  291    setup_call_cleanup(
  292        gzopen(Path, read, In, [encoding(utf8)]),
  293        read_object(In, Data, Type, Size),
  294        close(In)).
 has_object(+Store, +Hash) is det
True when Hash exists in store.
  300has_object(Store, Hash) :-
  301    pack_object(Hash, _Type, _Size, _Offset, Store, _Pack),
  302    !.
  303has_object(Store, Hash) :-
  304    gitty_object_file(Store, Hash, Path),
  305    exists_file(Path).
 load_object_raw(+Store, +Hash, -Data)
Load the compressed data for an object. Intended for replication.
  311load_object_raw(_Store, Hash, Bytes) :-
  312    load_object_from_pack(Hash, Data, Type, Size),
  313    !,
  314    object_bytes(Type, Size, Data, Bytes).
  315load_object_raw(Store, Hash, Data) :-
  316    gitty_object_file(Store, Hash, Path),
  317    exists_file(Path),
  318    !,
  319    setup_call_cleanup(
  320        open(Path, read, In, [type(binary)]),
  321        read_string(In, _, Data),
  322        close(In)).
 object_bytes(+Type, +Size, +Data, -Bytes) is det
Encode an object with the given parameters in memory.
  328object_bytes(Type, Size, Data, Bytes) :-
  329    setup_call_cleanup(
  330        new_memory_file(MF),
  331        ( setup_call_cleanup(
  332              open_memory_file(MF, write, Out, [encoding(octet)]),
  333              setup_call_cleanup(
  334                  zopen(Out, ZOut, [format(gzip), close_parent(false)]),
  335                  ( set_stream(ZOut, encoding(utf8)),
  336                    format(ZOut, '~w ~d\u0000~w', [Type, Size, Data])
  337                  ),
  338                  close(ZOut)),
  339              close(Out)),
  340          memory_file_to_string(MF, Bytes, octet)
  341        ),
  342        free_memory_file(MF)).
 load_object_header(+Store, +Hash, -Type, -Size) is det
Load the header of an object
  349load_object_header(Store, Hash, Type, Size) :-
  350    gitty_object_file(Store, Hash, Path),
  351    setup_call_cleanup(
  352        gzopen(Path, read, In, [encoding(utf8)]),
  353        read_object_hdr(In, Type, Size),
  354        close(In)).
  355
  356read_object(In, Data, Type, Size) :-
  357    read_object_hdr(In, Type, Size),
  358    read_string(In, _, Data).
  359
  360read_object_hdr(In, Type, Size) :-
  361    get_code(In, C0),
  362    read_hdr(C0, In, Hdr),
  363    phrase((nonblanks(TypeChars), " ", integer(Size)), Hdr),
  364    atom_codes(Type, TypeChars).
  365
  366read_hdr(C, In, [C|T]) :-
  367    C > 0,
  368    !,
  369    get_code(In, C1),
  370    read_hdr(C1, In, T).
  371read_hdr(_, _, []).
 gitty_rescan(?Store) is det
Update our view of the shared storage for all stores matching Store.
  378gitty_rescan(Store) :-
  379    retractall(store(Store, _)).
 gitty_scan(+Store) is det
Scan gitty store for files (entries), filling head/3. This is performed lazily at first access to the store.

@tdb Possibly we need to maintain a cached version of this index to avoid having to open all objects of the gitty store.

  390gitty_scan(Store) :-
  391    store(Store, _),
  392    !,
  393    remote_updates(Store).
  394gitty_scan(Store) :-
  395    with_mutex(gitty, gitty_scan_sync(Store)).
  396
  397:- thread_local
  398    latest/3.  399
  400gitty_scan_sync(Store) :-
  401    store(Store, _),
  402    !.
  403gitty_scan_sync(Store) :-
  404    redis_db(Store, _, _, _),
  405    !,
  406    gitty_attach_packs(Store),
  407    redis_ensure_heads(Store),
  408    get_time(Now),
  409    assertz(store(Store, Now)).
  410:- if(remote_sync(true)).  411gitty_scan_sync(Store) :-
  412    remote_sync(true),
  413    !,
  414    gitty_attach_packs(Store),
  415    restore_heads_from_remote(Store).
  416:- endif.  417gitty_scan_sync(Store) :-
  418    gitty_attach_packs(Store),
  419    read_heads_from_objects(Store).
 read_heads_from_objects(+Store) is det
Establish the head(Store,File,Ext,Hash) relation by reading all objects and adding a fact for the most recent commit.
  426read_heads_from_objects(Store) :-
  427    gitty_scan_latest(Store),
  428    forall(retract(latest(Name, Hash, _Time)),
  429           assert_head(Store, Name, Hash)),
  430    get_time(Now),
  431    assertz(store(Store, Now)).
  432
  433assert_head(Store, Name, Hash) :-
  434    file_name_extension(_, Ext, Name),
  435    assertz(head(Store, Name, Ext, Hash)).
 gitty_scan_latest(+Store)
Scans the gitty store, extracting the latest version of each named entry.
  443gitty_scan_latest(Store) :-
  444    retractall(head(Store, _, _, _)),
  445    retractall(latest(_, _, _)),
  446    (   gitty_hash(Store, Hash),
  447        load_object(Store, Hash, Data, commit, _Size),
  448        term_string(Meta, Data, []),
  449        _{name:Name, time:Time} :< Meta,
  450        (   latest(Name, _, OldTime),
  451            OldTime > Time
  452        ->  true
  453        ;   retractall(latest(Name, _, _)),
  454            assertz(latest(Name, Hash, Time))
  455        ),
  456        fail
  457    ;   true
  458    ).
 gitty_hash(+Store, ?Hash) is nondet
True when Hash is an object in the store.
  465gitty_hash(Store, Hash) :-
  466    var(Hash),
  467    !,
  468    (   gitty_attach_packs(Store),
  469        pack_object(Hash, _Type, _Size, _Offset, Store, _Pack)
  470    ;   gitty_file_object(Store, Hash)
  471    ).
  472gitty_hash(Store, Hash) :-
  473    (   gitty_attach_packs(Store),
  474        pack_object(Hash, _Type, _Size, _Offset, Store, _Pack)
  475    ->  true
  476    ;   gitty_object_file(Store, Hash, File),
  477        exists_file(File)
  478    ).
  479
  480gitty_file_object(Store, Hash) :-
  481    access_file(Store, exist),
  482    directory_files(Store, Level0),
  483    member(E0, Level0),
  484    E0 \== '..',
  485    atom_length(E0, 2),
  486    directory_file_path(Store, E0, Dir0),
  487    directory_files(Dir0, Level1),
  488    member(E1, Level1),
  489    E1 \== '..',
  490    atom_length(E1, 2),
  491    directory_file_path(Dir0, E1, Dir),
  492    directory_files(Dir, Files),
  493    member(File, Files),
  494    atom_length(File, 36),
  495    atomic_list_concat([E0,E1,File], Hash).
 delete_object(+Store, +Hash)
Delete an existing object
  501delete_object(Store, Hash) :-
  502    gitty_object_file(Store, Hash, File),
  503    delete_file(File).
 gitty_object_file(+Store, +Hash, -Path) is det
True when Path is the file at which the object with Hash is stored.
  510gitty_object_file(Store, Hash, Path) :-
  511    sub_string(Hash, 0, 2, _, Dir0),
  512    sub_string(Hash, 2, 2, _, Dir1),
  513    sub_string(Hash, 4, _, 0, File),
  514    atomic_list_concat([Store, Dir0, Dir1, File], /, Path).
  515
  516
  517                 /*******************************
  518                 *            SYNCING           *
  519                 *******************************/
 gitty_update_head(+Store, +Name, +OldCommit, +NewCommit, +DataHash) is det
Update the head of a gitty store for Name. OldCommit is the current head and NewCommit is the new head. If Name is created, and thus there is no head, OldCommit must be -.

This operation can fail because another writer has updated the head. This can both be in-process or another process.

  531gitty_update_head(Store, Name, OldCommit, NewCommit, DataHash) :-
  532    redis_db(Store, _, _, _),
  533    !,
  534    redis_update_head(Store, Name, OldCommit, NewCommit, DataHash).
  535gitty_update_head(Store, Name, OldCommit, NewCommit, _) :-
  536    with_mutex(gitty,
  537               gitty_update_head_sync(Store, Name, OldCommit, NewCommit)).
  538
  539:- if(remote_sync(true)).  540gitty_update_head_sync(Store, Name, OldCommit, NewCommit) :-
  541    remote_sync(true),
  542    !,
  543    setup_call_cleanup(
  544        heads_output_stream(Store, HeadsOut),
  545        gitty_update_head_sync(Store, Name, OldCommit, NewCommit, HeadsOut),
  546        close(HeadsOut)).
  547:- endif.  548gitty_update_head_sync(Store, Name, OldCommit, NewCommit) :-
  549    gitty_update_head_sync2(Store, Name, OldCommit, NewCommit).
  550
  551gitty_update_head_sync(Store, Name, OldCommit, NewCommit, HeadsOut) :-
  552    gitty_update_head_sync2(Store, Name, OldCommit, NewCommit),
  553    format(HeadsOut, '~q.~n', [head(Name, OldCommit, NewCommit)]).
  554
  555gitty_update_head_sync2(Store, Name, OldCommit, NewCommit) :-
  556    gitty_scan(Store),              % fetch remote changes
  557    (   OldCommit == (-)
  558    ->  (   head(Store, Name, _, _)
  559        ->  throw(error(gitty(file_exists(Name),_)))
  560        ;   assert_head(Store, Name, NewCommit)
  561        )
  562    ;   (   retract(head(Store, Name, _, OldCommit))
  563        ->  assert_head(Store, Name, NewCommit)
  564        ;   throw(error(gitty(not_at_head(Name, OldCommit)), _))
  565        )
  566    ).
 remote_updates(+Store)
Watch for remote updates to the store. We only do this if we did not do so the last second.
  573:- dynamic
  574    last_remote_sync/2.  575
  576:- if(remote_sync(false)).  577remote_updates(_) :-
  578    remote_sync(false),
  579    !.
  580:- endif.  581remote_updates(Store) :-
  582    remote_up_to_data(Store),
  583    !.
  584remote_updates(Store) :-
  585    with_mutex(gitty, remote_updates_sync(Store)).
  586
  587remote_updates_sync(Store) :-
  588    remote_up_to_data(Store),
  589    !.
  590remote_updates_sync(Store) :-
  591    retractall(last_remote_sync(Store, _)),
  592    get_time(Now),
  593    asserta(last_remote_sync(Store, Now)),
  594    remote_update(Store).
  595
  596remote_up_to_data(Store) :-
  597    last_remote_sync(Store, Last),
  598    get_time(Now),
  599    Now-Last < 1.
  600
  601remote_update(Store) :-
  602    remote_updates(Store, List),
  603    maplist(update_head(Store), List).
  604
  605update_head(Store, head(Name, OldCommit, NewCommit)) :-
  606    (   OldCommit == (-)
  607    ->  \+ head(Store, Name, _, _)
  608    ;   retract(head(Store, Name, _, OldCommit))
  609    ),
  610    !,
  611    assert_head(Store, Name, NewCommit).
  612update_head(_, _).
 remote_updates(+Store, -List) is det
Find updates from other gitties on the same filesystem. Note that we have to push/pop the input context to avoid creating a notion of an input context which possibly relate messages incorrectly to the sync file.
  621remote_updates(Store, List) :-
  622    heads_input_stream(Store, Stream),
  623    setup_call_cleanup(
  624        '$push_input_context'(gitty_sync),
  625        read_new_terms(Stream, List),
  626        '$pop_input_context').
  627
  628read_new_terms(Stream, Terms) :-
  629    read(Stream, First),
  630    read_new_terms(First, Stream, Terms).
  631
  632read_new_terms(end_of_file, _, List) :-
  633    !,
  634    List = [].
  635read_new_terms(Term, Stream, [Term|More]) :-
  636    read(Stream, Term2),
  637    read_new_terms(Term2, Stream, More).
  638
  639heads_output_stream(Store, Out) :-
  640    heads_file(Store, HeadsFile),
  641    open(HeadsFile, append, Out,
  642         [ encoding(utf8),
  643           lock(exclusive)
  644         ]).
  645
  646heads_input_stream(Store, Stream) :-
  647    heads_input_stream_cache(Store, Stream0),
  648    !,
  649    Stream = Stream0.
  650heads_input_stream(Store, Stream) :-
  651    heads_file(Store, File),
  652    between(1, 2, _),
  653    catch(open(File, read, In,
  654               [ encoding(utf8),
  655                 eof_action(reset)
  656               ]),
  657          _,
  658          create_heads_file(Store)),
  659    !,
  660    assert(heads_input_stream_cache(Store, In)),
  661    Stream = In.
  662
  663create_heads_file(Store) :-
  664    call_cleanup(
  665        heads_output_stream(Store, Out),
  666        close(Out)),
  667    fail.                                   % always fail!
  668
  669heads_file(Store, HeadsFile) :-
  670    ensure_directory(Store),
  671    directory_file_path(Store, ref, RefDir),
  672    ensure_directory(RefDir),
  673    directory_file_path(RefDir, head, HeadsFile).
 restore_heads_from_remote(Store)
Restore the known heads by reading the remote sync file.
  679restore_heads_from_remote(Store) :-
  680    heads_file(Store, File),
  681    exists_file(File),
  682    setup_call_cleanup(
  683        open(File, read, In, [encoding(utf8)]),
  684        restore_heads(Store, In),
  685        close(In)),
  686    !,
  687    get_time(Now),
  688    assertz(store(Store, Now)).
  689restore_heads_from_remote(Store) :-
  690    read_heads_from_objects(Store),
  691    heads_file(Store, File),
  692    setup_call_cleanup(
  693        open(File, write, Out, [encoding(utf8)]),
  694        save_heads(Store, Out),
  695        close(Out)),
  696    !.
  697
  698restore_heads(Store, In) :-
  699    read(In, Term0),
  700    Term0 = epoch(_),
  701    read(In, Term1),
  702    restore_heads(Term1, In, Store).
  703
  704restore_heads(end_of_file, _, _) :- !.
  705restore_heads(head(File, _, Hash), In, Store) :-
  706    retractall(head(Store, File, _, _)),
  707    assert_head(Store, File, Hash),
  708    read(In, Term),
  709    restore_heads(Term, In, Store).
  710
  711save_heads(Store, Out) :-
  712    get_time(Now),
  713    format(Out, 'epoch(~0f).~n~n', [Now]),
  714    forall(head(Store, File, _, Hash),
  715           format(Out, '~q.~n', [head(File, -, Hash)])).
 delete_head(+Store, +Head) is det
Delete Head from Store. Used by gitty_fsck/1 to remove heads that have no commits. Should we forward this to remotes, or should they do their own thing?
  724delete_head(Store, Head) :-
  725    redis_db(Store, _, _, _),
  726    !,
  727    redis_delete_head(Store, Head).
  728delete_head(Store, Head) :-
  729    retractall(head(Store, Head, _, _)).
 set_head(+Store, +File, +Hash) is det
Set the head of the given File to Hash
  735set_head(Store, File, Hash) :-
  736    redis_db(Store, _, _, _),
  737    !,
  738    redis_set_head(Store, File, Hash).
  739set_head(Store, File, Hash) :-
  740    file_name_extension(_, Ext, File),
  741    (   head(Store, File, _, Hash0)
  742    ->  (   Hash == Hash0
  743        ->  true
  744        ;   asserta(head(Store, File, Ext, Hash)),
  745            retractall(head(Store, File, _, Hash0))
  746        )
  747    ;   asserta(head(Store, File, Ext, Hash))
  748    ).
  749
  750
  751                 /*******************************
  752                 *            PACKS             *
  753                 *******************************/
  754
  755/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  756
  757<pack file> := <header>
  758               <file>*
  759<header>    := "gitty(Version).\n" <object>* "end_of_header.\n"
  760<object>    := obj(Hash, Type, Size, FileSize)
  761- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
  762
  763pack_version(1).
 repack_objects(+Store, +Options) is det
Repack objects of Store for reduced disk usage and enhanced performance. By default this picks up all file objects of the store and all existing small pack files. Options:
small_pack(+Bytes)
Consider all packs with less than Bytes as small and repack them. Default 10Mb
min_files(+Count)
Do not repack if there are less than Count new files. Default 1,000.
  778:- debug(gitty(pack)).  779
  780repack_objects(Store, Options) :-
  781    option(min_files(MinFiles), Options, 1_000),
  782    findall(Object, gitty_file_object(Store, Object), Objects),
  783    length(Objects, NewFiles),
  784    debug(gitty(pack), 'Found ~D file objects', [NewFiles]),
  785    (   NewFiles >= MinFiles
  786    ->  pack_files(Store, ExistingPacks),
  787        option(small_pack(MaxSize), Options, 10_000_000),
  788        include(small_file(MaxSize), ExistingPacks, PackFiles),
  789        (   debugging(gitty(pack))
  790        ->  length(PackFiles, PackCount),
  791            debug(gitty(pack), 'Found ~D small packs', [PackCount])
  792        ;   true
  793        ),
  794        directory_file_path(Store, pack, PackDir),
  795        make_directory_path(PackDir),
  796        pack_objects(Store, Objects, PackFiles, PackDir, _PackFile, Options)
  797    ;   debug(gitty(pack), 'Nothing to do', [])
  798    ).
  799
  800small_file(MaxSize, File) :-
  801    size_file(File, Size),
  802    Size < MaxSize.
 pack_objects(+Store, +Objects, +Packs, +PackDir, -PackFile, +Options) is det
Pack the given objects and pack files into a new pack.
  809pack_objects(Store, Objects, Packs, PackDir, PackFile, Options) :-
  810    with_mutex(gitty_pack,
  811               pack_objects_sync(Store, Objects, Packs, PackDir,
  812                                 PackFile, Options)).
  813
  814pack_objects_sync(_Store, [], [Pack], _, [Pack], _) :-
  815    !.
  816pack_objects_sync(Store, Objects, Packs, PackDir, PackFilePath, Options) :-
  817    length(Objects, ObjCount),
  818    length(Packs, PackCount),
  819    debug(gitty(pack), 'Repacking ~D objects and ~D packs',
  820          [ObjCount, PackCount]),
  821    maplist(object_info(Store), Objects, FileInfo),
  822    maplist(pack_info(Store), Packs, PackInfo),
  823    append([FileInfo|PackInfo], Info0),
  824    sort(1, @<, Info0, Info),           % remove possible duplicates
  825    (   debugging(gitty(pack))
  826    ->  (   PackCount > 0
  827        ->  length(Info, FinalObjCount),
  828            debug(gitty(pack), 'Total ~D objects', [FinalObjCount])
  829        ;   true
  830        )
  831    ;   true
  832    ),
  833    directory_file_path(PackDir, 'pack-create', TmpPack),
  834    setup_call_cleanup(
  835        (   open(TmpPack, write, Out0, [type(binary), lock(write)]),
  836            open_hash_stream(Out0, Out, [algorithm(sha1)])
  837        ),
  838        (   write_signature(Out),
  839            maplist(write_header(Out), Info),
  840            format(Out, 'end_of_header.~n', []),
  841            maplist(add_file(Out, Store), Info),
  842            stream_hash(Out, SHA1)
  843        ),
  844        close(Out)),
  845    format(atom(PackFile), 'pack-~w.pack', [SHA1]),
  846    directory_file_path(PackDir, PackFile, PackFilePath),
  847    rename_file(TmpPack, PackFilePath),
  848    debug(gitty(pack), 'Attaching ~p', [PackFilePath]),
  849    attach_pack(Store, PackFilePath),
  850    remove_objects_after_pack(Store, Objects, Options),
  851    delete(Packs, PackFilePath, RmPacks),
  852    remove_repacked_packs(Store, RmPacks, Options),
  853    debug(gitty(pack), 'Packing completed', []).
  854
  855object_info(Store, Object, obj(Object, Type, Size, FileSize)) :-
  856    gitty_object_file(Store, Object, File),
  857    load_object_header(Store, Object, Type, Size),
  858    size_file(File, FileSize).
  859
  860pack_info(Store, PackFile, Objects) :-
  861    attach_pack(Store, PackFile),
  862    pack_read_header(PackFile, _Version, _DataOffset, Objects).
  863
  864write_signature(Out) :-
  865    pack_version(Version),
  866    format(Out, "gitty(~d).~n", [Version]).
  867
  868write_header(Out, obj(Object, Type, Size, FileSize)) :-
  869    format(Out, 'obj(~q,~q,~d,~d).~n', [Object, Type, Size, FileSize]).
 add_file(+Out, +Store, +Object) is det
Add Object from Store to the pack stream Out.
  875add_file(Out, Store, obj(Object, _Type, _Size, _FileSize)) :-
  876    gitty_object_file(Store, Object, File),
  877    exists_file(File),
  878    !,
  879    setup_call_cleanup(
  880        open(File, read, In, [type(binary)]),
  881        copy_stream_data(In, Out),
  882        close(In)).
  883add_file(Out, Store, obj(Object, Type, Size, FileSize)) :-
  884    pack_object(Object, Type, Size, Offset, Store, PackFile),
  885    setup_call_cleanup(
  886        open(PackFile, read, In, [type(binary)]),
  887        (   seek(In, Offset, bof, Offset),
  888            copy_stream_data(In, Out, FileSize)
  889        ),
  890        close(In)).
 gitty_fsck(+Store) is det
Validate all packs associated with Store
  897gitty_fsck(Store) :-
  898    pack_files(Store, PackFiles),
  899    maplist(fsck_pack, PackFiles).
 fsck_pack(+File) is det
Validate the integrity of the pack file File.
  905fsck_pack(File) :-
  906    debug(gitty(pack), 'fsck ~p', [File]),
  907    check_pack_hash(File),
  908    debug(gitty(pack), 'fsck ~p: checking objects', [File]),
  909    check_pack_objects(File),
  910    debug(gitty(pack), 'fsck ~p: done', [File]).
  911
  912check_pack_hash(File) :-
  913    file_base_name(File, Base),
  914    file_name_extension(Plain, Ext, Base),
  915    must_be(oneof([pack]), Ext),
  916    atom_concat('pack-', Hash, Plain),
  917    setup_call_cleanup(
  918        (   open(File, read, In0, [type(binary)]),
  919            open_hash_stream(In0, In, [algorithm(sha1)])
  920        ),
  921        (   setup_call_cleanup(
  922                open_null_stream(Null),
  923                copy_stream_data(In, Null),
  924                close(Null)),
  925            stream_hash(In, SHA1)
  926        ),
  927        close(In)),
  928    assertion(Hash == SHA1).
  929
  930check_pack_objects(PackFile) :-
  931    setup_call_cleanup(
  932        open(PackFile, read, In, [type(binary)]),
  933        (  read_header(In, Version, DataOffset, Objects),
  934           set_stream(In, encoding(utf8)),
  935           foldl(check_object(In, PackFile, Version), Objects, DataOffset, _)
  936        ),
  937        close(In)).
  938
  939check_object(In, PackFile, _Version,
  940             obj(Object, Type, Size, FileSize),
  941             Offset0, Offset) :-
  942    Offset is Offset0+FileSize,
  943    byte_count(In, Here),
  944    (   Here == Offset0
  945    ->  true
  946    ;   print_message(warning, pack(reposition(Here, Offset0))),
  947        seek(In, Offset0, bof, Offset0)
  948    ),
  949    (   setup_call_cleanup(
  950            zopen(In, In2, [multi_part(false), close_parent(false)]),
  951            catch(read_object(In2, Data, _0RType, _0RSize), E,
  952                  ( print_message(error,
  953                                  gitty(PackFile, fsck(read_object(Object, E)))),
  954                    fail)),
  955            close(In2))
  956    ->  byte_count(In, End),
  957        (   End == Offset
  958        ->  true
  959        ;   print_message(error,
  960                          gitty(PackFile, fsck(object_end(Object, End,
  961                                                          Offset0, Offset,
  962                                                          Data))))
  963        ),
  964        assertion(Type == _0RType),
  965        assertion(Size == _0RSize),
  966        gitty:check_object(Object, Data, Type, Size)
  967    ;   true
  968    ).
 gitty_attach_packs(+Store) is det
Attach all packs for Store
  975gitty_attach_packs(Store) :-
  976    attached_packs(Store),
  977    !.
  978gitty_attach_packs(Store) :-
  979    with_mutex(gitty_attach_packs,
  980               gitty_attach_packs_sync(Store)).
  981
  982gitty_attach_packs_sync(Store) :-
  983    attached_packs(Store),
  984    !.
  985gitty_attach_packs_sync(Store) :-
  986    pack_files(Store, PackFiles),
  987    maplist(attach_pack(Store), PackFiles),
  988    asserta(attached_packs(Store)).
  989
  990pack_files(Store, Packs) :-
  991    directory_file_path(Store, pack, PackDir),
  992    exists_directory(PackDir),
  993    !,
  994    directory_files(PackDir, Files),
  995    convlist(is_pack(PackDir), Files, Packs).
  996pack_files(_, []).
  997
  998is_pack(PackDir, File, Path) :-
  999    file_name_extension(_, pack, File),
 1000    directory_file_path(PackDir, File, Path).
 attach_pack(+Store, +PackFile)
Load the index of Pack into memory.
 1006attach_pack(Store, PackFile) :-
 1007    attached_pack(Store, PackFile),
 1008    !.
 1009attach_pack(Store, PackFile) :-
 1010    retractall(pack_object(_,_,_,_,_,PackFile)),
 1011    pack_read_header(PackFile, Version, DataOffset, Objects),
 1012    foldl(assert_object(Store, PackFile, Version), Objects, DataOffset, _),
 1013    assertz(attached_pack(Store, PackFile)).
 1014
 1015pack_read_header(PackFile, Version, DataOffset, Objects) :-
 1016    setup_call_cleanup(
 1017        open(PackFile, read, In, [type(binary)]),
 1018        read_header(In, Version, DataOffset, Objects),
 1019        close(In)).
 1020
 1021read_header(In, Version, DataOffset, Objects) :-
 1022    read(In, Signature),
 1023    (   Signature = gitty(Version)
 1024    ->  true
 1025    ;   domain_error(gitty_pack_file, Objects)
 1026    ),
 1027    read(In, Term),
 1028    read_index(Term, In, Objects),
 1029    get_code(In, Code),
 1030    assertion(Code == 0'\n),
 1031    byte_count(In, DataOffset).
 1032
 1033read_index(end_of_header, _, []) :-
 1034    !.
 1035read_index(Object, In, [Object|T]) :-
 1036    read(In, Term2),
 1037    read_index(Term2, In, T).
 1038
 1039assert_object(Store, Pack, _Version,
 1040              obj(Object, Type, Size, FileSize),
 1041              Offset0, Offset) :-
 1042    Offset is Offset0+FileSize,
 1043    assertz(pack_object(Object, Type, Size, Offset0, Store, Pack)).
 detach_pack(+Store, +Pack) is det
Remove a pack file from the memory index.
 1049detach_pack(Store, Pack) :-
 1050    retractall(pack_object(_, _, _, _, Store, Pack)),
 1051    retractall(attached_pack(Store, Pack)).
 load_object_from_pack(+Hash, -Data, -Type, -Size) is semidet
True when Hash is in a pack and can be loaded.
 1057load_object_from_pack(Hash, Data, Type, Size) :-
 1058    pack_object(Hash, Type, Size, Offset, _Store, Pack),
 1059    setup_call_cleanup(
 1060        open(Pack, read, In, [type(binary)]),
 1061        read_object_at(In, Offset, Data, Type, Size),
 1062        close(In)).
 1063
 1064read_object_at(In, Offset, Data, Type, Size) :-
 1065    seek(In, Offset, bof, Offset),
 1066    read_object_here(In, Data, Type, Size).
 1067
 1068read_object_here(In, Data, Type, Size) :-
 1069    stream_property(In, encoding(Enc)),
 1070    setup_call_cleanup(
 1071        ( set_stream(In, encoding(utf8)),
 1072          zopen(In, In2, [multi_part(false), close_parent(false)])
 1073        ),
 1074        read_object(In2, Data, Type, Size),
 1075        ( close(In2),
 1076          set_stream(In, encoding(Enc))
 1077        )).
 unpack_packs(+Store) is det
Unpack all packs.
 1084unpack_packs(Store) :-
 1085    absolute_file_name(Store, AbsStore, [file_type(directory),
 1086                                         access(read)]),
 1087    pack_files(AbsStore, Packs),
 1088    maplist(unpack_pack(AbsStore), Packs).
 unpack_pack(+Store, +Pack) is det
Turn a pack back into a plain object files
 1094unpack_pack(Store, PackFile) :-
 1095    pack_read_header(PackFile, _Version, DataOffset, Objects),
 1096    setup_call_cleanup(
 1097        open(PackFile, read, In, [type(binary)]),
 1098        foldl(create_file(Store, In), Objects, DataOffset, _),
 1099        close(In)),
 1100    detach_pack(Store, PackFile),
 1101    delete_file(PackFile).
 1102
 1103create_file(Store, In, obj(Object, _Type, _Size, FileSize), Offset0, Offset) :-
 1104    Offset is Offset0+FileSize,
 1105    gitty_object_file(Store, Object, Path),
 1106    with_mutex(gitty_file, exists_or_recreate(Path, Out)),
 1107        (   var(Out)
 1108        ->  true
 1109        ;   setup_call_cleanup(
 1110                seek(In, Offset0, bof, Offset0),
 1111                copy_stream_data(In, Out, FileSize),
 1112                close(Out))
 1113        ).
 1114
 1115exists_or_recreate(Path, _Out) :-
 1116    exists_file(Path),
 1117    !.
 1118exists_or_recreate(Path, Out) :-
 1119    file_directory_name(Path, Dir),
 1120    make_directory_path(Dir),
 1121    open(Path, write, Out, [type(binary), lock(write)]).
 remove_objects_after_pack(+Store, +Objects, +Options) is det
Remove the indicated (file) objects from Store.
 1128remove_objects_after_pack(Store, Objects, Options) :-
 1129    debug(gitty(pack), 'Deleting plain files', []),
 1130    maplist(delete_object(Store), Objects),
 1131    (   option(prune_empty_directories(true), Options, true)
 1132    ->  debug(gitty(pack), 'Pruning empty directories', []),
 1133        prune_empty_directories(Store)
 1134    ;   true
 1135    ).
 remove_repacked_packs(+Store, +Packs, +Options)
Remove packs that have been repacked.
 1141remove_repacked_packs(Store, Packs, Options) :-
 1142    maplist(remove_pack(Store, Options), Packs).
 1143
 1144remove_pack(Store, _Options, Pack) :-
 1145    detach_pack(Store, Pack),
 1146    delete_file(Pack).
 prune_empty_directories(+Dir) is det
Prune directories that are empty below Dir. Dir itself is not removed, even if it is empty.
 1153prune_empty_directories(Dir) :-
 1154    prune_empty_directories(Dir, 0).
 1155
 1156prune_empty_directories(Dir, Level) :-
 1157    directory_files(Dir, AllFiles),
 1158    exclude(hidden, AllFiles, Files),
 1159    (   Files == [],
 1160        Level > 0
 1161    ->  delete_directory_async(Dir)
 1162    ;   convlist(prune_empty_directories(Dir, Level), Files, Left),
 1163        (   Left == [],
 1164            Level > 0
 1165        ->  delete_directory_async(Dir)
 1166        ;   true
 1167        )
 1168    ).
 1169
 1170hidden(.).
 1171hidden(..).
 1172
 1173prune_empty_directories(Parent, Level0, File, _) :-
 1174    directory_file_path(Parent, File, Path),
 1175    exists_directory(Path),
 1176    !,
 1177    Level is Level0 + 1,
 1178    prune_empty_directories(Path, Level),
 1179    fail.
 1180prune_empty_directories(_, _, File, File).
 1181
 1182delete_directory_async(Dir) :-
 1183    with_mutex(gitty_file, delete_directory_async2(Dir)).
 1184
 1185delete_directory_async2(Dir) :-
 1186    catch(delete_directory(Dir), E,
 1187          (   \+ exists_directory(Dir)
 1188          ->  true
 1189          ;   \+ empty_directory(Dir)
 1190          ->  true
 1191          ;   throw(E)
 1192          )).
 1193
 1194empty_directory(Dir) :-
 1195    directory_files(Dir, AllFiles),
 1196    exclude(hidden, AllFiles, []).
 1197
 1198
 1199		 /*******************************
 1200		 *        REDIS PRIMITIVES	*
 1201		 *******************************/
 1202
 1203redis_head_db(Store, DB, Key) :-
 1204    redis_db(Store, DB, _, Prefix),
 1205    string_concat(Prefix, ":gitty:head", Key).
 1206
 1207redis_head_db_ro(Store, DB, Key) :-
 1208    redis_db(Store, _, DB, Prefix),
 1209    string_concat(Prefix, ":gitty:head", Key).
 redis_file(+Store, ?Name, ?Ext, ?Hash)
 1214redis_file(Store, Name, Ext, Hash) :-
 1215    nonvar(Name),
 1216    !,
 1217    file_name_extension(_Base, Ext, Name),
 1218    redis_head_db_ro(Store, DB, Heads),
 1219    redis(DB, hget(Heads, Name), Hash as atom).
 1220redis_file(Store, Name, Ext, Hash) :-
 1221    nonvar(Ext),
 1222    !,
 1223    string_concat("*.", Ext, Pattern),
 1224    redis_head_db_ro(Store, DB, Heads),
 1225    redis_hscan(DB, Heads, LazyList, [match(Pattern)]),
 1226    member(NameS-HashS, LazyList),
 1227    atom_string(Name, NameS),
 1228    atom_string(Hash, HashS).
 1229redis_file(Store, Name, Ext, Hash) :-
 1230    nonvar(Hash),
 1231    !,
 1232    load_plain_commit(Store, Hash, Commit),
 1233    Name = Commit.name,
 1234    file_name_extension(_Base, Ext, Name).
 1235redis_file(Store, Name, Ext, Hash) :-
 1236    redis_head_db_ro(Store, DB, Heads),
 1237    redis(DB, hgetall(Heads), Pairs as pairs(atom,atom)),
 1238    member(Name-Hash, Pairs),
 1239    file_name_extension(_Base, Ext, Name).
 redis_ensure_heads(+Store)
Ensure the redis db contains a hashmap mapping all file names to their head hashes.
 1246redis_ensure_heads(Store) :-
 1247    redis_head_db_ro(Store, DB, Key),
 1248    redis(DB, exists(Key), 1),
 1249    !.
 1250redis_ensure_heads(Store) :-
 1251    redis_head_db(Store, DB, Key),
 1252    debug(gitty(redis), 'Initializing gitty heads in ~p ...', [Key]),
 1253    gitty_scan_latest(Store),
 1254    forall(retract(latest(Name, Hash, _Time)),
 1255           redis(DB, hset(Key, Name, Hash))),
 1256    debug(gitty(redis), '... finished gitty heads', []).
 redis_update_head(+Store, +Name, +OldCommit, +NewCommit, +DataHash)
 1260redis_update_head(Store, Name, -, NewCommit, DataHash) :-
 1261    !,
 1262    redis_head_db(Store, DB, Key),
 1263    redis(DB, hset(Key, Name, NewCommit)),
 1264    publish_objects(Store, [NewCommit, DataHash]).
 1265redis_update_head(Store, Name, OldCommit, NewCommit, DataHash) :-
 1266    redis_head_db(Store, DB, Key),
 1267    redis_hcas(DB, Key, Name, OldCommit, NewCommit),
 1268    publish_objects(Store, [NewCommit, DataHash]).
 redis_delete_head(Store, Head) is det
Unregister Head
 1274redis_delete_head(Store, Head) :-
 1275    redis_head_db(Store, DB, Key),
 1276    redis(DB, hdel(Key, Head)).
 redis_set_head(+Store, +File, +Hash) is det
 1280redis_set_head(Store, File, Hash) :-
 1281    redis_head_db(Store, DB, Key),
 1282    redis(DB, hset(Key, File, Hash)).
 1283
 1284		 /*******************************
 1285		 *           REPLICATE		*
 1286		 *******************************/
 redis_replicate_get(+Store, +Hash)
Try to get an object from another SWISH server in the network. We implement replication using the PUB/SUB protocol of Redis. This is not ideal as this route of the synchronisation is only used if for some reason this server lacks some object. This typically happens if this node is new to the cluster or has been offline for a long time. In a large cluster, most nodes will have the objects and each of them will send the object around. A consumer group based solution is not ideal either, as the message may be picked up by a node that does not have this object, after which we need the failure recovery protocol to get it right. This is particularly the case with two nodes, where we have a fair chance to have be requested for the hash we miss ourselves.

We could improve on this two ways: (1) put the hash published in a short-lived key on Redis and make others check that. That is likely to avoid many nodes sending the same object or (2) see how many nodes are in the pool and switch to a consumer group based approach if this number is high (and thus we are unlikely to be asked ourselves for the missing hash).

See also
- publish_objects/2 for the incremental replication
 1312:- multifile
 1313    swish_redis:stream/2. 1314
 1315swish_redis:stream('gitty:replicate', [maxlen(100)]).
 1316
 1317:- listen(http(pre_server_start(_)),
 1318          init_replicator). 1319
 1320init_replicator :-
 1321    redis_swish_stream('gitty:replicate', ReplKey),
 1322    listen(redis(_Redis, ReplKey, _Id, Data),
 1323           replicate(Data)),
 1324    listen(redis(_, 'swish:gitty', Message),
 1325           gitty_message(Message)),
 1326    message_queue_create(_, [alias(gitty_queue)]).
 1327
 1328:- debug(gitty(replicate)). 1329
 1330gitty_message(discover(Hash)) :-
 1331    debug(gitty(replicate), 'Discover: ~p', [Hash]),
 1332    store(Store, _),
 1333    load_object_raw(Store, Hash, Data),
 1334    debug(gitty(replicate), 'Sending object ~p', [Hash]),
 1335    redis(swish, publish(swish:gitty, object(Hash, Data) as prolog)).
 1336gitty_message(object(Hash, Data)) :-
 1337    debug(gitty(replicate), 'Replicate: ~p', [Hash]),
 1338    redis_db(Store, _DB, _RO, _Prefix),
 1339    store_object_raw(Store, Hash, Data, New),
 1340    debug(gitty(replicate), 'Received object ~p (new=~p)', [Hash, New]),
 1341    (   New == true
 1342    ->  thread_send_message(gitty_queue, Hash)
 1343    ;   true
 1344    ).
 redis_replicate_get(+Store, +Hash) is semidet
True to get Hash if we do not have it locally. This initiates a Redis discover request for the hash. The replies are picked up by gitty_message/1 above.

The code may be subject to various race conditions, but fortunately objects are immutable. It also seems possible that the Redis stream gets lost. Not sure when and how. For now, we restart if we get no reply, but nore more than once per minute.

 1357redis_replicate_get(Store, Hash) :-
 1358    is_gitty_hash(Hash),
 1359    redis(swish, publish(swish:gitty, discover(Hash) as prolog), Count),
 1360    Count > 1,                          % If I'm alone it won't help ...
 1361    between(1, 100, _),
 1362    (   thread_get_message(gitty_queue, Hash,
 1363                           [ timeout(0.1)
 1364                           ])
 1365    ->  !
 1366    ;   has_object(Store, Hash)
 1367    ->  !
 1368    ;   restart_pubsub,
 1369        fail
 1370    ).
 1371
 1372:- dynamic
 1373    restarted/1. 1374
 1375restart_pubsub :-
 1376    (   restarted(When)
 1377    ->  get_time(Now),
 1378        Now-When < 60,
 1379        !
 1380    ).
 1381restart_pubsub :-
 1382    get_time(Now),
 1383    transaction(( retractall(restarted(_)),
 1384                  asserta(restarted(Now)))),
 1385    thread_signal(redis_pubsub, throw(error(io_error(read, _),_))),
 1386    sleep(0.05).
 publish_objects(+Store, +Hashes)
Make the objects we just stored globally known. These are added to the Redis stream gitty:replicate and received by replicate/1 below.

This realized eager replication as opposed to the above code (redis_replicate_get/2) which performs lazy replication. Eager replication ensure the object is on multiple places in the event that the node on which it was saved dies shortly after.

Note that we also receive the object we just saved. That is unavoidable in a network where all nodes are equal.

 1403publish_objects(Store, Hashes) :-
 1404    redis_swish_stream('gitty:replicate', ReplKey),
 1405    maplist(publish_object(Store, ReplKey), Hashes).
 1406
 1407publish_object(Store, Stream, Hash) :-
 1408    load_object_raw(Store, Hash, Data),
 1409    debug(gitty(replicate), 'Sending ~p to ~p', [Hash, Stream]),
 1410    xadd(swish, Stream, _, _{hash:Hash, data:Data}).
 replicate(+Data) is det
Act on a message send to the gitty:replicate stream. Add the object to our store unless we already have it. Note that we receive our own objects as well.
 1418replicate(Data) :-
 1419    redis_db(Store, _DB, _RO, _Prefix),
 1420    atom_string(Hash, Data.hash),
 1421    store_object_raw(Store, Hash, Data.data, _0New),
 1422    debug(gitty(replicate), 'Received object ~p (new=~p)',
 1423          [Hash, _0New]).
 1424
 1425
 1426		 /*******************************
 1427		 *         REDIS BASICS		*
 1428		 *******************************/
 redis_hcas(+DB, +Hash, +Key, +Old, +New) is semidet
Update Hash.Key to New provided the current value is Old.
 1434redis_hcas(DB, Hash, Key, Old, New) :-
 1435    redis(DB, eval("if redis.call('HGET', KEYS[1], ARGV[1]) == ARGV[2] then \c
 1436                      redis.call('HSET', KEYS[1],  ARGV[1], ARGV[3]); \c
 1437                      return 1; \c
 1438                      end; \c
 1439                    return 0\c
 1440                   ",
 1441                   1, Hash, Key, Old, New),
 1442          1)