35
   36:- module(swish_data_source,
   37          [ data_source/2,                 38            data_record/2,                 39            record/2,                      40            data_property/2,               41            data_row/2,                    42            data_row/4,                    43            data_dump/3,                   44
   45            data_flush/1,                  46            'data assert'/1,               47            'data materialized'/3,	   48            'data failed'/2		   49          ]).   50:- use_module(library(error)).   51:- use_module(library(lists)).   52:- use_module(library(settings)).   53:- use_module(library(solution_sequences)).   54:- use_module(library(pengines)).   55
   56:- setting(max_memory, integer, 8000,
   57           "Max memory used for cached data store (Mb)").
   67:- meta_predicate
   68    data_source(:, +),
   69    data_record(:, -),
   70    record(:, -),
   71    data_row(:, -),
   72    data_row(:, +, +, -),
   73    data_dump(:, +, -),
   74    data_property(:, -).   75
   76:- multifile
   77    source/2.                              78
   79
   80		    83
   84:- dynamic
   85    data_source_db/3,                      86    data_signature_db/2,                   87    data_materialized/5,                   88    data_last_access/3.                    89
   90'data assert'(Term) :-
   91    assertz(Term).
  105'data materialized'(Hash, Signature, SourceVersion) :-
  106    statistics(cputime, CPU1),
  107    get_time(Now),
  108    nb_current('$data_source_materalize', stats(Time0, CPU0)),
  109    CPU  is CPU1 - CPU0,
  110    Wall is Now - Time0,
  111    assertz(data_signature_db(Hash, Signature)),
  112    assertz(data_materialized(Hash, Now, SourceVersion, CPU, Wall)).
  113
  114'data failed'(_Hash, Signature) :-
  115    functor(Signature, Name, Arity),
  116    functor(Generic, Name, Arity),
  117    retractall(Generic).
  124data_source(M:Id, Source) :-
  125    variant_sha1(Source, Hash),
  126    data_source_db(Hash, Source, _),
  127    !,
  128    (   clause(M:'$data'(Id, Hash), true)
  129    ->  true
  130    ;   assertz(M:'$data'(Id, Hash))
  131    ).
  132data_source(M:Id, Source) :-
  133    valid_source(Source),
  134    variant_sha1(Source, Hash),
  135    mutex_create(Lock),
  136    assertz(data_source_db(Hash, Source, Lock)),
  137    assertz(M:'$data'(Id, Hash)).
  148record(Id, Record) :-
  149    data_record(Id, Record).
  150
  151data_record(M:Id, Record) :-
  152    data_hash(M:Id, Hash),
  153    materialize(Hash),
  154    data_signature_db(Hash, Signature),
  155    data_record(Signature, Id, Record, Head),
  156    call(Head).
  157
  158data_record(Signature, Tag, Record, Head) :-
  159    Signature =.. [Name|Keys],
  160    pairs_keys_values(Pairs, Keys, Values),
  161    dict_pairs(Record, Tag, Pairs),
  162    Head =.. [Name|Values].
  163
  164data_hash(M:Id, Hash) :-
  165    clause(M:'$data'(Id, Hash), true),
  166    !.
  167data_hash(_:Id, _) :-
  168    existence_error(dataset, Id).
  179data_row(Id, Row) :-
  180    data_row(Id, all, true, Row).
  181
  182data_row(M:Id, Range, Header, Row) :-
  183    must_be(boolean, Header),
  184    data_hash(M:Id, Hash),
  185    materialize(Hash),
  186    data_signature_db(Hash, Signature),
  187    Signature =.. [_|ColNames],
  188    same_length(ColNames, Vars),
  189    Goal =.. [Hash|Vars],
  190    Row  =.. [Id|Vars],
  191    (   Header == true,
  192        Vars = ColNames
  193    ;   range(Range, M:Id, Goal)
  194    ).
  195
  196range(all, _Id, Goal) :-
  197    !,
  198    call(Goal).
  199range(From-To, _Id, Goal) :-
  200    !,
  201    Skip is From - 1,
  202    Size is To-Skip,
  203    limit(Size, offset(Skip, call(Goal))).
  204range(Limit, _Id, Goal) :-
  205    Limit >= 0,
  206    !,
  207    limit(Limit, call(Goal)).
  208range(Limit, Id, Goal) :-
  209    Limit < 0,
  210    data_property(Id, rows(Rows)),
  211    Skip is Rows+Limit,
  212    offset(Skip, call(Goal)).
  228data_dump(Id, Range, Table) :-
  229    findall(Row, data_row(Id, Range, true, Row), Table).
  255data_property(M:Id, Property) :-
  256    data_hash(M:Id, Hash),
  257    materialize(Hash),
  258    property(Property),
  259    property(Property, Hash).
  260
  261property(columns(_)).
  262property(column_names(_)).
  263property(rows(_)).
  264property(hash(_)).
  265property(source_version(_)).
  266property(materialized(_)).
  267property(source(_)).
  268
  269property(columns(Count), Hash) :-
  270    data_signature_db(Hash, Signature),
  271    functor(Signature, _, Count).
  272property(column_names(Names), Hash) :-
  273    data_signature_db(Hash, Signature),
  274    Signature =.. [_|Names].
  275property(rows(Count), Hash) :-
  276    data_signature_db(Hash, Signature),
  277    predicate_property(Signature, number_of_clauses(Count)).
  278property(hash(Hash), Hash).
  279property(source_version(SourceVersion), Hash) :-
  280    data_materialized(Hash, _, SourceVersion, _, _).
  281property(materialized(TimeStamp), Hash) :-
  282    data_materialized(Hash, TimeStamp, _, _, _).
  283property(source(SourceTerm), Hash) :-
  284    data_source_db(Hash, SourceTerm, _Lock).
  293:- multifile
  294    swish:goal_expansion/2.  295
  296swish:goal_expansion(Dict, swish_data_source:Head) :-
  297    is_dict(Dict, Id),
  298    prolog_load_context(module, M),
  299    clause(M:'$data'(Id, Hash), true),
  300    materialize(Hash),
  301    data_signature_db(Hash, Signature),
  302    data_record(Signature, Id, Record, Head),
  303    Dict :< Record.
  304
  305
  306		   309
  310valid_source(Source) :-
  311    must_be(nonvar, Source),
  312    source(Source, _Goal),
  313    !.
  314valid_source(Source) :-
  315    existence_error(data_source, Source).
  331materialize(Hash) :-
  332    must_be(atom, Hash),
  333    data_materialized(Hash, _When, _From, _CPU, _Wall),
  334    !,
  335    update_last_access(Hash).
  336materialize(Hash) :-
  337    data_source_db(Hash, Source, Lock),
  338    update_last_access(Hash),
  339    gc_data,
  340    with_mutex(Lock, materialize_sync(Hash, Source)).
  341
  342materialize_sync(Hash, _Source) :-
  343    data_materialized(Hash, _When, _From, _CPU, _Wall),
  344    !.
  345materialize_sync(Hash, Source) :-
  346    source(Source, Goal),
  347    get_time(Time0),
  348    statistics(cputime, CPU0),
  349    setup_call_cleanup(
  350        b_setval('$data_source_materalize', stats(Time0, CPU0)),
  351        call(Goal, Hash),
  352        nb_delete('$data_source_materalize')),
  353    data_signature_db(Hash, Head),
  354    functor(Head, Name, Arity),
  355    public(Name/Arity).
  356
  357
  358		 
  367update_last_access(Hash) :-
  368    get_time(Now),
  369    Rounded is floor(Now/60)*60,
  370    (   data_last_access(Hash, Rounded, _)
  371    ->  true
  372    ;   clause(data_last_access(Hash, _, C0), true, Old)
  373    ->  C is C0+1,
  374        asserta(data_last_access(Hash, Rounded, C)),
  375        erase(Old)
  376    ;   asserta(data_last_access(Hash, Rounded, 1))
  377    ).
  378
  379gc_stats(Hash, _{ hash:Hash,
  380                  materialized:When, cpu:CPU, wall:Wall,
  381                  bytes:Size,
  382                  last_accessed_ago:Ago,
  383                  access_frequency:AccessCount
  384                }) :-
  385    data_materialized(Hash, When, _From, CPU, Wall),
  386    data_signature_db(Hash, Signature),
  387    data_last_access(Hash, Last, AccessCount),
  388    get_time(Now),
  389    Ago is floor(Now/60)*60-Last,
  390    predicate_property(Signature, number_of_clauses(Count)),
  391    functor(Signature, _, Arity),
  392    Size is (88+(16*Arity))*Count.
  402gc_data :-
  403    setting(max_memory, MB),
  404    Bytes is MB*1024*1024,
  405    gc_data(Bytes),
  406    set_module(program_space(Bytes)).
  407
  408gc_data(MaxSize) :-
  409    module_property(swish_data_source, program_size(Size)),
  410    Size < MaxSize,
  411    !.
  412gc_data(MaxSize) :-
  413    findall(Stat, gc_stats(_, Stat), Stats),
  414    sort(last_accessed_ago, >=, Stats, ByTime),
  415    member(Stat, ByTime),
  416       data_flush(ByTime.hash),
  417       module_property(swish_data_source, program_size(Size)),
  418       Size < MaxSize,
  419    !.
  420gc_data(_).
  427data_flush(Hash) :-
  428    data_signature_db(Hash, Signature),
  429    data_record(Signature, _Id, _Record, Head),
  430    retractall(Head),
  431    retractall(data_signature_db(Hash, Head)),
  432    retractall(data_materialized(Hash, _When1, _From, _CPU, _Wall)),
  433    retractall(data_last_access(Hash, _When2, _Count)).
  434
  435
  436		   439
  440:- multifile
  441    sandbox:safe_meta/2.  442
  443sandbox:safe_meta(swish_data_source:data_source(Id,_), [])     :- safe_id(Id).
  444sandbox:safe_meta(swish_data_source:data_record(Id,_), [])     :- safe_id(Id).
  445sandbox:safe_meta(swish_data_source:record(Id,_), [])          :- safe_id(Id).
  446sandbox:safe_meta(swish_data_source:data_row(Id,_), [])        :- safe_id(Id).
  447sandbox:safe_meta(swish_data_source:data_row(Id,_,_,_), [])    :- safe_id(Id).
  448sandbox:safe_meta(swish_data_source:data_dump(Id,_,_), [])     :- safe_id(Id).
  449sandbox:safe_meta(swish_data_source:data_property(Id,_), [])   :- safe_id(Id).
  450
  451safe_id(M:_) :- !, pengine_self(M).
  452safe_id(_)
 
Cached data access
This module provides access to external data by caching it as a Prolog predicate. The data itself is kept in a global data module, so it is maintained over a SWISH Pengine invocation. */