Clovisor ONS demo related fixes 86/67486/1
authorStephen Wong <stephen.kf.wong@gmail.com>
Fri, 5 Apr 2019 06:16:53 +0000 (06:16 +0000)
committerStephen Wong <stephen.kf.wong@gmail.com>
Fri, 5 Apr 2019 06:19:29 +0000 (06:19 +0000)
Change-Id: I9449ee5f699a3cdf471dc8b405de650325ae09f6
Signed-off-by: Stephen Wong <stephen.kf.wong@gmail.com>
12 files changed:
clover/clovisor/bin/clovisor
clover/clovisor/build-docker
clover/clovisor/build.sh
clover/clovisor/clovisor_main.go
clover/clovisor/libclovisor/clovisor_bcc.go
clover/clovisor/libclovisor/clovisor_cfg.go
clover/clovisor/libclovisor/clovisor_k8s.go
clover/clovisor/libclovisor/ebpf/node_interface.c [new file with mode: 0755]
clover/clovisor/libclovisor/libproto/build-plugin [new file with mode: 0755]
clover/clovisor/libclovisor/libproto/clovisor_http.go [new file with mode: 0644]
clover/clovisor/libclovisor/libproto/http_alt.go [new file with mode: 0644]
clover/clovisor/proto/http.so [new file with mode: 0644]

index 240e0bc..a683ee6 100755 (executable)
Binary files a/clover/clovisor/bin/clovisor and b/clover/clovisor/bin/clovisor differ
index 99668d7..4f776ec 100755 (executable)
@@ -14,7 +14,7 @@ if [ -z "$1" ]
 fi
 cp bin/clovisor .
 docker build --build-arg TARGET_KERNEL_VER=$kernel_ver -t clovisor .
-#docker tag clovisor localhost:5000/clovisor
-#docker push localhost:5000/clovisor
-docker tag clovisor s3wong/clovisor
-docker push s3wong/clovisor
+docker tag clovisor localhost:5000/clovisor
+docker push localhost:5000/clovisor
+#docker tag clovisor s3wong/clovisor
+#docker push s3wong/clovisor
index 4503d5a..4a9cfe6 100755 (executable)
@@ -5,11 +5,11 @@
 # which accompanies this distribution, and is available at
 # http://www.apache.org/licenses/LICENSE-2.0
 
-GOVERSION=1.10.3
+GOVERSION=1.12
 OS=linux
 ARCH=amd64
-GOPATH=/home/ubuntu/go
-CLIENTGOVERSION=v8.0.0
+GOPATH=/home/s3wong/go
+CLIENTGOVERSION=v10.0.0
 
 SRCDIR=`pwd`
 
@@ -28,6 +28,7 @@ go get github.com/google/gopacket
 go get github.com/iovisor/gobpf
 go get github.com/opentracing/opentracing-go
 go get github.com/pkg/errors
+go get github.com/go-redis/redis
 go get github.com/uber/jaeger-client-go
 go get github.com/vishvananda/netlink
 go get github.com/vishvananda/netns
@@ -39,7 +40,7 @@ cd $GOPATH/src/k8s.io/client-go
 git checkout $CLIENTGOVERSION
 godep restore ./...
 
-cd $SRCDIR/libclovisor
-go build .
-cd ../
-go build -o clovisor .
+#cd $SRCDIR/libclovisor
+#go build .
+#cd ../
+#go build -o clovisor .
index e235c50..b8e6508 100644 (file)
@@ -9,8 +9,10 @@ package main
 
 import (
     "fmt"
+    "io/ioutil"
     "os"
     "os/signal"
+    "path/filepath"
     "syscall"
 
     clovisor "./libclovisor"
@@ -21,6 +23,23 @@ var podMonitoringMap map[string]*clovisor.ClovisorBCC
 func main() {
     node_name := os.Getenv("MY_NODE_NAME")
 
+    ex, err := os.Executable()
+    if err != nil {
+        fmt.Println(err.Error())
+    } else {
+        exPath := filepath.Dir(ex)
+        fmt.Printf("Current Working Directory is %v\n", exPath)
+        files, _ := ioutil.ReadDir(exPath)
+        for _, f := range files {
+            fmt.Printf("%v ",f.Name())
+        }
+        fmt.Printf("\n")
+    }
+
+    clovisor.Monitor_proto_plugin_cfg()
+
+    clovisor.ClovisorPhyInfSetup()
+
     podMonitoringMap = make(map[string]*clovisor.ClovisorBCC)
 
     clovisor_k8s_client, err := clovisor.K8s_client_init(node_name)
index ab5bc33..a6c74ef 100644 (file)
@@ -9,15 +9,20 @@ package clovisor
 
 import (
     "encoding/hex"
-    "bufio"
+    //"encoding/json"
     "bytes"
     "encoding/binary"
+    "errors"
     "fmt"
     "io/ioutil"
-    "net/http"
+    "net"
+    //"net/http"
+    "plugin"
     "strconv"
+    "strings"
     "time"
 
+    //"github.com/go-redis/redis"
     "github.com/google/gopacket"
     "github.com/google/gopacket/layers"
     "github.com/iovisor/gobpf/bcc"
@@ -30,11 +35,15 @@ import (
 /*
 #cgo CFLAGS: -I/usr/include/bcc/compat
 #cgo LDFLAGS: -lbcc
-#include <bcc/bpf_common.h>
+#include <bcc/bcc_common.h>
 #include <bcc/libbpf.h>
 */
 import "C"
 
+type Parser interface {
+    Parse(session_key string, is_req bool, data []byte)([]byte, map[string]string)
+}
+
 type ClovisorBCC struct {
     stopChan    chan bool
     // TODO(s3wong): remove once k8s watcher available
@@ -65,8 +74,12 @@ type egress_match_cfg struct {
 }
 
 type session_info_t struct {
-    session     map[string]string
-    buf         []byte
+    done        bool
+    service     string
+    generalInfo map[string]string
+    traces      []map[string]string
+    reqBuf      []byte
+    respBuf     []byte
 }
 
 const (
@@ -77,7 +90,12 @@ const (
 )
 
 //var sessionMap map[string]map[string]string;
-var sessionMap map[string]session_info_t;
+var sessionMap map[string]*session_info_t;
+var protocolParser = map[string]Parser{};
+var defaultModPath = map[string]string{
+    "http":     "/proto/http.so",
+}
+var tracerMap = map[string]opentracing.Tracer{};
 
 var veth_ifidx_command = "cat /sys/class/net/eth0/iflink";
 
@@ -88,6 +106,20 @@ var protocolMap = map[string]int{
     "udp":      4,
 }
 
+var traceTable string = "NetworkTraces"
+
+/*
+ * redisConnect: redis client connecting to redis server
+func redisConnect() *redis.Client {
+    client := redis.NewClient(&redis.Options{
+        Addr:       fmt.Sprintf("%s:6379", redisServer),
+        Password:   "",
+        DB:         0,
+    })
+    return client
+}
+ */
+
 func linkSetup(ifname string) netlink.Link {
     link, err := netlink.LinkByName(ifname)
     netlink.LinkSetUp(link)
@@ -114,43 +146,117 @@ func dumpBPFTable(table *bcc.Table) {
     }
 }
 
-func print_network_traces(tracer opentracing.Tracer) {
-    for key, sess_info := range sessionMap {
-        value := sess_info.session
-        if _, ok := value["done"]; ok {
-            span := tracer.StartSpan("http-tracing")
-            span.SetTag("Node-Name", value["nodename"])
-            span.SetTag("Pod-Name", value["podname"])
-            span.SetTag("Source-IP", value["srcip"])
-            span.SetTag("Destination-IP", value["dstip"])
-            span.SetTag("Source-Port", value["srcport"])
-            span.SetTag("Destination-Port", value["dstport"])
-            span.SetTag("HTTP Request Method", value["reqmethod"])
-            span.SetTag("HTTP Request URL", value["requrl"])
-            span.SetTag("HTTP Request Protocol", value["reqproto"])
-            if _, exist := value["host"]; exist {
-                span.SetTag("HTTP Request Host", value["host"])
-            }
-            if _, exist := value["useragent"]; exist {
-                span.SetTag("HTTP Client User Agent", value["useragent"])
+func loadProtoParser(protocol string, update bool) error {
+    var modPath = ""
+
+    if !update {
+        if _, ok := protocolParser[protocol]; ok {
+            fmt.Printf("Found parse function for protocol %s\n", protocol)
+            return nil
+        }
+    }
+
+    client := redisConnect()
+
+    redisResult := client.HGet(ProtoCfg, protocol)
+    if redisResult.Err() == nil {
+        if len(redisResult.Val()) > 0 {
+            modPath = redisResult.Val()
+        }
+    }
+    if len(modPath) == 0 {
+        if _, ok := defaultModPath[protocol]; ok {
+            modPath = defaultModPath[protocol]
+        } else {
+            return errors.New(fmt.Sprintf("Unable to find module path for protocol %s", protocol))
+        }
+    }
+
+    fmt.Printf("Loading plugin for protocol %v with %v\n", protocol, modPath)
+
+    plug, err := plugin.Open(modPath)
+    if err != nil {
+        fmt.Println(err)
+        return err
+    }
+
+    symParse, err := plug.Lookup("Parser")
+    if err != nil {
+        fmt.Println(err)
+        return err
+    }
+
+    var parser Parser
+    parser, ok := symParse.(Parser)
+    if !ok {
+        fmt.Printf("Unexpected type from mod %s symbol parse\n", modPath)
+        return errors.New(fmt.Sprintf("Wrong type for func parse from %s", modPath))
+    }
+
+    protocolParser[protocol] = parser
+    return nil
+}
+
+func print_network_traces() {
+    /*
+    client := redisConnect()
+
+    traces, err := client.HGetAll(traceTable).Result()
+    if err != nil {
+        fmt.Printf("Error retriving traces from redis: %v\n", err.Error())
+        return
+    }
+    */
+    /*
+        structure:
+        "done": "true",
+        "traces": array of protocol traces
+                  [0] : "admin": map[string]string
+                  [1] : "ipv4 or ipv6": map[string]string
+                  [2] : "tcp or udp": map[string]string
+                  [3] : "http"...
+     */
+     /*
+    for key, value := range traces {
+        traceMap := map[string]interface{}
+        json.Unmarshal([]byte(value), &traceMap)
+        if _, ok := traceMap["done"]; ok {
+            span := tracer.StartSpan(fmt.Sprintf("tracing-%s", key))
+            for idx, trace := range traceMap["traces"] {
+                span.SetTag(fmt.Sprintf("protocol-%d", idx), trace['protocol'])
+                for tag, tagVal := range trace {
+                    if tag == "protocol" {
+                        continue
+                    }
+                    span.SetTag(tag, tagVal)
+                }
             }
-            if _, exist := value["requestid"]; exist {
-                span.SetTag("OpenTracing Request ID", value["requestid"])
+            span.Finish()
+            ret := client.HDel(traceTable, key)
+            if ret.Err() != nil {
+                fmt.Printf("Error deleting %v from %v: %v\n", key, traceTable, ret.Err())
             }
-            if _, exist := value["envoydecorator"]; exist {
-                span.SetTag("Envoy Decorator", value["envoydecorator"])
+        }
+    }
+    */
+    for key, value := range sessionMap {
+        if value.done {
+            tracer := tracerMap[value.service]
+            span := tracer.StartSpan(fmt.Sprintf("tracing-%s", key))
+            for genTag, genVal := range value.generalInfo {
+                fmt.Printf("general info writing %v: %v\n", genTag, genVal)
+                span.SetTag(genTag, genVal)
             }
-            if _, exist := value["traceid"]; exist {
-                span.SetTag("Trace ID", value["traceid"])
+            for idx, trace := range value.traces {
+                span.SetTag(fmt.Sprintf("protocol-%d", idx), trace["protocol"])
+                for tag, tagVal := range trace {
+                    if tag == "protocol" {
+                        continue
+                    }
+                    fmt.Printf("%v writing %v: %v\n", trace["protocol"], tag, tagVal)
+                    span.SetTag(tag, tagVal)
+                }
             }
-            span.SetTag("HTTP Request Packet Count", value["reqpakcount"])
-            span.SetTag("HTTP Request Byte Count", value["reqbytecount"])
-            span.SetTag("HTTP Response Status", value["respstatus"])
-            span.SetTag("HTTP Response Status Code", value["respstatuscode"])
-            span.SetTag("HTTP Response Protocol", value["respproto"])
-            span.SetTag("HTTP Response Packet Count", value["resppakcount"])
-            span.SetTag("HTTP Response Byte Count", value["respbytecount"])
-            span.SetTag("HTTP Session Duration", value["duration"])
             span.Finish()
             delete(sessionMap, key)
         }
@@ -160,12 +266,15 @@ func print_network_traces(tracer opentracing.Tracer) {
 func handle_skb_event(data *[]byte, node_name string, pod_name string,
                       session_table *bcc.Table,
                       monitoring_info *monitoring_info_t,
-                      egress_match_list []egress_match_t) (error) {
+                      egress_match_list []egress_match_t,
+                      svc_name string) (error) {
     //fmt.Printf("monitoring info has %v\n", monitoring_info)
-    fmt.Printf("\n\nnode[%s] pod[%s]\n%s", node_name, pod_name, hex.Dump(*data))
+    fmt.Printf("\n\nnode[%s] pod[%s]\n%s\n", node_name, pod_name, hex.Dump(*data))
+    var ipproto layers.IPProtocol
     var src_ip, dst_ip uint32
     var src_port, dst_port uint16
     var session_key, src_ip_str, dst_ip_str string
+    is_req := false
     proto := HTTP
     is_ingress:= binary.LittleEndian.Uint32((*data)[0:4])
     packet := gopacket.NewPacket((*data)[4:len(*data)],
@@ -175,6 +284,7 @@ func handle_skb_event(data *[]byte, node_name string, pod_name string,
         ipv4, _ := ipv4_layer.(*layers.IPv4)
         src_ip_str = ipv4.SrcIP.String()
         dst_ip_str = ipv4.DstIP.String()
+        ipproto = ipv4.Protocol
         fmt.Printf("Source: %s      Dest: %s\n", src_ip_str, dst_ip_str)
         // Note: the src_ip and dst_ip var here are ONLY being used as
         // lookup key to eBPF hash table, hence preserving network
@@ -210,79 +320,128 @@ func handle_skb_event(data *[]byte, node_name string, pod_name string,
             break
         }
     }
-    app_layer := packet.ApplicationLayer()
-    if app_layer == nil {
-        fmt.Printf("No application layer, TCP packet\n")
-        proto = TCP
-    }
     if dst_port == uint16(monitoring_info.port_num) || egress_port_req {
-        session_key = fmt.Sprintf("%x.%x:%d:%d", src_ip, dst_ip, src_port,
-                                  dst_port)
-        if _, ok := sessionMap[session_key]; !ok {
-            sessionMap[session_key] = session_info_t{}
-            sess_map := sessionMap[session_key]
-            sess_map.session = make(map[string]string)
-            sess_map.buf = []byte{}
-            sessionMap[session_key] = sess_map
-            zero := strconv.Itoa(0)
-            sessionMap[session_key].session["reqpakcount"] = zero
-            sessionMap[session_key].session["reqbytecount"] = zero
-            sessionMap[session_key].session["resppakcount"] = zero
-            sessionMap[session_key].session["respbytecount"] = zero
+        is_req = true
+    }
+    if is_req {
+        session_key = fmt.Sprintf("%x:%x:%d:%d:%d", src_ip, dst_ip, ipproto,
+                                  src_port, dst_port)
+    } else {
+        session_key = fmt.Sprintf("%x:%x:%d:%d:%d", dst_ip, src_ip, ipproto,
+                                  dst_port, src_port)
+    }
+    var sess_map *session_info_t
+    if _, ok := sessionMap[session_key]; !ok {
+        sess_map = &session_info_t{}
+        sess_map.done = false
+        sess_map.service = svc_name
+        sess_map.generalInfo = make(map[string]string)
+        sess_map.traces = []map[string]string{}
+        sess_map.reqBuf = []byte{}
+        sess_map.respBuf = []byte{}
+        sessionMap[session_key] = sess_map
+        zero := strconv.Itoa(0)
+        sessionMap[session_key].generalInfo["reqpakcount"] = zero
+        sessionMap[session_key].generalInfo["reqbytecount"] = zero
+        sessionMap[session_key].generalInfo["resppakcount"] = zero
+        sessionMap[session_key].generalInfo["respbytecount"] = zero
+        sessionMap[session_key].generalInfo["nodename"] = node_name
+        sessionMap[session_key].generalInfo["podname"] = pod_name
+    } else {
+        sess_map = sessionMap[session_key]
+    }
+
+    curr_pak_count := 0
+    curr_byte_count := 0
+    map_val := sess_map.generalInfo
+    if is_req {
+        curr_pak_count, _ = strconv.Atoi(map_val["reqpakcount"])
+        curr_byte_count, _ = strconv.Atoi(map_val["reqbytecount"])
+    } else {
+        curr_pak_count, _ = strconv.Atoi(map_val["resppakcount"])
+        curr_byte_count, _ = strconv.Atoi(map_val["respbytecount"])
+    }
+    curr_pak_count++
+    curr_byte_count += len(packet.Data())
+    if is_req {
+        map_val["reqpakcount"] = strconv.Itoa(curr_pak_count)
+        map_val["reqbytecount"] = strconv.Itoa(curr_byte_count)
+    } else {
+        map_val["resppakcount"] = strconv.Itoa(curr_pak_count)
+        map_val["respbytecount"] = strconv.Itoa(curr_byte_count)
+    }
+
+    if is_req {
+        // TODO (s3wong): just do IPv4 and TCP without using the plugin for now
+        // the condition check itself is cheating also...
+        if len(sess_map.traces) <= 1 {
+            ipv4Map := make(map[string]string)
+            ipv4Map["protocol"] = "IPv4"
+            ipv4Map["srcip"] = src_ip_str
+            ipv4Map["dstip"] = dst_ip_str
+            sess_map.traces = append(sess_map.traces, ipv4Map)
+            tcpMap := make(map[string]string)
+            tcpMap["protocol"] = "TCP"
+            tcpMap["srcport"] = fmt.Sprintf("%d", src_port)
+            tcpMap["dstport"] = fmt.Sprintf("%d", dst_port)
+            sess_map.traces = append(sess_map.traces, tcpMap)
         }
-        map_val := sessionMap[session_key].session
-        map_val["nodename"] = node_name
-        map_val["podname"] = pod_name
-        map_val["srcip"] = src_ip_str
-        map_val["dstip"] = dst_ip_str
-        map_val["srcport"] = fmt.Sprintf("%d", src_port)
-        map_val["dstport"] = fmt.Sprintf("%d", dst_port)
-        curr_pak_count, _ := strconv.Atoi(map_val["reqpakcount"])
-        curr_byte_count, _ := strconv.Atoi(map_val["reqbytecount"])
-        curr_pak_count++
-        if proto == HTTP {
-            curr_byte_count += len(app_layer.Payload())
-            reader := bytes.NewReader(app_layer.Payload())
-            buf := bufio.NewReader(reader)
-            req, err := http.ReadRequest(buf)
-            if err != nil {
-                fmt.Printf("Request error: ")
-                fmt.Println(err)
-            } else if req == nil {
-                fmt.Println("request is nil")
-            } else {
-                fmt.Printf("HTTP Request Method %s url %v proto %v\n",
-                            req.Method, req.URL, req.Proto)
-                map_val["reqmethod"] = req.Method
-                map_val["requrl"] = fmt.Sprintf("%v", req.URL)
-                map_val["reqproto"] = fmt.Sprintf("%v", req.Proto)
-                if user_agent := req.UserAgent(); len(user_agent) > 0 {
-                    map_val["useragent"] = user_agent
-                }
-                if len(req.Host) > 0 {
-                    map_val["host"] = req.Host
-                }
-                header := req.Header
-                if req_id := header.Get("X-Request-Id"); len(req_id) > 0 {
-                    map_val["requestid"] = req_id
-                }
-                if envoy_dec := header.Get("X-Envoy-Decorator-Operation"); len(envoy_dec) > 0 {
-                    map_val["envoydecorator"] = envoy_dec
-                }
-                if trace_id := header.Get("X-B3-Traceid"); len(trace_id) > 0 {
-                    map_val["traceid"] = trace_id
-                }
-                if _, ok := map_val["respstatus"]; ok {
-                    map_val["done"] = "true"
+    }
+
+    var dataptr []byte
+    app_layer := packet.ApplicationLayer()
+    errStr := ""
+    if app_layer != nil {
+        if is_req {
+            dataptr = append(sess_map.reqBuf, app_layer.Payload()...)
+        } else {
+            dataptr = append(sess_map.respBuf, app_layer.Payload()...)
+        }
+        for _, protocol := range monitoring_info.protocols {
+            if _, ok := protocolParser[protocol]; ok {
+                parser := protocolParser[protocol]
+                new_dataptr, parseMap := parser.Parse(session_key, is_req,
+                                                      dataptr)
+                if parseMap != nil {
+                    protocolTag := strings.ToUpper(protocol)
+                    merged := false
+                    for _, existing := range sess_map.traces {
+                        if existing["protocol"] == protocolTag {
+                            for k, v := range parseMap {
+                                existing[k] = v
+                            }
+                            merged = true
+                            break
+                        }
+                    }
+                    if !merged {
+                        parseMap["protocol"] = strings.ToUpper(protocol)
+                        sess_map.traces = append(sess_map.traces, parseMap)
+                    }
+                    dataptr = new_dataptr
+                } else {
+                    // offset to packet is off, no need to continue
+                    // parsing, return error
+                    errStr = fmt.Sprintf("Error: unable to parse protocol %v", protocol)
+                    fmt.Println(errStr)
+                    //return errors.New(errStr)
+                    break
                 }
             }
+        }
+    } else {
+        fmt.Printf("No application layer, TCP packet\n")
+        return nil
+    }
+
+    if len(errStr) > 0 {
+        // buffer
+        if is_req {
+            sess_map.reqBuf = append([]byte(nil), dataptr...)
         } else {
-            // TODO(s3wong): TCP assumed for now
-            curr_byte_count += (len(*data) - 4)
+            sess_map.respBuf = append([]byte(nil), dataptr...)
         }
-        map_val["reqpakcount"] = strconv.Itoa(curr_pak_count)
-        map_val["reqbytecount"] = strconv.Itoa(curr_byte_count)
-        fmt.Printf("Current session packet count %v and byte count %v\n", map_val["reqpakcount"], map_val["reqbytecount"])
+        //sessionMap[session_key] = sess_map
     } else {
         session_key := session_key_t {
             src_ip: dst_ip,
@@ -304,7 +463,7 @@ func handle_skb_event(data *[]byte, node_name string, pod_name string,
             var duration uint64 = 0
             leaf_buf := bytes.NewBuffer(leaf)
             if leaf_buf == nil {
-                fmt.Println("Error: leaf is nil")
+                fmt.Println("Error: unable to allocate new byte buffer")
                 return nil
             }
             session := session_t{}
@@ -314,89 +473,31 @@ func handle_skb_event(data *[]byte, node_name string, pod_name string,
                 return nil
             }
             if session.Resp_time == 0 {
-                fmt.Printf("session response time not set?\n")
+                fmt.Printf("session response time not set yet\n")
             } else {
                 duration = (session.Resp_time - session.Req_time)/1000
-                fmt.Printf("Leaf %v\n", leaf)
+                fmt.Printf("session time : %v\n", session)
                 fmt.Printf("Duration: %d usec\n", duration)
             }
-            sess_key := fmt.Sprintf("%x.%x:%d:%d", dst_ip, src_ip,
-                                    dst_port, src_port)
-            if _, ok := sessionMap[sess_key]; !ok {
-                //sessionMap[sess_key] = make(map[string]string)
-                sessionMap[sess_key] = session_info_t{}
-                sess_map := sessionMap[sess_key]
-                sess_map.session = make(map[string]string)
-                sess_map.buf = []byte{}
-                sessionMap[sess_key] = sess_map
-                zero := strconv.Itoa(0)
-                sessionMap[sess_key].session["reqpakcount"] = zero
-                sessionMap[sess_key].session["reqbytecount"] = zero
-                sessionMap[sess_key].session["resppakcount"] = zero
-                sessionMap[sess_key].session["respbytecount"] = zero
-            }
-            var map_val = sessionMap[sess_key].session
-            map_val["nodename"] = node_name
-            map_val["podname"] = pod_name
-            map_val["srcip"] = dst_ip_str
-            map_val["dstip"] = src_ip_str
-            map_val["srcport"] = fmt.Sprintf("%d", dst_port)
-            map_val["dstport"] = fmt.Sprintf("%d", src_port)
             map_val["duration"] = fmt.Sprintf("%v usec", duration)
-            curr_pak_count, _ := strconv.Atoi(map_val["resppakcount"])
-            curr_byte_count, _ := strconv.Atoi(map_val["respbytecount"])
-            curr_pak_count++
-
-            if proto == HTTP {
-                curr_byte_count += len(app_layer.Payload())
-                reader := bytes.NewReader(app_layer.Payload())
-                buf := bufio.NewReader(reader)
-                resp, err := http.ReadResponse(buf, nil)
-                read_http := true
-                if err != nil {
-                    fmt.Printf("Response error: ")
-                    fmt.Println(err)
-                    sess_map := sessionMap[sess_key]
-                    sess_map.buf = append(sess_map.buf, app_layer.Payload()...)
-                    reader = bytes.NewReader(sess_map.buf)
-                    buf = bufio.NewReader(reader)
-                    resp, err = http.ReadResponse(buf, nil)
-                    if err != nil || resp == nil {
-                        if err != nil {
-                            fmt.Printf("Response error: %v\n", err)
-                        }
-                        read_http = false
-                    }
-                    sessionMap[sess_key] = sess_map
-                } else if resp == nil {
-                    fmt.Println("response is nil")
-                    read_http = false
-                }
-                if read_http {
-                    fmt.Printf("HTTP Response Status %v code %v Proto %v\n",
-                                resp.Status, resp.StatusCode, resp.Proto)
-                    map_val["respstatus"] = resp.Status
-                    map_val["respstatuscode"] = fmt.Sprintf("%v", resp.StatusCode)
-                    map_val["respproto"] = fmt.Sprintf("%v", resp.Proto)
-                    //map_val["duration"] = fmt.Sprintf("%v usec", duration)
-                    /*
-                    if _, ok := map_val["reqmethod"]; ok {
-                        map_val["done"] = "true"
-                    }
-                    */
-                }
-                if resp != nil {
-                    resp.Body.Close()
-                }
+
+            node, node_session, err := getNodeIntfSession(session_key)
+            if err == nil {
+                map_val["node-interface"] = node
+                map_val["node-request-ts"] = fmt.Sprintf("%v", node_session.Req_time)
+                map_val["node-response-ts"] = fmt.Sprintf("%v", node_session.Resp_time)
+                delNodeIntfSession(node, key)
             } else {
-                // TODO(s3wong): TCP assumed for now
-                curr_byte_count += (len(*data) - 4)
+                fmt.Printf("Session not found in any node interface... posssibly local?")
             }
-            map_val["resppakcount"] = strconv.Itoa(curr_pak_count)
-            map_val["respbytecount"] = strconv.Itoa(curr_byte_count)
-            fmt.Printf("Current session packet count %v and byte count %v\n", map_val["resppakcount"], map_val["respbytecount"])
+
             if duration > 0 {
-                map_val["done"] = "true"
+                sess_map.done = true
+                err := session_table.Delete(key)
+                if err != nil {
+                    fmt.Printf("Error deleting key %v: %v\n", key, err)
+                    return err
+                }
             }
         }
     }
@@ -441,6 +542,207 @@ func setEgressTable(egress_table *bcc.Table,
     return nil
 }
 
+var nodeintfFilterList = [...]string {"lo", "veth", "docker", "flannel"}
+
+func filterNodeIntf(intf string) bool {
+    for _, substring := range nodeintfFilterList {
+        if strings.Contains(intf, substring) {
+            return false
+        }
+    }
+    return true
+}
+
+type nodeIntf struct {
+    bpfMod          *bcc.Module
+    ipTrackTable    *bcc.Table
+    sessionTable    *bcc.Table
+}
+
+var nodeIntfMap = map[string]*nodeIntf{}
+
+func setupNodeIntf(ifindex int) (*nodeIntf, error) {
+    buf, err := ioutil.ReadFile("libclovisor/ebpf/node_interface.c")
+    if err != nil {
+        fmt.Println(err)
+        return nil, err
+    }
+    code := string(buf)
+
+    bpf_mod := bcc.NewModule(code, []string{})
+
+    ingress_fn, err := bpf_mod.Load("handle_ingress",
+                                    C.BPF_PROG_TYPE_SCHED_CLS,
+                                    1, 65536)
+    if err != nil {
+        fmt.Printf("Failed to load node interface ingress func: %v\n", err)
+        return nil, err
+    }
+
+    egress_fn, err := bpf_mod.Load("handle_egress",
+                                   C.BPF_PROG_TYPE_SCHED_CLS,
+                                   1, 65536)
+    if err != nil {
+        fmt.Printf("Failed to load node interface egress func: %v\n", err)
+        return nil, err
+    }
+
+    ip_track_table := bcc.NewTable(bpf_mod.TableId("ip2track"), bpf_mod)
+    node_sess_table := bcc.NewTable(bpf_mod.TableId("node_sessions"), bpf_mod)
+
+    // check if qdisc clsact filter for this interface already exists
+    link, err := netlink.LinkByIndex(ifindex)
+    if err != nil {
+        fmt.Println(err)
+    } else {
+        qdiscs, err := netlink.QdiscList(link)
+        if err == nil {
+            for _, qdisc_ := range qdiscs {
+                if qdisc_.Type() == "clsact" {
+                    netlink.QdiscDel(qdisc_)
+                    break
+                }
+            }
+        }
+    }
+
+    attrs := netlink.QdiscAttrs {
+        LinkIndex: ifindex,
+        Handle: netlink.MakeHandle(0xffff, 0),
+        Parent: netlink.HANDLE_CLSACT,
+    }
+
+    qdisc := &netlink.GenericQdisc {
+        QdiscAttrs: attrs,
+        QdiscType:  "clsact",
+    }
+
+    if err := netlink.QdiscAdd(qdisc); err != nil {
+        fmt.Println(err)
+        return nil, err
+    }
+
+    ingress_filter_attrs := netlink.FilterAttrs{
+        LinkIndex:  ifindex,
+        Parent:     netlink.MakeHandle(0xffff, 0xfff3),
+        Priority:   1,
+        Protocol:   unix.ETH_P_ALL,
+    }
+    ingress_filter := &netlink.BpfFilter{
+        FilterAttrs:    ingress_filter_attrs,
+        Fd:             ingress_fn,
+        Name:           "handle_ingress",
+        DirectAction:   true,
+    }
+    if ingress_filter.Fd < 0 {
+        fmt.Println("Failed to load node interface ingress bpf program")
+        return nil, err
+    }
+
+    if err := netlink.FilterAdd(ingress_filter); err != nil {
+        fmt.Println(err)
+        return nil, err
+    }
+
+    egress_filter_attrs := netlink.FilterAttrs{
+        LinkIndex:  ifindex,
+        Parent:     netlink.MakeHandle(0xffff, 0xfff2),
+        Priority:   1,
+        Protocol:   unix.ETH_P_ALL,
+    }
+    egress_filter := &netlink.BpfFilter{
+        FilterAttrs:    egress_filter_attrs,
+        Fd:             egress_fn,
+        Name:           "handle_egress",
+        DirectAction:   true,
+    }
+    if egress_filter.Fd < 0 {
+        fmt.Println("Failed to load node interface egress bpf program")
+        return nil, err
+    }
+
+    if err := netlink.FilterAdd(egress_filter); err != nil {
+        fmt.Println(err)
+        return nil, err
+    }
+
+    return &nodeIntf{
+        bpfMod:         bpf_mod,
+        ipTrackTable:   ip_track_table,
+        sessionTable:   node_sess_table,
+    }, nil
+}
+
+func ClovisorPhyInfSetup() error {
+    intfList, err := net.Interfaces()
+    if err != nil {
+        fmt.Printf("Failed to get node interfaces: %v\n", err)
+        return err
+    }
+
+    for _, f := range intfList {
+        if !filterNodeIntf(f.Name) {
+            continue
+        }
+        fmt.Printf("Tracking node interface %v w/ index %v\n", f.Name, f.Index)
+        bpf_node_intf, err := setupNodeIntf(f.Index)
+        if err != nil {
+            fmt.Printf("Failed to set up node interface %v: %v\n", f.Name, err)
+            return err
+        }
+        nodeIntfMap[f.Name] = bpf_node_intf
+    }
+    return nil
+}
+
+func setIPTrackingTable(table *bcc.Table, ipaddr uint32, action int) error {
+    key, _ := table.KeyStrToBytes(strconv.Itoa(int(ipaddr)))
+    leaf, _ := table.LeafStrToBytes(strconv.Itoa(action))
+    if err := table.Set(key, leaf); err != nil {
+        fmt.Printf("Failed to set IP tracking table: %v\n", err)
+        return err
+    }
+    dumpBPFTable(table)
+    return nil
+}
+
+func setNodeIntfTrackingIP(ipaddr uint32) {
+    for name, node_intf := range nodeIntfMap {
+        err := setIPTrackingTable(node_intf.ipTrackTable, ipaddr, 1)
+        if err != nil {
+            fmt.Printf("Failed to add ip address %v to node interface %v: %v\n", backtoIP4(int64(ipaddr)), name, err)
+        }
+    }
+}
+
+func getNodeIntfSession(session_key session_key_t) (string, *session_t, error) {
+    key_buf := &bytes.Buffer{}
+    binary.Write(key_buf, binary.LittleEndian, session_key)
+    key := append([]byte(nil), key_buf.Bytes()...)
+
+    for node, node_intf := range nodeIntfMap {
+        fmt.Printf("For node interface %v... ", node)
+        //dumpBPFTable(node_intf.sessionTable)
+        if leaf, err := node_intf.sessionTable.Get(key); err == nil {
+            leaf_buf := bytes.NewBuffer(leaf)
+            session := session_t{}
+            binary.Read(leaf_buf, binary.LittleEndian, &session)
+            return node, &session, nil
+        }
+    }
+    return "", nil, errors.New("session not found")
+}
+
+func delNodeIntfSession(node_iname string, key []byte) error {
+    nodeIntf := nodeIntfMap[node_iname]
+    err := nodeIntf.sessionTable.Delete(key)
+    if err != nil {
+        fmt.Printf("Error deleting session %v from node interface %v: %v\n",
+                   key, node_iname, err)
+    }
+    return err
+}
+
 func ClovisorNewPodInit(k8s_client *ClovisorK8s,
                         node_name string,
                         pod_name string,
@@ -457,7 +759,7 @@ func ClovisorNewPodInit(k8s_client *ClovisorK8s,
         return nil, err
     }
 
-    sessionMap = map[string]session_info_t{};
+    sessionMap = map[string]*session_info_t{};
 
     fmt.Printf("Beginning network tracing for pod %v\n", pod_name)
 
@@ -492,7 +794,7 @@ func ClovisorNewPodInit(k8s_client *ClovisorK8s,
 
     traffic_table := bcc.NewTable(bpf_mod.TableId("dports2proto"), bpf_mod)
     if err := setTrafficTable(traffic_table, int(monitoring_info.port_num),
-                              monitoring_info.protocol, true);
+                              monitoring_info.protocols[0], true);
         err != nil {
         fmt.Printf("Error on setting traffic port")
         return nil, err
@@ -587,6 +889,8 @@ func ClovisorNewPodInit(k8s_client *ClovisorK8s,
         return nil, err
     }
 
+    setNodeIntfTrackingIP(ip2Long(monitoring_info.pod_ip))
+
     table := bcc.NewTable(bpf_mod.TableId("skb_events"), bpf_mod)
 
     skb_rev_chan := make(chan []byte)
@@ -597,17 +901,20 @@ func ClovisorNewPodInit(k8s_client *ClovisorK8s,
         return nil, err
     }
 
-    tracer, closer := initJaeger(monitoring_info.svc_name)
-    ticker := time.NewTicker(500 * time.Millisecond)
     stop := make(chan bool)
     go func() {
+        fmt.Printf("Start tracing to Jaeger with service %v\n", monitoring_info.svc_name)
+        tracer, closer := initJaeger(monitoring_info.svc_name)
+        tracerMap[monitoring_info.svc_name] = tracer
+        ticker := time.NewTicker(500 * time.Millisecond)
         for {
             select {
                 case <- ticker.C:
-                    print_network_traces(tracer)
+                    print_network_traces()
                 case data := <-skb_rev_chan:
                     err = handle_skb_event(&data, node_name, pod_name, session_table,
-                                           monitoring_info, egress_match_list)
+                                           monitoring_info, egress_match_list,
+                                           monitoring_info.svc_name)
                     if err != nil {
                         fmt.Printf("failed to decode received data: %s\n", err)
                     }
index f3c631a..9c552da 100644 (file)
@@ -25,6 +25,8 @@ import (
 var redisServer string = "redis.clover-system"
 var jaegerCollector string = "jaeger-collector.clover-system:14268"
 var jaegerAgent string = "jaeger-agent.clover-system:6831"
+var ProtoCfg string = "clovisor_proto_cfg"
+var protoPluginCfgChan string = "clovisor_proto_plugin_cfg_chan"
 
 /*
  * redisConnect: redis client connecting to redis server
@@ -40,6 +42,7 @@ func redisConnect() *redis.Client {
 
 func get_cfg_labels(node_name string) ([]string, error) {
     client := redisConnect()
+    defer client.Close()
     labels_list, err := client.LRange("clovisor_labels", 0, -1).Result()
     if err != nil {
         fmt.Println(err.Error())
@@ -51,6 +54,7 @@ func get_cfg_labels(node_name string) ([]string, error) {
 
 func get_egress_match_list(pod_name string) ([]egress_match_t) {
     client := redisConnect()
+    defer client.Close()
     egress_cfg_list, err := client.LRange("clovior_egress_match", 0, -1).Result()
     if err != nil {
         fmt.Println(err.Error())
@@ -85,13 +89,24 @@ func get_egress_match_list(pod_name string) ([]egress_match_t) {
 // https://www.socketloop.com/tutorials/golang-convert-ip-address-string-to-long-unsigned-32-bit-integer
 func ip2Long(ip string) uint32 {
     var long uint32
-    binary.Read(bytes.NewBuffer(net.ParseIP(ip).To4()), binary.LittleEndian, &long)
+    //binary.Read(bytes.NewBuffer(net.ParseIP(ip).To4()), binary.LittleEndian, &long)
+    binary.Read(bytes.NewBuffer(net.ParseIP(ip).To4()), binary.BigEndian, &long)
     return long
 }
 
+func backtoIP4(ipInt int64) string {
+    // need to do two bit shifting and “0xff” masking
+    b0 := strconv.FormatInt((ipInt>>24)&0xff, 10)
+    b1 := strconv.FormatInt((ipInt>>16)&0xff, 10)
+    b2 := strconv.FormatInt((ipInt>>8)&0xff, 10)
+    b3 := strconv.FormatInt((ipInt & 0xff), 10)
+    return b0 + "." + b1 + "." + b2 + "." + b3
+}
+
 func get_cfg_session_match() ([]egress_match_cfg, error) {
     var ret_list []egress_match_cfg
     client := redisConnect()
+    defer client.Close()
     keys, err := client.HKeys("clovisor_session_match").Result()
     if err != nil {
         fmt.Println(err.Error())
@@ -140,5 +155,27 @@ func initJaeger(service string) (opentracing.Tracer, io.Closer) {
 
 func get_jaeger_server() (string, error) {
     client := redisConnect()
+    defer client.Close()
     return client.Get("clovisor_jaeger_server").Result()
 }
+
+func Monitor_proto_plugin_cfg() {
+    client := redisConnect()
+    //defer client.Close()
+
+    pubsub := client.Subscribe(protoPluginCfgChan)
+    //defer pubsub.Close()
+
+    go func() {
+        for {
+            msg, err := pubsub.ReceiveMessage()
+            if err != nil {
+                fmt.Printf("Error getting protocol plugin configuration: %v\n", err)
+                return
+            }
+
+            fmt.Printf("Update on protocol %v notification received\n", msg.Payload)
+            loadProtoParser(msg.Payload, true)
+        }
+    }()
+}
index 8f4b481..9587437 100644 (file)
@@ -33,8 +33,9 @@ type monitoring_info_t struct {
     svc_name    string
     pod_name    string
     container_name  string
-    protocol    string
+    protocols   []string
     port_num    uint32
+    pod_ip      string
 }
 
 var DEFAULT_NAMESPACE = "default"
@@ -87,7 +88,10 @@ func (client *ClovisorK8s) Get_monitoring_info(nodeName string) (map[string]*mon
 func (client *ClovisorK8s) getPodsForSvc(svc *core_v1.Service,
                                          namespace string) (*core_v1.PodList, error) {
     set := labels.Set(svc.Spec.Selector)
+    //label := strings.Split(set.AsSelector().String(), ",")[0]
+    //fmt.Printf("Trying to get pods for service %v with label %v from %v\n", svc.GetName(), label, set.AsSelector().String())
     listOptions := metav1.ListOptions{LabelSelector: set.AsSelector().String()}
+    //listOptions := metav1.ListOptions{LabelSelector: label}
     return client.client.CoreV1().Pods(namespace).List(listOptions)
 }
 
@@ -104,7 +108,8 @@ func (client *ClovisorK8s) get_monitoring_pods(nodeName string,
     monitoring_info := make(map[string]*monitoring_info_t)
     if len(labels_list) == 0 {
         // TODO(s3wong): set it to 'default'
-        namespace = "linux-foundation-gke"
+        //namespace = "linux-foundation-gke"
+        namespace = "default"
         if svcs_list, err :=
             client.client.CoreV1().Services(namespace).List(metav1.ListOptions{});
                         err != nil {
@@ -151,14 +156,25 @@ func (client *ClovisorK8s) get_monitoring_pods(nodeName string,
     for ns, svc_slice := range ns_svc_map {
         for _, svc_list_ := range svc_slice {
             for _, svc := range svc_list_.Items {
-                fmt.Printf("Looking for supported protocols for service %v:%v\n", ns, svc.GetName())
-                var svc_port_map = map[string]core_v1.ServicePort{}
+                if ns == "default" && svc.GetName() == "kubernetes" {
+                    continue
+                }
+                //fmt.Printf("Looking for supported protocols for service %v:%v\n", ns, svc.GetName())
+                //var svc_port_map = map[string]core_v1.ServicePort{}
+                var svc_port_map = map[string][]string{}
                 for _, svc_port := range svc.Spec.Ports {
                     if len(svc_port.Name) > 0 {
-                        for _, sp := range SUPPORTED_PROTOCOLS {
-                            if strings.Contains(svc_port.Name, sp) {
-                                target_port := svc_port.TargetPort.String()
-                                svc_port_map[target_port] = svc_port
+                        svc_protos := strings.Split(svc_port.Name, "-")
+                        for _, proto := range svc_protos {
+                            if err := loadProtoParser(proto, false); err == nil {
+                                for _, sp := range SUPPORTED_PROTOCOLS {
+                                    if strings.Contains(proto, sp) {
+                                        target_port := svc_port.TargetPort.String()
+                                        svc_port_map[target_port] = append(svc_port_map[target_port], proto)
+                                    }
+                                }
+                            } else {
+                                fmt.Printf("Unsupported protocol: %v\n", proto)
                             }
                         }
                     }
@@ -166,13 +182,23 @@ func (client *ClovisorK8s) get_monitoring_pods(nodeName string,
                 if len(svc_port_map) == 0 {
                     fmt.Printf("Found no port with supported protocol for %v:%v\n", ns, svc.GetName())
                     continue
+                } else {
+                    fmt.Printf("svc_port_map for service %v is %v\n", svc.GetName(), svc_port_map)
                 }
-                fmt.Printf("Fetching pods for namespace %v service: %v\n", ns, svc.GetName())
+                //fmt.Printf("Fetching pods for namespace %v service: %v\n", ns, svc.GetName())
                 pod_list, err := client.getPodsForSvc(&svc, ns)
                 if err != nil {
                     fmt.Print("Error fetching pods for %v:%v [%v]\n", ns, svc.GetName(), err)
                     continue
                 }
+                /*
+                labelSet := labels.Set(svc.Spec.Selector)
+                pod_list, err := client.client.CoreV1().Pods(ns).List(metav1.ListOptions{})
+                if err != nil {
+                    fmt.Print("Error fetching pods for %v:%v [%v]\n", ns, svc.GetName(), err)
+                    continue
+                }
+                */
                 for _, pod := range pod_list.Items {
                     if pod.Spec.NodeName == nodeName {
                         for _, container := range pod.Spec.Containers {
@@ -191,12 +217,8 @@ func (client *ClovisorK8s) get_monitoring_pods(nodeName string,
                                 monitoring_info[pod_name].pod_name = pod_name
                                 monitoring_info[pod_name].container_name = container.Name
                                 monitoring_info[pod_name].port_num = port_num
-                                svc_port_name := svc_port_map[tp_string].Name
-                                if (strings.Contains(svc_port_name, "-")) {
-                                    monitoring_info[pod_name].protocol = svc_port_name[:strings.Index(svc_port_name, "-")]
-                                } else {
-                                    monitoring_info[pod_name].protocol = svc_port_name
-                                }
+                                monitoring_info[pod_name].protocols = svc_port_map[tp_string]
+                                monitoring_info[pod_name].pod_ip = pod.Status.PodIP
                             }
                         }
                     }
diff --git a/clover/clovisor/libclovisor/ebpf/node_interface.c b/clover/clovisor/libclovisor/ebpf/node_interface.c
new file mode 100755 (executable)
index 0000000..cd14a50
--- /dev/null
@@ -0,0 +1,158 @@
+// Copyright (c) Authors of Clover
+//
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Apache License, Version 2.0
+// which accompanies this distribution, and is available at
+// http://www.apache.org/licenses/LICENSE-2.0
+#include <uapi/linux/if_ether.h>
+#include <uapi/linux/in.h>
+#include <uapi/linux/ip.h>
+#include <uapi/linux/tcp.h>
+#include <uapi/linux/pkt_cls.h>
+#include <uapi/linux/bpf.h>
+
+#include <bcc/proto.h>
+
+#define MAX_SESSION_TABLE_ENTRIES 8192
+
+typedef struct session_key_ {
+    u32 src_ip;
+    u32 dst_ip;
+    unsigned short src_port;
+    unsigned short dst_port;
+} session_key_t;
+
+typedef struct session_ {
+    u64 req_time;
+    u64 resp_time;
+} session_t;
+
+BPF_HASH(ip2track, u32, u32);
+BPF_HASH(node_sessions, session_key_t, session_t, MAX_SESSION_TABLE_ENTRIES);
+
+struct eth_hdr {
+       unsigned char   h_dest[ETH_ALEN];
+       unsigned char   h_source[ETH_ALEN];
+       unsigned short  h_proto;
+};
+
+static inline int ipv4_hdrlen(struct iphdr *ip4)
+{
+    return ip4->ihl * 4;
+}
+
+static inline int tcp_doff(struct tcphdr *tcp_hdr)
+{
+    return tcp_hdr->doff * 4;
+}
+
+static inline void fill_up_sess_key(session_key_t *key, u32 src_ip,
+                                    u32 dst_ip, u16 src_port, u16 dst_port)
+{
+    key->src_ip = src_ip;
+    key->dst_ip = dst_ip;
+    key->src_port = src_port;
+    key->dst_port = dst_port;
+}
+
+static inline void process_response(u32 src_ip, u32 dst_ip, u16 src_port,
+                                    u16 dst_port)
+{
+    session_key_t sess_key = {};
+    session_t *session_ptr = NULL;
+    fill_up_sess_key(&sess_key, src_ip, dst_ip, src_port, dst_port);
+    session_ptr = node_sessions.lookup(&sess_key);
+    if (session_ptr != NULL) {
+        u64 resp_time = bpf_ktime_get_ns();
+        session_t update_session = {
+            session_ptr->req_time,
+            resp_time
+        };
+        node_sessions.update(&sess_key, &update_session);
+    }
+}
+
+static inline void process_request(u32 src_ip, u32 dst_ip, u16 src_port,
+                                   u16 dst_port)
+{
+    session_key_t sess_key = {};
+    session_t *session_ptr = NULL;
+    session_t new_session = {
+        bpf_ktime_get_ns(),
+        0
+    };
+    fill_up_sess_key(&sess_key, src_ip, dst_ip, src_port, dst_port);
+    session_ptr = node_sessions.lookup(&sess_key);
+    if (! session_ptr) {
+        node_sessions.insert(&sess_key, &new_session);
+    }
+}
+
+static inline void ingress_parsing(struct tcphdr *tcp_hdr,
+                                   struct iphdr *ipv4_hdr)
+{
+    unsigned int dst_ip = ntohl(ipv4_hdr->daddr);
+    int ret = 0;
+
+    unsigned int *proto = ip2track.lookup(&dst_ip);
+    if (proto != NULL) {
+        process_response(ntohl(ipv4_hdr->daddr),
+                         ntohl(ipv4_hdr->saddr),
+                         ntohs(tcp_hdr->dest),
+                         ntohs(tcp_hdr->source));
+    }
+}
+
+static inline void egress_parsing(struct tcphdr *tcp_hdr,
+                                  struct iphdr *ipv4_hdr)
+{
+    unsigned int src_ip = ntohl(ipv4_hdr->saddr);
+
+    unsigned int *proto = ip2track.lookup(&src_ip);
+
+    if (proto != NULL) {
+        process_request(ntohl(ipv4_hdr->saddr),
+                        ntohl(ipv4_hdr->daddr),
+                        ntohs(tcp_hdr->source),
+                        ntohs(tcp_hdr->dest));
+    }
+}
+
+static inline int handle_packet(struct __sk_buff *skb, int is_ingress)
+{
+       void *data = (void *)(long)skb->data;
+       void *data_end = (void *)(long)skb->data_end;
+       struct eth_hdr *eth = data;
+    struct iphdr *ipv4_hdr = data + sizeof(*eth);
+    struct tcphdr *tcp_hdr = data + sizeof(*eth) + sizeof(*ipv4_hdr);
+
+    /* TODO(s3wong): assuming TCP only for now */
+       /* single length check */
+       if (data + sizeof(*eth) + sizeof(*ipv4_hdr) + sizeof(*tcp_hdr) > data_end)
+               return TC_ACT_OK;
+
+    if (eth->h_proto != htons(ETH_P_IP))
+        return TC_ACT_OK;
+
+    // TODO(s3wong): no support for IP options
+    if (ipv4_hdr->protocol != IPPROTO_TCP || ipv4_hdr->ihl != 5)
+        return TC_ACT_OK;
+
+    if (is_ingress == 1) {
+        ingress_parsing(tcp_hdr, ipv4_hdr);
+    } else{
+        egress_parsing(tcp_hdr, ipv4_hdr);
+    }
+
+       return TC_ACT_OK;
+}
+
+int handle_ingress(struct __sk_buff *skb)
+{
+    return handle_packet(skb, 1);
+}
+
+int handle_egress(struct __sk_buff *skb)
+{
+    return handle_packet(skb, 0);
+}
diff --git a/clover/clovisor/libclovisor/libproto/build-plugin b/clover/clovisor/libclovisor/libproto/build-plugin
new file mode 100755 (executable)
index 0000000..743071b
--- /dev/null
@@ -0,0 +1,9 @@
+#!/bin/bash
+#
+# Copyright (c) Authors of Clover
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+go build --buildmode=plugin -o ../../proto/http.so clovisor_http.go
diff --git a/clover/clovisor/libclovisor/libproto/clovisor_http.go b/clover/clovisor/libclovisor/libproto/clovisor_http.go
new file mode 100644 (file)
index 0000000..6440d5a
--- /dev/null
@@ -0,0 +1,84 @@
+// Copyright (c) Authors of Clover
+//
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Apache License, Version 2.0
+// which accompanies this distribution, and is available at
+// http://www.apache.org/licenses/LICENSE-2.0
+
+package main
+
+import (
+    "bufio"
+    "bytes"
+    "fmt"
+    "io/ioutil"
+    "net/http"
+)
+
+type httpparser string
+
+func (p httpparser) Parse(session_key string,
+                          is_req bool,
+                          data []byte) ([]byte, map[string]string) {
+    map_val := make(map[string]string)
+    reader := bytes.NewReader(data)
+    buf := bufio.NewReader(reader)
+    if is_req == true {
+        req, err := http.ReadRequest(buf)
+        if err != nil {
+            fmt.Printf("Request error: ")
+            fmt.Println(err)
+            return nil, nil
+        } else if req == nil {
+            fmt.Println("request is nil")
+            return nil, nil
+        } else {
+            fmt.Printf("HTTP Request Method %s url %v proto %v\n",
+                       req.Method, req.URL, req.Proto)
+            map_val["reqmethod"] = req.Method
+            map_val["requrl"] = fmt.Sprintf("%v", req.URL)
+            map_val["reqproto"] = fmt.Sprintf("%v", req.Proto)
+            if user_agent := req.UserAgent(); len(user_agent) > 0 {
+                map_val["useragent"] = user_agent
+            }
+            if len(req.Host) > 0 {
+                map_val["host"] = req.Host
+            }
+            header := req.Header
+            if req_id := header.Get("X-Request-Id"); len(req_id) > 0 {
+                map_val["requestid"] = req_id
+            }
+            if envoy_dec := header.Get("X-Envoy-Decorator-Operation"); len(envoy_dec) > 0 {
+                map_val["envoydecorator"] = envoy_dec
+            }
+            if trace_id := header.Get("X-B3-Traceid"); len(trace_id) > 0 {
+                map_val["traceid"] = trace_id
+            }
+            body, err := ioutil.ReadAll(req.Body)
+            if err != nil {
+                fmt.Printf("Error reading HTTP Request body %v\n", err.Error())
+            }
+            return body, map_val
+        }
+    } else {
+        // response
+        resp, err := http.ReadResponse(buf, nil)
+        if err != nil {
+            fmt.Printf("Response error: ")
+            fmt.Println(err)
+            return nil, nil
+        }
+        fmt.Printf("HTTP Response Status %v code %v Proto %v\n",
+                    resp.Status, resp.StatusCode, resp.Proto)
+        map_val["respstatus"] = resp.Status
+        map_val["respstatuscode"] = fmt.Sprintf("%v", resp.StatusCode)
+        map_val["respproto"] = fmt.Sprintf("%v", resp.Proto)
+        body, err := ioutil.ReadAll(resp.Body)
+        if err != nil {
+            fmt.Printf("Error reading HTTP Request body %v\n", err.Error())
+        }
+        return body, map_val
+    }
+}
+
+var Parser httpparser
diff --git a/clover/clovisor/libclovisor/libproto/http_alt.go b/clover/clovisor/libclovisor/libproto/http_alt.go
new file mode 100644 (file)
index 0000000..f7581fb
--- /dev/null
@@ -0,0 +1,93 @@
+// Copyright (c) Authors of Clover
+//
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Apache License, Version 2.0
+// which accompanies this distribution, and is available at
+// http://www.apache.org/licenses/LICENSE-2.0
+
+package main
+
+import (
+    "bufio"
+    "bytes"
+    "fmt"
+    "io/ioutil"
+    "net/http"
+)
+
+type httpparser string
+
+func (p httpparser) Parse(session_key string,
+                          is_req bool,
+                          data []byte) ([]byte, map[string]string) {
+    map_val := make(map[string]string)
+    reader := bytes.NewReader(data)
+    buf := bufio.NewReader(reader)
+    if is_req == true {
+        req, err := http.ReadRequest(buf)
+        if err != nil {
+            fmt.Printf("Request error: ")
+            fmt.Println(err)
+            return nil, nil
+        } else if req == nil {
+            fmt.Println("request is nil")
+            return nil, nil
+        } else {
+            fmt.Printf("HTTP Request Method %s url %v proto %v\n",
+                       req.Method, req.URL, req.Proto)
+            map_val["reqmethod"] = req.Method
+            map_val["requrl"] = fmt.Sprintf("%v", req.URL)
+            map_val["reqproto"] = fmt.Sprintf("%v", req.Proto)
+            if user_agent := req.UserAgent(); len(user_agent) > 0 {
+                map_val["useragent"] = user_agent
+            }
+            if len(req.Host) > 0 {
+                map_val["host"] = req.Host
+            }
+            header := req.Header
+            if req_id := header.Get("X-Request-Id"); len(req_id) > 0 {
+                map_val["requestid"] = req_id
+            }
+            if envoy_dec := header.Get("X-Envoy-Decorator-Operation"); len(envoy_dec) > 0 {
+                map_val["envoydecorator"] = envoy_dec
+            }
+            if trace_id := header.Get("X-B3-Traceid"); len(trace_id) > 0 {
+                map_val["traceid"] = trace_id
+            }
+            body, err := ioutil.ReadAll(req.Body)
+            if err != nil {
+                fmt.Printf("Error reading HTTP Request body %v\n", err.Error())
+            }
+            return body, map_val
+        }
+    } else {
+        // response
+        resp, err := http.ReadResponse(buf, nil)
+        if err != nil {
+            fmt.Printf("Response error: ")
+            fmt.Println(err)
+            return nil, nil
+        }
+        fmt.Printf("HTTP Response Status %v code %v Proto %v\n",
+                    resp.Status, resp.StatusCode, resp.Proto)
+        map_val["respstatus"] = resp.Status
+        map_val["respstatuscode"] = fmt.Sprintf("%v", resp.StatusCode)
+        map_val["respproto"] = fmt.Sprintf("%v", resp.Proto)
+        header := resp.Header
+        //fmt.Printf("Response Header contains %v\n", header)
+        if contentType := header.Get("Content-Type"); len(contentType) > 0 {
+            map_val["content-type"] = contentType
+        }
+        if lastMod := header.Get("Last-Modified"); len(lastMod) > 0 {
+            map_val["last-modified"] = lastMod
+        }
+
+        body, err := ioutil.ReadAll(resp.Body)
+        if err != nil {
+            fmt.Printf("Error reading HTTP Request body %v\n", err.Error())
+        }
+        return body, map_val
+    }
+}
+
+var Parser httpparser
diff --git a/clover/clovisor/proto/http.so b/clover/clovisor/proto/http.so
new file mode 100644 (file)
index 0000000..1a61fb2
Binary files /dev/null and b/clover/clovisor/proto/http.so differ