Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 19 additions & 19 deletions br
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ usage() {
}

showhelp() {
echo "Usage: `basename $1`: [-m host1 [host2...]] [-c column] [-r reduce] [-i input] [-o output]"
echo "Usage: `basename "$1"`: [-m host1 [host2...]] [-c column] [-r reduce] [-i input] [-o output]"
echo "bashreduce. Map an input file to many hosts, sort/reduce, merge"
echo " -m: hosts to use, can repeat hosts for multiple cores"
echo " default hosts from /etc/br.hosts"
Expand Down Expand Up @@ -41,17 +41,17 @@ while getopts "m:c:r:i:o:t:S:h" name; do
o) output=$OPTARG;;
t) tmp_dir=$OPTARG;;
S) sort_mem=$OPTARG;;
h) showhelp $0;;
[?]) usage $0;;
h) showhelp "$0";;
[?]) usage "$0";;
esac
done

if [ -z "$hosts" ]; then
if [ -e /etc/br.hosts ]; then
hosts=`cat /etc/br.hosts`
else
echo "`basename $0`: must specify hosts with -m or provide /etc/br.hosts"
usage $0
echo "`basename "$0"`: must specify hosts with -m or provide /etc/br.hosts"
usage "$0"
fi
fi

Expand All @@ -62,7 +62,7 @@ fi
jobid="`uuidgen`"
jobpath="$tmp_dir/br_job_$jobid"
nodepath="$tmp_dir/br_node_$jobid"
mkdir -p $jobpath/{in,out}
mkdir -p "$jobpath"/{in,out}

# now, for each host, set up in and out fifos (and a netcat for each), and ssh to each host to set up workers listening on netcat

Expand All @@ -73,30 +73,30 @@ out_files=

for host in $hosts; do
# our named pipes
mkfifo $jobpath/{in,out}/$host_idx
mkfifo "$jobpath"/{in,out}/$host_idx
# lets get the pid of our listener
ssh -n $host "mkdir -p $nodepath"
pid=$(ssh -n $host "nc -l -p $port_out >$nodepath/in_$host_idx 2>/dev/null </dev/null & jobs -l" | awk {'print $2'})
ssh $host -n "tail -s0.1 -f --pid=$pid $nodepath/in_$host_idx 2>/dev/null </dev/null | LC_ALL='$LC_ALL' sort -S$sort_mem -T$tmp_dir -k$mapcolumn,$mapcolumn 2>/dev/null $reduce | nc -q0 -l -p $port_in >&/dev/null &"
ssh -n $host "mkdir -p \"$nodepath\""
pid=$(ssh -n $host "nc -l -p $port_out >\"$nodepath\"/in_$host_idx 2>/dev/null </dev/null & jobs -l" | awk {'print $2'})
ssh $host -n "tail -s0.1 -f --pid=$pid \"$nodepath\"/in_$host_idx 2>/dev/null </dev/null | LC_ALL='$LC_ALL' sort -S$sort_mem -T\"$tmp_dir\" -k$mapcolumn,$mapcolumn 2>/dev/null $reduce | nc -q0 -l -p $port_in >&/dev/null &"
# our local forwarders
nc $host $port_in >$jobpath/in/$host_idx &
nc -q0 $host $port_out <$jobpath/out/$host_idx &
nc $host $port_in >"$jobpath"/in/$host_idx &
nc -q0 $host $port_out <"$jobpath"/out/$host_idx &
# our vars
out_files="$out_files $jobpath/out/$host_idx"
out_files="$out_files \"$jobpath\"/out/$host_idx"
port_in=$(($port_in + 2))
port_out=$(($port_in + 1))
host_idx=$(($host_idx + 1))
done

# okay, time to map
if which brp >/dev/null; then
eval "${input:+pv $input |} brp - $(($mapcolumn - 1)) $out_files"
eval "${input:+pv \"$input\" |} brp - $(($mapcolumn - 1)) $out_files"
else
# use awk if we don't have brp
# we're taking advantage of a special property that awk leaves its file handles open until its done
# i think this is universal
# we're also sending a zero length string to all the handles at the end, in case some pipe got no love
eval "${input:+pv $input |} awk '{
eval "${input:+pv \"$input\" |} awk '{
srand(\$$mapcolumn);
print \$0 >>\"$jobpath/out/\"int(rand() * $host_idx);
}
Expand All @@ -108,18 +108,18 @@ fi

# save it somewhere
if which brm >/dev/null; then
eval "brm - $(($mapcolumn - 1)) `find $jobpath/in/ -type p | xargs` ${output:+| pv >$output}"
eval "brm - $(($mapcolumn - 1)) `find "$jobpath"/in/ -type p -print0 | xargs -0` ${output:+| pv >\"$output\"}"
else
# use sort -m if we don't have brm
# sort -m creates tmp files if too many input files are specified
# brm doesn't do this
eval "sort -k$mapcolumn,$mapcolumn -m $jobpath/in/* ${output:+| pv >$output}"
eval "sort -k$mapcolumn,$mapcolumn -m \"$jobpath\"/in/* ${output:+| pv >\"$output\"}"
fi

# finally, clean up after ourselves
rm -rf $jobpath
rm -rf "$jobpath"
for host in $hosts; do
ssh $host "rm -rf $nodepath"
ssh $host "rm -rf \"$nodepath\""
done

# TODO: is there a safe way to kill subprocesses upon fail?
Expand Down