Listing 1 clusterpunchserver
#!/usr/local/bin/perl
use strict;
use Data::Dumper;
use Fcntl ':flock';
use FindBin;
use Getopt::Std;
use IO::Socket qw(:DEFAULT :crlf);
use POSIX qw(strftime setsid);
use POSIX 'WNOHANG';
use Time::HiRes qw(tv_interval gettimeofday);
use Config::General;
use lib "$FindBin::RealBin/../lib";
use clusterpunch; # accessory functions
use vars qw($opt_f $opt_l $opt_d $opt_p $opt_h $opt_D $opt_v $opt_s);
getopts("f:l:p:vdshD");
my $VERSION = "0.1";
$Data::Dumper::Varname = "STAT";
$Data::Dumper::Purity = 1;
# Limit incoming messages to 1000 bytes.
use constant MAX_MSG_LEN => 1000;
################################################################
# Load parameters from configuration files into %CONFIG.
# Parameters can be redefined if multiple files are read in
################################################################
chomp(my $SERVERNAME = `hostname -s`);
my %CONFIG = LoadConfig(ConfigFiles($opt_f));
# Process command line parameters, overriding any
# configuration file settings
$CONFIG{"verbose"} = (grep(defined $_, ($opt_v,$CONFIG{"verbose"},0)))[0];
$CONFIG{"debug"} = (grep(defined $_, ($opt_D,$CONFIG{"debug"},0)))[0];
$clusterpunch::debug = $CONFIG{"debug"};
$CONFIG{"logging"} = (grep(defined $_, ($opt_s,$CONFIG{"logging"},0)))[0];
$CONFIG{"port"} = (grep(defined $_, ($opt_p,$CONFIG{"port"},8095)))[0];
$CONFIG{"logdir"} = (grep(defined $_, ($opt_l,$CONFIG{"logdir"},"/tmp")))[0];
$CONFIG{"daemon"} = (grep(defined $_, ($opt_d,$CONFIG{"daemon"},0)))[0];
$CONFIG{"daemon"} = 0 if $CONFIG{"verbose"};
# Make sure we can write to the LOG directory, if it is defined
$CONFIG{logdir} = undef
if defined $CONFIG{logdir} && (! -d $CONFIG{logdir} || ! -w _);
$SIG{HUP} = $SIG{INT} = \&Finish;
$SIG{CHLD} = \&Reaper;
foreach (keys %CONFIG) {
Log("$_ ",(ref($CONFIG{$_}) eq "ARRAY")?int(@{$CONFIG{$_}}):$CONFIG{$_});
}
my ($msg_in,$clienthost,$clientport)=(undef,"-",undef);
my $sock = InitiateSocket(port=>$CONFIG{port});
if(! $sock) {
Log("Server already running? Cannot create socket - aborting start request.");
exit 0;
}
# Run in the background, if daemon is requested and if verbose is not requested.
if(! $CONFIG{"verbose"} && $CONFIG{"daemon"}) {
Log("Backgrounding");
BecomeDaemon();
} else {
Log("Running in foreground");
}
Log("Servicing incoming requests - parent PID $$");
# loop:
# parent->listen
# if(connection)
# parent->forkchild child->process_connection
# child->execute_commands
# child->send_results_to_client
# child->exit
while (1) {
# endlessly wait for something from the socket
next unless $sock->recv($msg_in,MAX_MSG_LEN);
# parse incoming commands and reload the configuration
# if required
my @commands = ProcessMessage($msg_in);
# check if we need to load up the punches from file again
if(grep (/reload/, map {$_->{command}} @commands)) {
%CONFIG = LoadConfig(ConfigFiles($opt_f));
}
# fork off a child, which handles the request
# becareful - the child may be receiving a shutdown signal which
# it must communicate back to the parent!
my $parentpid = $$;
my $childpid = fork;
if(! defined $childpid) {
Log("problem forking - exiting");
exit;
}
if($childpid) {
# we're in the parent - wait for next connection
Log("Parent $$ has forked process $childpid");
next;
}
# we're in the child ... process connection and quit
Log("Child $$ from parent $parentpid - processing connection");
$clienthost = IdentifyClient($sock);
$clientport = $sock->peerport;
my $msglength = length($msg_in);
Log("RCV from $clienthost command $msg_in $msglength bytes");
################################################################
# Iterate through all commands and take appropriate action
# Each command is the hash {command=>SCALAR,args=>LISTREF}
my %STAT;
# First check if we need to shutdown!
if(grep (/shutdown|halt|stop/, map {$_->{command}} @commands)) {
# must send the parent a HUP signal
Log("sending HUP to parent $parentpid");
my $cnt = kill 1, $parentpid;
Log("kill returned $cnt");
# finish the child
Finish();
}
# Check if we need debugging
$clusterpunch::debug = 1 if grep (/debug/, map {$_->{command}} @commands);
my @punches = @{$CONFIG{punch}};
PUNCH: foreach my $command (@commands) {
my $commandtext = $command->{command};
my ($punch) = grep($_->{name} eq $commandtext, @punches);
next PUNCH unless $punch;
# found a punch for the current command text
my @args = @{$command->{args}};
my ($punch_name,$statistic,$cumulative,$valuemap,$valuetype,$function,$system) =
@{$punch}{qw(name statistic cumulative valuemap valuetype function system)};
Log("PUNCH($commandtext) found valuetype $valuetype");
my ($timer,$call_value,$punch_value);
$timer = [gettimeofday] if $valuetype eq "timer";
if($function) {
@_ = @args;
$call_value = eval $function;
if($@) {
Log("PUNCH($commandtext) could not parse function");
next PUNCH;
}
} elsif ($system) {
Log("PUNCH($commandtext) system [$system]");
open(PROC,"$system |");
while(<PROC>) {
s/^\s+//;
$call_value .= $_;
}
close(PROC);
chomp $call_value;
}
if($valuetype eq "return") {
$punch_value = $call_value;
} else {
$punch_value = tv_interval($timer);
}
if($valuemap) {
@_ = ($punch_value);
$punch_value = eval $valuemap;
if($@) {
Log("PUNCH($commandtext) could not parse valuemap");
next PUNCH;
}
}
Log("PUNCH($commandtext) returned $punch_value");
$STAT{$statistic} = $punch_value;
$STAT{$cumulative} += $punch_value if defined $cumulative;
next PUNCH;
if($commandtext eq "mem") {
# append the argument to the memory field e.g. memfree, memtotal, memswapused
my $field = "mem";
if(@args) {
$field .= $args[0];
} else {
$field .= "free";
}
$STAT{$field} = MemFree(@args);
Log("return $STAT{memfree}");
}
}
$STAT{live} = 1;
$STAT{host} = $SERVERNAME;
Log(Dumper(\%STAT));
my $dump = Dumper(\%STAT);
$sock->send(pack("a*",$dump)) or die "send(): $!\n";
exit;
}
# should never get here ... but if we do
$sock->close();
exit 0;
################################################################
#
# Parse socket messages
#
# The message $msg_in is of the format
# cmd(arg1,arg2,...,argn);cmd(...);...;cmd(...)
#
# These are parsed into the @commands array which has the format
# $commands[n] = {command=>commandtext,args=>[arg1,arg2,...,argn]};
#
################################################################
sub ProcessMessage {
my $msg_in = shift;
my @commandblocks = split(/;/,$msg_in);
my @commands;
foreach my $commandblock (@commandblocks) {
my $command;
my @args;
if($commandblock =~ /(.*?)\((.*)\)/) {
$command = $1;
@args = split(/,/,$2);
} else {
$command = $commandblock;
}
push(@commands,{command=>$command,args=>\@args});
}
return @commands;
}
################################################################
#
# Daemonize - run the script in the background
#
################################################################
sub BecomeDaemon {
my $child = fork;
if(! defined $child) {
die "cannot fork";
}
if($child) {
Log("$SERVERNAME going to daemon mode");
exit 0;
}
setsid();
open(STDIN,"</dev/null");
open(STDOUT,">/dev/null");
open(STDERR,">&STDOUT");
chdir "/";
umask(0);
$ENV{PATH} = "/usr/local/bin:/usr/local/sbin:/bin/:/sbin:/usr/bin/:/usr/sbin";
return $$;
}
################################################################
# Initiate and return the socket
################################################################
sub InitiateSocket {
my %input = @_;
my $port = $input{port} || return undef;
my $socket = IO::Socket::INET->new(Proto=>"udp",LocalPort=>$port);
return $socket;
}
################################################################
# Identify the client connecting to us
################################################################
sub IdentifyClient {
my $socket = shift;
my $clienthost = gethostbyaddr($sock->peeraddr,AF_INET) || $sock->peerhost;
$clienthost =~ s/(.*?)\..*/$1/ if $clienthost =~ /[A-Za-z]/;
return $clienthost;
}
################################################################
# Reap the children to avoid zombies
################################################################
sub Reaper {
while (waitpid(-1,WNOHANG)>0) {}
}
################################################################
# Cleanly exit - close the socket and log
################################################################
sub Finish {
$sock->close();
Log("pid $$ shutting down - Finish()");
exit 0;
}
################################################################
# Log to file using a semaphore file as a lock
################################################################
sub Log {
my $timestamp = strftime "%Y-%m-%d %H:%M:%S",localtime;
$clienthost ||= "-";
if($CONFIG{verbose}) {
warn "$SERVERNAME | $clienthost [$timestamp] ",@_,"\n";
}
if($CONFIG{logging} && $CONFIG{logdir}) {
if(open(LOGSEM,">$CONFIG{logdir}/$SERVERNAME.sem")) {
flock(LOGSEM,LOCK_EX);
if(open(LOG,">>$CONFIG{logdir}/$SERVERNAME.log")) {
print LOG "$SERVERNAME | $clienthost [$timestamp] ",@_,"\n";
close(LOG);
}
flock(LOGSEM,LOCK_UN);
close(LOGSEM);
}
}
}
|