#!/usr/bin/env perl
=begin #------------------------------------------------------------------------
* generates messages to registered recipients when new or updated report available
* excludes clinical trial & outreach samples
* select requests >5 mins old to allow for further updates within short time span
* runs every 30 (?) mins
* see __DATA__ section for sql queries
Event Triggers: request status either authorised or complete, and one of:
* new authorised report
* confirmed final diagnosis
* change to diagnosis
* amended comment
To do:
* request status (new/relapsed/default) to allow selective messaging
* manual trigger (as in with the right permissions, the reporter can trigger
a notification to any recipient)
Any existing request_notification row only updated with a more recent datetime, using:
"ON DUPLICATE KEY UPDATE
datetime = IF(VALUES(datetime) > datetime, VALUES(datetime), datetime)"
*** see __DATA__ section for validation ****
#-------------------------------------------------------------------------------
=cut
BEGIN {
use constant DURATION => 1800; # default if no value passed as -t
use Getopt::Std;
getopts('t:'); # time (seconds)
our($opt_t);
} # warn $opt_t; exit;
my $JUST_TESTING = 0; # emails admin only, logs to lims_test.request_history
#===============================================================================
my $delta = $opt_t || DURATION; # warn $delta; # time window (seconds) to detect update
my $delay = 300; # seconds delay - to allow for reflection before sending notification
#===============================================================================
# delta value cannot be less than $delay (and needs to be much greater to avoid
# risk of task repeating before previous instance finished, so set at 2x):
die "run frequency ($delta seconds) cannot be less than 2x delay time ($delay seconds)"
unless $delta >= $delay * 2;
my $dbname = $ENV{CENTRE} || 'hilis4'; # cron exports 'CENTRE' for non-HMDS tasks
use lib (
'/home/raj/perl5/lib/perl5',
'/home/raj/perl-lib',
);
use Template;
use Local::DB;
use Modern::Perl;
use Config::Auto;
use Data::Printer;
use FindBin qw($Bin); # warn $Bin; exit;
use SQL::Abstract::More;
use lib "$Bin/../../../lib";
use LIMS::Local::ScriptHelpers;
$Local::QueryLogger::NO_QUERY_LOGS = 1; # don't need queries in logs dir
# $Local::DBIx::Simple::Result::NO_AUTO_DATE_INFLATION = 1; # need DT object
# get tools from LIMS::Local::ScriptHelpers:
my $tools = LIMS::Local::ScriptHelpers->new();
$tools->test_only($JUST_TESTING);
my $config = $tools->config(); # p $config;
my $table = $JUST_TESTING
? 'lims_test.request_notification' : 'request_notification';
my $sqla = SQL::Abstract::More->new;
my $dbix = Local::DB->dbix({dbname => $dbname});
# get user.id of server_username from config:
my $server_name = $config->{server_username};
my $user_id = $dbix->select('users', 'id', { username => $server_name })->value;
my $NOW = $tools->time_now;
# duration to detect a change of request status:
my $window = $NOW->clone->subtract( seconds => $delta ); # p $window;
# timepoint at time now minus delay:
my $t_max = $NOW->clone->subtract( seconds => $delay ); # p t_max;
# timepoint for deciding if notification is new or update = authorised < (window + t_max):
my $auth_time_cutoff = $NOW->clone->subtract( seconds => $delta + $delay ); # p $auth_time_cutoff;
# run queries to update request_notification table =============================
do_updates();
# get request id's from request_notification with timestamps older than delay (t_max):
my @req_ids = $dbix->select($table, 'request_id',
{ datetime => { '<' => $t_max } })->column; # say $_ for @req_ids; exit;
# @req_ids = (300001 .. 300005); # for testing
exit unless @req_ids; # or next query is nonsense (where 0=1)
# get details for all request_id's =============================================
my @notifications = do {
my @cols = qw(
p.last_name
p.first_name
p.nhs_number
p.dob
r.id|request_id
rrd.status
rrd.comment
pc.referral_source_id
rs.parent_organisation_id
ref.national_code|gmc_code
rd.hospital_department_code
max(rh.time)|authorised
d.name|diagnosis
d2.name|secondary_diagnosis
); # r.request_number r.year ref.name|referrer rs.display_name|location
my @rels = (
'requests|r' => 'r.patient_case_id=pc.id' ,
'patient_case|pc' => 'pc.patient_id=p.id' ,
'patients|p' => 'pc.referral_source_id=rs.id' ,
'referral_sources|rs' => 'rrd.request_id=r.id' ,
'request_report_detail|rrd' => 'r.referrer_department_id=rd.id' ,
'referrer_department|rd' => 'rd.referrer_id=ref.id' ,
'referrers|ref' => 'rh.request_id=r.id' ,
'request_history|rh' => 'rrd.diagnosis_id=d.id' ,
'diagnoses|d' => '=>rsd.request_id=r.id' ,
'request_secondary_diagnosis|rsd' => '=>rsd.secondary_diagnosis_id=d2.id',
'diagnoses|d2'
);
my @args = (
-columns => \@cols,
-from => [ -join => @rels ],
-where => {
'rh.action' => 'authorised',
'r.id' => { -in => \@req_ids },
},
-group_by => 'r.id', # required for MAX(rh.time)
); # p @args;
my ($sql, @bind) = $sqla->select(@args); # p $sql; # p \@bind;
# $dbix->dump_query($sql, @bind); # exit;
$dbix->query($sql, @bind)->hashes;
}; # p \@notifications;
=begin
Each selected request is checked for any registered recipient(s), message(s)
sent, then deleted from request_notification. Any row with timestamp more
recent than the delay (5 mins) will be processed by subsequent cycle.
=cut
my $tt = Template->new({ TAG_STYLE => 'asp' }); # for tt rendering (only uclh)
for my $ref (@notifications) { # p $ref;
# send notification(s) to any addresses configured for this request:
if ( my @addr = _retrieve_addresses($ref) ) { # p \@addr; AoH
_send_notification($ref, @addr);
}
# delete row whether message sent or not:
$dbix->delete($table, { request_id => $ref->{request_id} });
}
#===============================================================================
sub do_updates { # request_history.action within past $delta mins:
my @skip_screens = ( # don't notify if screened as:
'Outreach',
'Outreach CML',
'Outreach CLL post-Rx monitoring',
'Outreach BLPD/PCD pre-Rx monitoring',
);
my @rels = (
'requests|r' => 'r.status_option_id=so.id' ,
'status_options|so' => 'rh.request_id=r.id' ,
'request_history|rh' => '=>rt.request_id=r.id' ,
'request_trial|rt' => 'ris.request_id=r.id' ,
'request_initial_screen|ris' => 'ris.screen_id=s.id' ,
'screens|s'
);
my @args = (
-columns => [ 'r.id|request_id', 'rh.time|datetime' ],
-from => [ -join => @rels ],
-where => {
'rt.request_id' => undef,
's.description' => { -not_in => \@skip_screens },
'so.description' => { -in => [ 'authorised','complete' ] },
-or => [
'rh.action' => { -in =>
[
'confirmed final diagnosis',
'amended comment',
'authorised',
],
},
'rh.action' => { -like => 'amended diagnosis (%)' }, # can be error or update
],
'rh.time' => { '>=' => $window },
}
);
my ($sql, @bind) = $sqla->select(@args); # p $sql; # p \@bind;
# $dbix->dump_query($sql, @bind); exit;
my $result = $dbix->query($sql, @bind);
while ( my $ref = $result->hash ) { # p $ref; next;
$dbix->insert_or_update_if_greater($table, $ref, 'datetime');
}
}
=begin # replaced by a single request_history table query above
sub do_updates {
{ # newly authorised requests within past $delta mins:
my @rels = (
'request_history|rh' => '=>rh.request_id = rt.request_id' ,
'request_trial|rt'
);
my @args = (
-columns => [ 'rh.request_id', 'rh.time|datetime' ],
-from => [ -join => @rels ],
-where => {
'rt.request_id' => undef,
'rh.action' => 'authorised',
'rh.time' => { '>=' => $window },
}
);
my ($sql, @bind) = $sqla->select(@args); # p $sql; # p \@bind;
# $dbix->dump_query($sql, @bind); # exit;
my $result = $dbix->query($sql, @bind);
while ( my $ref = $result->hash ) {
# $ref->{status} = 'authorised'; # using "rh.time as authorised" instead
$dbix->insert_or_update_if_greater($table, $ref, 'datetime');
}
}
{ # diagnosis update within past $delta mins:
my @rels = (
'requests|r' => 'r.status_option_id=so.id' ,
'status_options|so' => 'rdh.request_id = r.id' ,
'request_diagnosis_history|rdh' => '=>rt.request_id = r.id' ,
'request_trial|rt'
);
my @args = (
-columns => [ 'r.id|request_id', 'rdh.time|datetime' ],
-from => [ -join => @rels ],
-where => {
'rt.request_id' => undef,
'so.description' => { -in => [ 'authorised','complete' ] },
'rdh.time' => { '>=' => $window },
}
);
my ($sql, @bind) = $sqla->select(@args); # p $sql; # p \@bind;
# $dbix->dump_query($sql, @bind); # exit;
my $result = $dbix->query($sql, @bind);
while ( my $ref = $result->hash ) {
# $ref->{status} = 'updated'; # using "rh.time as authorised" instead
$dbix->insert_or_update_if_greater($table, $ref, 'datetime');
}
}
{ # results_summary update within past $delta mins - slow and replaced with next block
my @rels = (
'requests|r' => 'r.status_option_id=so.id' ,
'status_options|so' => 'rrs.request_id = r.id' ,
'request_result_summaries|rrs' => '=>rt.request_id = r.id' ,
'request_trial|rt'
);
my @args = (
-columns => [ 'r.id|request_id', 'rrs.time|datetime' ],
-from => [ -join => @rels ],
-where => {
'rt.request_id' => undef,
'so.description' => { -in => [ 'authorised','complete' ] },
'rrs.time' => { '>=' => $window },
}
);
my ($sql, @bind) = $sqla->select(@args); # p $sql; # p \@bind;
# $dbix->dump_query($sql, @bind); # exit;
my $result = $dbix->query($sql, @bind);
while ( my $ref = $result->hash ) {
# $ref->{status} = 'updated'; # using "rh.time as authorised" instead
$dbix->insert_or_update_if_greater($table, $ref, 'datetime');
}
}
{ # requests.updated_at within past $delta mins, or request_report_detail.updated_at
# within past $delta mins and > requests.updated_at
my @rels = (
'requests|r' => 'r.status_option_id=so.id' ,
'status_options|so' => 'rrd.request_id = r.id' ,
'request_report_detail|rrd' => '=>rt.request_id = r.id' ,
'request_trial|rt'
);
my @args = (
-columns => [ 'r.id|request_id', 'rrd.updated_at|datetime' ],
-from => [ -join => @rels ],
-where => {
'rt.request_id' => undef,
'so.description' => { -in => [ 'authorised','complete' ] },
-or => [
'r.updated_at' => { '>=' => $window },
'rrd.updated_at' => { '>=' => $window, '>' => \'r.updated_at' },
],
}
);
my ($sql, @bind) = $sqla->select(@args); # p $sql; # p \@bind;
# $dbix->dump_query($sql, @bind); # exit;
my $result = $dbix->query($sql, @bind);
while ( my $ref = $result->hash ) {
# $ref->{status} = 'updated'; # using "rh.time as authorised" instead
$dbix->insert_or_update_if_greater($table, $ref, 'datetime');
}
}
}
=cut
#-------------------------------------------------------------------------------
sub _retrieve_addresses {
my $data = shift;
return ( { contact_address => $config->{service_email}, status => 'all' } )
if $JUST_TESTING;
my @cols = qw( hospital_department_code parent_organisation_id
referral_source_id gmc_code );
my ($department_id, $parent_id, $source_id, $gmc_number) = @{$data}{@cols};
my @where = (
-and => [
is_active => 'yes',
[ # <= square bracket pair not strictly necessary here
{ type => 'organisation', identifier => $parent_id },
{ type => 'hospital', identifier => $source_id },
{ type => 'referrer', identifier => $gmc_number },
{ -and => [
department_id => $department_id,
[
{ type => 'organisation', identifier => $parent_id },
{ type => 'hospital', identifier => $source_id },
]
],
},
],
],
);
my @args = (
-columns => [ 'contact_address', 'status' ],
-from => 'report_notification',
-where => \@where,
); # p @args;
my ($sql, @bind) = $sqla->select(@args); # p $sql; # p \@bind;
# $dbix->dump_query($sql, @bind); exit;
my @addr = $dbix->query( $sql, @bind )->hashes;
return @addr;
}
#-------------------------------------------------------------------------------
sub _send_notification {
my ($data, @recipients) = @_; # href, array
my $message; # default (leeds) is for no message apart from url, appended below
if ( $config->{_centre} eq 'uclh' ) { # parse template:
$message = _process_uclh_tt($data);
}
# append url to online report:
$message .= sprintf
'The full report is available online: <%s/search/notification/%s>',
$config->{application_url}, $data->{request_id}; # p $message; exit;
my $subject = sprintf 'Notification of %s HMDS report on %s %s [%s]',
# must be an update if authorised before (window + delay) mins ago:
( $data->{authorised} < $auth_time_cutoff ? 'an updated' : 'a new' ),
# names:
ucfirst $data->{first_name}, uc $data->{last_name},
# nhs number or dob:
( $data->{nhs_number} || $data->{dob}->ymd ); # p $subject;
my %mail = (
config => $config,
subject => $subject,
message => $message,
); # p \%mail;
my $tbl = $JUST_TESTING # already have a global $table (request_notification)
? 'lims_test.request_history' : 'request_history';
ADDR:
for my $recipient (@recipients) { # p $recipient;
# skip if contact wants only new diagnoses & diagnosis status isn't new:
next ADDR if $recipient->{status} eq 'new' && $data->{status} ne 'new';
my $addr = $recipient->{contact_address};
warn "unsafe recipient address $addr" and next ADDR
unless $addr =~ /\@nhs.net\Z/;
$mail{recipient} = $addr; # p \%mail; next ADDR;
my $result = LIMS::Model::Email->send_message(\%mail); # Return::Value
if ( $result->type eq 'success' ) { # log it:
my %h = (
request_id => $data->{request_id},
user_id => $user_id,
action => 'dispatched report notification to ' . $addr,
);
$dbix->insert($tbl, \%h);
}
}
}
#-------------------------------------------------------------------------------
sub _process_uclh_tt {
my $data = shift;
my $tmpl = qq!An integrated report is available on your patient !
. qq!<% first_name.ucfirst %> <% last_name.upper %> d.o.b. <% dob.ymd %> !
. qq!showing a <% IF status.match('new'); 'NEW '; END %>diagnosis !
. qq!of "<% diagnosis %>"<% IF secondary_diagnosis %> and a secondary !
. qq!diagnosis of "<% secondary_diagnosis %>"<% END %>.\n\n!
. qq!<% IF comment %>Comment: <% comment %>\n\n<% END %>!;
my $msg;
$tt->process(\$tmpl, $data, \$msg) or die $tt->error(); # p $msg; # die OK as data stays in db
return $msg;
}
__DATA__
# test of ON DUPLICATE KEY UPDATE datetime:
# http://stackoverflow.com/questions/10081481/mysql-update-if-value-is-greater-than-that-current-value
$dbix->delete('test.'$table); # don't do this on live server !!
{ # insert row:
my $sql = qq!
INSERT INTO 'test.$table(request_id, datetime) VALUES(??)
ON DUPLICATE KEY UPDATE datetime
= IF(VALUES(datetime) > datetime, VALUES(datetime), datetime)
!;
my $res = $dbix->query($sql, 10, '2017-01-01 02:00:00'); say $res->rows;
}
say "check test.$table now"; sleep 5;
{ # earlier datetime - should *not* update:
my $sql = qq!
INSERT INTO 'test.$table(request_id, datetime) VALUES(??)
ON DUPLICATE KEY UPDATE datetime
= IF(VALUES(datetime) > datetime, VALUES(datetime), datetime)
!;
my $res = $dbix->query($sql, 10, '2017-01-01 01:00:00'); say $res->rows;
}
say "check test.$table now"; sleep 5;
{ # later datetime - *should* update:
my $sql = qq!
INSERT INTO 'test.$table(request_id, datetime) VALUES(??)
ON DUPLICATE KEY UPDATE datetime
= IF(VALUES(datetime) > datetime, VALUES(datetime), datetime)
!;
my $res = $dbix->query($sql, 10, '2017-01-01 03:00:00'); say $res->rows;
}
say "check test.$table now"; exit;
================================================================================
INSERT INTO $table
/* do not use request_status_view in cron - maybe exceeds /tmp capacity for tmp table */
SELECT rsv.request_id, rsv.time
FROM request_status_view rsv
LEFT JOIN request_trial rt on rt.request_id = rsv.request_id
WHERE rsv.time > DATE_SUB(NOW(), INTERVAL ? MINUTE)
and rsv.action = 'authorised'
and rt.request_id IS NULL
ON DUPLICATE KEY UPDATE datetime
= IF(VALUES(datetime) > datetime, VALUES(datetime), datetime)
INSERT INTO $table
SELECT rdh.request_id, rdh.time
FROM request_diagnosis_history rdh
JOIN requests r on rdh.request_id = r.id
LEFT JOIN request_trial rt on rt.request_id = r.id
WHERE rdh.time > DATE_SUB(NOW(), INTERVAL ? MINUTE)
and r.status_option_id IN (4,5)
and rt.request_id IS NULL
ON DUPLICATE KEY UPDATE datetime
= IF(VALUES(datetime) > datetime, VALUES(datetime), datetime)
INSERT INTO $table
SELECT rrs.request_id, rrs.time
FROM request_result_summaries rrs
JOIN requests r on rrs.request_id = r.id
LEFT JOIN request_trial rt on rt.request_id = r.id
WHERE rrs.time > DATE_SUB(NOW(), INTERVAL ? MINUTE)
and r.status_option_id IN (4,5)
and rt.request_id IS NULL
ON DUPLICATE KEY UPDATE datetime
= IF(VALUES(datetime) > datetime, VALUES(datetime), datetime)
SELECT contact_address
FROM notification_contacts
WHERE
( type = 'referrer' and identifier = ? ) or
( type = 'hospital' and identifier = ? ) or
( type = 'organisation' and identifier = ? ) or
( department = ? and (
( type = 'hospital' and identifier = ? ) or
( type = 'organisation' and identifier = ? )
) )