天天看点

Spring Jdbc Insert 批处理

import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.EOFException;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.ConnectException;
import java.net.MalformedURLException;
import java.net.NoRouteToHostException;
import java.net.SocketException;
import java.net.URL;
import java.net.URLConnection;
import java.net.UnknownHostException;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map.Entry;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.log4j.Logger;
import org.dom4j.Document;
import org.dom4j.Element;
import org.dom4j.io.SAXReader;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.jdbc.core.JdbcTemplate;

import com.trafficcast.bean.SpeedRecord;
import com.trafficcast.dao.SpeedRecordDao;
import com.trafficcast.jdbc.JdbcUtils;



public class NE_511_ArchiveSpeed implements SpeedRecordDao{

	// The log4j instance
	private static final Logger LOGGER = Logger.getLogger(NE_511_ArchiveSpeed.class);

	// Retry wait time,set default to 2 minutes,will load from property file
	private long retryWaitTime = 2 * 60 * 1000;
	
	//Spring jdbcTemplate
	private JdbcTemplate jdbcTemplate = new JdbcTemplate(JdbcUtils.getDataSource());

	// Sleep period
	private long loopSleepTime = 1000 * 15 * 60;

	// buffer size.
	private final int BUF_SIZE = 1024;

	// Local xml file name.
	private final String LOCAL_XML_FILE = "ne_rt_speed.xml";

	private String locationUrl = "http://www.511.nebraska.gov/atis/devices/vdsicons.xml";

	// Current event id location url
	private String event_url = "http://www.511.nebraska.gov/atis/jsp/vdsXmlContents.jsp?vdsId=";

	private HashMap<String, String[]> idLocationMap = new HashMap<String, String[]>();
	
	private final String LA_SPECIAL_RECORDS_FILE = "NE_Special.txt";

	private List<SpeedRecord> recordList = new ArrayList<SpeedRecord>();
	
	// Reader name
	private final String READER_ID = NE_511_ArchiveSpeed.class.getName();

	private Matcher matcher = null;

	private Pattern dataPattern = null;
	
	private SimpleDateFormat sdf = new SimpleDateFormat("MM/dd/yyyy hh:mm a",Locale.US); //03/22/2012 7:56 PM
	
	public NE_511_ArchiveSpeed() {
		init();
	}

	public static void main(String[] args) throws Exception {
		NE_511_ArchiveSpeed archive = new NE_511_ArchiveSpeed();
		archive.run();
	}

	private void run() throws Exception {
		long start_time, sleep_time;

		while (true) {
			try {
				start_time = System.currentTimeMillis();
				saveXMLFile();
				storeIdLocationMap();
				recordList.clear();
				sleep_time = loopSleepTime - (System.currentTimeMillis() - start_time);
				if (sleep_time < 0) {
					sleep_time = 1000;
				}

				LOGGER.info("Sleeping for " + (sleep_time / 1000) + " seconds.");
				Thread.sleep(sleep_time);
			} catch (NoRouteToHostException noRouteExp) {
				LOGGER.warn("Internet connection is not available, retrying in 30 seconds");
				try {
					Thread.sleep(30000);
				} catch (InterruptedException intExp) {
					LOGGER.fatal("Thread was interrupted");
				}
			} catch (ConnectException connectExp) {
				LOGGER.warn("Connection to Washington State DOT was refused.....retrying in 30 seconds");
				try {
					Thread.sleep(30000);
				} catch (InterruptedException intExp) {
					LOGGER.fatal("Thread was interrupted");
				}
			} catch (FileNotFoundException fileExp) {
				LOGGER.warn("Could not retrieve RTspeed data from  Washington State DOT ....retrying in 30 seconds");
				try {
					Thread.sleep(30000);
				} catch (InterruptedException intExp) {
					LOGGER.fatal("Thread was interrupted");
				}
			} catch (NullPointerException nullExp) {
				LOGGER.warn("Null pointer to the file input stream returned ....retrying in 10 seconds");
				try {
					Thread.sleep(30000);
				} catch (InterruptedException intExp) {
					LOGGER.fatal("Thread was interrupted");
				}
			} catch (UnknownHostException hostExp) {
				LOGGER.warn("Could not establish connectiion with  Washington State DOT....retrying in 30 seconds");
				try {
					Thread.sleep(30000);
				} catch (InterruptedException intExp) {
					LOGGER.fatal("Thread was interrupted");
				}
			} catch (SocketException socExp) {
				LOGGER.warn("junjian" + "..............retrying in 30 seconds");
				try {
					Thread.sleep(30000);
				} catch (InterruptedException intExp) {
					LOGGER.fatal("Thread was interrupted.");
				}
			} catch (EOFException endExp) {
				LOGGER.info("End of file reached");
				try {
					Thread.sleep(30000);
				} catch (InterruptedException intExp) {
					LOGGER.fatal("Thread was interrupted.");
				}
			} catch (IOException ioExp) {

				LOGGER.warn(ioExp.getMessage() + ".....retrying in 30 seconds");
				try {
					Thread.sleep(30000);
				} catch (InterruptedException intExp) {
					LOGGER.fatal("Thread was interrupted.");
				}
			} catch (Exception ex) {
				System.out.println(ex);
				LOGGER.warn(ex.getMessage() + ", retrying in 30 seconds...");

				try {
					Thread.sleep(30000);
				} catch (InterruptedException ex1) {
					LOGGER.fatal("Thread was interrupted.");
				}
			} finally {
				recordList.clear();
			}// end finally
		}// end while

	}

	private void saveXMLFile() throws Exception {
		int byteLength = 0;
		byte[] input_buffer = new byte[BUF_SIZE];
		BufferedOutputStream outStream = null;
		InputStream inStream = null;
		try {
			// DNS setting
			java.security.Security.setProperty("networkaddress.cache.negative.ttl", "0");
			java.security.Security.setProperty("networkaddress.cache.ttl", "0");
			System.setProperty("sun.net.inetaddr.ttl", "0");
			System.setProperty("sun.net.inetaddr.negative.ttl", "0");

			outStream = new BufferedOutputStream(new FileOutputStream(
					LOCAL_XML_FILE), BUF_SIZE);
			URLConnection urlCon = null;
			URL url = new URL(locationUrl);
			urlCon = url.openConnection();
			urlCon.setConnectTimeout((int) retryWaitTime);
			urlCon.setReadTimeout((int) retryWaitTime);
			LOGGER.info("Retrieving xml file.");
			inStream = urlCon.getInputStream();

			while ((byteLength = inStream.read(input_buffer, 0, BUF_SIZE)) > 0) {
				outStream.write(input_buffer, 0, byteLength);
			}
			// Close streams.
			outStream.flush();
			LOGGER.info("Save xml successfully.");
		} finally {
			if (outStream != null) {
				outStream.close();
			}
			if (inStream != null) {
				inStream.close();
			}
		}

	}

	private void storeIdLocationMap() throws Exception {
		SAXReader saxReader = null;
		Document document = null;
		saxReader = new SAXReader();
		document = saxReader.read(new File(LOCAL_XML_FILE));
		Element element = document.getRootElement();
		if (element != null) {
			Element responseElt = element.element("vdsList");
			if (responseElt != null) {
				List<?> eventList = responseElt.elements("vds");
				for (int i = 0; i < eventList.size(); i++) {
					Element subElt = (Element) eventList.get(i);
					if (subElt != null) {
						String[] locationToken = new String[3];
						String idValue = subElt.elementText("id");
						locationToken[0] = subElt.elementText("coordY");
						locationToken[1] = subElt.elementText("coordX");
						locationToken[2] = subElt.elementText("direction");
						idLocationMap.put(idValue, locationToken);
					}
				}
				Set<Entry<String, String[]>> set = idLocationMap.entrySet();
				for (Entry<String, String[]> entry : set) {
					readDataSource(entry.getKey(), entry.getValue());
				}
				insertBatch(recordList);
			}
		}
	}

	private void readDataSource(String idValue, String[] location) throws Exception {
		// DNS cache check
		setDNS();

		URL url = null;
		URLConnection con = null;
		String lineData = null;
		BufferedReader buffReader = null;
		
		try {
			url = new URL(event_url + idValue);
			con = url.openConnection();
			con.setConnectTimeout((int) retryWaitTime);
			con.setReadTimeout((int) retryWaitTime);
			buffReader = new BufferedReader(new InputStreamReader(
					con.getInputStream()));

			while ((lineData = buffReader.readLine()) != null) {
				if (lineData.startsWith("<div id='vds'>")) {
					lineData = lineData.replaceAll("<.*?>", " ").toUpperCase();
					lineData = lineData.replaceAll("\\s+", " ");
					lineData = lineData.replaceAll("SPEED", "");
					SpeedRecord record = new SpeedRecord(); 
					if ((matcher = dataPattern.matcher(lineData)).find()) {
						record.setReader_id(READER_ID);
						record.setHwy(matcher.group(1).trim()); // Main st
						record.setHwyDir(matcher.group(2).trim()); // Main dir
						record.setCrossFrom(matcher.group(3).trim()); // From st
						record.setSpeed(matcher.group(4).trim()); // speed
						String updatedTimeStr = formatDate(matcher.group(5).trim());
						System.out.println(updatedTimeStr);
						Date updatedTime = formatDate3(updatedTimeStr);
						record.setUpdatedTime(updatedTime); // Updated time 03/22/2012 7:56 PM
						record.setStartLat(Double.parseDouble(location[0].trim()));
						record.setStartLon(Double.parseDouble(location[1].trim()));
						Calendar c = Calendar.getInstance();
						c.setTime(updatedTime);
						int season = getSeason(updatedTimeStr);
						int timeidx = (c.get(Calendar.HOUR_OF_DAY) * 60 + c.get(Calendar.MINUTE)) / 15;
						int weekday = c.get(Calendar.DAY_OF_WEEK) - 1;
						record.setSeason(season);
						record.setTimeIdx(timeidx);
						record.setWeekday(weekday);
						recordList.add(record);
					} else {
						saveErrors(lineData);
					}
				}
			}
			
		} catch (MalformedURLException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			try {
				if (buffReader != null) {
					buffReader.close();
				}
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}

	private void setDNS() {
		java.security.Security.setProperty("networkaddress.cache.ttl", "0");
		java.security.Security.setProperty("networkaddress.cache.negative.ttl","0");
		System.setProperty("sun.net.inetaddr.ttl", "0");
		System.setProperty("sun.net.inetaddr.negative.ttl", "0");
	}
	
	/**
	 * 
	 * Get season
	 */
	public int getSeason(String date) throws Exception {
		int season = 0;
		Calendar cal = Calendar.getInstance();
		cal.setTime(formatDate3(date));
		int month = cal.get(Calendar.MONTH) + 1;
		if (month < 4) {
			season = 1;
		} else if (month < 7) {
			season = 2;
		} else if (month < 10) {
			season = 3;
		} else if (month < 12) {
			season = 4;
		}
		return season;
	}// end getSeason

	private void init() {
		dataPattern = Pattern.compile("(.*) ([WESN]) AT (.*) (\\d+) MPH UPDATED: (.*)");
	}
	
	/**
	 * Save error message to a output file
	 * 
	 * @param errorMessage
	 * @return None
	 * @exception
	 * @see
	 */

	private void saveErrors(String errorMessage) throws Exception {
		FileWriter fw = null;
		try {
			fw = new FileWriter(LA_SPECIAL_RECORDS_FILE, true);
			fw.write(errorMessage + "\n");
			
		} finally {
			if (fw != null) {
				fw.flush();
				fw.close();
				fw = null;
			}
		}
	}
	
	public Date formatDate2(String s) throws Exception {
		SimpleDateFormat sdf = new SimpleDateFormat("EEE, dd MMM yyyy hh:mm:ss a", Locale.US);
		Date d = sdf.parse(s);
		return d;
	}// end of formatDate

	public Date formatDate3(String s) throws Exception {
		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss",Locale.US);
		Date d = sdf.parse(s);
		return d;
	}

	/**
	 * Format date
	 * 
	 * @throws Exception
	 */
	// Tue, 6 May 2008 01:48:55 AM EDT
	public String formatDate(String date) throws Exception {
		Date a = sdf.parse(date);
		SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss",Locale.US);
		String b = sdf2.format(a);
		return b;
	}// end formatDate

	
	public void insertBatch(final List<SpeedRecord> speedRecords) {
		String sql = "insert into USA_dot_ArchiveSpeed(reader_id, dotsensorid,hwy,hwydir,cross_from,speed,updated_time,start_lat,start_lon,season,weekday,timeidx) values (?,?,?,?,?,?,?,?,?,?,?,?)";
		jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() {
			public void setValues(PreparedStatement ps, int i) throws SQLException {
				SpeedRecord speedRecord = speedRecords.get(i);
				ps.setString(1, speedRecord.getReader_id());
				ps.setString(2, speedRecord.getDotsensorid());
				ps.setString(3, speedRecord.getHwy());
				ps.setString(4, speedRecord.getHwyDir());
				ps.setString(5, speedRecord.getCrossFrom());
				ps.setString(6, speedRecord.getSpeed());
				ps.setDate(7, new java.sql.Date(speedRecord.getUpdatedTime().getTime()));
				ps.setDouble(8, speedRecord.getStartLat());
				ps.setDouble(9, speedRecord.getStartLon());
				ps.setInt(10, speedRecord.getSeason());
				ps.setInt(11, speedRecord.getWeekday());
				ps.setInt(12, speedRecord.getTimeIdx());
			}
			public int getBatchSize() {
				return speedRecords.size();
			}
		});
	}
}