import java.io.IOException;
import java.io.PrintWriter;
import java.io.Serializable;
import java.io.StringWriter;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.GregorianCalendar;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.jdbc.support.GeneratedKeyHolder;
import org.springframework.jdbc.support.KeyHolder;
/**
* Object responsible for creating EmailJobs and EmailJobWorkers. Also responsible for starting EmailJobWorkers once their start time has been reached.
* Also responsible for removing files and tables associated with email jobs upon a request from a user to remove them.
*
*
* @version 1.0.0
*/
{
private static final long serialVersionUID = 1L;
private static Logger logger = LoggerFactory.getLogger(EmailJobManager.class.getName());
/**Location on server of HTML files used for emails*/
public static final String HTML_FILE_DIR
= "C:\\test\\";
/**Delay in minutes before the manager checks to see if any jobs need to start*/
private final int JOB_CHECK_DELAY = 1;
/*Spring JDBC objects*/
private DataSource ds;
private transient JdbcTemplate jt;
private transient NamedParameterJdbcTemplate njt;
/**List of all workers still currently actively sending out emails*/
private List<EmailJobWorker> activeWorkers = new ArrayList<EmailJobWorker>();
/**List of workers that completed their jobs*/
private List<EmailJobWorker> finishedWorkers = new ArrayList<EmailJobWorker>();
/**List of paused workers*/
private List<EmailJobWorker> pausedWorkers = new ArrayList<EmailJobWorker>();
/**Timer that fires telling manager to check to see if jobs need to be started, or moved to finished list*/
private transient Timer jobCheckTimer
= new Timer("EmailJobManager");
public EmailJobManager(DataSource ds)
{
this.njt = new NamedParameterJdbcTemplate(ds);
this.ds = ds;
this.jt = new JdbcTemplate(ds);
loadJobs();
@Override
public void run()
{
logger.info("EmailJobManager timer run started.");
for(EmailJobWorker worker: activeWorkers)
{
logger.info("EmailJobWorker " + worker.getEmailJobName() + " is an active worker.");
logger.info("Memory Address in manager timer loop 1 for " + worker.getEmailJobName() + " ({})", worker.toString());
}
logger.info("Size of pausedWorkers: {}", pausedWorkers.size());
for(EmailJobWorker worker : pausedWorkers)
{
logger.info("Memory Address in manager timer loop 2 for " + worker.getEmailJobName() + " ({})", worker.toString());
logger.info("Comparing current time ({}) to EmailJobWorker " + worker.getEmailJobName() + " start time ({})", sdf.format(c.getTime()), sdf.format(worker.getStartDate()));
if(c.compareTo(worker.getEmailJob().getStartTime()) > 0)
{
pausedWorkers.remove(worker);
activeWorkers.add(worker);
worker.start();
logger.info("EmailJob " + worker.getEmailJob().getJobName() + " started.\n");
}
}
for(EmailJobWorker worker : activeWorkers)
{
if(worker.isCompleted())
{
activeWorkers.remove(worker);
finishedWorkers.add(worker);
logger.info("Worker " + worker.getEmailJobName() + " has completed.");
}
}
}
}, 0, /*interval in milliseconds*/JOB_CHECK_DELAY * 60 * 1000);
logger.info("EmailJobManager timer started sucesfully. (sleepTime) ({})", JOB_CHECK_DELAY);
}
/**
* Adds new email job to this manager's duties.
*
* @param jobName Name of the email job
* @param startTime Time for the email job to start
* @param batchSize Number of emails to be sent out per time interval
* @param timeInterval Time between batches in minutes
* @param emailSubject Subject of the email
* @param fromAddress The email address the emails will be sent from
* @param password Password of the email address
* @throws IOException If HTML file to be used for the email is not found
*/
{
String SQL
= "INSERT INTO email_jobs (jobName, emailSubject, email, password, startTime, batchSize, timeInterval) VALUES(:jobName, :emailSubject, :email, :password, :startTime, :batchSize, :timeInterval)"; MapSqlParameterSource params = new MapSqlParameterSource();
params.addValue("jobName", jobName);
params.addValue("emailSubject", emailSubject);
params.addValue("email", fromAddress);
params.addValue("password", password);
params.addValue("startTime", dateFormat.format(startTime.getTime()));
params.addValue("batchSize", batchSize);
params.addValue("timeInterval", timeInterval);
KeyHolder keyHolder = new GeneratedKeyHolder();
njt.update(SQL, params, keyHolder);
int jobId = keyHolder.getKey().intValue();
EmailJob job = new EmailJob(jobId, jobName, startTime, HTML_FILE_DIR + "/" + jobName + ".html");
EmailJobWorker newWorker = new EmailJobWorker(job, batchSize, timeInterval, ds, emailSubject, fromAddress, password);
logger.info("Memory Address in addJob for " + newWorker.getEmailJobName() + " ({})", newWorker.toString());
activeWorkers.add(newWorker);
}
/**
* Pauses a job that is currently running.
*
* @param jobName Name of the job to pause.
*/
public void pauseJob
(String jobName
) {
for(EmailJobWorker worker : activeWorkers)
{
if(worker.getEmailJob().getJobName().equals(jobName))
{
activeWorkers.remove(worker);
pausedWorkers.add(worker);
worker.stop();
return;
}
}
}
public void startJob
(String jobName
) {
for(EmailJobWorker worker : pausedWorkers)
{
if(worker.getEmailJobName().equals(jobName))
{
pausedWorkers.remove(worker);
activeWorkers.add(worker);
worker.start();
return;
}
}
for(EmailJobWorker worker : finishedWorkers)
{
if(worker.getEmailJobName().equals(jobName))
{
finishedWorkers.remove(worker);
activeWorkers.add(worker);
worker.restart();
return;
}
}
}
public void destroyJob
(String jobName
) {
for(EmailJobWorker worker : getActiveWorkers())
{
if(worker.getEmailJobName().equals(jobName))
{
worker.stop();
activeWorkers.remove(worker);
}
}
for(EmailJobWorker worker : getPausedWorkers())
{
if(worker.getEmailJobName().equals(jobName))
{
worker.stop();
pausedWorkers.remove(worker);
}
}
for(EmailJobWorker worker : getFinishedWorkers())
{
if(worker.getEmailJobName().equals(jobName))
{
finishedWorkers.remove(worker);
}
}
}
public List<EmailJobWorker> getActiveWorkers()
{
return this.activeWorkers;
}
public List<EmailJobWorker> getPausedWorkers()
{
return this.pausedWorkers;
}
public List<EmailJobWorker> getFinishedWorkers()
{
return this.finishedWorkers;
}
/**
* Checks to see if any jobs were not completed as expected due to hardware error. Recreates the EmailJob and EmailJobWorker responsible.
*/
private void loadJobs()
{
String SQL
= "SELECT * FROM email_jobs";
List
<Map
<String, Object
>> rows
= jt.
queryForList(SQL
);
for(Map
<String, Object
> row
: rows
) {
int jobId
= (Integer)row.
get("id"); startTime.
setTime((java.
util.
Date)row.
get("startTime")); int batchSize
= (Integer)row.
get("batchSize"); int timeInterval
= (Integer)row.
get("timeInterval"); boolean completed
= (Boolean)row.
get("completed"); boolean paused
= (Boolean)row.
get("paused");
//Get id of next email to be sent out
SQL = "SELECT MIN(id) FROM " + jobName + "_email_list" + " WHERE sentOut = false";
int startId = njt.queryForInt(SQL, new MapSqlParameterSource());
EmailJob job;
try
{
job = new EmailJob(jobId, jobName, startTime, HTML_FILE_DIR + "/" + jobName + ".html");
}
{
String stackTrace
= sw.
toString();
logger.error("EmailJob " + jobName + " failed to load. HTML file at " + HTML_FILE_DIR + "/" + jobName + ".html" + " was not found.\n" + stackTrace + "\n");
continue;
}
EmailJobWorker worker = new EmailJobWorker(job, batchSize, timeInterval, ds, emailSubject, email, password, startId);
if(completed)
{
finishedWorkers.add(worker);
logger.info("EmailJobWorker " + jobName + " has been added to the finished worker pool.");
}
else
{
logger.info("Worker " + worker.getEmailJobName() + " has been added to pausedWorkers list with memory address ({})", worker.toString());
pausedWorkers.add(worker);
}
logger.info("EmailJob " + jobName + " loaded sucessfully.");
}
}
}