For all its warts, Perl is a powerful programming language that will continue to be used for some time. If programming languages were people, Perl would be your crazy uncle with a very cluttered but very well stocked workshop. One of its more powerful features is the threads api.
I have noticed that programs which create and join a lot of threads seem to consume more and more memory. I'm not sure why this happens, but I think it is because the memory allocated when the threads are created isn't deallocated when they are joined. (This is speculation).
The solution to this is to create a batch of threads which will run for the entire life of your program. Instead of being joined and returning, they instead communicate via queues created using Perl's thread-safe Thread::Queue library.
In the name of full disclosure I need to state that I could not have done this without the pool_reuse.pl example found here. The aim of this article is to supplement the provided example by providing a more verbose explanation, as well as an example where data is received from the threads.
Several queues are created. First we make a queue which holds the ids of threads which aren't currently busy. After this is done, we run through a loop where we create an input queue, an output queue, and a thread. In our example, the threads are for functions which add a random number to the input. The input and output queues are passed as arguments. The queues are assigned to values in hashes, keyed by the thread ids.
When we want to do actual work, we dequeue an available thread from the thread queue. We'll then store which thread we are using in a data structure of some kind, and then dequeue that thread's output queue later. It is important to keep track of which queues we are using, because dequeue is a blocking operation. If you try to dequeue an empty queue, it will simply sit there until something has been enqueued. Depending upon how you've written things, that could be never.
Please see the example below:
#!/usr/bin/perl
use threads;
use threads::shared;
use Thread::Queue;
use strict;
#sentinal variable for threads
my $finished :shared = 0;
#number of threads
my $numThreads = 10;
my %inputQueues;
my %outputQueues;
my $threadQueue = Thread::Queue->new();
for (my $i = 0; $i < $numThreads; $i ++) {
#create input queue for the thread
my $inputQueue = Thread::Queue->new();
#create output queue for the thread
my $outputQueue = Thread::Queue->new();
my $thread = threads->new(\&randomAdd,$inputQueue,$outputQueue);
my $tid = $thread->tid();
$inputQueues{$tid} = $inputQueue;
$outputQueues{$tid} = $outputQueue;
}
#signal handler for clean exit
$SIG{'SIGINT'} = $SIG{'SIGTERM'} =
sub {
$finished = 1;
$threadQueue->enqueue(0,-1);
};
my @initialValues;
#fill up the array with random numbers
for (my $i = 0; $i < 10; $i ++) {
push @initialValues, int(rand(100));
}
for (my $i = 0; $i < 10; $i ++) {
print "$i\n";
my @output;
my @tids;
foreach my $value (@initialValues) {
#dequeue a thread
my $tid = $threadQueue->dequeue();
push @tids, $tid;
$inputQueues{$tid}->enqueue($value);
}
foreach my $tid (@tids) {
push @output, $outputQueues{$tid}->dequeue();
}
print "old value new value\n";
for (my $j = 0; $j < 10; $j++) {
print "$initialValues[$j] $output[$j]\n";
$initialValues[$j] += $output[$j];
}
print "\n";
sleep 1;
}
$finished = 1;
foreach my $thread (threads->list()) {
$inputQueues{$thread->tid()}->enqueue(-1);
$thread->join();
}
exit(0);
sub randomAdd {
#add a random number to the data input from the queue.
#put the results in the output queue
my ($inputQueue,$outputQueue) = @_;
my $tid = threads->tid();
while (! $finished) {
#put ourselves in the queue so we are avilable
$threadQueue->enqueue($tid);
#pull input from queue
my $inVal = $inputQueue->dequeue();
if ($inVal >= 0) {
my $outVal = $inVal + int(rand(100));
#enqueue data in output queue
$outputQueue->enqueue($outVal);
} else {
return;
}
}
return;
}