# # PreforkAgent # # Allows execution of many jobs in parallel. # Parent / child communication is implemented with pipes. # # Steffen Heinrich - Jun, 2007 # use strict; package PreforkAgent; my $VERSION = '0.04'; ################################################# # libs and class vars use IO::Select; my $EOF_MSG_SEQ = "\x1F"; # ASCII cotrol character US (Unit Separator) ############################ # constructor sub new { my $class = shift; my $me = bless { debug_out => 0, parent => $$, listener => IO::Select->new(), kids_to_spawn => 0, kids => 0, tasks_pending => 0, living => {}, pids => {}, jobs => {}, waiting => {}, child_prepare => sub {1} }, $class; $me } ############################ # methods sub register { # registers one or more callback routines with the agent my $self = shift() or return; my %subs = @_ or return; my @errors; while (my ($s, $c) = each %subs) { if ($s !~ /^(child_prepare|fetch_next_task|process_job|process_response)$/) { push @errors, "'$s' is not a known sub"; } elsif (defined($c) && ref($c) && ref($c) =~ /CODE/) { $self->{$s} = $c; } else { push @errors, "'$s' does not reference a sub"; } } die join("\n", @errors)."\n" if @errors; } sub spawn { # creates a given number of children # and opens bidrectional pipes for each my $self = shift() or return; my $kids_to_spawn = shift() or return; $self->{kids_to_spawn} = $kids_to_spawn; my $process_job = $self->{process_job} or die "process_job() must have been registered with PreforkAgent before a call to spawn()!\n"; my $sel = $self->{listener}; # prevent zombies since we won't wait() # $SIG{CHLD} = 'IGNORE'; # causes "Can't ignore signal CHLD, forcing to default" in response of checkout-script # fork loop for my $child (1..$kids_to_spawn) { my $whdl = 'W'.$child; my $rhdl = 'R'.$child; { no strict 'refs'; # open bidirect comm pipe $rhdl, WH or die "pipe1: $!"; # parent <- child pipe RH, $whdl or die "pipe2: $!"; # child <- parent # register the read handle with the ones to listen to $sel->add(\*$rhdl); } # save write handle connected with readhandle $self->{living}{$child} = $whdl; select((select(WH), $| = 1)[0]); # autoflush select((select($whdl), $| = 1)[0]); # autoflush my $pid; unless ($pid = fork()) { # Child process # closes unnecessary handles close $rhdl; close $whdl; # execute individual initialization my $init = $self->{child_prepare}; defined(&$init($child)) or die; # creates a new listener $sel = IO::Select->new; # registers the one handle to listen to $sel->add(\*RH); # signals readiness _write_into_pipe(\*WH, 'READY'); while ($sel->can_read) { my $job = _read_from_pipe(\*RH); if ($job eq 'QUIT') { last; } else { # do something my $answer = &$process_job($job, $child); _write_into_pipe(\*WH, $answer); } } # child is done, unload and quit $sel->remove(\*RH); close WH; close RH; exit 0; } # Parent process closes unnecessary handles close WH; close RH; # and registers child $self->{pids}{$child} = $pid; # parent catches SIGINT $SIG{INT} = sub {$self->cleanup()} unless $self->{kids}++; } # loop to start others ($self->{kids} == $self->{kids_to_spawn}) or die "Could only spawn $self->{kids} kids of $self->{kids_to_spawn}: $!"; $self->{kids} } # end of spawn() sub assign { # Sending out jobs to any child which is ready to listen # and collecting any responses which are then being reported to the registered callback. # As long as their are more jobs to do, they are being immediately assigned to returning children. my $self = shift() or return; my $fetch_next_task = $self->{fetch_next_task}; my $process_response = $self->{process_response}; ($fetch_next_task && $process_response) or die "fetch_next_task() and process_response() must have been registered with PreforkAgent before a call to assign()!\n"; $self->{kids} > 0 or die "You need to call spawn(kids) before assigning jobs!\n"; my $sel = $self->{listener}; $self->{tasks_pending} = 1; # work loop while ($self->{kids}) { # wait on children that have output pending while (my @ready = $sel->can_read()) { foreach my $rhdl (@ready) { my $child = ''; # regain child ID from read handle { no strict 'refs'; *{$rhdl} =~ /^(.+::)?R(\d+)$/ and $child = $2; } unless ($self->{waiting}{$child}) { # this child may have already been served if delay() # was called within this foreach loop # in which case the following read would hang. my $response = _read_from_pipe($rhdl); unless ($response eq 'READY') { &$process_response($response, $child); } } $self->_fill_or_kill($child, $rhdl); } # now dispatch children that have been put on hold while (my ($child, $rhdl) = each %{$self->{waiting}}) { $self->_fill_or_kill($child, $rhdl); } } } # since all children exited # we don't have to wait() $self->cleanup(); } # end of assign() sub _fill_or_kill { # either assign the next task or tell child to quit my ($self, $child, $rhdl) = @_ or return; my $fetch_next_task = $self->{fetch_next_task}; # assign next task my $task = undef; $self->{tasks_pending} = $self->{tasks_pending} && defined($task = &$fetch_next_task($child)); my $whdl = $self->{living}{$child}; if ($child && $whdl && $self->{tasks_pending}) { _write_into_pipe($whdl, $task); $self->{jobs}{$child}++ if $self->{debug_out}; } else { # tell child to exit _write_into_pipe($whdl, 'QUIT'); # unregister child $self->{listener}->remove($rhdl); delete $self->{living}{$child}; # close handles to child close $rhdl; close $whdl; $self->{kids}--; } # child is now busy or told to quit delete $self->{waiting}{$child}; } sub delay { # for the delay passed in only collect responses. # don't reassign new jobs. my $self = shift() or return; my $delay = shift() or return; my $process_response = $self->{process_response}; my $sel = $self->{listener}; my $bits = $sel->bits(); my $rin = $bits; while ($delay > 0) { while ((my $num, $delay) = select($rin, undef, undef, $delay)) { if ($num > 0) { my @ready = $sel->handles($rin); foreach my $rhdl (@ready) { my $child = ''; { no strict 'refs'; *{$rhdl} =~ /^(.+::)?R(\d+)$/ and $child = $2; } my $response = _read_from_pipe($rhdl); unless ($response eq 'READY') { &$process_response($response, $child); } # remember this child, we can't 'select' it anymore because pipe is empty $self->{waiting}{$child} = $rhdl; } # delete channels served $bits ^= $rin; $rin = $bits; } last unless $delay; # should be a timeout } } } # end of delay() ############################ # subroutines sub _read_from_pipe { my ($fh) = @_; my $blksize = (stat $fh)[11] || 16384; my $offset = 0; my $buf = ''; while (my $len = sysread($fh, $buf, $blksize, $offset)) { if (!defined $len) { next if $! =~ /^Interrupted/; die "System Read Error: $!\n"; } $offset += $len; last if $buf =~ s/$EOF_MSG_SEQ$//o; } $buf } sub _write_into_pipe { my ($fh, $msg) = @_; $msg .= $EOF_MSG_SEQ; my $length = length($msg); my $blksize = (stat $fh)[11] || 16384; my $offset = 0; while ($length) { my $len = syswrite($fh, $msg, $blksize, $offset); die "System Write Error: $!\n" unless defined $len; $length -= $len; $offset += $len; } $offset } sub cleanup { my $self = shift() or return; # only for parent return unless $self->{parent} == $$; if ($self->{debug_out}) { my $job_cnt = $self->{jobs}; foreach my $child (sort {$job_cnt->{$b} <=> $job_cnt->{$a}} keys %$job_cnt) { printf STDERR "%4s: %5d\n", $child, $job_cnt->{$child}; } print STDERR "\$kids = $self->{kids}\n"; print STDERR "\%living count = ", scalar(keys %{$self->{living}}), "\n"; print STDERR "select bitmap = '", defined $self->{listener}->bits()?(unpack 'b*', $self->{listener}->bits()):'', "'\n"; } while (my ($kid, $pid) = each %{$self->{pids}}) { my $ps = `ps $pid`; if ($ps =~ /$0\b/so) { print STDERR "killing $pid\n"; `kill $pid`; } delete $self->{pids}{$kid}; } } 1 __END__ =pod =head1 NAME PreforkAgent - A dispatch wrapper for simultanous tasks. =head1 PURPOSE Any big number of similar tasks that have to be run in parallel with outmost throughput. First, a given number of children is spawned. Then each of them is handed the next task from a common queue in succsession as they return with a response. =head1 SYNOPSIS use PreforkAgent; my $pfa = PreforkAgent->new or die; $pfa->register( child_prepare => \&individual_init, # this sub is optional fetch_next_task => \&next_job, process_job => \&dispatch, process_response => \&collect_response ); my $GLOBAL_VAR = "fancy value"; my $kid_count = $pfa->spawn(5) or die; $pfa->assign(); exit; sub individual_init { # child will die if false is returned # this sub is optional my $child_id = shift() or return; # Child context: # can read $GLOBAL_VAR at time of spawn(), but not change # any initialization to be done by each child goes here ... return $success; } sub next_job { # must return a string, which will be subsequently passed to dispatch() # and MUST return undef, if finished. # Enables the main program to tie a certain job to a specific child's response returned in collect_response(). my $child_id = shift; # Parent context: # can read AND write $GLOBAL_VAR ... return $str_job; } sub dispatch { # defines the parallel task for the children # processes the job and returns a serialized response my $str_job = shift() or return; my $child_id = shift() or return; # Child context: # can read $GLOBAL_VAR at time of spawn(), but not change ... return $str_response; } sub collect_response { # allows the main program to evaluate any of the child's reponses my $response = shift() or return; my $child_id = shift() or return; # Parent context: # can read AND write $GLOBAL_VAR ... } =head1 DESCRIPTION =over 4 =item new() Takes no parameters. =item register(%CALLBACKS) =item spawn($CHILD_PROCESSES) Forks the given number of children or dies. On success the same number is returned. Calling this at a point in your script where overall memory consumption is low, helps to reduce the footprint of your application. (Usually as early as possible.) It also helps to C<'require'> those modules after the call that are not needed by the children. =item assign() Takes no parameters. Does the actual work. =item delay($SECOND_FRACTIONS) May be called to postpone further assignments until after the timeout. Returning children are still being served. The only sensible location for a call is the C callback; =back =head1 Callbacks =head1 SEE ALSO ParallelUserAgent by Marc Langheinrich =head1 VERSION This document describes version 0.04. =head1 LICENSE Copyright (C) 2007, Steffen Heinrich. All rights reserved. This module is free software; you can redistribute it and/or modify it under the same terms as Perl itself. =cut