Big renaming
[cilux] / src / drivers / op / protocol.c
diff --git a/src/drivers/op/protocol.c b/src/drivers/op/protocol.c
new file mode 100644 (file)
index 0000000..ce964a9
--- /dev/null
@@ -0,0 +1,562 @@
+
+/* -}{----------------------------------------------------------------------- */
+
+#include <kernelapi.h>
+#include <notification.h>
+
+/* -}{----------------------------------------------------------------------- */
+
+#define TMPBUFSIZE  4096
+static char         tmpbuf[TMPBUFSIZE];
+static k_hashtable* own_resources;
+
+/* -}{----------------------------------------------------------------------- */
+
+extern void  run_tests(void);
+
+extern char*   make_cache_path(char* uri);
+extern void    look_in_file_cache(ni_event* evq);
+extern void    save_in_file_cache(ni_resource* res);
+
+extern void    init_uri2chan(void);
+extern char*   get_host_for(char* uri);
+extern char*   get_channel_for(char* host);
+extern char*   use_ping_info(k_hashtable*, k_channel*);
+extern void    use_from_info(k_hashtable*, k_channel*);
+extern void    ping_tunnels(void);
+extern void    send_ping(k_channel* chan, char* firstline, char* to);
+
+/* -}{----------------------------------------------------------------------- */
+
+static int           handles_resource(char* name);
+static void          sync_resource(ni_resource* res);
+       int           connection_writable(k_channel* chan, int bufpos, int len);
+       int           connection_readable(k_channel* chan, int bufpos, int len);
+static int           recv_next_event(    k_channel* chan);
+static void          recv_request(       k_channel* chan, char* header);
+static void          recv_response(      k_channel* chan, char* header);
+static void          got_mmap(char*, char*, char*, int, k_stat, void*);
+static void          set_read_buffer(k_channel*, char*, size_t, ni_event*);
+static int           recv_entity(        k_channel* chan, int bufpos, int eof);
+static int           expecting_response(char* pub, ni_event* evt, k_channel*);
+static void          do_request(   ni_event* evq);
+       void          ensure_self_sub(ni_event* evq);
+static void          ping_resource_subs(void* arg, char* key, void* val);
+static void          ping_sub(ni_resource* res, k_hashtable* sub);
+static ni_resource* own_resource(char* uri);
+static void          send_request(ni_event* evq);
+static void          send_response(ni_event* evt);
+static k_channel*    ensure_chan(char* chanm);
+
+/* -}{----------------------------------------------------------------------- */
+
+EXPORT int op_module_loaded(void)
+{
+       ni_register_driver("op", handles_resource, sync_resource);
+
+       if(strstr(k_version, "test")){
+               //run_tests();
+       }
+
+       init_uri2chan();
+
+       own_resources=k_hashtable_new("Own Resources", 0);
+
+       k_log_out("OP Driver initialised");
+
+       return 0;
+}
+
+EXPORT void op_module_tick(void)
+{
+       static long tix;
+       tix++;
+       if(!(tix % 1000)){
+               k_hashtable_apply(own_resources, ping_resource_subs, 0);
+       }
+       if(!(tix % 3000)){
+               ping_tunnels();
+       }
+}
+
+EXPORT int op_module_event(void* data)
+{
+       ni_event* evt=data;
+       if(!k_hashtable_get(evt->ent_head, "Status:")){
+               do_request(evt);
+       }
+       else{
+               send_response(evt);
+       }
+       return 0;
+}
+
+/* -}{----------------------------------------------------------------------- */
+
+int handles_resource(char* name)
+{
+       return 0;
+}
+
+void sync_resource(ni_resource* res)
+{
+       save_in_file_cache(res);
+}
+
+/* -}{----------------------------------------------------------------------- */
+
+int connection_readable(k_channel* chan, int bufpos, int len)
+{
+       if(0) k_log_out("connection_readable %s %p %d %d %p",
+                                chan->name, chan, bufpos, len, chan->context);
+       int sof=(len==  0);
+       int eof=(len== -1);
+
+       if(sof) return 0;
+
+       do{
+               ni_event* evt=chan->context;
+               if(!evt){
+                       int n=recv_next_event(chan);
+                       if(n<0) break;
+                       bufpos-=n;
+               }
+               else{
+                       int n=recv_entity(chan, bufpos, eof);
+                       if(n<0) break;
+                       bufpos-=n;
+               }
+
+       } while(1);
+
+       if(eof && chan->context){
+               ni_event* evt=chan->context;
+               evt->entity=0;
+               ni_event_delete(evt);
+               chan->context=0;
+       }
+
+       return 0;
+}
+
+int connection_writable(k_channel* chan, int bufpos, int len)
+{
+       if(0) k_log_out("connection_writable %p %d %d %p",
+                                      chan, bufpos, len, chan->context);
+//if(len>20000) exit(1);
+       int sof=(len==  0);
+       int eof=(len== -1);
+
+       if(sof){
+               send_ping(chan, "PING OP/0.5", 0);
+               return 0;
+       }
+
+       if(eof && chan->context){
+               ni_event* evt=chan->context;
+               evt->entity=0;
+               ni_event_delete(evt);
+               chan->context=0;
+       }
+
+       return 0;
+}
+
+/* -}{---- Receiving -------------------------------------------------------- */
+
+int recv_next_event(k_channel* chan)
+{
+       char* header=k_channel_chop_div(chan, CRLF CRLF);
+       if(!header) return -1;
+       int n=strlen(header)+strlen(CRLF CRLF);
+
+       if(!strncmp(header, "GET",  3) ||
+          !strncmp(header, "SUB",  3) ||
+          !strncmp(header, "UNSUB",5) ||
+          !strncmp(header, "HEAD", 4) ||
+          !strncmp(header, "PING", 4)   ){
+
+               recv_request(chan, header);
+               return n;
+       }
+       if(!strncmp(header, "OP/", 4)   ){
+
+               recv_response(chan, header);
+               return n;
+       }
+       k_free(header);
+       k_log_err("Failed reading request or response - closing connection");
+       k_channel_close(chan);
+       return n;
+}
+
+void recv_request(k_channel* chan, char* header)
+{
+       ni_event* evq;
+       evq=ni_get_request_headers(header);
+       if(!evq){
+               k_log_err("Failed reading request headers - closing connection");
+               k_channel_close(chan);
+               return;
+       }
+       if(!k_hashtable_isn(evq->evt_head, "Protocol:", "OP/", 4)){
+               ni_event_delete(evq);
+               k_log_err("Failed reading request not OP - closing connection");
+               k_channel_close(chan);
+               return;
+       }
+
+       k_hashtable* ent_head=evq->ent_head;
+       int ping=k_hashtable_is(ent_head, "Method:", "PING");
+
+       if(!evq->uri && !ping){
+               evq->uri=k_strdup(chan->name);
+               k_hashtable_put_dup(ent_head, "URI:", chan->name);
+       }
+
+       ni_event_show(evq, "OP Protocol Request");
+
+       if(k_hashtable_isi(evq->evt_head, "Connection:", "Keep-Alive")){
+               chan->linger=1;
+               if(k_hashtable_isn(ent_head, "Sub-To:", "./test", 6)){
+                       chan->linger=0;
+               }
+       }
+       if(ping){
+               char* from=use_ping_info(ent_head, chan);
+               if(from) send_ping(chan, "OP/0.5 270 PING", from);
+               ni_event_delete(evq);
+               return;
+       }
+       //use_from_info(ent_head, chan);
+
+       ni_event* evp=ni_event_new(evq->uri, 0, k_hashtable_dup(ent_head), 0);
+
+       ni_event_delete(evq);
+
+       k_event_post("on", evp);
+}
+
+void recv_response(k_channel* chan, char* header)
+{
+       ni_event* evt=ni_get_response_headers(header);
+       if(!evt){
+               k_log_err("recv_response: headers failed but doing nothing!");
+               return;
+       }
+       char*        pub=     evt->uri;
+       k_hashtable* ent_head=evt->ent_head;
+
+       if(!expecting_response(pub, evt, chan)) return;
+
+       ni_event_show(evt, "Response");
+
+       int head=k_hashtable_is(     ent_head, "Status:", "260");
+       int nmod=k_hashtable_is(     ent_head, "Status:", "304");
+       int ping=k_hashtable_is(     ent_head, "Status:", "270");
+       int cl  =k_hashtable_get_int(ent_head, "Content-Length:");
+       int entity=!(head || nmod || ping || cl==0);
+
+       if(ping){
+               use_ping_info(ent_head, chan);
+               ni_event_delete(evt);
+               return;
+       }
+       use_from_info(ent_head, chan);
+
+       if(entity){
+               k_hashtable_set(ent_head, "Status:",      "260");
+               k_hashtable_set(ent_head, "Status-Text:", "Headers Only");
+       }
+       k_event_post("on", evt);
+
+       if(entity){
+
+               k_hashtable* eh=k_hashtable_new("nHeaders/recv_response", 1);
+               char* from      =k_hashtable_get(ent_head, "From:");
+               char* contlen   =k_hashtable_get(ent_head, "Content-Length:");
+               char* cux       =k_hashtable_get(ent_head, "CUX:");
+               k_hashtable_set(eh, "Status:",        "206");
+               k_hashtable_set(eh, "Status-Text:",   "Partial Content");
+               k_hashtable_put_dup(eh, "From:",           from);
+               k_hashtable_put_dup(eh, "Content-Length:", contlen);
+               k_hashtable_put_dup(eh, "CUX:",            cux);
+               ni_event* evc=ni_event_new(pub, 0, eh, 0);
+               chan->context=evc;
+
+               int constant=k_hashtable_is(ent_head, "CUX:", "C");
+               if(constant){
+                       char* path=make_cache_path(pub); if(!path) return;
+                       k_file_read(".", path, USE_MMAP, cl, got_mmap, chan);
+               }
+               else{
+                       char* data=k_malloc(cl);
+                       set_read_buffer(chan, data, cl, evc);
+               }
+       }
+}
+
+void got_mmap(char*  basedir,
+              char*  path,
+              char*  data,
+              int    usedmmap,
+              k_stat kstat,
+              void*  context){
+
+       k_free(path);
+       k_channel* chan=context;
+       ni_event* evt=chan->context;
+       if(!evt){ k_log_err("got_mmap: evt=0"); return; }
+       if(!data || !usedmmap){ k_log_err("got_mmap: mmap failed"); return; }
+
+       size_t cl=k_hashtable_get_int(evt->ent_head, "Content-Length:");
+       set_read_buffer(chan, data, cl, evt);
+}
+
+void set_read_buffer(k_channel* chan, char* data, size_t cl, ni_event* evt)
+{
+       evt->entity=data;
+       int r=k_channel_setbuf(chan, data, cl);
+       if(0) k_log_out("k_channel_setbuf %d", r);
+       if(r==BUFFER_ALREADY_SET){
+               k_log_err("oops! k_channel_setbuf BUFFER_ALREADY_SET");
+               return;
+       }
+       if(r==BUFFER_FILLED){
+               k_hashtable_set(evt->ent_head, "Status:",      "200");
+               k_hashtable_set(evt->ent_head, "Status-Text:", "OK");
+               chan->context=0;
+               k_event_post("on", evt);
+       }
+}
+
+int recv_entity(k_channel* chan, int bufpos, int eof)
+{
+       ni_event*   evt=chan->context;
+       k_hashtable* ent_head=evt->ent_head;
+
+       char* cls=k_hashtable_get(    ent_head, "Content-Length:");
+       int   cl =k_hashtable_get_int(ent_head, "Content-Length:");
+
+       if(!cls && !eof) return -1;
+
+       int partial=0;
+       int eofcontlen=eof && (!cls || bufpos < cl);
+       if(eofcontlen){
+               if(cls){
+                       char* clg=k_strdup(cls);
+                       k_hashtable_put(ent_head, "Content-Length-Given:", clg);
+                       partial=1;
+               }
+               cl=bufpos;
+               char b[32]; snprintf(b, 32, "%d", cl);
+               k_hashtable_put_dup(ent_head, "Content-Length:", b);
+       }
+
+       if(bufpos < cl){
+               if(bufpos){
+                       ni_event* evp=ni_event_dup(evt);
+                       snprintf(tmpbuf, TMPBUFSIZE, "0-%d", bufpos);
+                       char* cr=k_strdup(tmpbuf);
+                       k_hashtable_put(evp->ent_head, "Content-Range:", cr);
+                       k_event_post("on", evp);
+               }
+               return -1;
+       }
+
+       static char dummy_empty_entity[0];
+       if(!k_channel_getbuf(chan)){
+               int cn=k_hashtable_is(ent_head, "CUX:", "C");
+               if(cl) evt->entity=k_channel_chop_len(chan, cl);
+               else   evt->entity=cn? dummy_empty_entity: k_malloc(1);
+       }
+
+       if(!partial){
+               k_hashtable_set(ent_head, "Status:",      "200");
+               k_hashtable_set(ent_head, "Status-Text:", "OK");
+       }
+       chan->context=0;
+       k_event_post("on", evt);
+
+       return cl;
+}
+
+int expecting_response(char* pub, ni_event* evt, k_channel* chan)
+{
+       if(pub && 0){
+               k_log_err("unwanted response: %s", pub);
+               ni_event_delete(evt);
+               k_channel_close(chan);
+               return 0;
+       }
+       return 1;
+}
+
+/* -}{---- Sending ---------------------------------------------------------- */
+
+void do_request(ni_event* evq)
+{
+       k_hashtable* sub=evq->ent_head;
+       int tc=k_hashtable_isi(sub, "Sub-Type:", "Cache");
+       int to=k_hashtable_isi(sub, "Sub-Type:", "Original");
+
+       if(tc){
+               char* ims=k_hashtable_get(sub, "If-Modified-Since:");
+               if(ims) ensure_self_sub(evq);
+               else    look_in_file_cache(evq);
+       }
+       else
+       if(to){
+               send_request(evq);
+       }
+}
+
+void ensure_self_sub(ni_event* evq)
+{
+       k_hashtable* sub=evq->ent_head;
+       char*        pub=k_hashtable_get(sub, "Sub-To:");
+
+       ni_resource* res=own_resource(pub);
+       k_hashtable*  enh=res->ent_head;
+       k_hashtable*  selfsub=k_hashtable_get(enh, "Sub-To:");
+       if(selfsub && !k_hashtable_is(selfsub, "Status-Cache:", "OK")){
+               k_log_err("cancel selfsub as new one needed");
+       }
+
+       k_hashtable* ss=k_hashtable_dup(sub);
+       k_hashtable_remove( ss, "From:");
+       k_hashtable_put_dup(ss, "URI:",       pub);
+       k_hashtable_set(    ss, "Sub-Type:", "Original");
+       k_hashtable_put_dup(ss, "Via:",       get_host_for(pub));
+       if(k_hashtable_get( ss, "If-Modified-Since:")){
+               char* lm=k_hashtable_get(enh, "Last-Modified:");
+               if(!res->entity) lm=0;
+               k_hashtable_set(ss, "If-Modified-Since:", lm? lm: "0");
+       }
+       ni_event* evs=ni_event_new(0, 0, ss, 0);
+       k_event_post("on", evs);
+
+       ni_event_delete(evq);
+}
+
+void ping_resource_subs(void* arg, char* key, void* val)
+{
+       ni_resource* res=val;
+       k_hashtable* pubcache=k_hashtable_get(res->ent_head, "Pub-Cache:");
+       if(!pubcache || !k_hashtable_get(pubcache, "Method:")) return;
+       k_hashtable* subs=k_hashtable_get(res->ent_head, "Sub-To:");
+       k_hashtable* sub;
+       for(sub=subs; sub; sub=sub->next){
+               if(!k_hashtable_is(sub, "Status-Cache:", "OK")){
+                       if(!k_hashtable_get(sub, "Status:")){
+                               ping_sub(res, sub);
+                       }
+                       else{
+                               int ts=k_hashtable_get_int(sub, "Timestamp:");
+                               if(0) k_log_out("check dried-up request: %d", ts);
+                       }
+               }
+       }
+}
+
+void ping_sub(ni_resource* res, k_hashtable* sub)
+{
+       ni_resource_show(res, "ping_resource_subs");
+
+       k_hashtable* ss=k_hashtable_dup(sub);
+
+       char* subto=k_hashtable_extract(ss, "URI:");
+       k_hashtable_put_dup(ss, "URI:",    res->uri);
+       k_hashtable_put(    ss, "Sub-To:", subto);
+       k_hashtable_set(    ss, "Sub-Type:", "Original");
+       k_hashtable_put_dup(ss, "Via:",    get_host_for(res->uri));
+
+       ni_event* evs=ni_event_new(0, 0, ss, 0);
+       k_event_post("on", evs);
+}
+
+ni_resource* own_resource(char* uri)
+{
+       ni_resource* res=k_hashtable_get(own_resources, uri);
+       if(!res){
+               res=ni_resource_get(uri);
+               k_hashtable_set(own_resources, uri, res);
+       }
+       return res;
+}
+
+void send_request(ni_event* evt)
+{
+       k_hashtable* eh=evt->ent_head;
+       char* method=k_strdup(k_hashtable_get(eh, "Method:"));
+       char* to    =k_strdup(k_hashtable_get(eh, "Sub-To:"));
+       char* via   =k_strdup(k_hashtable_get(eh, "Via:"));
+
+       char* chanm=get_channel_for(via);
+       if(!chanm) goto free_and_return;
+
+       k_channel* chan=ensure_chan(chanm);
+       if(!chan) goto free_and_return;
+
+       ni_fix_ni_headers(eh, 0);
+       ni_request(evt, to, method, chan);
+
+       free_and_return:
+       k_free(method); k_free(to); k_free(via);
+       ni_event_delete(evt);
+}
+
+void send_response(ni_event* evt)
+{
+       ni_event_show(evt, "send_response");
+
+       k_hashtable* eh=evt->ent_head;
+
+       k_hashtable* sub=k_hashtable_get(eh,  "Pub-To:");
+       char* uri       =k_hashtable_get(sub, "URI:");
+       char* from      =k_hashtable_get(sub, "From:");
+       char* method    =k_hashtable_get(sub, "Method:");
+       int   methead   =k_hashtable_is( sub, "Method:", "HEAD");
+
+       char* to=from? from: uri;
+
+       char* host=from? from: get_host_for(uri);
+       char* chanm=get_channel_for(host);
+       if(!chanm){
+               if(0) k_log_out("no OP protocol channel %s", to);
+               ni_event_delete(evt);
+               return;
+       }
+
+       k_channel* chan=ensure_chan(chanm);
+       if(!chan){
+               if(0) k_log_out("no OP protocol channel %s", to);
+               ni_event_delete(evt);
+               return;
+       }
+
+       k_hashtable_extract(eh, "Pub-To:");
+
+       char* protocol="OP/0.5";
+
+       ni_fix_ni_headers(eh, methead);
+       ni_response(evt, to, method, protocol, 0, chan);
+
+       k_hashtable_delete(sub);
+       evt->entity=0;
+       ni_event_delete(evt);
+}
+
+k_channel* ensure_chan(char* chanm)
+{
+       k_channel* chan=k_channel_get_name(chanm);
+       if(!chan){
+               k_log_err("Cannot find current channel for %s", chanm);
+               k_channel_connect_name(chanm, connection_readable,
+                                             connection_writable);
+       }
+       return chan;
+}
+
+/* -}{----------------------------------------------------------------------- */
+