CLOVER-43: Initial commit for Clovisor 17/62117/9
authorStephen Wong <stephen.kf.wong@gmail.com>
Tue, 11 Sep 2018 23:37:33 +0000 (23:37 +0000)
committerStephen Wong <stephen.kf.wong@gmail.com>
Thu, 1 Nov 2018 06:42:03 +0000 (06:42 +0000)
Part of Clover's initial task (during project proposal) is to explore
cloud native network tracing tools. Clover the project proposes
Clovisor as a way for Clover to use the IOvisor
(https://www.iovisor.org/) to perform network tracing. This is the first
commit of this module

Please note that the BPF code used in this commit were only tested with
Linux kernel 4.14+ --- hence, if ubuntu is used, and one prefers LTS,
the requirement is 18.04+

A different patch will track the redis and k8s API server watcher code

Also, a design doc will be added in the future (TBD)

Signed-off-by: Stephen Wong <stephen.kf.wong@gmail.com>
Change-Id: I30d9f9d474b8703097c470d39628e86bc788f9b6

12 files changed:
clover/clovisor/Dockerfile [new file with mode: 0644]
clover/clovisor/bin/clovisor [new file with mode: 0755]
clover/clovisor/build-docker [new file with mode: 0755]
clover/clovisor/build.sh [new file with mode: 0755]
clover/clovisor/clovisor.yaml [new file with mode: 0644]
clover/clovisor/clovisor_main.go [new file with mode: 0644]
clover/clovisor/libclovisor/clovisor_bcc.go [new file with mode: 0644]
clover/clovisor/libclovisor/clovisor_cfg.go [new file with mode: 0644]
clover/clovisor/libclovisor/clovisor_k8s.go [new file with mode: 0644]
clover/clovisor/libclovisor/ebpf/session_tracking.c [new file with mode: 0755]
clover/clovisor/libclovisor/jaeger-all-in-one-template.yml [new file with mode: 0644]
clover/clovisor/libclovisor/redis.yaml [new file with mode: 0644]

diff --git a/clover/clovisor/Dockerfile b/clover/clovisor/Dockerfile
new file mode 100644 (file)
index 0000000..4df4ee5
--- /dev/null
@@ -0,0 +1,17 @@
+FROM ubuntu:18.04
+
+ARG TARGET_KERNEL_VER
+
+RUN set -ex; \
+  echo "deb [trusted=yes] http://repo.iovisor.org/apt/bionic bionic main" > /etc/apt/sources.list.d/iovisor.list; \
+  apt-get update -y; \
+  DEBIAN_FRONTEND=noninteractive apt-get install -y \
+    auditd \
+    bcc-tools \
+    linux-headers-$TARGET_KERNEL_VER \
+    libelf1;
+
+COPY . .
+RUN chmod +x clovisor
+
+CMD ["./clovisor"]
diff --git a/clover/clovisor/bin/clovisor b/clover/clovisor/bin/clovisor
new file mode 100755 (executable)
index 0000000..bd94d65
Binary files /dev/null and b/clover/clovisor/bin/clovisor differ
diff --git a/clover/clovisor/build-docker b/clover/clovisor/build-docker
new file mode 100755 (executable)
index 0000000..c724c8c
--- /dev/null
@@ -0,0 +1,18 @@
+#!/bin/bash
+#
+# Copyright (c) Authors of Clover
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+if [ -z "$1" ]
+  then
+    kernel_ver=`uname -r`
+  else
+    kernel_ver=$1
+fi
+cp bin/clovisor .
+docker build --build-arg TARGET_KERNEL_VER=$kernel_ver -t clovisor .
+docker tag clovisor localhost:5000/clovisor
+docker push localhost:5000/clovisor
diff --git a/clover/clovisor/build.sh b/clover/clovisor/build.sh
new file mode 100755 (executable)
index 0000000..4503d5a
--- /dev/null
@@ -0,0 +1,45 @@
+# Copyright (c) Authors of Clover
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+GOVERSION=1.10.3
+OS=linux
+ARCH=amd64
+GOPATH=/home/ubuntu/go
+CLIENTGOVERSION=v8.0.0
+
+SRCDIR=`pwd`
+
+wget https://dl.google.com/go/go$GOVERSION.$OS-$ARCH.tar.gz
+sudo tar -C /usr/local -xzf go$GOVERSION.$OS-$ARCH.tar.gz
+export PATH=$PATH:/usr/local/go/bin
+export PATH=$GOPATH/bin:$PATH
+
+sudo apt install -y gcc
+sudo apt-key adv --keyserver keyserver.ubuntu.com --recv-keys D4284CDD
+echo "deb https://repo.iovisor.org/apt/bionic bionic main" | sudo tee /etc/apt/sources.list.d/iovisor.list
+sudo apt-get update -y
+sudo apt-get install -y bcc-tools libbcc-examples linux-headers-$(uname -r)
+
+go get github.com/google/gopacket
+go get github.com/iovisor/gobpf
+go get github.com/opentracing/opentracing-go
+go get github.com/pkg/errors
+go get github.com/uber/jaeger-client-go
+go get github.com/vishvananda/netlink
+go get github.com/vishvananda/netns
+go get golang.org/x/sys/unix
+
+go get github.com/tools/godep
+go get k8s.io/client-go/...
+cd $GOPATH/src/k8s.io/client-go
+git checkout $CLIENTGOVERSION
+godep restore ./...
+
+cd $SRCDIR/libclovisor
+go build .
+cd ../
+go build -o clovisor .
diff --git a/clover/clovisor/clovisor.yaml b/clover/clovisor/clovisor.yaml
new file mode 100644 (file)
index 0000000..7d5e43b
--- /dev/null
@@ -0,0 +1,55 @@
+apiVersion: v1
+kind: Namespace
+metadata:
+  name: clovisor
+  labels:
+    name: clovisor
+---
+apiVersion: v1
+kind: ServiceAccount
+metadata:
+  name: clovisor
+  namespace: clovisor
+---
+apiVersion: rbac.authorization.k8s.io/v1beta1
+kind: ClusterRoleBinding
+metadata:
+  name: serv-account-rbac-clovisor
+subjects:
+  - kind: ServiceAccount
+    # Reference to upper's `metadata.name`
+    name: default
+    # Reference to upper's `metadata.namespace`
+    namespace: clovisor
+roleRef:
+  kind: ClusterRole
+  name: cluster-admin
+  apiGroup: rbac.authorization.k8s.io
+---
+apiVersion: extensions/v1beta1
+kind: DaemonSet
+metadata:
+  name: clovisor
+  namespace: clovisor
+spec:
+  selector:
+    matchLabels:
+      app: clovisor
+  template:
+    metadata:
+      name: clovisor
+      labels:
+        app: clovisor
+    spec:
+      hostNetwork: true
+      dnsPolicy: ClusterFirstWithHostNet
+      containers:
+      - name: clovisor
+        image: localhost:5000/clovisor
+        securityContext:
+          privileged: true
+        env:
+        - name: MY_NODE_NAME
+          valueFrom:
+            fieldRef:
+              fieldPath: spec.nodeName
diff --git a/clover/clovisor/clovisor_main.go b/clover/clovisor/clovisor_main.go
new file mode 100644 (file)
index 0000000..46b1780
--- /dev/null
@@ -0,0 +1,58 @@
+// Copyright (c) Authors of Clover
+//
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Apache License, Version 2.0
+// which accompanies this distribution, and is available at
+// http://www.apache.org/licenses/LICENSE-2.0
+
+package main
+
+import (
+    "fmt"
+    "os"
+    "os/signal"
+    "syscall"
+
+    clovisor "./libclovisor"
+)
+
+var podMonitoringMap map[string]*clovisor.ClovisorBCC
+
+func main() {
+    node_name := os.Getenv("MY_NODE_NAME")
+
+    podMonitoringMap = make(map[string]*clovisor.ClovisorBCC)
+
+    clovisor_k8s_client, err := clovisor.K8s_client_init(node_name)
+    if err != nil {
+        fmt.Printf("Clovisor to Kubernetes connectivity failed: %v\n", err)
+        return
+    }
+    fmt.Printf("Clovisor got k8s client succeed\n")
+
+    monitoring_info_map, err := clovisor_k8s_client.Get_monitoring_info(node_name)
+    if err != nil {
+        fmt.Printf("Clovisor getting monitoring info failed: %v\n", err)
+        return
+    }
+    fmt.Printf("Clovisor get monitoring info succeed: %v\n", monitoring_info_map)
+
+    for pod := range monitoring_info_map {
+        podMon, err := clovisor.ClovisorNewPodInit(clovisor_k8s_client, pod,
+                                                   monitoring_info_map[pod])
+        if err != nil {
+            fmt.Printf("Clovisor monitoring pod %s failed: %v\n", pod, err)
+            continue
+        }
+        podMonitoringMap[pod] = podMon
+    }
+
+    sig := make(chan os.Signal, 1)
+    signal.Notify(sig, os.Interrupt, os.Kill, syscall.SIGTERM)
+    <-sig
+    for pod := range podMonitoringMap {
+        fmt.Printf("Send stop pod to pod %v\n", pod)
+        podMonitoringMap[pod].StopPod()
+    }
+}
+
diff --git a/clover/clovisor/libclovisor/clovisor_bcc.go b/clover/clovisor/libclovisor/clovisor_bcc.go
new file mode 100644 (file)
index 0000000..4dc936d
--- /dev/null
@@ -0,0 +1,552 @@
+// Copyright (c) Authors of Clover
+//
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Apache License, Version 2.0
+// which accompanies this distribution, and is available at
+// http://www.apache.org/licenses/LICENSE-2.0
+
+package clovisor
+
+import (
+    "encoding/hex"
+    "bufio"
+    "bytes"
+    "encoding/binary"
+    "fmt"
+    "io/ioutil"
+    "net/http"
+    "strconv"
+    "time"
+
+    "github.com/google/gopacket"
+    "github.com/google/gopacket/layers"
+    "github.com/iovisor/gobpf/bcc"
+    opentracing "github.com/opentracing/opentracing-go"
+    "github.com/vishvananda/netlink"
+
+    "golang.org/x/sys/unix"
+)
+
+/*
+#cgo CFLAGS: -I/usr/include/bcc/compat
+#cgo LDFLAGS: -lbcc
+#include <bcc/bpf_common.h>
+#include <bcc/libbpf.h>
+*/
+import "C"
+
+type ClovisorBCC struct {
+    stopChan    chan bool
+    // TODO(s3wong): remove once k8s watcher available
+    qdisc       *netlink.GenericQdisc
+}
+
+type session_key_t struct {
+    src_ip      uint32
+    dst_ip      uint32
+    src_port    uint16
+    dst_port    uint16
+}
+
+type session_t struct {
+    Req_time    uint64
+    Resp_time   uint64
+}
+
+type egress_match_t struct {
+    dst_ip      uint32
+    dst_port    uint16
+}
+
+type egress_match_cfg struct {
+    egress_match    egress_match_t
+    action          string
+    params          string
+}
+
+type session_info_t struct {
+    session     map[string]string
+    buf         []byte
+}
+
+const (
+    HTTP = 1 << iota
+    HTTP2 = 1 << iota
+    TCP = 1 << iota
+    UDP = 1 << iota
+)
+
+//var sessionMap map[string]map[string]string;
+var sessionMap map[string]session_info_t;
+
+var veth_ifidx_command = "cat /sys/class/net/eth0/iflink";
+
+var protocolMap = map[string]int{
+    "http":     1,
+    "http2":    2,
+    "tcp":      3,
+    "udp":      4,
+}
+
+func linkSetup(ifname string) netlink.Link {
+    link, err := netlink.LinkByName(ifname)
+    netlink.LinkSetUp(link)
+    if err != nil {
+        fmt.Println(err)
+        return nil
+    }
+    return link
+}
+
+/*
+ * dumpBPFTable: for debug purpose
+ */
+func dumpBPFTable(table *bcc.Table) {
+    iterator := table.Iter()
+    if iterator == nil {
+        fmt.Printf("Table %v does not exist\n", table.Name())
+    } else {
+        for iterator.Next() {
+            key_str, _ := table.KeyBytesToStr(iterator.Key())
+            leaf_str, _ := table.LeafBytesToStr(iterator.Leaf())
+            fmt.Printf("table %v key: %v  leaf: %v\n", table.Name(), key_str, leaf_str)
+        }
+    }
+}
+
+func print_network_traces(tracer opentracing.Tracer) {
+    for key, sess_info := range sessionMap {
+        value := sess_info.session
+        if _, ok := value["done"]; ok {
+            span := tracer.StartSpan("http-tracing")
+            span.SetTag("Pod-Name", value["podname"])
+            span.SetTag("Source-IP", value["srcip"])
+            span.SetTag("Destination-IP", value["dstip"])
+            span.SetTag("Source-Port", value["srcport"])
+            span.SetTag("Destination-Port", value["dstport"])
+            span.SetTag("HTTP Request Method", value["reqmethod"])
+            span.SetTag("HTTP Request URL", value["requrl"])
+            span.SetTag("HTTP Request Protocol", value["reqproto"])
+            span.SetTag("HTTP Response Status", value["respstatus"])
+            span.SetTag("HTTP Response Status Code", value["respstatuscode"])
+            span.SetTag("HTTP Response Protocol", value["respproto"])
+            span.SetTag("HTTP Session Duration", value["duration"])
+            span.Finish()
+            delete(sessionMap, key)
+        }
+    }
+}
+
+func handle_skb_event(data *[]byte, pod_name string, session_table *bcc.Table,
+                      monitoring_info *monitoring_info_t,
+                      egress_match_list []egress_match_t) (error) {
+    //fmt.Printf("monitoring info has %v\n", monitoring_info)
+    fmt.Printf("\n\n%s", hex.Dump(*data))
+    var src_ip, dst_ip uint32
+    var src_port, dst_port uint16
+    var session_key, src_ip_str, dst_ip_str string
+    proto := HTTP
+    is_ingress:= binary.LittleEndian.Uint32((*data)[0:4])
+    packet := gopacket.NewPacket((*data)[4:len(*data)],
+                                 layers.LayerTypeEthernet,
+                                 gopacket.Default)
+    if ipv4_layer := packet.Layer(layers.LayerTypeIPv4); ipv4_layer != nil {
+        ipv4, _ := ipv4_layer.(*layers.IPv4)
+        src_ip_str = ipv4.SrcIP.String()
+        dst_ip_str = ipv4.DstIP.String()
+        fmt.Printf("Source: %s      Dest: %s\n", src_ip_str, dst_ip_str)
+        // Note: the src_ip and dst_ip var here are ONLY being used as
+        // lookup key to eBPF hash table, hence preserving network
+        // byte order
+        src_ip = binary.BigEndian.Uint32(ipv4.SrcIP)
+        dst_ip = binary.BigEndian.Uint32(ipv4.DstIP)
+    }
+    tcp_layer := packet.Layer(layers.LayerTypeTCP)
+    if tcp_layer != nil {
+        tcp, _ := tcp_layer.(*layers.TCP)
+        fmt.Printf("From src port %d to dst port %d [%v]: FIN:%v|SYN:%v|RST:%v|PSH:%v|ACK:%v|URG:%v|ECE:%v|CWR:%v|NS:%v\n",
+                    tcp.SrcPort, tcp.DstPort, tcp.DataOffset, tcp.FIN, tcp.SYN,
+                    tcp.RST, tcp.PSH, tcp.ACK, tcp.URG, tcp.ECE, tcp.CWR, tcp.NS)
+        //src_port := binary.LittleEndian.Uint16(uint16(tcp.SrcPort))
+        //dst_port := binary.LittleEndian.Uint16(uint16(tcp.DstPort))
+        src_port = uint16(tcp.SrcPort)
+        dst_port = uint16(tcp.DstPort)
+    } else {
+        fmt.Printf("Non-TCP packet, skip tracing...\n")
+        return nil
+    }
+    fmt.Printf("proto: %d is_ingress: %d data length %v\n", proto, is_ingress, len(*data))
+    fmt.Println("dst_port is ", dst_port)
+    if dst_port == 0 {
+        return nil
+    }
+    // TODO(s3wong): dump table
+    dumpBPFTable(session_table)
+    egress_port_req := false
+    for _, port := range egress_match_list {
+        if port.dst_port == dst_port {
+            egress_port_req = true
+            break
+        }
+    }
+    app_layer := packet.ApplicationLayer()
+    if app_layer == nil {
+        fmt.Printf("No application layer, TCP packet\n")
+        proto = TCP
+    }
+    if dst_port == uint16(monitoring_info.port_num) || egress_port_req {
+        session_key = fmt.Sprintf("%x.%x:%d:%d", src_ip, dst_ip, src_port,
+                                  dst_port)
+        if _, ok := sessionMap[session_key]; !ok {
+            sessionMap[session_key] = session_info_t{}
+            sess_map := sessionMap[session_key]
+            sess_map.session = make(map[string]string)
+            sess_map.buf = []byte{}
+            sessionMap[session_key] = sess_map
+        }
+        map_val := sessionMap[session_key].session
+        map_val["podname"] = pod_name
+        map_val["srcip"] = src_ip_str
+        map_val["dstip"] = dst_ip_str
+        map_val["srcport"] = fmt.Sprintf("%d", src_port)
+        map_val["dstport"] = fmt.Sprintf("%d", dst_port)
+        if proto == HTTP {
+            reader := bytes.NewReader(app_layer.Payload())
+            buf := bufio.NewReader(reader)
+            req, err := http.ReadRequest(buf)
+            if err != nil {
+                fmt.Printf("Request error: ")
+                fmt.Println(err)
+            } else if req == nil {
+                fmt.Println("request is nil")
+            } else {
+                fmt.Printf("HTTP Request Method %s url %v proto %v\n",
+                            req.Method, req.URL, req.Proto)
+                map_val["reqmethod"] = req.Method
+                map_val["requrl"] = fmt.Sprintf("%v", req.URL)
+                map_val["reqproto"] = fmt.Sprintf("%v", req.Proto)
+                if _, ok := map_val["respstatus"]; ok {
+                    map_val["done"] = "true"
+                }
+            }
+        }
+    } else {
+        session_key := session_key_t {
+            src_ip: dst_ip,
+            dst_ip: src_ip,
+            src_port:   dst_port,
+            dst_port:   src_port,
+        }
+        key_buf := &bytes.Buffer{}
+        err := binary.Write(key_buf, binary.LittleEndian, session_key)
+        if err != nil {
+            fmt.Println(err)
+            return nil
+        }
+        key := append([]byte(nil), key_buf.Bytes()...)
+        if leaf, err := session_table.Get(key); err != nil {
+            fmt.Printf("Failed to lookup key %v with err %v\n", session_key, err)
+            return nil
+        } else {
+            var duration uint64 = 0
+            leaf_buf := bytes.NewBuffer(leaf)
+            if leaf_buf == nil {
+                fmt.Println("Error: leaf is nil")
+                return nil
+            }
+            session := session_t{}
+            if err = binary.Read(leaf_buf, binary.LittleEndian, &session);
+                err != nil {
+                fmt.Println(err)
+                return nil
+            }
+            if session.Resp_time == 0 {
+                fmt.Printf("session response time not set?\n")
+            } else {
+                duration = (session.Resp_time - session.Req_time)/1000
+                fmt.Printf("Leaf %v\n", leaf)
+                fmt.Printf("Duration: %d usec\n", duration)
+            }
+            sess_key := fmt.Sprintf("%x.%x:%d:%d", dst_ip, src_ip,
+                                    dst_port, src_port)
+            if _, ok := sessionMap[sess_key]; !ok {
+                //sessionMap[sess_key] = make(map[string]string)
+                sessionMap[sess_key] = session_info_t{}
+                sess_map := sessionMap[sess_key]
+                sess_map.session = make(map[string]string)
+                sess_map.buf = []byte{}
+                sessionMap[sess_key] = sess_map
+            }
+            var map_val = sessionMap[sess_key].session
+            map_val["podname"] = pod_name
+            map_val["srcip"] = dst_ip_str
+            map_val["dstip"] = src_ip_str
+            map_val["srcport"] = fmt.Sprintf("%d", dst_port)
+            map_val["dstport"] = fmt.Sprintf("%d", src_port)
+            map_val["duration"] = fmt.Sprintf("%v usec", duration)
+
+            if proto == HTTP {
+                reader := bytes.NewReader(app_layer.Payload())
+                buf := bufio.NewReader(reader)
+                resp, err := http.ReadResponse(buf, nil)
+                read_http := true
+                if err != nil {
+                    fmt.Printf("Response error: ")
+                    fmt.Println(err)
+                    sess_map := sessionMap[sess_key]
+                    sess_map.buf = append(sess_map.buf, app_layer.Payload()...)
+                    reader = bytes.NewReader(sess_map.buf)
+                    buf = bufio.NewReader(reader)
+                    resp, err = http.ReadResponse(buf, nil)
+                    if err != nil || resp == nil {
+                        if err != nil {
+                            fmt.Printf("Response error: %v\n", err)
+                        }
+                        read_http = false
+                    }
+                    sessionMap[sess_key] = sess_map
+                } else if resp == nil {
+                    fmt.Println("response is nil")
+                    read_http = false
+                }
+                if read_http {
+                    fmt.Printf("HTTP Response Status %v code %v Proto %v\n",
+                                resp.Status, resp.StatusCode, resp.Proto)
+                    map_val["respstatus"] = resp.Status
+                    map_val["respstatuscode"] = fmt.Sprintf("%v", resp.StatusCode)
+                    map_val["respproto"] = fmt.Sprintf("%v", resp.Proto)
+                    //map_val["duration"] = fmt.Sprintf("%v usec", duration)
+                    /*
+                    if _, ok := map_val["reqmethod"]; ok {
+                        map_val["done"] = "true"
+                    }
+                    */
+                }
+                if resp != nil {
+                    resp.Body.Close()
+                }
+            }
+            if duration > 0 {
+                map_val["done"] = "true"
+            }
+        }
+    }
+
+    return nil
+}
+
+func setTrafficTable(traffic_table *bcc.Table, port_num int, protocol_id string, dump_table bool) error {
+    key, _ := traffic_table.KeyStrToBytes(strconv.Itoa(port_num))
+    leaf, _ := traffic_table.LeafStrToBytes(strconv.Itoa(protocolMap[protocol_id]))
+    if err := traffic_table.Set(key, leaf); err != nil {
+        fmt.Printf("Failed to set traffic table tcpdports: %v\n", err)
+        return err
+    }
+    if dump_table {
+        dumpBPFTable(traffic_table)
+    }
+    return nil
+}
+
+func setEgressTable(egress_table *bcc.Table,
+                    egress_match_list []egress_match_t,
+                    action int,
+                    dump_table bool) error {
+    for _, egress_match := range egress_match_list {
+        key_buf := &bytes.Buffer{}
+        err := binary.Write(key_buf, binary.LittleEndian, egress_match)
+        if err != nil {
+            fmt.Printf("Error converting key %v into binary: %v\n", egress_match, err)
+            continue
+        }
+        key := append([]byte(nil), key_buf.Bytes()...)
+        leaf, _ := egress_table.LeafStrToBytes(strconv.Itoa(action))
+        if err := egress_table.Set(key, leaf); err != nil {
+            fmt.Printf("Failed to add key %v:%v to egress table: %v\n", key,leaf,err)
+            return err
+        }
+    }
+    if dump_table {
+        dumpBPFTable(egress_table)
+    }
+    return nil
+}
+
+func ClovisorNewPodInit(k8s_client *ClovisorK8s,
+                        pod_name string,
+                        monitoring_info *monitoring_info_t) (*ClovisorBCC, error) {
+
+    output, err := k8s_client.exec_command(veth_ifidx_command, monitoring_info)
+    if err != nil {
+        return nil, err
+    }
+
+    ifindex , err := strconv.Atoi(output)
+    if err != nil {
+        fmt.Printf("Error converting %v to ifindex, error: %v\n", output, err.Error())
+        return nil, err
+    }
+
+    sessionMap = map[string]session_info_t{};
+
+    fmt.Printf("Beginning network tracing for pod %v\n", pod_name)
+
+    buf, err := ioutil.ReadFile("libclovisor/ebpf/session_tracking.c")
+    if err != nil {
+        fmt.Println(err)
+        return nil, err
+    }
+    code := string(buf)
+
+    bpf_mod := bcc.NewModule(code, []string{})
+    //defer bpf_mod.Close()
+
+    ingress_fn, err := bpf_mod.Load("handle_ingress",
+                                    C.BPF_PROG_TYPE_SCHED_CLS,
+                                    1, 65536)
+    if err != nil {
+        fmt.Println("Failed to load ingress func: %v\n", err)
+        return nil, err
+    }
+    fmt.Println("Loaded Ingress func to structure")
+
+    egress_fn, err := bpf_mod.Load("handle_egress",
+                                   C.BPF_PROG_TYPE_SCHED_CLS,
+                                   1, 65536)
+    if err != nil {
+        fmt.Println("Failed to load egress func: %v\n", err)
+        return nil, err
+    }
+
+    fmt.Println("Loaded Egress func to structure")
+
+    traffic_table := bcc.NewTable(bpf_mod.TableId("dports2proto"), bpf_mod)
+    if err := setTrafficTable(traffic_table, int(monitoring_info.port_num),
+                              monitoring_info.protocol, true);
+        err != nil {
+        fmt.Printf("Error on setting traffic port")
+        return nil, err
+    }
+
+    egress_match_list := get_egress_match_list(pod_name)
+
+    egress_table := bcc.NewTable(bpf_mod.TableId("egress_lookup_table"), bpf_mod)
+    if egress_match_list != nil {
+        if err := setEgressTable(egress_table, egress_match_list, 1, true); err != nil {
+            return nil, err
+        }
+    }
+
+    session_table := bcc.NewTable(bpf_mod.TableId("sessions"), bpf_mod)
+
+    attrs := netlink.QdiscAttrs {
+        LinkIndex: ifindex,
+        Handle: netlink.MakeHandle(0xffff, 0),
+        Parent: netlink.HANDLE_CLSACT,
+    }
+
+    qdisc := &netlink.GenericQdisc {
+        QdiscAttrs: attrs,
+        QdiscType:  "clsact",
+    }
+
+    if err := netlink.QdiscAdd(qdisc); err != nil {
+        fmt.Println(err)
+        return nil, err
+    }
+
+    fmt.Printf("Qdisc for clsact added for index %v\n", ifindex)
+
+    ingress_filter_attrs := netlink.FilterAttrs{
+        LinkIndex:  ifindex,
+        Parent:     netlink.MakeHandle(0xffff, 0xfff3),
+        Priority:   1,
+        Protocol:   unix.ETH_P_ALL,
+    }
+    ingress_filter := &netlink.BpfFilter{
+        FilterAttrs:    ingress_filter_attrs,
+        Fd:             ingress_fn,
+        Name:           "handle_ingress",
+        DirectAction:   true,
+    }
+    if ingress_filter.Fd < 0 {
+        fmt.Println("Failed to load ingress bpf program")
+        return nil, err
+    }
+
+    if err := netlink.FilterAdd(ingress_filter); err != nil {
+        fmt.Println(err)
+        return nil, err
+    }
+
+    egress_filter_attrs := netlink.FilterAttrs{
+        LinkIndex:  ifindex,
+        Parent:     netlink.MakeHandle(0xffff, 0xfff2),
+        Priority:   1,
+        Protocol:   unix.ETH_P_ALL,
+    }
+    egress_filter := &netlink.BpfFilter{
+        FilterAttrs:    egress_filter_attrs,
+        Fd:             egress_fn,
+        Name:           "handle_egress",
+        DirectAction:   true,
+    }
+    if egress_filter.Fd < 0 {
+        fmt.Println("Failed to load egress bpf program")
+        return nil, err
+    }
+
+    if err := netlink.FilterAdd(egress_filter); err != nil {
+        fmt.Println(err)
+        return nil, err
+    }
+
+    table := bcc.NewTable(bpf_mod.TableId("skb_events"), bpf_mod)
+
+    skb_rev_chan := make(chan []byte)
+
+    perfMap, err := bcc.InitPerfMap(table, skb_rev_chan)
+    if err != nil {
+        fmt.Println(err)
+        return nil, err
+    }
+
+    tracer, closer := initJaeger(monitoring_info.svc_name)
+    ticker := time.NewTicker(500 * time.Millisecond)
+    stop := make(chan bool)
+    go func() {
+        for {
+            select {
+                case <- ticker.C:
+                    print_network_traces(tracer)
+                case data := <-skb_rev_chan:
+                    err = handle_skb_event(&data, pod_name, session_table,
+                                           monitoring_info, egress_match_list)
+                    if err != nil {
+                        fmt.Printf("failed to decode received data: %s\n", err)
+                    }
+                case <- stop:
+                    fmt.Printf("Receiving stop for pod %v\n", pod_name)
+                    ticker.Stop()
+                    perfMap.Stop()
+                    closer.Close()
+                    // TODO(s3wong): uncomment remove qdisc del once k8s watcher implemented
+                    //netlink.QdiscDel(qdisc)
+                    bpf_mod.Close()
+                    return
+            }
+        }
+    }()
+
+    perfMap.Start()
+    return &ClovisorBCC{
+        stopChan:   stop,
+        qdisc:      qdisc,
+    }, nil
+}
+
+func (clovBcc *ClovisorBCC) StopPod() {
+    // TODO(s3wong): remove qdisc del once k8s watcher implemented
+    netlink.QdiscDel(clovBcc.qdisc)
+    clovBcc.stopChan <- true
+}
diff --git a/clover/clovisor/libclovisor/clovisor_cfg.go b/clover/clovisor/libclovisor/clovisor_cfg.go
new file mode 100644 (file)
index 0000000..f3c631a
--- /dev/null
@@ -0,0 +1,144 @@
+// Copyright (c) Authors of Clover
+//
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Apache License, Version 2.0
+// which accompanies this distribution, and is available at
+// http://www.apache.org/licenses/LICENSE-2.0
+
+package clovisor
+
+import (
+    "bytes"
+    "encoding/binary"
+    "fmt"
+    "io"
+    "net"
+    "strconv"
+    "strings"
+
+    "github.com/go-redis/redis"
+    opentracing "github.com/opentracing/opentracing-go"
+    jaeger "github.com/uber/jaeger-client-go"
+    jaeger_config "github.com/uber/jaeger-client-go/config"
+)
+
+var redisServer string = "redis.clover-system"
+var jaegerCollector string = "jaeger-collector.clover-system:14268"
+var jaegerAgent string = "jaeger-agent.clover-system:6831"
+
+/*
+ * redisConnect: redis client connecting to redis server
+ */
+func redisConnect() *redis.Client {
+    client := redis.NewClient(&redis.Options{
+        Addr:       fmt.Sprintf("%s:6379", redisServer),
+        Password:   "",
+        DB:         0,
+    })
+    return client
+}
+
+func get_cfg_labels(node_name string) ([]string, error) {
+    client := redisConnect()
+    labels_list, err := client.LRange("clovisor_labels", 0, -1).Result()
+    if err != nil {
+        fmt.Println(err.Error())
+        return nil, err
+    }
+
+    return labels_list, err
+}
+
+func get_egress_match_list(pod_name string) ([]egress_match_t) {
+    client := redisConnect()
+    egress_cfg_list, err := client.LRange("clovior_egress_match", 0, -1).Result()
+    if err != nil {
+        fmt.Println(err.Error())
+        return nil
+    }
+    ret_list := make([]egress_match_t, 0, len(egress_cfg_list))
+    for _, em_cfg_str := range(egress_cfg_list) {
+        fmt.Printf("egress match cfg == %v\n", em_cfg_str)
+        em_cfg_slice := strings.Split(em_cfg_str, ":")
+        if len(em_cfg_slice) < 2 {
+            fmt.Printf("egress match config requires at least two fields [%v]\n", em_cfg_slice)
+            continue
+        } else if len(em_cfg_slice) == 3 {
+            if strings.Contains(pod_name, em_cfg_slice[2]) {
+                fmt.Printf("%v != %v, filtering out this config for pod %v\n",
+                           em_cfg_slice[2], pod_name, pod_name)
+                continue
+            }
+        }
+        var ip uint32 = 0
+        if em_cfg_slice[0] != "0" {
+            ip = ip2Long(em_cfg_slice[0])
+        }
+        port_32, _ := strconv.Atoi(em_cfg_slice[1])
+        port := uint16(port_32)
+        ret_list = append(ret_list,  egress_match_t{ip, port})
+    }
+    return ret_list
+}
+
+// following function comes from
+// https://www.socketloop.com/tutorials/golang-convert-ip-address-string-to-long-unsigned-32-bit-integer
+func ip2Long(ip string) uint32 {
+    var long uint32
+    binary.Read(bytes.NewBuffer(net.ParseIP(ip).To4()), binary.LittleEndian, &long)
+    return long
+}
+
+func get_cfg_session_match() ([]egress_match_cfg, error) {
+    var ret_list []egress_match_cfg
+    client := redisConnect()
+    keys, err := client.HKeys("clovisor_session_match").Result()
+    if err != nil {
+        fmt.Println(err.Error())
+        return nil, err
+    }
+    for _, key := range keys {
+        value, err := client.HGet("clovisor_session_match", key).Result()
+        if err != nil {
+            fmt.Println(err.Error())
+            continue
+        }
+        match_slice := strings.Split(key, "-")
+        dst_ip := ip2Long(match_slice[0])
+        dst_port, _ := strconv.Atoi(match_slice[1])
+        egress_match := egress_match_t{
+                dst_ip:     dst_ip,
+                dst_port:   uint16(dst_port),
+        }
+        // organize into internally understandable struct
+        ret_list = append(ret_list, egress_match_cfg{
+                                        egress_match:   egress_match,
+                                        action:         value,
+                                    })
+    }
+    return ret_list, nil
+}
+
+func initJaeger(service string) (opentracing.Tracer, io.Closer) {
+    cfg := &jaeger_config.Configuration{
+        Sampler: &jaeger_config.SamplerConfig{
+            Type:  "const",
+            Param: 1,
+        },
+        Reporter: &jaeger_config.ReporterConfig{
+            LogSpans: true,
+            CollectorEndpoint: fmt.Sprintf("http://%s/api/traces", jaegerCollector),
+            LocalAgentHostPort: fmt.Sprintf("%s", jaegerAgent),
+        },
+    }
+    tracer, closer, err := cfg.New(service, jaeger_config.Logger(jaeger.StdLogger))
+    if err != nil {
+        panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
+    }
+    return tracer, closer
+}
+
+func get_jaeger_server() (string, error) {
+    client := redisConnect()
+    return client.Get("clovisor_jaeger_server").Result()
+}
diff --git a/clover/clovisor/libclovisor/clovisor_k8s.go b/clover/clovisor/libclovisor/clovisor_k8s.go
new file mode 100644 (file)
index 0000000..85b0ea4
--- /dev/null
@@ -0,0 +1,251 @@
+// Copyright (c) Authors of Clover
+//
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Apache License, Version 2.0
+// which accompanies this distribution, and is available at
+// http://www.apache.org/licenses/LICENSE-2.0
+
+package clovisor
+
+import (
+    "bytes"
+    "fmt"
+    "strconv"
+    "strings"
+
+    core_v1 "k8s.io/api/core/v1"
+    "k8s.io/apimachinery/pkg/runtime"
+    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+    "k8s.io/client-go/kubernetes"
+    "k8s.io/client-go/rest"
+    "k8s.io/client-go/tools/remotecommand"
+)
+
+type ClovisorK8s struct {
+    client      *kubernetes.Clientset
+    config      *rest.Config
+}
+
+type monitoring_info_t struct {
+    namespace   string
+    svc_name    string
+    pod_name    string
+    container_name  string
+    protocol    string
+    port_num    uint32
+}
+
+var DEFAULT_NAMESPACE = "default"
+
+func K8s_client_init(nodeName string) (*ClovisorK8s, error) {
+    config, err := rest.InClusterConfig()
+    if err != nil {
+        fmt.Println(err.Error())
+        return nil, err
+    }
+
+    client, err := kubernetes.NewForConfig(config)
+    if err != nil {
+        fmt.Println(err.Error())
+        return nil, err
+    }
+
+    return &ClovisorK8s{
+        client:     client,
+        config:     config,
+    }, nil
+}
+
+func parse_label_cfg(label_cfg string) (string, string, string) {
+    label_slice := strings.Split(label_cfg, ":")
+    if len(label_slice) == 1 {
+        return label_slice[0], "", ""
+    }
+    return label_slice[0], label_slice[1], label_slice[2]
+}
+
+func (client *ClovisorK8s) Get_monitoring_info(nodeName string) (map[string]*monitoring_info_t,
+                                               error) {
+
+    labels_list, err := get_cfg_labels(nodeName)
+    if err != nil {
+        fmt.Printf("Error getting cfg labels: %v\n", err)
+        return nil, err
+    }
+
+    namespace, svcs, pods, err := client.fetch_svcs_pods(nodeName, labels_list)
+    if err != nil {
+        return nil, err
+    }
+
+    mon_info_map := make(map[string]*monitoring_info_t)
+    for idx, _ := range svcs {
+        svc := svcs[idx]
+        pod := pods[idx]
+        mon_info := client.get_monitoring_pods(namespace, nodeName, svc, pod)
+        for k, v := range mon_info {
+            mon_info_map[k] = v
+        }
+    }
+    return mon_info_map, nil
+}
+
+func (client *ClovisorK8s) fetch_svcs_pods(nodeName string,
+                                           labels_list []string) (string,
+                                                                  []*core_v1.ServiceList,
+                                                                  []*core_v1.PodList, error) {
+    namespace := DEFAULT_NAMESPACE
+    /*
+     * Three cases:
+     * 1.) no configured namespaces, monitoring all pods in default namesapce
+     * 2.) if any config only has namespace, monitoring all pods in namespace
+     * 3.) label is configured, only monitor pods with that label
+     */
+    var svcs []*core_v1.ServiceList
+    var pods []*core_v1.PodList
+    if len(labels_list) == 0 {
+        if svcs_list, err :=
+            client.client.CoreV1().Services(namespace).List(metav1.ListOptions{});
+                        err != nil {
+            fmt.Printf("Error fetching service list for namespace %s\n",
+                        namespace)
+            return namespace, nil, nil, err
+        } else {
+            svcs = append(svcs, svcs_list)
+        }
+
+        if pods_list, err :=
+            client.client.CoreV1().Pods(namespace).List(metav1.ListOptions{});
+                        err != nil {
+            fmt.Printf("Error fetching pods list for namespace %s\n",
+                        namespace)
+            return namespace, nil, nil, err
+        } else {
+            pods = append(pods, pods_list)
+        }
+    } else {
+        for _, label_str := range labels_list {
+            var label_selector string
+            namespace, key, value := parse_label_cfg(label_str)
+            if len(namespace) == 0 {
+                fmt.Printf("Error in config: %s not a valid config\n", label_str)
+                continue
+            }
+            if len(key) == 0 {
+                fmt.Printf("Namespace only config for %s\n", namespace)
+            } else {
+                label_selector = fmt.Sprintf("%s=%s", key, value)
+            }
+            if svc_list, err :=
+                    client.client.CoreV1().Services(namespace).List(metav1.ListOptions{
+                        LabelSelector: label_selector,
+                                }); err != nil {
+                fmt.Printf("Error listing services with label %v:%v:%v - %v\n",
+                            key, value, namespace, err.Error())
+                continue
+            } else {
+                svcs = append(svcs, svc_list)
+            }
+            if pod_list, err :=
+                client.client.CoreV1().Pods(namespace).List(metav1.ListOptions{
+                        LabelSelector: label_selector,
+                                }); err != nil {
+                fmt.Printf("Error listing pods with label %v:%v:%v - %v\n",
+                            key, value, namespace, err.Error())
+                continue
+            } else {
+                pods = append(pods, pod_list)
+            }
+        }
+    }
+    return namespace, svcs, pods, nil
+}
+
+func (client *ClovisorK8s) get_monitoring_pods(
+                    namespace string,
+                    node_name string,
+                    svcs *core_v1.ServiceList,
+                    pods *core_v1.PodList) (map[string]*monitoring_info_t) {
+    monitoring_info := make(map[string]*monitoring_info_t)
+    svc_map := make(map[string][]string)
+
+    for _, svc := range svcs.Items {
+        svc_port := svc.Spec.Ports[0]
+        target_port := svc_port.TargetPort.String()
+        svc_port_name := svc_port.Name
+        svc_map[target_port] = append(svc_map[target_port], svc.GetName())
+        if len(svc_port_name) > 0 {
+            svc_map[target_port] = append(svc_map[target_port], svc_port_name)
+        } else {
+            svc_map[target_port] = append(svc_map[target_port], "tcp")
+        }
+    }
+    for _, v := range pods.Items {
+        if v.Spec.NodeName == node_name {
+            pod_name := v.GetName()
+            monitoring_info[pod_name] = &(monitoring_info_t{})
+            monitoring_info[pod_name].namespace = namespace
+            monitoring_info[pod_name].pod_name = pod_name
+            monitoring_info[pod_name].container_name = v.Spec.Containers[0].Name
+            monitoring_info[pod_name].port_num = uint32(v.Spec.Containers[0].Ports[0].ContainerPort)
+            tp_string := strconv.Itoa(int(monitoring_info[pod_name].port_num))
+            svc_array := svc_map[tp_string]
+            monitoring_info[pod_name].svc_name = svc_array[0]
+            if (strings.Contains(svc_array[1], "-")) {
+                monitoring_info[pod_name].protocol = svc_array[1][:strings.Index(svc_array[1], "-")]
+            } else {
+                monitoring_info[pod_name].protocol = svc_array[1]
+            }
+        }
+    }
+    return monitoring_info
+}
+
+func (client *ClovisorK8s) exec_command(command string,
+                                        monitoring_info *monitoring_info_t) (string, error) {
+
+    // Following code based on:
+    // https://stackoverflow.com/questions/43314689/example-of-exec-in-k8ss-pod-by-using-go-client
+    // https://github.com/a4abhishek/Client-Go-Examples/blob/master/exec_to_pod/exec_to_pod.go
+    exec_req := client.client.CoreV1().RESTClient().Post().
+            Resource("pods").
+            Name(monitoring_info.pod_name).
+            Namespace(monitoring_info.namespace).
+            SubResource("exec")
+    scheme := runtime.NewScheme()
+    if err := core_v1.AddToScheme(scheme); err != nil {
+        fmt.Printf("Error in exec pods: %v\n", err.Error())
+        return "", err
+    }
+
+    parameterCodec := runtime.NewParameterCodec(scheme)
+    exec_req.VersionedParams(&core_v1.PodExecOptions{
+            Command:    strings.Fields(command),
+            Container:  monitoring_info.container_name,
+            Stdin:      false,
+            Stdout:     true,
+            Stderr:     true,
+            TTY:        false,
+        }, parameterCodec)
+
+    exec, err := remotecommand.NewSPDYExecutor(client.config, "POST", exec_req.URL())
+    if err != nil {
+        fmt.Printf("Error in remotecommand exec: %v\n", err.Error())
+        return "", err
+    }
+
+    var stdout, stderr bytes.Buffer
+    err = exec.Stream(remotecommand.StreamOptions{
+            Stdin:  nil,
+            Stdout: &stdout,
+            Stderr: &stderr,
+            Tty:    false,
+        })
+    if err != nil {
+        fmt.Printf("Error in exec stream: %v\n", err.Error())
+        return "", err
+    }
+
+    stdout_no_newline := strings.TrimSuffix(stdout.String(), "\n")
+    return stdout_no_newline, nil
+}
diff --git a/clover/clovisor/libclovisor/ebpf/session_tracking.c b/clover/clovisor/libclovisor/ebpf/session_tracking.c
new file mode 100755 (executable)
index 0000000..99f704a
--- /dev/null
@@ -0,0 +1,275 @@
+// Copyright (c) Authors of Clover
+//
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Apache License, Version 2.0
+// which accompanies this distribution, and is available at
+// http://www.apache.org/licenses/LICENSE-2.0
+#include <uapi/linux/if_ether.h>
+#include <uapi/linux/in.h>
+#include <uapi/linux/ip.h>
+#include <uapi/linux/tcp.h>
+#include <uapi/linux/pkt_cls.h>
+#include <uapi/linux/bpf.h>
+
+#include <bcc/proto.h>
+
+#define HTTP_HDR_MIN_LEN 7
+#define MAX_SESSION_TABLE_ENTRIES 8192
+
+typedef enum {
+    HTTP = 1,
+    HTTP2 = 2,
+    TCP = 3,
+    UDP = 4,
+} app_proto_t;
+
+typedef struct session_key_ {
+    u32 src_ip;
+    u32 dst_ip;
+    unsigned short src_port;
+    unsigned short dst_port;
+} session_key_t;
+
+typedef struct session_ {
+    u64 req_time;
+    u64 resp_time;
+} session_t;
+
+typedef struct egress_match_ {
+    u32 dst_ip;
+    unsigned short dst_port;
+} egress_match_t;
+
+typedef enum policy_action_ {
+    RECORD = 1,
+} policy_action_t;
+
+BPF_PERF_OUTPUT(skb_events);
+BPF_HASH(dports2proto, u16, u32);
+BPF_HASH(egress_lookup_table, egress_match_t, policy_action_t);
+BPF_HASH(sessions, session_key_t, session_t, MAX_SESSION_TABLE_ENTRIES);
+
+struct eth_hdr {
+       unsigned char   h_dest[ETH_ALEN];
+       unsigned char   h_source[ETH_ALEN];
+       unsigned short  h_proto;
+};
+
+static inline int ipv4_hdrlen(struct iphdr *ip4)
+{
+    return ip4->ihl * 4;
+}
+
+static inline int tcp_doff(struct tcphdr *tcp_hdr)
+{
+    return tcp_hdr->doff * 4;
+}
+
+static inline int http_parsing(void *data, void *data_end)
+{
+
+    int is_http = 1;
+    if (data + HTTP_HDR_MIN_LEN > data_end) {
+        bpf_trace_printk("No HTTP Header in TCP segment");
+        return 0;
+    }
+    if (strncmp((char*)data, "HTTP", 4)) {
+        if (strncmp((char*)data, "GET", 3)) {
+            if (strncmp((char*)data, "POST", 4)) {
+                if (strncmp((char*)data, "PUT", 3)) {
+                    if (strncmp((char*)data, "HEAD", 4)) {
+                        is_http = 0;
+                    }
+                }
+            }
+        }
+    }
+    return is_http;
+}
+
+static inline void fill_up_sess_key(session_key_t *key, u32 src_ip,
+                                    u32 dst_ip, u16 src_port, u16 dst_port)
+{
+    key->src_ip = src_ip;
+    key->dst_ip = dst_ip;
+    key->src_port = src_port;
+    key->dst_port = dst_port;
+}
+
+static inline int process_response(u32 src_ip, u32 dst_ip, u16 src_port,
+                                   u16 dst_port)
+{
+    session_key_t sess_key = {};
+    session_t *session_ptr = NULL;
+    fill_up_sess_key(&sess_key, src_ip, dst_ip, src_port, dst_port);
+    session_ptr = sessions.lookup(&sess_key);
+    if (session_ptr != NULL) {
+        u64 resp_time = bpf_ktime_get_ns();
+        session_t update_session = {
+            session_ptr->req_time,
+            resp_time
+        };
+        sessions.update(&sess_key, &update_session);
+        return 1;
+    }
+    return 0;
+}
+
+static inline void process_request(u32 src_ip, u32 dst_ip, u16 src_port,
+                                   u16 dst_port)
+{
+    session_key_t sess_key = {};
+    session_t *session_ptr = NULL;
+    session_t new_session = {
+        bpf_ktime_get_ns(),
+        0
+    };
+    fill_up_sess_key(&sess_key, src_ip, dst_ip, src_port, dst_port);
+    session_ptr = sessions.lookup(&sess_key);
+    if (! session_ptr) {
+        sessions.insert(&sess_key, &new_session);
+    }
+    /*
+    if (session_ptr != NULL) {
+        sessions.update(&sess_key, &new_session);
+    } else {
+        sessions.insert(&sess_key, &new_session);
+    }
+    */
+}
+
+static inline app_proto_t ingress_tcp_parsing(struct tcphdr *tcp_hdr,
+                                              struct iphdr *ipv4_hdr,
+                                              void *data_end)
+{
+    unsigned short dest_port = htons(tcp_hdr->dest);
+    egress_match_t egress_match = {};
+    policy_action_t *policy_ptr = NULL;
+
+    unsigned int *proto = dports2proto.lookup(&dest_port);
+    if (proto != NULL) {
+        if (tcp_hdr->syn && !tcp_hdr->ack) {
+            return TCP;
+        }
+        if (tcp_hdr->fin || tcp_hdr->rst) {
+            process_response(ntohl(ipv4_hdr->saddr),
+                             ntohl(ipv4_hdr->daddr),
+                             ntohs(tcp_hdr->source),
+                             ntohs(tcp_hdr->dest));
+            return TCP;
+        }
+        process_request(ntohl(ipv4_hdr->saddr),
+                        ntohl(ipv4_hdr->daddr),
+                        ntohs(tcp_hdr->source),
+                        ntohs(tcp_hdr->dest));
+    } else {
+        egress_match.dst_ip = ntohl(ipv4_hdr->saddr);
+        egress_match.dst_port = ntohs(tcp_hdr->source);
+        policy_ptr = egress_lookup_table.lookup(&egress_match);
+        if (policy_ptr == NULL) {
+            egress_match.dst_ip = 0;
+            policy_ptr = egress_lookup_table.lookup(&egress_match);
+        }
+
+        if (policy_ptr != NULL) {
+            if (*policy_ptr == RECORD) {
+                if (tcp_hdr->fin || tcp_hdr->rst) {
+                    process_response(ntohl(ipv4_hdr->daddr),
+                                     ntohl(ipv4_hdr->saddr),
+                                     ntohs(tcp_hdr->dest),
+                                     ntohs(tcp_hdr->source));
+                }
+            }
+        }
+    }
+
+    // everything else drops to TCP
+    //return ((void*)tcp_hdr);
+    return HTTP;
+}
+
+static inline app_proto_t egress_tcp_parsing(struct tcphdr *tcp_hdr,
+                                             struct iphdr *ipv4_hdr,
+                                             void *data_end)
+{
+    unsigned short src_port = htons(tcp_hdr->source);
+    app_proto_t ret = TCP;
+    egress_match_t egress_match = {};
+    policy_action_t *policy_ptr = NULL;
+
+    unsigned int *proto = dports2proto.lookup(&src_port);
+
+    if (proto != NULL) {
+        if (tcp_hdr->fin || tcp_hdr->rst) {
+            process_response(ntohl(ipv4_hdr->daddr),
+                             ntohl(ipv4_hdr->saddr),
+                             ntohs(tcp_hdr->dest),
+                             ntohs(tcp_hdr->source));
+        }
+    } else {
+
+        egress_match.dst_ip = ntohl(ipv4_hdr->daddr);
+        egress_match.dst_port = ntohs(tcp_hdr->dest);
+        policy_ptr = egress_lookup_table.lookup(&egress_match);
+        if (policy_ptr == NULL) {
+            egress_match.dst_ip = 0;
+            policy_ptr = egress_lookup_table.lookup(&egress_match);
+        }
+
+        if (policy_ptr != NULL) {
+            if (*policy_ptr == RECORD) {
+                process_request(ntohl(ipv4_hdr->saddr),
+                                ntohl(ipv4_hdr->daddr),
+                                ntohs(tcp_hdr->source),
+                                ntohs(tcp_hdr->dest));
+            }
+        }
+    }
+    //return(ret_hdr);
+    return HTTP;
+}
+
+static inline int handle_packet(struct __sk_buff *skb, int is_ingress)
+{
+       void *data = (void *)(long)skb->data;
+       void *data_end = (void *)(long)skb->data_end;
+       struct eth_hdr *eth = data;
+    struct iphdr *ipv4_hdr = data + sizeof(*eth);
+    struct tcphdr *tcp_hdr = data + sizeof(*eth) + sizeof(*ipv4_hdr);
+    app_proto_t proto = TCP;
+
+    /* TODO(s3wong): assuming TCP only for now */
+       /* single length check */
+       if (data + sizeof(*eth) + sizeof(*ipv4_hdr) + sizeof(*tcp_hdr) > data_end)
+               return TC_ACT_OK;
+
+    if (eth->h_proto != htons(ETH_P_IP))
+        return TC_ACT_OK;
+
+    // TODO(s3wong): no support for IP options
+    if (ipv4_hdr->protocol != IPPROTO_TCP || ipv4_hdr->ihl != 5)
+        return TC_ACT_OK;
+
+    if (is_ingress == 1) {
+        proto = ingress_tcp_parsing(tcp_hdr, ipv4_hdr, data_end);
+    } else{
+        proto = egress_tcp_parsing(tcp_hdr, ipv4_hdr, data_end);
+    }
+
+       if (proto == HTTP) {
+        int offset = is_ingress;
+           skb_events.perf_submit_skb(skb, skb->len, &offset, sizeof(offset));
+    }
+
+       return TC_ACT_OK;
+}
+
+int handle_ingress(struct __sk_buff *skb)
+{
+    return handle_packet(skb, 1);
+}
+
+int handle_egress(struct __sk_buff *skb)
+{
+    return handle_packet(skb, 0);
+}
diff --git a/clover/clovisor/libclovisor/jaeger-all-in-one-template.yml b/clover/clovisor/libclovisor/jaeger-all-in-one-template.yml
new file mode 100644 (file)
index 0000000..0ae3870
--- /dev/null
@@ -0,0 +1,151 @@
+#
+# Copyright 2017-2018 The Jaeger Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+# in compliance with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License
+# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+# or implied. See the License for the specific language governing permissions and limitations under
+# the License.
+#
+
+apiVersion: v1
+kind: List
+items:
+- apiVersion: extensions/v1beta1
+  kind: Deployment
+  metadata:
+    name: jaeger-deployment
+    namespace: clover-system
+    labels:
+      app: jaeger
+      jaeger-infra: jaeger-deployment
+  spec:
+    replicas: 1
+    strategy:
+      type: Recreate
+    template:
+      metadata:
+        labels:
+          app: jaeger
+          jaeger-infra: jaeger-pod
+        annotations:
+          prometheus.io/scrape: "true"
+          prometheus.io/port: "16686"
+      spec:
+          containers:
+          -   env:
+              - name: COLLECTOR_ZIPKIN_HTTP_PORT
+                value: "9411"
+              image: jaegertracing/all-in-one
+              name: jaeger
+              ports:
+                - containerPort: 5775
+                  protocol: UDP
+                - containerPort: 6831
+                  protocol: UDP
+                - containerPort: 6832
+                  protocol: UDP
+                - containerPort: 5778
+                  protocol: TCP
+                - containerPort: 16686
+                  protocol: TCP
+                - containerPort: 9411
+                  protocol: TCP
+              readinessProbe:
+                httpGet:
+                  path: "/"
+                  port: 14269
+                initialDelaySeconds: 5
+- apiVersion: v1
+  kind: Service
+  metadata:
+    name: jaeger-query
+    namespace: clover-system
+    labels:
+      app: jaeger
+      jaeger-infra: jaeger-service
+  spec:
+    ports:
+      - name: query-http
+        port: 80
+        protocol: TCP
+        targetPort: 16686
+    selector:
+      jaeger-infra: jaeger-pod
+    type: LoadBalancer
+- apiVersion: v1
+  kind: Service
+  metadata:
+    name: jaeger-collector
+    namespace: clover-system
+    labels:
+      app: jaeger
+      jaeger-infra: collector-service
+  spec:
+    ports:
+    - name: jaeger-collector-tchannel
+      port: 14267
+      protocol: TCP
+      targetPort: 14267
+    - name: jaeger-collector-http
+      port: 14268
+      protocol: TCP
+      targetPort: 14268
+    - name: jaeger-collector-zipkin
+      port: 9411
+      protocol: TCP
+      targetPort: 9411
+    selector:
+      jaeger-infra: jaeger-pod
+    type: ClusterIP
+- apiVersion: v1
+  kind: Service
+  metadata:
+    name: jaeger-agent
+    namespace: clover-system
+    labels:
+      app: jaeger
+      jaeger-infra: agent-service
+  spec:
+    ports:
+    - name: agent-zipkin-thrift
+      port: 5775
+      protocol: UDP
+      targetPort: 5775
+    - name: agent-compact
+      port: 6831
+      protocol: UDP
+      targetPort: 6831
+    - name: agent-binary
+      port: 6832
+      protocol: UDP
+      targetPort: 6832
+    - name: agent-configs
+      port: 5778
+      protocol: TCP
+      targetPort: 5778
+    clusterIP: None
+    selector:
+      jaeger-infra: jaeger-pod
+- apiVersion: v1
+  kind: Service
+  metadata:
+    name: zipkin
+    namespace: clover-system
+    labels:
+      app: jaeger
+      jaeger-infra: zipkin-service
+  spec:
+    ports:
+    - name: jaeger-collector-zipkin
+      port: 9411
+      protocol: TCP
+      targetPort: 9411
+    clusterIP: None
+    selector:
+      jaeger-infra: jaeger-pod
+
diff --git a/clover/clovisor/libclovisor/redis.yaml b/clover/clovisor/libclovisor/redis.yaml
new file mode 100644 (file)
index 0000000..8f99326
--- /dev/null
@@ -0,0 +1,53 @@
+apiVersion: v1
+kind: Namespace
+metadata:
+  name: clover-system
+  labels:
+    name: clover-system
+---
+apiVersion: v1
+kind: Pod
+metadata:
+  labels:
+    name: redis
+    redis-sentinel: "true"
+    role: master
+  name: redis
+  namespace: clover-system
+spec:
+  containers:
+    - name: redis
+      image: k8s.gcr.io/redis:v1
+      env:
+        - name: MASTER
+          value: "true"
+      ports:
+        - containerPort: 6379
+      resources:
+        limits:
+          cpu: "0.1"
+      volumeMounts:
+        - mountPath: /redis-master-data
+          name: data
+    - name: sentinel
+      image: kubernetes/redis:v1
+      env:
+        - name: SENTINEL
+          value: "true"
+      ports:
+        - containerPort: 26379
+  volumes:
+    - name: data
+      emptyDir: {}
+---
+apiVersion: v1
+kind: Service
+metadata:
+  name: redis
+  namespace: clover-system
+spec:
+  ports:
+  - port: 6379
+  selector:
+    name: redis
+---