jasmine(Just A Simple Multicast engINE) Initial merge 93/33093/4 v1.0.2
authorZhijiang Hu <hu.zhijiang@zte.com.cn>
Fri, 7 Apr 2017 03:21:42 +0000 (23:21 -0400)
committerZhijiang Hu <hu.zhijiang@zte.com.cn>
Fri, 7 Apr 2017 07:52:25 +0000 (03:52 -0400)
Change-Id: I7a543019c8d92314ef549bf72369b7276f39577d
Signed-off-by: Zhijiang Hu <hu.zhijiang@zte.com.cn>
21 files changed:
code/jasmine/Makefile.am [new file with mode: 0755]
code/jasmine/README [new file with mode: 0644]
code/jasmine/autogen.sh [new file with mode: 0755]
code/jasmine/buffer.c [new file with mode: 0755]
code/jasmine/buffer.h [new file with mode: 0755]
code/jasmine/build-aux/git-version-gen [new file with mode: 0755]
code/jasmine/client.c [new file with mode: 0755]
code/jasmine/configure.ac [new file with mode: 0755]
code/jasmine/jasmine.spec.in [new file with mode: 0755]
code/jasmine/misc.c [new file with mode: 0755]
code/jasmine/misc.h [new file with mode: 0755]
code/jasmine/server-tcp.c [new file with mode: 0755]
code/jasmine/server-udp.c [new file with mode: 0755]
code/jasmine/server.c [new file with mode: 0755]
code/jasmine/server.h [new file with mode: 0755]
code/jasmine/tcp-common.c [new file with mode: 0755]
code/jasmine/tcp-common.h [new file with mode: 0755]
code/jasmine/tcp-queue.c [new file with mode: 0755]
code/jasmine/tcp-queue.h [new file with mode: 0755]
code/jasmine/udp-common.c [new file with mode: 0755]
code/jasmine/udp-common.h [new file with mode: 0755]

diff --git a/code/jasmine/Makefile.am b/code/jasmine/Makefile.am
new file mode 100755 (executable)
index 0000000..41acfd6
--- /dev/null
@@ -0,0 +1,83 @@
+# Copyright (c) 2015 ZTE, Inc.
+
+SPEC                   = $(PACKAGE_NAME).spec
+
+TARFILE                        = $(PACKAGE_NAME)-$(VERSION).tar.gz
+
+EXTRA_DIST             = autogen.sh $(SPEC).in \
+                         build-aux/git-version-gen \
+                         .version
+
+AUTOMAKE_OPTIONS       = foreign
+
+ACLOCAL_AMFLAGS                = -I m4
+
+MAINTAINERCLEANFILES = Makefile.in aclocal.m4 configure depcomp \
+                         config.guess config.sub missing install-sh \
+                         autoheader automake autoconf \
+                         autoscan.log configure.scan ltmain.sh test-driver config.h.in
+
+noinst_HEADERS         = buffer.h misc.h server.h tcp-common.h tcp-queue.h udp-common.h
+
+dist-clean-local:
+       rm -f autoconf automake autoheader
+
+clean-generic:
+       rm -rf $(SPEC) $(TARFILE)
+
+## make rpm/srpm section.
+
+$(SPEC): $(SPEC).in
+       rm -f $@-t $@
+       ver="$(VERSION)" && \
+       sed \
+               -e "s#@version@#$$ver#g" \
+       $< > $@-t; \
+       chmod a-w $@-t
+       mv $@-t $@
+
+$(TARFILE):
+       $(MAKE) dist
+
+RPMBUILDOPTS   = --define "_sourcedir $(abs_builddir)" \
+                 --define "_specdir $(abs_builddir)" \
+                 --define "_builddir $(abs_builddir)" \
+                 --define "_srcrpmdir $(abs_builddir)" \
+                 --define "_rpmdir $(abs_builddir)"
+
+srpm: clean
+       $(MAKE) $(SPEC) $(TARFILE)
+       rpmbuild $(WITH_LIST) $(RPMBUILDOPTS) --nodeps -bs $(SPEC)
+
+rpm: clean _version
+       $(MAKE) $(SPEC) $(TARFILE)
+       rpmbuild $(WITH_LIST) $(RPMBUILDOPTS) -ba $(SPEC)
+
+# release/versioning
+BUILT_SOURCES  = .version
+.version:
+       echo $(VERSION) > $@-t && mv $@-t $@
+
+dist-hook:
+       echo $(VERSION) > $(distdir)/.tarball-version
+
+.PHONY: _version
+
+_version:
+       cd $(srcdir) && rm -rf autom4te.cache .version && autoreconf -i
+       $(MAKE) $(AM_MAKEFLAGS) Makefile
+
+maintainer-clean-local:
+       rm -rf m4
+
+###
+
+bin_PROGRAMS = jasmines jasminec
+
+DEFAULT_INCLUDES = -I. -I/usr/include
+
+jasmines_SOURCES = udp-common.c tcp-common.c buffer.c misc.c server.c server-udp.c server-tcp.c tcp-queue.c
+jasminec_SOURCES = udp-common.c tcp-common.c buffer.c misc.c client.c
+
+jasmines_LDADD = -lpthread
+jasminec_LDADD = -lpthread
diff --git a/code/jasmine/README b/code/jasmine/README
new file mode 100644 (file)
index 0000000..221caea
--- /dev/null
@@ -0,0 +1,17 @@
+jasmine: Just A Small Multicast engINE
+
+Installation
+------------
+
+./autogen.sh
+./configure
+make && make install
+
+Usage
+-----
+jasmine server:
+Usage: jasmines local_ip num_of_clients [port]
+
+jasmine client:
+Usage: jasminec <local_ip> <server_ip> [port]
+
diff --git a/code/jasmine/autogen.sh b/code/jasmine/autogen.sh
new file mode 100755 (executable)
index 0000000..5bf25ec
--- /dev/null
@@ -0,0 +1,5 @@
+#!/bin/sh
+# Run this to generate all the initial makefiles, etc.
+mkdir -p m4
+echo Building configuration system...
+autoreconf -i
diff --git a/code/jasmine/buffer.c b/code/jasmine/buffer.c
new file mode 100755 (executable)
index 0000000..0567026
--- /dev/null
@@ -0,0 +1,152 @@
+/*##############################################################################
+# Copyright (c) 2017 ZTE Coreporation and others.
+# hu.zhijiang@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################*/
+
+#include <stdlib.h>
+#include <string.h>
+
+#include "buffer.h"
+#include "misc.h"
+
+struct buffer_ctl buffctl;
+struct packet_ctl *packetctl[PACKETS_PER_BUFFER];
+struct packet_ctl empty;
+
+void buffer_init()
+{
+    int i;
+
+    for (i = 0; i < PACKETS_PER_BUFFER; i++) {
+        packetctl[i] = (struct packet_ctl *)wrapper_malloc(PACKET_SIZE);
+        memset(packetctl[i], 0, PACKET_SIZE);
+        packetctl[i]->data_size = 0;
+        packetctl[i]->seq = 0;
+    }
+
+    buffctl.buffer_id = 0;
+    buffctl.packet_id_base = 0;
+    buffctl.pkt_count = 0;
+    empty.data_size = 0;
+}
+
+void packetctl_precheck()
+{
+    int i;
+
+    for (i = 0; i < PACKETS_PER_BUFFER; i++) {
+        if (packetctl[i]->data_size != 0) {
+            crit("Error precheck packet slot %d,%d", i, packetctl[i]->data_size);
+        }
+    }
+}
+
+// Calculate packet checksum
+uint32_t packet_csum(uint8_t *data, int len)
+{
+    uint32_t *item, sum = 0;
+    int i = 0;
+
+    while (len % 4) {
+        data[len] = 0; len++;
+    }
+
+    while (i < len) {
+        item = (uint32_t *)((char *)data + i);
+        sum += *item;
+        i += 4;
+    }
+
+    return sum;
+}
+
+// Fill buffers from file descriptor
+long buffer_fill(int fd)
+{
+    int s = 0;
+    int r = PACKET_PAYLOAD_SIZE;
+
+    buffctl.buffer_id++;
+    buffctl.packet_id_base += buffctl.pkt_count;
+    buffctl.buffer_size = 0;
+    while (r > 0 && s < PACKETS_PER_BUFFER) {
+        if ((r = read(fd, packetctl[s]->data, PACKET_PAYLOAD_SIZE)) < 0) {
+            crit("Error reading data from stdin");
+        }
+
+        // r == 0 means EOF
+
+        if (r > 0) {
+            buffctl.buffer_size += r;
+            packetctl[s]->data_size = r;
+            packetctl[s]->seq = buffctl.packet_id_base + s;
+            packetctl[s]->crc = packet_csum(packetctl[s]->data, r);
+            s++;
+        }
+    }
+
+    log(6, "input %d bytes of data in %d packets", buffctl.buffer_size, s);
+    buffctl.pkt_count = s;
+    return s;
+}
+
+long buffer_flush(int fd)
+{
+    int s = 0;
+    while (buffctl.pkt_count--) {
+        write(fd, packetctl[s]->data, packetctl[s]->data_size);
+        buffctl.buffer_size -= packetctl[s]->data_size;
+        packetctl[s]->data_size = 0;
+        packetctl[s]->seq = 0;
+        packetctl[s]->crc = 0;
+        s++;
+    }
+
+    return s;
+}
+
+/* Caller should use new_pkt as packet buffer again if this returns NULL */
+struct packet_ctl *packet_put(struct packet_ctl *new_pkt)
+{
+    struct packet_ctl *freed_pkt;
+    uint32_t i;
+
+    if (new_pkt->seq < buffctl.packet_id_base ||
+        new_pkt->seq - buffctl.packet_id_base >= buffctl.pkt_count) {
+        return NULL;
+    }
+
+    if (new_pkt->data_size == 0) {
+        return NULL;
+    }
+
+    i = packet_csum(new_pkt->data, new_pkt->data_size);
+    if (new_pkt->crc != i) {
+        return NULL;
+    }
+
+    i = new_pkt->seq - buffctl.packet_id_base;
+    freed_pkt = packetctl[i];
+    packetctl[i] = new_pkt;
+    return freed_pkt;
+}
+
+struct packet_ctl *packet_get(uint32_t seq)
+{
+    empty.seq = seq;
+
+    if (seq < buffctl.packet_id_base ||
+        seq - buffctl.packet_id_base >= buffctl.pkt_count) {
+        return &empty;
+    }
+
+    if (packetctl[seq - buffctl.packet_id_base]->data_size == 0) {
+        return &empty;
+    }
+
+    return packetctl[seq - buffctl.packet_id_base];
+}
diff --git a/code/jasmine/buffer.h b/code/jasmine/buffer.h
new file mode 100755 (executable)
index 0000000..e899fe6
--- /dev/null
@@ -0,0 +1,64 @@
+/*##############################################################################
+# Copyright (c) 2017 ZTE Coreporation and others.
+# hu.zhijiang@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################*/
+
+#ifndef _MCAST_BUFFER_H
+#define _MCAST_BUFFER_H
+
+#include <unistd.h>
+#include <netinet/ip.h>
+#include <netinet/udp.h>
+
+#define MTU 1500
+
+#define PACKET_SIZE ((MTU) - sizeof(struct iphdr) - sizeof(struct udphdr))
+
+/* Exclude padding */
+#define PACKET_PAYLOAD_SIZE (((PACKET_SIZE) - sizeof(struct packet_ctl)) & ~0x3)
+
+#define PACKETS_PER_BUFFER 1024
+
+#define DEF_PORT 18383 /* for both UDP and TCP */
+
+/* Buffer header (align to 4 Byte) */
+struct buffer_ctl {
+    uint32_t buffer_id;
+    uint32_t buffer_size;
+    uint32_t packet_id_base;
+    uint32_t pkt_count;
+};
+
+/* Packet header (align to 4 Byte) */
+struct packet_ctl {
+    uint32_t seq;
+    uint32_t crc;
+    uint32_t data_size;
+    uint8_t data[0];
+};
+
+#define CLIENT_READY 0x1
+#define CLIENT_REQ 0x2
+#define CLIENT_DONE 0x4
+#define SERVER_SENT 0x8
+
+/* Retransmition Request Header (align to 4 Byte) */
+struct request_ctl {
+    uint32_t req_count; /* Requested packet slot count */
+};
+
+extern struct buffer_ctl buffctl;
+extern struct packet_ctl *packetctl[PACKETS_PER_BUFFER];
+
+void buffer_init();
+long buffer_fill(int fd);
+long buffer_flush(int fd);
+struct packet_ctl *packet_put(struct packet_ctl *new_pkt);
+struct packet_ctl *packet_get(uint32_t seq);
+void packetctl_precheck();
+
+#endif
diff --git a/code/jasmine/build-aux/git-version-gen b/code/jasmine/build-aux/git-version-gen
new file mode 100755 (executable)
index 0000000..c3d53f3
--- /dev/null
@@ -0,0 +1,170 @@
+#!/bin/sh
+# Print a version string.
+scriptversion=2010-10-13.20; # UTC
+
+# Copyright (C) 2007-2010 Free Software Foundation, Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+# This script is derived from GIT-VERSION-GEN from GIT: http://git.or.cz/.
+# It may be run two ways:
+# - from a git repository in which the "git describe" command below
+#   produces useful output (thus requiring at least one signed tag)
+# - from a non-git-repo directory containing a .tarball-version file, which
+#   presumes this script is invoked like "./git-version-gen .tarball-version".
+
+# In order to use intra-version strings in your project, you will need two
+# separate generated version string files:
+#
+# .tarball-version - present only in a distribution tarball, and not in
+#   a checked-out repository.  Created with contents that were learned at
+#   the last time autoconf was run, and used by git-version-gen.  Must not
+#   be present in either $(srcdir) or $(builddir) for git-version-gen to
+#   give accurate answers during normal development with a checked out tree,
+#   but must be present in a tarball when there is no version control system.
+#   Therefore, it cannot be used in any dependencies.  GNUmakefile has
+#   hooks to force a reconfigure at distribution time to get the value
+#   correct, without penalizing normal development with extra reconfigures.
+#
+# .version - present in a checked-out repository and in a distribution
+#   tarball.  Usable in dependencies, particularly for files that don't
+#   want to depend on config.h but do want to track version changes.
+#   Delete this file prior to any autoconf run where you want to rebuild
+#   files to pick up a version string change; and leave it stale to
+#   minimize rebuild time after unrelated changes to configure sources.
+#
+# It is probably wise to add these two files to .gitignore, so that you
+# don't accidentally commit either generated file.
+#
+# Use the following line in your configure.ac, so that $(VERSION) will
+# automatically be up-to-date each time configure is run (and note that
+# since configure.ac no longer includes a version string, Makefile rules
+# should not depend on configure.ac for version updates).
+#
+# AC_INIT([GNU project],
+#         m4_esyscmd([build-aux/git-version-gen .tarball-version]),
+#         [bug-project@example])
+#
+# Then use the following lines in your Makefile.am, so that .version
+# will be present for dependencies, and so that .tarball-version will
+# exist in distribution tarballs.
+#
+# BUILT_SOURCES = $(top_srcdir)/.version
+# $(top_srcdir)/.version:
+#      echo $(VERSION) > $@-t && mv $@-t $@
+# dist-hook:
+#      echo $(VERSION) > $(distdir)/.tarball-version
+
+case $# in
+    1|2) ;;
+    *) echo 1>&2 "Usage: $0 \$srcdir/.tarball-version" \
+         '[TAG-NORMALIZATION-SED-SCRIPT]'
+       exit 1;;
+esac
+
+tarball_version_file=$1
+tag_sed_script="${2:-s/x/x/}"
+nl='
+'
+
+# Avoid meddling by environment variable of the same name.
+v=
+
+svn log --non-interactive -q --limit 1 > /dev/null  2>&1
+svnwork=$?
+
+# First see if there is a tarball-only version file.
+# then try "git describe", then default.
+if test -f $tarball_version_file
+then
+    v=`cat $tarball_version_file` || exit 1
+    case $v in
+       *$nl*) v= ;; # reject multi-line output
+       [0-9]*) ;;
+       *) v= ;;
+    esac
+    test -z "$v" \
+       && echo "$0: WARNING: $tarball_version_file seems to be damaged" 1>&2
+fi
+
+if test -n "$v"
+then
+    : # use $v
+# Otherwise, if there is at least one git commit involving the working
+# directory, and "git describe" output looks sensible, use that to
+# derive a version string.
+elif test "`git log -1 --pretty=format:x . 2>&1`" = x \
+    && v=`git describe --abbrev=4 --match='v*' HEAD 2>/dev/null \
+         || git describe --abbrev=4 HEAD 2>/dev/null` \
+    && v=`printf '%s\n' "$v" | sed "$tag_sed_script"` \
+    && case $v in
+        v[0-9]*) ;;
+        *) (exit 1) ;;
+       esac
+then
+    # Is this a new git that lists number of commits since the last
+    # tag or the previous older version that did not?
+    #   Newer: v6.10-77-g0f8faeb
+    #   Older: v6.10-g0f8faeb
+    case $v in
+       *-*-*) : git describe is okay three part flavor ;;
+       *-*)
+           : git describe is older two part flavor
+           # Recreate the number of commits and rewrite such that the
+           # result is the same as if we were using the newer version
+           # of git describe.
+           vtag=`echo "$v" | sed 's/-.*//'`
+           numcommits=`git rev-list "$vtag"..HEAD | wc -l`
+           v=`echo "$v" | sed "s/\(.*\)-\(.*\)/\1-$numcommits-\2/"`;
+           ;;
+    esac
+
+    # Change the first '-' to a '.', so version-comparing tools work properly.
+    # Remove the "g" in git describe's output string, to save a byte.
+    v=`echo "$v" | sed 's/-/./;s/\(.*\)-g/\1-/'`;
+elif [ "$svnwork" -eq "0" ] ;
+then
+    vtag="1.0.0" # we do not have git tag in svn, so here just hard coded it.
+    v=`svn log -q --limit 1 | grep "|" | awk '{print $1}'`
+    v=`echo "$v" |sed 's/^r//'`
+    v="$vtag.$v"
+else
+    v=UNKNOWN
+fi
+
+v=`echo "$v" |sed 's/^v//'`
+
+# Don't declare a version "dirty" merely because a time stamp has changed.
+git update-index --refresh > /dev/null 2>&1
+
+dirty=`sh -c 'git diff-index --name-only HEAD' 2>/dev/null` || dirty=
+case "$dirty" in
+    '') ;;
+    *) # Append the suffix only if there isn't one already.
+       case $v in
+         *-dirty) ;;
+         *) v="$v-dirty" ;;
+       esac ;;
+esac
+
+# Omit the trailing newline, so that m4_esyscmd can use the result directly.
+echo "$v" | tr -d "$nl"
+
+# Local variables:
+# eval: (add-hook 'write-file-hooks 'time-stamp)
+# time-stamp-start: "scriptversion="
+# time-stamp-format: "%:y-%02m-%02d.%02H"
+# time-stamp-time-zone: "UTC"
+# time-stamp-end: "; # UTC"
+# End:
diff --git a/code/jasmine/client.c b/code/jasmine/client.c
new file mode 100755 (executable)
index 0000000..42d996a
--- /dev/null
@@ -0,0 +1,313 @@
+/*##############################################################################
+# Copyright (c) 2017 ZTE Coreporation and others.
+# hu.zhijiang@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################*/
+
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <arpa/inet.h>
+
+#include "buffer.h"
+#include "udp-common.h"
+#include "tcp-common.h"
+#include "misc.h"
+
+/* Global statistics */
+long tot_dups = 0;
+long tot_reps = 0;
+long tot_pkts = 0;
+long long tot_size = 0;
+
+void recv_mcinfo(int tcp_socket)
+{
+    int res;
+
+    res = read_all(tcp_socket, &mcinfo, sizeof(struct mc_info));
+    if (res != sizeof(struct mc_info)) {
+        crit("Error while reading initial data");
+    }
+}
+
+void recv_buffctl(int tcp_socket)
+{
+    int res;
+
+    res = read_all(tcp_socket, &buffctl, sizeof(struct buffer_ctl));
+    if (res != sizeof(struct buffer_ctl)) {
+        crit("Error reading buffer control");
+    }
+}
+
+void send_client_ready(int tcp_socket)
+{
+    uint8_t cmd;
+
+    /* Tell TCP server we are ready to receive */
+    cmd = CLIENT_READY;
+    write(tcp_socket, &cmd, 1);
+}
+
+/* Read out the SERVER_SENT signal */
+void recv_server_sent(int tcp_socket)
+{
+    uint8_t cmd;
+    int res;
+
+    res = read_all(tcp_socket, &cmd, 1);
+    if (res != 1) {
+    //if (read(tcp_socket, &cmd, 1) < 1) {
+        crit("Error reading SERVER_SENT");
+    }
+
+    if (cmd != SERVER_SENT) {
+        crit("Error reading SERVER_SENT %d", cmd);
+    }
+}
+
+void send_client_request(int tcp_socket, struct request_ctl *req)
+{
+    uint8_t cmd;
+
+    /* Tell TCP server we are ready to receive */
+    cmd = CLIENT_REQ;
+    write(tcp_socket, &cmd, 1);
+    write(tcp_socket, req,
+          sizeof(struct request_ctl) + (req->req_count) * sizeof(uint32_t));
+}
+
+void recv_server_ack(int tcp_socket, struct packet_ctl *pkt)
+{
+    int res;
+
+    res = read_all(tcp_socket, pkt, sizeof(struct packet_ctl));
+    if (res != sizeof(struct packet_ctl)) {
+        crit("Error on tcp socket");
+    }
+
+    res = read_all(tcp_socket,
+                   ((char *)pkt) + sizeof(struct packet_ctl),
+                   pkt->data_size);
+    if (res != pkt->data_size) {
+        crit("Error on tcp socket received %d of %d",
+             res, pkt->data_size);
+    }
+}
+
+void tcp_retransmition(int tcp_socket,
+                       struct packet_ctl **curr_pkt,
+                       struct packet_ctl **freed_pkt)
+{
+    struct request_ctl *reqctl;
+    uint32_t *reqbody;
+    uint8_t rqbuf[sizeof(struct request_ctl) + PACKETS_PER_BUFFER * sizeof(uint32_t)];
+    uint32_t l;
+
+    reqctl = (struct request_ctl *)rqbuf;
+    reqbody = (uint32_t *)(rqbuf + sizeof(struct request_ctl));
+
+    reqctl->req_count = 0;
+    for (l = 0; l < buffctl.pkt_count; l++) {
+        if (!packetctl[l]->data_size) {
+            log(6, "Requesting packet %u", l + buffctl.packet_id_base);
+            reqbody[reqctl->req_count] = l + buffctl.packet_id_base;
+            reqctl->req_count++;
+        }
+    }
+
+    if (reqctl->req_count > 0) {
+        send_client_request(tcp_socket, reqctl);
+
+        /* read retransmitted blocks via TCP */
+        for (l = 0; l < reqctl->req_count; l++) {
+            if (*freed_pkt) {
+                *curr_pkt = *freed_pkt;
+            }
+
+            recv_server_ack(tcp_socket, *curr_pkt);
+
+            *freed_pkt = packet_put(*curr_pkt);
+            if (!(*freed_pkt)) {
+                crit("Malformed packet on tcp socket");
+            }
+            if ((*freed_pkt)->data_size != 0) {
+                crit("Malformed free packet slot or TCP data");
+            }
+
+            log(6, "Received retran packet %u", (*curr_pkt)->seq);
+        }
+
+        tot_reps += reqctl->req_count;
+    }
+}
+
+/* Returns how many good packets received from UDP */
+int recv_mcast(int tcp_socket, int udp_socket,
+               struct packet_ctl **curr_pkt,
+               struct packet_ctl **freed_pkt)
+{
+    int rcv_pkt_count;
+    int got_sent;
+    int maxfd;
+    fd_set rfds;
+    struct timeval tv;
+    int res;
+
+    maxfd = tcp_socket;
+    if (maxfd < udp_socket) {
+        maxfd = udp_socket;
+    }
+    maxfd++;
+    FD_ZERO(&rfds);
+
+    rcv_pkt_count = 0;
+    got_sent = 0;
+
+    if (buffctl.pkt_count != 0) {
+        do {
+            FD_SET(tcp_socket, &rfds);
+            FD_SET(udp_socket, &rfds);
+            tv.tv_sec = 5;
+            tv.tv_usec = 0;
+
+            res = select(maxfd, &rfds, 0, 0, &tv);
+            if (res < 0) {
+                crit("select error");
+            }
+
+            if (res == 0) {
+                crit("select timed out");
+            }
+
+            /* Read multicast packet */
+            if (FD_ISSET(udp_socket, &rfds)) {
+                log(7, "Reading multicast packet");
+                if (*freed_pkt) {
+                    *curr_pkt = *freed_pkt;
+                }
+
+                res = recv(udp_socket, *curr_pkt, PACKET_SIZE, 0);
+                if (res <= 0) {
+                    crit("error on multicast socket");
+                }
+
+                if (res < sizeof(struct packet_ctl) ) {
+                    log(7, "Truncated packet received (%d bytes)", res);
+                } else if (res != (*curr_pkt)->data_size + sizeof(struct packet_ctl)) {
+                    log(7,
+                        "Truncated packet received (%d of %ld bytes)",
+                        res, (*curr_pkt)->data_size + sizeof(struct packet_ctl));
+                } else {
+                    log(9, "Normal packet seq:%d", (*curr_pkt)->seq);
+                    (*freed_pkt) = packet_put((*curr_pkt));
+                    if (!(*freed_pkt)) {
+                        log(5, "Malformed packet");
+                    } else {
+                        if ((*freed_pkt)->data_size == 0) {
+                            rcv_pkt_count++;
+                        } else {
+                            log(6, "Duplicated packet");
+                            tot_dups++;
+                        }
+                    }
+                }
+            } else if (FD_ISSET(tcp_socket, &rfds)) {
+                /* Check TCP, only if there was no more data from UDP */
+                log(6, "No more data path1");
+                recv_server_sent(tcp_socket);
+                got_sent = 1;
+                break;
+            }
+        } while (rcv_pkt_count < buffctl.pkt_count);
+    }
+
+    if (got_sent == 0) {
+        log(6, "No more data path2");
+        recv_server_sent(tcp_socket);
+    }
+
+    return rcv_pkt_count;
+}
+
+void send_client_done(int tcp_socket)
+{
+    uint8_t cmd;
+
+    /* Tell TCP server we are ready to receive */
+    cmd = CLIENT_DONE;
+    write(tcp_socket, &cmd, 1);
+}
+
+int main(int argc, char *argv[])
+{
+    int ms, ts;
+    struct packet_ctl *alloc_pkt, *curr_pkt, *freed_pkt;
+    int udp_rcv_count;
+    u_short port = DEF_PORT;
+    struct in_addr local_addr;
+
+    if (argc < 3) {
+        printf("Usage: %s <local_ip> <server_ip> [port]\n", argv[0]);
+        return 0;
+    }
+
+    if (!inet_aton(argv[1], &local_addr)) {
+        crit("can not resolve address: %s", argv[1]);
+    }
+
+    if (argc > 3) {
+        port = atoi(argv[3]);
+        if (!port) {
+            port = DEF_PORT;
+        }
+    }
+
+    buffer_init();
+    /* Init first time packet slot */
+    alloc_pkt = (struct packet_ctl *)wrapper_malloc(PACKET_SIZE);
+    memset(alloc_pkt, 0, PACKET_SIZE);
+    freed_pkt = curr_pkt = alloc_pkt;
+
+    ts = init_tcp_client_socket(argv[2], port);
+    recv_mcinfo(ts);
+    ms = init_mcast_socket(&local_addr, &mcinfo.group);
+    /* Do we need set_nonblock(ms)??? ; */
+
+    /* Will do dummy run even if buffctl.pkt_count is zero at the first round */
+    do { /* one buffer a round */
+        packetctl_precheck();
+        recv_buffctl(ts);
+        send_client_ready(ts);
+
+        udp_rcv_count = recv_mcast(ts, ms, &curr_pkt,&freed_pkt);
+        if (udp_rcv_count == buffctl.pkt_count) {
+            log(6, "All packets of current buffer received from UDP");
+        } else {
+            tcp_retransmition(ts, &curr_pkt, &freed_pkt);
+        }
+
+        tot_pkts += buffctl.pkt_count;
+        tot_size += buffctl.buffer_size;
+        log(1, "\rBuffer received %lld Bytes, %ld Packets(%ld Repeats %ld Dups)",
+            tot_size, tot_pkts, tot_reps, tot_dups);
+
+        if (buffctl.pkt_count) {
+            buffer_flush(STDOUT_FILENO);
+        }
+
+        send_client_done(ts);
+    } while (buffctl.pkt_count != 0);
+
+    shutdown(ts, 2);
+    close(ts);
+    close(ms);
+    log(1, "All buffers receive done.\n");
+    return 0;
+}
diff --git a/code/jasmine/configure.ac b/code/jasmine/configure.ac
new file mode 100755 (executable)
index 0000000..77870c9
--- /dev/null
@@ -0,0 +1,65 @@
+#                                               -*- Autoconf -*-
+# Process this file with autoconf to produce a configure script.
+
+# bootstrap / init
+AC_PREREQ([2.61])
+
+AC_INIT([jasmine],
+       m4_esyscmd([build-aux/git-version-gen .tarball-version]),
+       [hu.zhijiang@zte.com.cn])
+
+AC_USE_SYSTEM_EXTENSIONS
+
+AM_INIT_AUTOMAKE([-Wno-portability])
+
+AC_CONFIG_HEADER([config.h])
+
+AC_CONFIG_MACRO_DIR([m4])
+
+AC_SUBST(WITH_LIST, [""])
+
+dnl Fix default variables - "prefix" variable if not specified
+if test "$prefix" = "NONE"; then
+       prefix="/usr"
+
+       dnl Fix "localstatedir" variable if not specified
+       if test "$localstatedir" = "\${prefix}/var"; then
+               localstatedir="/var"
+       fi
+       dnl Fix "sysconfdir" variable if not specified
+       if test "$sysconfdir" = "\${prefix}/etc"; then
+               sysconfdir="/etc"
+       fi
+       dnl Fix "libdir" variable if not specified
+       if test "$libdir" = "\${exec_prefix}/lib"; then
+               if test -e /usr/lib64; then
+                       libdir="/usr/lib64"
+               else
+                       libdir="/usr/lib"
+               fi
+       fi
+fi
+
+# Checks for programs.
+AC_PATH_PROG([BASHPATH], [bash])
+
+AC_CONFIG_FILES([Makefile])
+
+PACKAGE_FEATURES=""
+
+ENV_CFLAGS="$CFLAGS"
+OPT_CFLAGS=""
+GDB_FLAGS=""
+EXTRA_WARNINGS="-Wall"
+CFLAGS="$ENV_CFLAGS $OPT_CFLAGS $GDB_FLAGS $EXTRA_WARNINGS"
+
+# substitute what we need:
+AC_SUBST([BASHPATH])
+
+AC_OUTPUT
+
+AC_MSG_RESULT([  Version                  = ${PACKAGE_VERSION}])
+AC_MSG_RESULT([  Final        CFLAGS      = ${CFLAGS}])
+AC_MSG_RESULT([  Final        CPPFLAGS    = ${CPPFLAGS}])
+AC_MSG_RESULT([  Final        LDFLAGS     = ${LDFLAGS}])
+AC_MSG_RESULT([  Features                 = ${PACKAGE_FEATURES}])
diff --git a/code/jasmine/jasmine.spec.in b/code/jasmine/jasmine.spec.in
new file mode 100755 (executable)
index 0000000..73172f3
--- /dev/null
@@ -0,0 +1,33 @@
+Summary: Just A Small Multicast engINE
+Name: jasmine
+Version: @version@
+Release: 1%{?dist}
+Vendor: ZTE
+License: Apache-2.0
+Group: System Environment/Base
+URL: https://wiki.opnfv.org/display/DAIS
+Source: %{name}-%{version}%{?gittarver}.tar.gz
+
+BuildRoot: %(mktemp -ud %{_tmppath}/%{name}-%{version}-%{release}-XXXXXX)
+
+%description
+jasmine is used to distribute files over UDP multicast for installers
+
+%prep
+%setup -q -n %{name}-%{version}%{?gittarver}
+
+%build
+if [ ! -f configure ]; then
+       ./autogen.sh
+fi
+
+./configure
+
+%install
+rm -rf %{buildroot}
+make install DESTDIR=%{buildroot}
+
+%files
+%{_bindir}/jasmines
+%{_bindir}/jasminec
+
diff --git a/code/jasmine/misc.c b/code/jasmine/misc.c
new file mode 100755 (executable)
index 0000000..eab367e
--- /dev/null
@@ -0,0 +1,73 @@
+/*##############################################################################
+# Copyright (c) 2017 ZTE Coreporation and others.
+# hu.zhijiang@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################*/
+
+#include <stdlib.h>
+#include <unistd.h>
+#include <arpa/inet.h>
+#include <sys/types.h>
+#include <fcntl.h>
+
+#include "misc.h"
+
+struct sockaddr_in make_addr(char *addr, unsigned short port)
+{
+    struct sockaddr_in sin;
+
+    sin.sin_family = AF_INET;
+    if (!addr) {
+        sin.sin_addr.s_addr = htonl(INADDR_ANY);
+    } else {
+        if (!inet_aton(addr, &sin.sin_addr))
+            crit("cant resolve address: %s", addr);
+    }
+    sin.sin_port = htons(port);
+    return sin;
+}
+
+/* Better than using a macro */
+void set_nonblock(int s)
+{
+    if (fcntl(s, F_SETFL, O_NONBLOCK) < 0) {
+        crit("set_nonblock failed, can't set O_NONBLOCK");
+    }
+}
+
+void * wrapper_malloc(size_t size)
+{
+    void * res;
+
+    if (size == 0) {
+        crit("wrapper_malloc: malloc 0 size not allowed");
+    }
+
+    res = malloc(size);
+    if (res == NULL) {
+        crit("wrapper_malloc: malloc failed");
+    }
+
+    return res;
+}
+
+size_t read_all(int fd, void *buf, size_t count)
+{
+    size_t t = 0;
+    size_t r = 0;
+
+    while (count > 0) {
+        r = read(fd, buf + t, count);
+        if (r <= 0) {
+            return t ? t : r;
+        }
+
+        t += r;
+        count -= r;
+    }
+
+    return t;
+}
diff --git a/code/jasmine/misc.h b/code/jasmine/misc.h
new file mode 100755 (executable)
index 0000000..e60d0b6
--- /dev/null
@@ -0,0 +1,39 @@
+/*##############################################################################
+# Copyright (c) 2017 ZTE Coreporation and others.
+# hu.zhijiang@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################*/
+
+#ifndef _MCAST_MISC_H
+#define _MCAST_MISC_H
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <error.h>
+#include <errno.h>
+#include <unistd.h>
+#include <sys/syscall.h>
+#define gettid() syscall(__NR_gettid)
+
+#define level 6
+
+#define crit(x, args...) do { \
+    fprintf(stderr, "\nERROR: "); fprintf(stderr, x, ##args); \
+    error(-1,errno, "\nERROR: [%s:%d:%ld] ", __FILE__, __LINE__, gettid()); \
+} while (0);
+
+#define log(l, f, args...) if (l < level) { \
+    fprintf(stderr, "\n[%s:%d:%ld] ", __FILE__, __LINE__, gettid()); \
+    fprintf(stderr, f, ##args); \
+    fprintf(stderr, "\n"); \
+}
+
+struct sockaddr_in make_addr(char *addr, unsigned short port);
+void * wrapper_malloc(size_t size);
+void set_nonblock(int s);
+size_t read_all(int fd, void *buf, size_t count);
+
+#endif
diff --git a/code/jasmine/server-tcp.c b/code/jasmine/server-tcp.c
new file mode 100755 (executable)
index 0000000..c48de77
--- /dev/null
@@ -0,0 +1,550 @@
+/*##############################################################################
+# Copyright (c) 2017 ZTE Coreporation and others.
+# hu.zhijiang@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################*/
+
+#include <unistd.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <arpa/inet.h>
+#include <fcntl.h>
+#include <sys/poll.h>
+#include <string.h>
+#include <pthread.h>
+
+#include "buffer.h"
+#include "tcp-common.h"
+#include "udp-common.h"
+#include "tcp-queue.h"
+#include "misc.h"
+#include "server.h"
+
+#define MAX_CLIENTS 128 /* Clients per TCP server */
+#define TCP_BUFF_SIZE 65536
+
+struct cdata {
+    struct tcpq *tx;
+    struct tcpq *rx;
+    int await; /* non-zero if we are in the middle of receiving clients requests */
+    struct sockaddr_in peer;
+};
+
+struct server_status_data {
+    int state; /* state machine */
+    int sync_count;
+    int ccount;
+    struct semlink sl;
+    pthread_t pchild;
+
+    struct pollfd ds[MAX_CLIENTS + 1];
+    struct cdata cd[MAX_CLIENTS + 1];
+    int cindex;
+    int last_buff_pkt_count;
+};
+
+void init_sdata(struct server_status_data *sdata, void *thread_args)
+{
+    int i;
+
+    sdata->sl.parent = (struct semaphores *)thread_args;
+    sdata->sl.this = NULL;
+
+    sdata->state = S_PREP;
+    sdata->last_buff_pkt_count = 1;
+    sdata->sync_count = 0;
+
+    for (i = 0; i < MAX_CLIENTS + 1; i++) {
+        sdata->ds[i].fd = -1;
+        sdata->ds[i].events = POLLIN;
+        sdata->cd[i].tx = tcpq_queue_init();
+        sdata->cd[i].rx = tcpq_queue_init();
+        sdata->cd[i].await = 0;
+    }
+}
+
+/* disconnect one client and compress table */
+void kill_client(struct server_status_data *sdata)
+{
+    struct pollfd *ds = sdata->ds;
+    struct cdata *cd = sdata->cd;
+
+    close((ds + sdata->cindex)->fd);
+    tcpq_queue_free((cd + sdata->cindex)->rx);
+    tcpq_queue_free((cd + sdata->cindex)->tx);
+
+    /* Move last slot to this free slot and release the last slot */
+    *(ds + sdata->cindex) = *(ds + sdata->ccount - 1);
+    *(cd + sdata->cindex) = *(cd + sdata->ccount - 1);
+    sdata->cindex--;
+    sdata->ccount--;
+    client_count--;
+}
+
+void send_buffctl_to_all_clients(struct server_status_data *sdata)
+{
+    int i;
+    long send_sz;
+    void *cpy;
+
+    /* copy header to all queues */
+    send_sz = sizeof(struct buffer_ctl);
+    for (i = 1; i < sdata->ccount; i++) {
+        cpy = wrapper_malloc(send_sz);
+        memcpy(cpy, &buffctl, send_sz);
+        tcpq_queue_tail(sdata->cd[i].tx, cpy, send_sz);
+        sdata->ds[i].events |= POLLOUT;
+    }
+}
+
+void send_sent_to_all_clients(struct server_status_data *sdata)
+{
+    int i;
+    void *cpy;
+    uint8_t byte = SERVER_SENT;
+
+    /* Push data to clients */
+    for (i = 1; i < sdata->ccount; i++) {
+        cpy = wrapper_malloc(1);
+        memcpy(cpy, &byte, 1);
+        tcpq_queue_tail(sdata->cd[i].tx, cpy, 1);
+        sdata->ds[i].events |= POLLOUT;
+    }
+}
+void accept_clients(struct server_status_data *sdata)
+{
+    int res;
+    int new_fd;
+    socklen_t socklen;
+    int poll_events;
+    int timeout = -1;
+    struct pollfd *ds = sdata->ds;
+    struct cdata *cd = sdata->cd;
+
+    ds->events = POLLIN;
+    ds->revents = 0;
+    sdata->ccount = 1;
+
+    while (sdata->ccount <= MAX_CLIENTS && client_count < wait_count) {
+        poll_events = poll(ds, 1, timeout);
+        if (poll_events < 0) {
+            crit("poll() failed");
+        } else if (poll_events == 0) {
+            log(2, "poll() returned with no results!");
+            continue;
+        }
+
+        log(9, "poll: %d events", poll_events);
+
+        if (ds->revents) {
+
+            /* new connections come in */
+            if (ds->revents & (POLLIN|POLLPRI)) {
+
+                do {
+                    socklen = sizeof((cd + sdata->ccount)->peer);
+retry_accept:
+                    new_fd = accept(ds->fd,
+                                    (struct sockaddr *)&((cd + sdata->ccount)->peer),
+                                    &socklen);
+                    if (new_fd == -1 && errno == EINTR) {
+                        goto retry_accept;
+                    }
+
+                    if (new_fd == -1) {
+                        log(2, "accept() returned with error %d", errno);
+                        break;
+                    }
+
+                    /* Send group info, before set non block */
+                    res = write(new_fd, &mcinfo, sizeof(struct mc_info));
+                    if (res < 0) {
+                        log(3, "Error opening connection: %s",
+                            inet_ntoa((cd + sdata->ccount)->peer.sin_addr));
+                        close(new_fd);
+                        continue;
+                    }
+
+                    /* Can set to non block becasue we will poll it in future. */
+                    res = fcntl(new_fd, F_SETFL, O_NONBLOCK);
+                    if (res == -1) {
+                        log(2, "fcntl() returned with error %d", errno);
+                        close(new_fd);
+                        continue;
+                    }
+
+                    (ds + sdata->ccount)->fd = new_fd;
+
+                    log(5, "New connection %d: %s",
+                        sdata->ccount, inet_ntoa((cd + sdata->ccount)->peer.sin_addr));
+                    sdata->ccount++;
+                    client_count++;
+                } while (sdata->ccount <= MAX_CLIENTS);
+
+            } else {
+                log(2, "Error on tcp socket");
+            }
+
+            poll_events--;
+            ds->revents = 0;
+        }
+    }
+
+    ds->events = 0;
+}
+
+void accept_clients_may_spawn(struct server_status_data *sdata)
+{
+    /* Wait the parent to let us accpet clients */
+    sl_wait_parent(&(sdata->sl));
+
+    sdata->ds[0].fd = sfd;
+    accept_clients(sdata);
+    if (client_count < wait_count) {
+        sdata->sl.this = spawn_thread(&sdata->pchild, tcp_server_main, 1);
+
+        /* Let child TCP servers accept connections */
+        sl_release_child(&(sdata->sl));
+        /* Wait until all childern TCP servers got all their clients */
+        sl_wait_child(&(sdata->sl));
+    }
+
+    /* Tell parent all client are accepted/connected */
+    sl_release_parent(&(sdata->sl));
+}
+
+void keep_on_receiving_client_request(struct server_status_data *sdata)
+{
+    char buf[TCP_BUFF_SIZE];
+    void *cpy;
+    struct request_ctl *req;
+    uint32_t *rqb;
+    struct packet_ctl *ans;
+    long rq_index;
+    long total_sz;
+
+    long read_out;
+    struct pollfd *ds;
+    struct cdata *cd;
+
+    ds = &(sdata->ds[sdata->cindex]);
+    cd = &(sdata->cd[sdata->cindex]);
+
+    read_out = read(ds->fd, buf, cd->await);
+    if (read_out <= 0) {
+        log(5, "Client disconnected (r): %s",
+            inet_ntoa(cd->peer.sin_addr));
+        kill_client(sdata);
+        return;
+    }
+
+    log(7, "New data (%ld + %ld) on conn %d: %s",
+        read_out, tcpq_queue_dsize(cd->rx), sdata->cindex,
+        inet_ntoa(cd->peer.sin_addr));
+    cpy = wrapper_malloc(read_out);
+    memcpy(cpy, buf, read_out);
+    tcpq_queue_tail(cd->rx, cpy, read_out);
+    cd->await -= read_out;
+
+    /* Full header received */
+    if (tcpq_queue_dsize(cd->rx) == sizeof(struct request_ctl)) {
+        cpy = tcpq_queue_flat_peek(cd->rx, &read_out);
+        req = (struct request_ctl *)cpy;
+        cd->await = req->req_count * sizeof(uint32_t);
+        log(6, "Client request for %u packets on %d: %s",
+            req->req_count, sdata->cindex, inet_ntoa(cd->peer.sin_addr));
+        /* req->rqc may be zero? So do not return from here */
+    }
+
+    /* Whole request(struct request_ctl + all rq blocks) received */
+    if (cd->await == 0) {
+        cpy = tcpq_dqueue_flat(cd->rx, &read_out);
+        req = (struct request_ctl *)cpy;
+        rqb = (uint32_t *) (cpy + sizeof(struct request_ctl));
+        for (rq_index = 0; rq_index < req->req_count; rq_index++) {
+            ans = packet_get(*(rqb + rq_index));
+            total_sz = ans->data_size + sizeof(struct packet_ctl);
+            log(6, "Send packet %u (%u bytes) on %d",
+                rqb[rq_index], ans->data_size, sdata->cindex);
+            cpy = wrapper_malloc(total_sz);
+            memcpy(cpy, ans, total_sz);
+            tcpq_queue_tail(cd->tx, cpy, total_sz);
+        }
+
+        if (rq_index > 0) {
+            /* Data need to be sent out */
+            ds->events |= POLLOUT;
+        }
+    }
+}
+
+void handle_error_event(struct server_status_data *sdata)
+{
+    struct cdata *cd;
+
+    cd = &(sdata->cd[sdata->cindex]);
+    log(5, "Closing connection %d: %s",
+        sdata->cindex, inet_ntoa(cd->peer.sin_addr));
+    kill_client(sdata);
+}
+
+void handle_client_ready(struct server_status_data *sdata)
+{
+    if (sdata->state == S_SYNC) {
+        sdata->sync_count++;
+
+        log(7, "Client SYNC %d of %d",
+            sdata->sync_count, sdata->ccount - 1);
+
+        if (sdata->sync_count == sdata->ccount - 1) {
+            /* All client SYNC done */
+            sdata->state = S_SEND;
+
+            /* Wait child ready */
+            sl_wait_child(&(sdata->sl));
+            /* Tell parent I am ready, release parent */
+            sl_release_parent(&(sdata->sl));
+
+            /* Wait parent to send mcast */
+            sl_wait_parent(&(sdata->sl));
+            /* tell child mcast done, release child */
+            sl_release_child(&(sdata->sl));
+
+            send_sent_to_all_clients(sdata);
+        }
+    }
+}
+
+void handle_client_done(struct server_status_data *sdata)
+{
+    if (sdata->state == S_SEND) {
+        log(7, "Client START %d of %d",
+            sdata->sync_count, sdata->ccount - 1);
+
+        sdata->sync_count--;
+        if (sdata->sync_count == 0) {
+            /* Got all client START */
+            sdata->state = S_PREP;
+            log(7, "All received, now tell UDP Server prepare next buffer");
+            /* Remeber old buffctl.pkt_count before it change */
+            sdata->last_buff_pkt_count = buffctl.pkt_count;
+
+            /* Wait child clients done */
+            sl_wait_child(&(sdata->sl));
+            /* Tell parent all our clients are done, release parent */
+            sl_release_parent(&(sdata->sl));
+        }
+    }
+}
+
+void handle_client_events(struct server_status_data *sdata)
+{
+    struct pollfd *ds;
+    struct cdata *cd;
+    int read_out;
+    uint8_t msgtype;
+
+    ds = &(sdata->ds[sdata->cindex]);
+    cd = &(sdata->cd[sdata->cindex]);
+
+    /* read message type */
+    read_out = read(ds->fd, &msgtype, 1);
+    if (read_out <= 0) {
+        log(5, "Client disconnected due to read error %d, ret:%d (r): %s",
+            errno, read_out, inet_ntoa(cd->peer.sin_addr));
+        kill_client(sdata);
+        return; /* continue to check other clients */
+    }
+
+    switch (msgtype) {
+    case CLIENT_READY:
+        handle_client_ready(sdata);
+        break;
+    case CLIENT_DONE:
+        handle_client_done(sdata);
+        break;
+    case CLIENT_REQ:
+        /* wait a whole struct request_ctl */
+        cd->await = sizeof(struct request_ctl);
+        break;
+    default:
+        log(4, "Wrong message type from %s",
+            inet_ntoa(cd->peer.sin_addr));
+        break;
+    }
+}
+
+void handle_pullin_event(struct server_status_data *sdata)
+{
+    struct cdata *cd;
+
+    cd = &(sdata->cd[sdata->cindex]);
+
+    /* Await is set only for repeat request */
+    if (cd->await) {
+        keep_on_receiving_client_request(sdata);
+    } else {
+        handle_client_events(sdata);
+    }
+}
+
+void handle_pullout_event(struct server_status_data *sdata)
+{
+    char buf[TCP_BUFF_SIZE];
+    struct pollfd *ds;
+    struct cdata *cd;
+    long transmit_sz;
+    void *cpy;
+    long written_in;
+
+    ds = &(sdata->ds[sdata->cindex]);
+    cd = &(sdata->cd[sdata->cindex]);
+
+    if (cd->tx->count == 0) {
+        ds->events = POLLIN; /* Just make sure */
+        return;
+    }
+
+    log(7, "handle_pullout_event servs No.%d conn", sdata->cindex);
+
+    /* If there is some data to be sent, try to do it now */
+    cpy = tcpq_dequeue_head(cd->tx, &transmit_sz);
+    memcpy(buf, cpy, transmit_sz);
+    free(cpy);
+
+    written_in = write(ds->fd, buf, transmit_sz);
+    if (written_in <= 0) {
+        log(5, "Client disconnected (w): %s",
+            inet_ntoa(cd->peer.sin_addr));
+        kill_client(sdata);
+        return;
+    }
+
+    if (written_in != transmit_sz) {
+        log(6, "Partial wrote: %ld out of %ld bytes sent",
+            written_in, transmit_sz);
+        cpy = wrapper_malloc(transmit_sz - written_in);
+        memcpy(cpy, buf + written_in, transmit_sz - written_in);
+        tcpq_queue_head(cd->tx, cpy, transmit_sz - written_in);
+    } else {
+        log(7, "Sent %ld bytes to %s",
+            written_in, inet_ntoa(cd->peer.sin_addr));
+        if (cd->tx->count == 0) {
+            /* Do not listen pollout event anymore */
+            ds->events = POLLIN;
+        }
+    }
+}
+
+void check_clients(struct server_status_data *sdata)
+{
+    int poll_events;
+    struct pollfd *ds;
+    int timeout = -1;
+
+    if (sdata->ccount == 1) {
+        log(5, "No more clients, start exiting. s,c,p:%d,%d,%d",
+            sdata->state, sdata->ccount, buffctl.pkt_count);
+        /* No client existed */
+        switch(sdata->state) {
+        case S_SYNC:
+            /* Wait all children ready */
+            sl_wait_child(&(sdata->sl));
+            /* Tell parent I am ready, release parent */
+            sl_release_parent(&(sdata->sl));
+
+            /* Wait parent to send mcast */
+            sl_wait_parent(&(sdata->sl));
+            /* Tell child mcast done, release child */
+            sl_release_child(&(sdata->sl));
+        case S_SEND:
+            sdata->state = S_PREP;
+            /* Remeber old buffctl.pkt_count before it change */
+            sdata->last_buff_pkt_count = buffctl.pkt_count;
+
+            /* Wait child clients done */
+            sl_wait_child(&(sdata->sl));
+            /* Tell parent all our clients are done */
+            sl_release_parent(&(sdata->sl));
+        }
+        /* return to main loop to finish the dummy run */
+        return;
+    }
+
+    /* poll clients only */
+    poll_events = poll(&(sdata->ds[1]), sdata->ccount - 1, timeout);
+    if (poll_events < 0) {
+        crit("poll() failed");
+    } else if (poll_events == 0) {
+        log(2, "poll() returned with no results!");
+        return;
+    }
+
+    log(9, "poll clients: %d events", poll_events);
+
+    /* check all connected clients */
+    for (sdata->cindex = 1; sdata->cindex < sdata->ccount; sdata->cindex++) {
+        ds = &(sdata->ds[sdata->cindex]);
+        if (!ds->revents) {
+            continue;
+        }
+
+        if (ds->revents & (POLLERR|POLLHUP|POLLNVAL)) {
+            handle_error_event(sdata);
+        } else if (ds->revents & (POLLIN|POLLPRI)) {
+            handle_pullin_event(sdata);
+        } else if (ds->revents & POLLOUT) {
+            handle_pullout_event(sdata);
+        }
+    }
+}
+
+void * tcp_server_main(void *args)
+{
+    int i;
+    struct server_status_data ssdata;
+    struct server_status_data *sdata;
+
+    sdata = &ssdata;
+    init_sdata(sdata, args);
+
+    accept_clients_may_spawn(sdata);
+
+    while (1) {
+        if (sdata->state == S_PREP) {
+            /* Wait UDP server to prepare the buffctl of this round to send */
+            sl_wait_parent(&(sdata->sl));
+            log(6, "After waiting UDP server preparation. s,c,p:%d,%d,%d",
+                sdata->state, sdata->ccount - 1, buffctl.pkt_count);
+            /* Tell children buffer pareparation done, release child */
+            sl_release_child(&(sdata->sl));
+            send_buffctl_to_all_clients(sdata);
+            sdata->state = S_SYNC;
+        }
+
+        check_clients(sdata);
+
+        if (sdata->state == S_PREP && sdata->last_buff_pkt_count == 0) {
+            log(5, "No more output, TCP server finished");
+            if (sdata->sl.this && pthread_join(sdata->pchild, 0) < 0) {
+                crit("pthread_join()");
+            }
+
+            /* shutdown conn to clients */
+            for (i = sdata->ccount - 1; i > 0; i--) {
+                shutdown((sdata->ds[i]).fd, 2);
+            }
+
+            if (!sdata->sl.this) {
+                /* leaf server */
+                close(sfd);
+                sfd = -1;
+            }
+            return 0;
+        }
+    }
+}
diff --git a/code/jasmine/server-udp.c b/code/jasmine/server-udp.c
new file mode 100755 (executable)
index 0000000..c641289
--- /dev/null
@@ -0,0 +1,95 @@
+/*##############################################################################
+# Copyright (c) 2017 ZTE Coreporation and others.
+# hu.zhijiang@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################*/
+
+#include <stdio.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <pthread.h>
+#include <signal.h>
+
+#include "buffer.h"
+#include "udp-common.h"
+#include "misc.h"
+#include "server.h"
+
+void * udp_server_main(void *args)
+{
+    pthread_t pchild;
+    struct sockaddr_in maddr;
+    int ms;
+    int i;
+    struct semlink sl;
+    char adr[128];
+    long long total = 0;
+
+    sprintf(adr, "%s%s", MCAST_ADDR_BASE, MCAST_ADDR_SUFFIX);
+    maddr = make_addr(adr, server_port);
+    mcinfo.group = maddr;
+    ms = init_mcast_socket(&local_addr, &maddr);
+
+    sl.this = spawn_thread(&pchild, tcp_server_main, 1);
+    sl.parent = NULL;
+
+    buffer_init();
+
+    /* Let TCP servers accept connections */
+    sl_release_child(&sl);
+    /* Wait until all TCP servers got all their clients */
+    sl_wait_child(&sl);
+
+    log(7, "UDP Server: All clients were accepted");
+
+    do {
+        /* one buffer round */
+
+        i = buffer_fill(STDIN_FILENO);
+
+        if (!client_count) {
+            /* No need to send, make it the last run of main loop, also let
+               TCP Server to do dummy run right away(not waiting until all
+               data sent). */
+            i = 0;
+            buffctl.pkt_count = 0;
+        }
+
+        log(7, "Signal TCP Server to send buffctl");
+
+        /* Tell tcp to Send headers, release child */
+        sl_release_child(&sl);
+        /* Wait all clients ready to start receiving */
+        sl_wait_child(&sl);
+
+        /* multicast data */
+        while (i--) {
+            sendto(ms, packetctl[i],
+                   packetctl[i]->data_size + sizeof(struct packet_ctl), 0,
+                   (struct sockaddr *)&maddr, sizeof(maddr));
+        }
+
+        log(7, "send finish");
+        /* Tell tcp to Send SERVER_SENT, release child */
+        sl_release_child(&sl);
+        /* wait tcp to send SERVER_SENT */
+        sl_wait_child(&sl);
+
+        total = total + buffctl.buffer_size;
+        log(1, "Buffer Done: sent %lld bytes", total);
+    } while (buffctl.pkt_count);
+
+
+    if (pthread_join(pchild, 0) < 0) {
+        crit("pthread_join() TCP Server");
+    }
+
+    log(1, "All Done");
+    close(ms);
+    return 0;
+}
diff --git a/code/jasmine/server.c b/code/jasmine/server.c
new file mode 100755 (executable)
index 0000000..4a9a327
--- /dev/null
@@ -0,0 +1,123 @@
+/*##############################################################################
+# Copyright (c) 2017 ZTE Coreporation and others.
+# hu.zhijiang@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################*/
+
+#include <pthread.h>
+#include <string.h>
+#include <stdlib.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+
+#include "buffer.h"
+#include "tcp-common.h"
+#include "misc.h"
+#include "server.h"
+
+/* Global variables shared among server*.c */
+int wait_count = 0;
+int client_count = 0;
+int server_port = DEF_PORT; /* Currently for both UDP and TCP */
+struct in_addr local_addr;
+/* listen socket - global for all TCP threads */
+int sfd = -1;
+
+void init_semaphores(struct semaphores *sp)
+{
+    if (sem_init(&sp->wait_parent, 0, 0) < 0) {
+        crit("sem_init() wait_parent");
+    }
+
+    if (sem_init(&sp->wait_child, 0, 0) < 0) {
+        crit("sem_init() wait_child");
+    }
+}
+
+void sl_wait_child(struct semlink *sl)
+{
+    if (sl->this) {
+        P(sl->this->wait_child);
+    }
+}
+
+void sl_release_child(struct semlink *sl)
+{
+    if (sl->this) {
+        V(sl->this->wait_parent);
+    }
+}
+
+void sl_wait_parent(struct semlink *sl)
+{
+    P(sl->parent->wait_parent);
+}
+
+void sl_release_parent(struct semlink *sl)
+{
+    V(sl->parent->wait_child);
+}
+
+/* Wrapper for pthread_create, additionaly may initialize/pass thru
+   a semaphores parameter */
+struct semaphores * spawn_thread(pthread_t *pchild,
+                                 void *(*entry)(void *),
+                                 int create_sems)
+{
+    struct semaphores *sp = NULL;
+
+    if (create_sems != 0) {
+        sp = (struct semaphores *) wrapper_malloc(sizeof(struct semaphores));
+        init_semaphores(sp);
+    }
+
+    if (pthread_create(pchild, 0, entry, (void *)sp) != 0) {
+        crit("pthread_create");
+    }
+    return sp;
+}
+
+int main(int argc, char *argv[])
+{
+    pthread_t pchild;
+
+    log(9, "buffer size:%ld, buffer head size:%ld, data size:%ld",
+        PACKET_SIZE, sizeof(struct packet_ctl), PACKET_PAYLOAD_SIZE);
+
+    if (argc < 3) {
+        printf("Usage: %s local_ip num_of_clients [port]\n", argv[0]);
+        return -1;
+    }
+
+    if (!inet_aton(argv[1], &local_addr)) {
+        crit("can not resolve address: %s", argv[1]);
+    }
+
+    wait_count = atoi(argv[2]);
+    if (!wait_count) {
+        crit("can not serv to 0 client\n");
+    }
+
+    if (argc > 3) {
+        server_port = atoi(argv[3]);
+        if (!server_port) {
+            server_port = DEF_PORT;
+        }
+    }
+
+    /* sfd is shared by all TCP threads as sdata->ds[0].fd */
+    sfd = init_tcp_server_socket(0, server_port);
+    set_nonblock(sfd);
+
+    /* start UDP servers thread */
+    (void)spawn_thread(&pchild, udp_server_main, 0);
+    if (pthread_join(pchild, 0) < 0) {
+        crit("pthread_join() UDP Server");
+    }
+
+    return 0;
+}
diff --git a/code/jasmine/server.h b/code/jasmine/server.h
new file mode 100755 (executable)
index 0000000..23dbed5
--- /dev/null
@@ -0,0 +1,63 @@
+/*##############################################################################
+# Copyright (c) 2017 ZTE Coreporation and others.
+# hu.zhijiang@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################*/
+
+#ifndef _MCAST_SERVER_H
+#define _MCAST_SERVER_H
+
+#include <pthread.h>
+#include <semaphore.h>
+
+struct semaphores {
+    sem_t wait_parent;
+    sem_t wait_child;
+};
+
+struct semlink {
+    struct semaphores *this; /* used by parent to point to the struct semaphores
+                                which it created during spawn child. */
+    struct semaphores *parent; /* used by child to point to the struct
+                                  semaphores which it created by parent */
+};
+
+void sl_wait_child(struct semlink *sl);
+void sl_release_child(struct semlink *sl);
+void sl_wait_parent(struct semlink *sl);
+void sl_release_parent(struct semlink *sl);
+
+/* Server state machine */
+#define S_PREP 0
+#define S_SYNC 1
+#define S_SEND 2
+
+extern int wait_count;
+extern int client_count;
+extern int server_port;
+extern struct in_addr local_addr;
+extern int sfd;
+
+void *udp_server_main(void *args);
+void *tcp_server_main(void *args);
+void init_semaphores(struct semaphores *sp);
+struct semaphores * spawn_thread(pthread_t *pchild,
+                                 void *(*entry)(void *),
+                                 int create_sems);
+
+#define V(x) do { \
+    if (sem_post(&x) < 0) { \
+        crit("sem_post()"); \
+    } \
+} while(0)
+
+#define P(x) do { \
+    if (sem_wait(&x) != 0) { \
+        crit("sem_wait()"); \
+    } \
+} while(0)
+
+#endif
diff --git a/code/jasmine/tcp-common.c b/code/jasmine/tcp-common.c
new file mode 100755 (executable)
index 0000000..69a57ba
--- /dev/null
@@ -0,0 +1,60 @@
+/*##############################################################################
+# Copyright (c) 2017 ZTE Coreporation and others.
+# hu.zhijiang@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################*/
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <arpa/inet.h>
+
+#include "buffer.h"
+#include "tcp-common.h"
+#include "misc.h"
+
+int init_tcp_server_socket(char *addr, unsigned short port)
+{
+    int s;
+    struct sockaddr_in sin;
+
+    port = port ? port : TCP_DPORT;
+    sin = make_addr(addr, port);
+
+    if ((s = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+        crit("socket() failed");
+    }
+
+    if (bind(s, (struct sockaddr *)&sin, sizeof(struct sockaddr_in)) < 0) {
+        crit("bind() failed, error binding tcp socket");
+    }
+
+    if (listen(s, 5) < 0) {
+        crit("listen() failed, error listening on socket");
+    }
+
+    log(4, "init_tcp_server_socket %s:%d", inet_ntoa(sin.sin_addr), port);
+    return s;
+}
+
+int init_tcp_client_socket(char *addr, unsigned short port)
+{
+    int s;
+    struct sockaddr_in sin;
+
+    port = port ? port : TCP_DPORT;
+    sin = make_addr(addr, port);
+
+    if ((s = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+        crit("socket() failed");
+    }
+
+    if (connect(s, (struct sockaddr *)&sin, sizeof(struct sockaddr_in)) < 0) {
+        crit("connect() failed");
+    }
+
+    log(4, "init_tcp_client_socket %s:%d", inet_ntoa(sin.sin_addr), port);
+    return s;
+}
diff --git a/code/jasmine/tcp-common.h b/code/jasmine/tcp-common.h
new file mode 100755 (executable)
index 0000000..6ac48e1
--- /dev/null
@@ -0,0 +1,18 @@
+/*##############################################################################
+# Copyright (c) 2017 ZTE Coreporation and others.
+# hu.zhijiang@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################*/
+
+#ifndef _MCAST_TCP_COMMON_H
+#define _MCAST_TCP_COMMON_H
+
+#define TCP_DPORT DEF_PORT
+
+int init_tcp_server_socket(char *addr, unsigned short port);
+int init_tcp_client_socket(char *addr, unsigned short port);
+
+#endif
diff --git a/code/jasmine/tcp-queue.c b/code/jasmine/tcp-queue.c
new file mode 100755 (executable)
index 0000000..63a57f2
--- /dev/null
@@ -0,0 +1,178 @@
+/*##############################################################################
+# Copyright (c) 2017 ZTE Coreporation and others.
+# hu.zhijiang@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################*/
+
+/* TCP poll message single linked queue */
+#include <stdlib.h>
+#include <string.h>
+
+#include "misc.h"
+#include "tcp-queue.h"
+
+void tcpq_queue_tail(struct tcpq *q, void *data, long size)
+{
+    struct qmsg *tmp;
+
+    if (!q) {
+        return;
+    }
+
+    tmp = (struct qmsg *)wrapper_malloc(sizeof(struct qmsg));
+    tmp->next = NULL;
+
+    if (!q->head) {
+        q->head = q->tail = tmp;
+    } else {
+        q->tail->next = tmp;
+        q->tail = tmp;
+    }
+
+    q->tail->data = data;
+    q->tail->size = size;
+    q->size += size;
+    q->count++;
+}
+
+void tcpq_queue_head(struct tcpq *q, void *data, long size)
+{
+    struct qmsg *tmp;
+
+    if (!q) {
+        return;
+    }
+
+    tmp = (struct qmsg *)wrapper_malloc(sizeof(struct qmsg));
+    tmp->next = NULL;
+
+    if (!q->head) {
+        q->head = q->tail = tmp;
+    } else {
+        tmp->next = q->head;
+        q->head = tmp;
+    }
+
+    q->head->data = data;
+    q->head->size = size;
+    q->size += size;
+    q->count++;
+}
+
+void * tcpq_dequeue_head(struct tcpq *q, long *size)
+{
+    void *res = NULL;
+    struct qmsg *tmp;
+
+    if (q && q->head) {
+        res = q->head->data;
+        *size = q->head->size;
+
+        tmp = q->head;
+        q->head = q->head->next;
+        if (!q->head) {
+            q->tail = NULL;
+        }
+
+        free(tmp);
+        q->count--;
+        q->size -= *size;
+    }
+    return res;
+}
+
+void * tcpq_queue_peek(struct tcpq *q, long *size)
+{
+    if (q && q->head) {
+        *size = q->head->size;
+        return q->head->data;
+    }
+
+    return NULL;
+}
+
+long tcpq_queue_dsize(struct tcpq *q)
+{
+    if (q) {
+        return q->size;
+    }
+
+    return 0;
+}
+
+void tcpq_queue_free(struct tcpq *q)
+{
+    struct qmsg *tmp;
+
+    if (!q) {
+        return;
+    }
+
+    while (q->head) {
+        tmp = q->head;
+        q->head = tmp->next;
+        free(tmp->data);
+        free(tmp);
+    }
+
+    q->count = 0;
+    q->size = 0;
+    q->head = q->tail = NULL;
+}
+
+struct tcpq * tcpq_queue_init(void)
+{
+    struct tcpq *q = wrapper_malloc(sizeof(struct tcpq));
+
+    q->count = 0;
+    q->size = 0;
+    q->head = q->tail = NULL;
+    return q;
+}
+
+void * tcpq_dqueue_flat(struct tcpq *q, long *size)
+{
+    void *res;
+    struct qmsg *tmp;
+    long offs = 0;
+
+    if (!q || q->count == 0) {
+        return NULL;
+    }
+
+    res = wrapper_malloc(q->size);
+    *size = q->size;
+
+    while (q->head) {
+        memcpy(res + offs, q->head->data, q->head->size);
+        offs += q->head->size;
+
+        tmp = q->head;
+        q->head = tmp->next;
+        free(tmp->data);
+        free(tmp);
+    }
+    tcpq_queue_free(q);
+    return res;
+}
+
+void * tcpq_queue_flat_peek(struct tcpq *q, long *size)
+{
+    void *cpy;
+
+    if (!q) {
+        return NULL;
+    }
+
+    if (q->count > 1) {
+        cpy = tcpq_dqueue_flat(q, size);
+        tcpq_queue_tail(q, cpy, *size); /* use tcpq_queue_head is also OK */
+    } else {
+        cpy = tcpq_queue_peek(q, size);
+    }
+
+    return cpy;
+}
diff --git a/code/jasmine/tcp-queue.h b/code/jasmine/tcp-queue.h
new file mode 100755 (executable)
index 0000000..4f096f8
--- /dev/null
@@ -0,0 +1,35 @@
+/*##############################################################################
+# Copyright (c) 2017 ZTE Coreporation and others.
+# hu.zhijiang@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################*/
+
+#ifndef _MCAST_TCP_QUEUE_H
+#define _MCAST_TCP_QUEUE_H
+
+struct tcpq {
+    struct qmsg *head, *tail;
+    long count; /* message count in a queue */
+    long size; /* Total data size of a queue */
+};
+
+struct qmsg {
+    struct qmsg *next;
+    void *data;
+    long size;
+};
+
+struct tcpq * tcpq_queue_init(void);
+void tcpq_queue_free(struct tcpq *q);
+long tcpq_queue_dsize(struct tcpq *q);
+void tcpq_queue_tail(struct tcpq *q, void *data, long size);
+void tcpq_queue_head(struct tcpq *q, void *data, long size);
+void * tcpq_dequeue_head(struct tcpq *q, long *size);
+void * tcpq_queue_peek(struct tcpq *q, long *size);
+void * tcpq_dqueue_flat(struct tcpq *q, long *size);
+void * tcpq_queue_flat_peek(struct tcpq *q, long *size);
+
+#endif
diff --git a/code/jasmine/udp-common.c b/code/jasmine/udp-common.c
new file mode 100755 (executable)
index 0000000..c731422
--- /dev/null
@@ -0,0 +1,97 @@
+/*##############################################################################
+# Copyright (c) 2017 ZTE Coreporation and others.
+# hu.zhijiang@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################*/
+
+#include <fcntl.h>
+#include <net/if.h>
+#include <sys/ioctl.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <arpa/inet.h>
+
+#include "buffer.h"
+#include "udp-common.h"
+#include "misc.h"
+
+struct mc_info mcinfo;
+
+int init_mcast_socket(struct in_addr *local_addr,
+                      struct sockaddr_in *maddr)
+{
+    struct ip_mreqn mreqn;  /* multicast request new */
+    int ms;  /* multicast socket */
+    u_short msock_port;
+    struct sockaddr_in bind_addr;
+
+    int msock_reuse = MCAST_REUSE;
+    u_char msock_loop = MCAST_LOOP;
+    u_char msock_ttl = MCAST_TTL;
+
+    if ((ms = socket(PF_INET, SOCK_DGRAM, 0)) < 0) {
+        crit("socket() failed, error allocating multicast socket");
+    }
+
+    // In order to let client and server on same host to use same port
+    if (setsockopt(ms, SOL_SOCKET, SO_REUSEADDR,
+        &msock_reuse, sizeof(msock_reuse)) < 0) {
+        crit("setsockopt() failed, can't set reuse flag");
+    }
+
+    if (setsockopt(ms, IPPROTO_IP, IP_MULTICAST_TTL,
+        &msock_ttl,sizeof(msock_ttl)) < 0) {
+        crit("setsockopt() failed, can't set ttl value");
+    }
+
+    if (setsockopt(ms, IPPROTO_IP, IP_MULTICAST_LOOP,
+        &msock_loop, sizeof(msock_loop)) < 0) {
+        crit("setsockopt() failed, can't set multicast packet looping");
+    }
+
+    /* TODO: Do we neet non-block? */
+
+    log(4, "Using multicast address: %s:%d",
+        inet_ntoa(maddr->sin_addr), ntohs(maddr->sin_port));
+
+    mreqn.imr_multiaddr = maddr->sin_addr;
+    mreqn.imr_address = *local_addr;
+    mreqn.imr_ifindex = 0;
+
+    /* Interface and remote mcast address choosing */
+
+    /* This tell interface which has mreqn.imr_address to join
+       mreqn.imr_multiaddr group, it has nothing to do with socket, port, and
+       mreqn.imr_address. */
+    if (setsockopt(ms, IPPROTO_IP, IP_ADD_MEMBERSHIP,
+        &mreqn, sizeof(mreqn)) < 0) {
+        crit("setsockopt() failed, %s can't join multicast group",
+             inet_ntoa(*local_addr));
+    }
+
+    /* local address choosing */
+
+    /* This tell kernel to use interface which has mreqn.imr_address as device
+       and mreqn.imr_address as source addr when sending any multicast through
+       socket. */
+    if (setsockopt(ms, IPPROTO_IP, IP_MULTICAST_IF,
+        &mreqn, sizeof(mreqn)) < 0) {
+        crit("setsockopt() failed to IP_MULTICAST_IF.\n");
+    }
+
+    /* Local port choosing, remote port is choosed when calling sendto()! */
+
+    /* This let us uses specific udp port number as source port when sending
+       any multicast datagrams. So ip in maddr is useless here and should be
+       INADDR_ANY, otherwise, bind will return Invalid argument. */
+    msock_port = ntohs(maddr->sin_port);
+    bind_addr = make_addr(0, msock_port);
+    if (bind(ms, (struct sockaddr *)&bind_addr, sizeof(bind_addr)) < 0) {
+        crit("bind() error, can't bind mcast port");
+    }
+
+    return ms;
+}
diff --git a/code/jasmine/udp-common.h b/code/jasmine/udp-common.h
new file mode 100755 (executable)
index 0000000..861545b
--- /dev/null
@@ -0,0 +1,42 @@
+/*##############################################################################
+# Copyright (c) 2017 ZTE Coreporation and others.
+# hu.zhijiang@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################*/
+
+#ifndef _MCAST_UDP_COMMON_H
+#define _MCAST_UDP_COMMON_H
+
+#include <unistd.h>
+#include <netinet/ip.h>
+#include <netinet/udp.h>
+
+#define MCAST_DPORT DEF_PORT
+
+#define MCAST_TTL 8
+/*
+ * Force reuse
+ */
+#define MCAST_REUSE 1
+
+/*
+ * Loop to let server host itself can receive data too.
+ */
+#define MCAST_LOOP 1
+#define MCAST_ADDR_BASE "224.238.0."
+#define MCAST_ADDR_SUFFIX "31"
+
+// Initial data
+struct mc_info {
+    struct sockaddr_in group;
+};
+
+extern struct mc_info mcinfo;
+
+// How to capture udp packets: tcpdump -i ethx udp -vv -n
+
+int init_mcast_socket(struct in_addr *local_addr, struct sockaddr_in *maddr);
+#endif