RSS Git Download  Clone
Raw Blame History
#!/usr/bin/env perl

=begin #------------------------------------------------------------------------
* generates messages to registered recipients when new or updated report available
* excludes clinical trial & outreach samples
* select requests >5 mins old to allow for further updates within short time span
* runs every 30 (?) mins
* see __DATA__ section for sql queries

Event Triggers: request status either authorised or complete, and one of:
    * new authorised report
    * confirmed final diagnosis
    * change to diagnosis
    * amended comment
To do:
    * request status (new/relapsed/default) to allow selective messaging
    * manual trigger (as in with the right permissions, the reporter can trigger
        a notification to any recipient)

Any existing request_notification row only updated with a more recent datetime, using:
    "ON DUPLICATE KEY UPDATE
        datetime = IF(VALUES(datetime) > datetime, VALUES(datetime), datetime)"
    *** see __DATA__ section for validation ****
#-------------------------------------------------------------------------------
=cut

BEGIN {
	use constant DURATION => 1800; # default if no value passed as -t
	use Getopt::Std;
	getopts('t:'); # time (seconds)
	our($opt_t);
} # warn $opt_t; exit;

my $JUST_TESTING = 0; # emails admin only, logs to lims_test.request_history

#===============================================================================
my $delta = $opt_t || DURATION; # warn $delta; # time window (seconds) to detect update
my $delay = 300; # seconds delay - to allow for reflection before sending notification
#===============================================================================

# delta value cannot be less than $delay (and needs to be much greater to avoid
# risk of task repeating before previous instance finished, so set at 2x):
die "run frequency ($delta seconds) cannot be less than 2x delay time ($delay seconds)"
    unless $delta >= $delay * 2;

my $dbname = $ENV{CENTRE} || 'hilis4'; # cron exports 'CENTRE' for non-HMDS tasks

use lib (
    '/home/raj/perl5/lib/perl5',
    '/home/raj/perl-lib',
);

use Template;
use Local::DB;
use Modern::Perl;
use Config::Auto;
use Data::Printer;
use FindBin qw($Bin); # warn $Bin; exit;
use SQL::Abstract::More;

use lib "$Bin/../../../lib";
use LIMS::Local::ScriptHelpers;
$Local::QueryLogger::NO_QUERY_LOGS = 1; # don't need queries in logs dir
# $Local::DBIx::Simple::Result::NO_AUTO_DATE_INFLATION = 1; # need DT object

# get tools from LIMS::Local::ScriptHelpers:
my $tools = LIMS::Local::ScriptHelpers->new();
$tools->test_only($JUST_TESTING);

my $config = $tools->config(); # p $config;
my $table  = $JUST_TESTING
    ? 'lims_test.request_notification' : 'request_notification';

my $sqla = SQL::Abstract::More->new;
my $dbix = Local::DB->dbix({dbname => $dbname});

# get user.id of server_username from config:
my $server_name = $config->{server_username};
my $user_id = $dbix->select('users', 'id', { username => $server_name })->value;

my $NOW = $tools->time_now;
# duration to detect a change of request status:
my $window = $NOW->clone->subtract( seconds => $delta ); # p $window;
# timepoint at time now minus delay:
my $t_max = $NOW->clone->subtract( seconds => $delay ); # p t_max;
# timepoint for deciding if notification is new or update = authorised < (window + t_max):
my $auth_time_cutoff = $NOW->clone->subtract( seconds => $delta + $delay ); # p $auth_time_cutoff;

# run queries to update request_notification table =============================
do_updates();

# get request id's from request_notification with timestamps older than delay (t_max):
my @req_ids = $dbix->select($table, 'request_id',
    { datetime => { '<' => $t_max } })->column; # say $_ for @req_ids; exit;
# @req_ids = (300001 .. 300005); # for testing
exit unless @req_ids; # or next query is nonsense (where 0=1)

# get details for all request_id's =============================================
my @notifications = do {
	my @cols = qw(
		p.last_name
		p.first_name
		p.nhs_number
        p.dob
        r.id|request_id
        rrd.status
        rrd.comment
        pc.referral_source_id
        rs.parent_organisation_id
        ref.national_code|gmc_code
        rd.hospital_department_code
        max(rh.time)|authorised
        d.name|diagnosis
	    d2.name|secondary_diagnosis
	); # r.request_number r.year ref.name|referrer rs.display_name|location
    my @rels = (
        'requests|r'                      => 'r.patient_case_id=pc.id'          ,
        'patient_case|pc'                 => 'pc.patient_id=p.id'               ,
        'patients|p'                      => 'pc.referral_source_id=rs.id'      ,
        'referral_sources|rs'             => 'rrd.request_id=r.id'              ,
        'request_report_detail|rrd'       => 'r.referrer_department_id=rd.id'   ,
        'referrer_department|rd'          => 'rd.referrer_id=ref.id'            ,
        'referrers|ref'                   => 'rh.request_id=r.id'               ,
        'request_history|rh'              => 'rrd.diagnosis_id=d.id'            ,
        'diagnoses|d'                     => '=>rsd.request_id=r.id'            ,
        'request_secondary_diagnosis|rsd' => '=>rsd.secondary_diagnosis_id=d2.id',
        'diagnoses|d2'
	);
    my @args = (
		-columns  => \@cols,
		-from     => [ -join => @rels ],
		-where    => {
            'rh.action' => 'authorised',
            'r.id'      => { -in => \@req_ids },
        },
        -group_by => 'r.id', # required for MAX(rh.time)
	); # p @args;
	my ($sql, @bind) = $sqla->select(@args); # p $sql; # p \@bind;
	   # $dbix->dump_query($sql, @bind); # exit;
    $dbix->query($sql, @bind)->hashes;
}; # p \@notifications;

=begin
    Each selected request is checked for any registered recipient(s), message(s)
    sent, then deleted from request_notification. Any row with timestamp more
    recent than the delay (5 mins) will be processed by subsequent cycle.
=cut

my $tt = Template->new({ TAG_STYLE => 'asp' }); # for tt rendering (only uclh)

for my $ref (@notifications) { # p $ref;
    # send notification(s) to any addresses configured for this request:
    if ( my @addr = _retrieve_addresses($ref) ) { # p \@addr; AoH
        _send_notification($ref, @addr);
    }
    # delete row whether message sent or not:
    $dbix->delete($table, { request_id => $ref->{request_id} });
}

#===============================================================================
sub do_updates { # request_history.action within past $delta mins:
    my @skip_screens = ( # don't notify if screened as:
        'Outreach',
        'Outreach CML',
        'Outreach CLL post-Rx monitoring',
        'Outreach BLPD/PCD pre-Rx monitoring',
    );
    my @rels = (
        'requests|r'                    => 'r.status_option_id=so.id'  ,
        'status_options|so'             => 'rh.request_id=r.id'        ,
        'request_history|rh'            => '=>rt.request_id=r.id'      ,
        'request_trial|rt'              => 'ris.request_id=r.id'       ,
        'request_initial_screen|ris'    => 'ris.screen_id=s.id'        ,     
        'screens|s'
    );
    my @args = (
        -columns => [ 'r.id|request_id', 'rh.time|datetime' ],
        -from    => [ -join => @rels ],
        -where   => {
            'rt.request_id'  => undef,
            's.description'  => { -not_in => \@skip_screens },
            'so.description' => { -in => [ 'authorised','complete' ] },
            -or => [
                'rh.action' => { -in =>
                    [
                        'confirmed final diagnosis',
                        'amended comment',
                        'authorised',
                    ],
                },
                'rh.action' => { -like => 'amended diagnosis (%)' }, # can be error or update
            ],
            'rh.time' => { '>=' => $window },
        }
    );
    my ($sql, @bind) = $sqla->select(@args); # p $sql; # p \@bind;
        # $dbix->dump_query($sql, @bind);  exit;
    my $result = $dbix->query($sql, @bind);
    while ( my $ref = $result->hash ) { # p $ref; next;
        $dbix->insert_or_update_if_greater($table, $ref, 'datetime');
    }
}

=begin # replaced by a single request_history table query above
sub do_updates {
    { # newly authorised requests within past $delta mins:
        my @rels = (
            'request_history|rh' => '=>rh.request_id = rt.request_id' ,
            'request_trial|rt'
        );
        my @args = (
            -columns => [ 'rh.request_id', 'rh.time|datetime' ],
            -from    => [ -join => @rels ],
            -where   => {
                'rt.request_id' => undef,
                'rh.action'     => 'authorised',
                'rh.time'       => { '>=' => $window },
            }
        );
        my ($sql, @bind) = $sqla->select(@args); # p $sql; # p \@bind;
            # $dbix->dump_query($sql, @bind); # exit;
        my $result = $dbix->query($sql, @bind);
        while ( my $ref = $result->hash ) {
    #        $ref->{status} = 'authorised'; # using "rh.time as authorised" instead
            $dbix->insert_or_update_if_greater($table, $ref, 'datetime');
        }
    }
    { # diagnosis update within past $delta mins:
        my @rels = (
            'requests|r'                      => 'r.status_option_id=so.id' ,
            'status_options|so'               => 'rdh.request_id = r.id'    ,
            'request_diagnosis_history|rdh'   => '=>rt.request_id = r.id'   ,
            'request_trial|rt'
        );
        my @args = (
            -columns => [ 'r.id|request_id', 'rdh.time|datetime' ],
            -from    => [ -join => @rels ],
            -where   => {
                'rt.request_id'  => undef,
                'so.description' => { -in => [ 'authorised','complete' ] },
                'rdh.time'       => { '>=' => $window },
            }
        );
        my ($sql, @bind) = $sqla->select(@args); # p $sql; # p \@bind;
            # $dbix->dump_query($sql, @bind); # exit;
        my $result = $dbix->query($sql, @bind);
        while ( my $ref = $result->hash ) {
    #        $ref->{status} = 'updated'; # using "rh.time as authorised" instead
            $dbix->insert_or_update_if_greater($table, $ref, 'datetime');
        }
    }
    { # results_summary update within past $delta mins - slow and replaced with next block
        my @rels = (
            'requests|r'                     => 'r.status_option_id=so.id'  ,
            'status_options|so'              => 'rrs.request_id = r.id'     ,
            'request_result_summaries|rrs'   => '=>rt.request_id = r.id'    ,
            'request_trial|rt'
        );
        my @args = (
            -columns => [ 'r.id|request_id', 'rrs.time|datetime' ],
            -from    => [ -join => @rels ],
            -where   => {
                'rt.request_id'  => undef,
                'so.description' => { -in => [ 'authorised','complete' ] },
                'rrs.time'       => { '>=' => $window },
            }
        );
        my ($sql, @bind) = $sqla->select(@args); # p $sql; # p \@bind;
            # $dbix->dump_query($sql, @bind); # exit;
        my $result = $dbix->query($sql, @bind);
        while ( my $ref = $result->hash ) {
    #        $ref->{status} = 'updated'; # using "rh.time as authorised" instead
            $dbix->insert_or_update_if_greater($table, $ref, 'datetime');
        }
    }
    { # requests.updated_at within past $delta mins, or request_report_detail.updated_at
        # within past $delta mins and > requests.updated_at
        my @rels = (
            'requests|r'                     => 'r.status_option_id=so.id'  ,
            'status_options|so'              => 'rrd.request_id = r.id'     ,
            'request_report_detail|rrd'      => '=>rt.request_id = r.id'    ,
            'request_trial|rt'
        );
        my @args = (
            -columns => [ 'r.id|request_id', 'rrd.updated_at|datetime' ],
            -from    => [ -join => @rels ],
            -where   => {
                'rt.request_id'  => undef,
                'so.description' => { -in => [ 'authorised','complete' ] },
                -or => [
                    'r.updated_at'   => { '>=' => $window },
                    'rrd.updated_at' => { '>=' => $window, '>' => \'r.updated_at' },
                ],
            }
        );
        my ($sql, @bind) = $sqla->select(@args); # p $sql; # p \@bind;
            # $dbix->dump_query($sql, @bind); # exit;
        my $result = $dbix->query($sql, @bind);
        while ( my $ref = $result->hash ) {
    #        $ref->{status} = 'updated'; # using "rh.time as authorised" instead
            $dbix->insert_or_update_if_greater($table, $ref, 'datetime');
        }
    }
}
=cut

#-------------------------------------------------------------------------------
sub _retrieve_addresses {
    my $data = shift;
    return ( { contact_address => $config->{service_email}, status => 'all' } )
        if $JUST_TESTING;

    my @cols = qw( hospital_department_code parent_organisation_id
        referral_source_id gmc_code );
    my ($department_id, $parent_id, $source_id, $gmc_number) = @{$data}{@cols};

    my @where = (
        -and => [
            is_active => 'yes',
            [ # <= square bracket pair not strictly necessary here
                { type => 'organisation', identifier => $parent_id  },
                { type => 'hospital',     identifier => $source_id  },
                { type => 'referrer',     identifier => $gmc_number },
                { -and => [
                        department_id => $department_id,
                        [
                            { type => 'organisation', identifier => $parent_id },
                            { type => 'hospital',     identifier => $source_id },
                        ]
                    ],
                },
            ],
        ],
    );
    my @args = (
		-columns  => [ 'contact_address', 'status' ],
		-from     => 'report_notification',
		-where    => \@where,
	); # p @args;
	my ($sql, @bind) = $sqla->select(@args); # p $sql; # p \@bind;
	    # $dbix->dump_query($sql, @bind); exit;
    my @addr = $dbix->query( $sql, @bind )->hashes;
    return @addr;
}

#-------------------------------------------------------------------------------
sub _send_notification {
    my ($data, @recipients) = @_; # href, array

    my $message; # default (leeds) is for no message apart from url, appended below
    if ( $config->{_centre} eq 'uclh' ) { # parse template:
        $message = _process_uclh_tt($data);
    }
    # append url to online report:
    $message .= sprintf
        'The full report is available online: <%s/search/notification/%s>',
            $config->{application_url}, $data->{request_id}; # p $message; exit;

    my $subject = sprintf 'Notification of %s HMDS report on %s %s [%s]',
        # must be an update if authorised before (window + delay) mins ago:
        ( $data->{authorised} < $auth_time_cutoff ? 'an updated' : 'a new' ),
        # names:
        ucfirst $data->{first_name}, uc $data->{last_name},
        # nhs number or dob:
        ( $data->{nhs_number} || $data->{dob}->ymd ); # p $subject;

    my %mail = (
        config  => $config,
        subject => $subject,
        message => $message,
    ); # p \%mail;

    my $tbl = $JUST_TESTING # already have a global $table (request_notification)
        ? 'lims_test.request_history' : 'request_history';

    ADDR:
    for my $recipient (@recipients) { # p $recipient;
        # skip if contact wants only new diagnoses & diagnosis status isn't new:
        next ADDR if $recipient->{status} eq 'new' && $data->{status} ne 'new';

        my $addr = $recipient->{contact_address};
        warn "unsafe recipient address $addr" and next ADDR
            unless $addr =~ /\@nhs.net\Z/;

        $mail{recipient} = $addr; # p \%mail; next ADDR;
        my $result = LIMS::Model::Email->send_message(\%mail); # Return::Value
        if ( $result->type eq 'success' ) { # log it:
            my %h = (
                request_id => $data->{request_id},
                user_id    => $user_id,
                action     => 'dispatched report notification to ' . $addr,
            );
            $dbix->insert($tbl, \%h);
        }
    }
}

#-------------------------------------------------------------------------------
sub _process_uclh_tt {
    my $data = shift;

    my $tmpl = qq!An integrated report is available on your patient !
    . qq!<% first_name.ucfirst %> <% last_name.upper %> d.o.b. <% dob.ymd %> !
    . qq!showing a <% IF status.match('new'); 'NEW '; END %>diagnosis !
    . qq!of "<% diagnosis %>"<% IF secondary_diagnosis %> and a secondary !
    . qq!diagnosis of "<% secondary_diagnosis %>"<% END %>.\n\n!
    . qq!<% IF comment %>Comment: <% comment %>\n\n<% END %>!;

    my $msg;
    $tt->process(\$tmpl, $data, \$msg) or die $tt->error(); # p $msg; # die OK as data stays in db
    return $msg;
}

__DATA__
# test of ON DUPLICATE KEY UPDATE datetime:
# http://stackoverflow.com/questions/10081481/mysql-update-if-value-is-greater-than-that-current-value

$dbix->delete('test.'$table); # don't do this on live server !!

{ # insert row:
    my $sql = qq!
        INSERT INTO 'test.$table(request_id, datetime) VALUES(??)
        ON DUPLICATE KEY UPDATE datetime
            = IF(VALUES(datetime) > datetime, VALUES(datetime), datetime)
    !;
    my $res = $dbix->query($sql, 10, '2017-01-01 02:00:00'); say $res->rows;
}
say "check test.$table now"; sleep 5;
{ # earlier datetime - should *not* update:
    my $sql = qq!
        INSERT INTO 'test.$table(request_id, datetime) VALUES(??)
        ON DUPLICATE KEY UPDATE datetime
            = IF(VALUES(datetime) > datetime, VALUES(datetime), datetime)
    !;
    my $res = $dbix->query($sql, 10, '2017-01-01 01:00:00'); say $res->rows;
}
say "check test.$table now"; sleep 5;
{ # later datetime - *should* update:
    my $sql = qq!
        INSERT INTO 'test.$table(request_id, datetime) VALUES(??)
        ON DUPLICATE KEY UPDATE datetime
            = IF(VALUES(datetime) > datetime, VALUES(datetime), datetime)
    !;
    my $res = $dbix->query($sql, 10, '2017-01-01 03:00:00'); say $res->rows;
}
say "check test.$table now"; exit;

================================================================================
INSERT INTO $table
 /* do not use request_status_view in cron - maybe exceeds /tmp capacity for tmp table */
    SELECT rsv.request_id, rsv.time
    FROM request_status_view rsv
    LEFT JOIN request_trial rt on rt.request_id = rsv.request_id
    WHERE rsv.time > DATE_SUB(NOW(), INTERVAL ? MINUTE)
and rsv.action = 'authorised'
and rt.request_id IS NULL
ON DUPLICATE KEY UPDATE datetime
    = IF(VALUES(datetime) > datetime, VALUES(datetime), datetime)

INSERT INTO $table
    SELECT rdh.request_id, rdh.time
    FROM request_diagnosis_history rdh
    JOIN requests r on rdh.request_id = r.id
    LEFT JOIN request_trial rt on rt.request_id = r.id
    WHERE rdh.time > DATE_SUB(NOW(), INTERVAL ? MINUTE)
and r.status_option_id IN (4,5)
and rt.request_id IS NULL
ON DUPLICATE KEY UPDATE datetime
    = IF(VALUES(datetime) > datetime, VALUES(datetime), datetime)

INSERT INTO $table
    SELECT rrs.request_id, rrs.time
    FROM request_result_summaries rrs
    JOIN requests r on rrs.request_id = r.id
    LEFT JOIN request_trial rt on rt.request_id = r.id
    WHERE rrs.time > DATE_SUB(NOW(), INTERVAL ? MINUTE)
and r.status_option_id IN (4,5)
and rt.request_id IS NULL
ON DUPLICATE KEY UPDATE datetime
    = IF(VALUES(datetime) > datetime, VALUES(datetime), datetime)

SELECT contact_address
FROM notification_contacts
WHERE
    ( type = 'referrer'     and identifier = ? )  or
    ( type = 'hospital'     and identifier = ? )  or
    ( type = 'organisation' and identifier = ? )  or
    ( department = ? and (
      ( type = 'hospital' and identifier = ? ) or
      ( type = 'organisation' and identifier = ? )
    ) )