package datacollect;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
/**
* 数据转储线程 将采集到的数据每隔一分钟一次性批量存储到数据库
*
*
@author Jack
*
*/
public class DataSaveThread implements Runnable {
// 成员变量
private ArrayList<String> list;// 共享资源
private ArrayList<String> list2DB;// 数据转储缓冲区
private int countSum;// 记录本次运行写入数据库的数据总条数
public DataSaveThread(ArrayList<String> list) {
this.list = list;
}
public void run() {
// 初始化模块
list2DB = new ArrayList<String>();// 将数据转储缓冲区初始化
try {
// 写入数据库模块
while (PluginListener.runFlag) {// 正常运行下,写入数据库
Thread.sleep(60 * 1000);// 该线程睡眠1分钟
if (!listToList2DB(list, list2DB).isEmpty()) {// 当数据转储缓冲区list2DB不为空的时候,再执行
dataBatch2DB(list2DB);// 写入数据库
}
}
} catch (InterruptedException e) {// 按下0,中断睡眠,把已经读到的数据及时写到数据库,保证数据的可靠性
if (!listToList2DB(list, list2DB).isEmpty()) {// 当数据转储缓冲区list2DB不为空的时候,再执行
dataBatch2DB(list2DB);// 写入数据库
}
}
finally {// 给出服务器关闭提示
System.out.println("The server has been closed!");
System.exit(0);
}
}
/**
* 将arrayList1集合中的元素copy到arrayList2集合中
*
*
@param arrayList1
* @param arrayList2
*
@return arrayList2
*/
private ArrayList<String> listToList2DB(ArrayList<String> arrayList1, ArrayList<String> arrayList2) {
try {
PluginListener.semp.acquire();// 申请信号量
if (!arrayList1.isEmpty()) {// 如果arrayList1不为空
System.out.println("\ndata2DB begin .....");
System.out.println("listSizeBegin=" + arrayList1.size());// arrayList1的大小
arrayList2.addAll(arrayList1);// 将arrayList1的内容全部复制到arrayList2中
arrayList1.clear();// 清空arrayList1
}
PluginListener.semp.release();// 释放信号量
} catch (InterruptedException e) {
e.printStackTrace();
}
return arrayList2;
}
/**
* 将数据批量插入数据库
*
* @param arrayList
*/
private void dataBatch2DB(ArrayList<String> arrayList) { //从这里开始不懂!!!!!!!!
// 定义变量并初始化
Connection conn = null;// 与特定数据库的连接
PreparedStatement ps = null;// SQL语句被预编译并存储在PreparedStatement对象中
int count = 0;// 用于记录合法记录的条数
long startTime;// 记录存入数据库的开始时间
long endTime;// 记录存入数据库的结束时间
String[] lineArray;// 用于存放分割arrayList元素之后的数据
String lineData;// 采集到的数据
String collectTime;// 采集到某条数据时所对应的时间
String sql = "insert into "+ParamSet.dbTable+"(sdata,collectTime,collectIP) values(?,?,?)";// 将SQL语句参数化
try {
conn = DBUtil.getConn();// 连接数据库
conn.setAutoCommit(false);// 禁用自动提交模式
ps = conn.prepareStatement(sql);// 将参数化的 SQL语句发送到数据库
// 批量存入数据库的核心代码
startTime = System.currentTimeMillis();// 存入数据库的开始时间
for (String line : arrayList) {
lineArray = line.split(",");
lineData = lineArray[0];
collectTime = lineArray[1];
ps.setString(1, lineData);
ps.setString(2, collectTime);
ps.setString(3, ParamSet.collectIp);
ps.addBatch();// 将一组参数添加到此 PreparedStatement 对象的批处理命令中
count++;
}
ps.executeBatch();// 将一批命令提交给数据库来执行
conn.commit();// 使所有上一次提交更改成为持久更改,并释放此 Connection对象当前持有的所有数据库锁
endTime = System.currentTimeMillis();// 存入数据库的结束时间
arrayList.clear();// 清空arrayList
countSum += count;// 本次运行写入数据库的数据总条数
// 信息提示模块
System.out.println("共有合法的记录" + count + "条");
System.out.println("数据存入DB花费的时间以毫秒为单位:" + (endTime - startTime) + "毫秒");
System.out.println("目前数据总条数是:" + countSum);
System.out.println("当前时间是:"
+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(System.currentTimeMillis())));
} catch (Exception e) {
e.printStackTrace();
} finally {
DBUtil.closeConn(conn);// 关闭数据库连接
try {
if (null != ps) {// 如果预编译的 SQL语句的对象不为空,则关闭
ps.close();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
/**
* 数据库的连接
*
* @author Jack
*
*/
class DBUtil {
// 定义数据库连接参数
public static final String DRIVER_CLASS_NAME = ParamSet.driver;// 驱动名称
public static final String URL = ParamSet.url;// JDBC连接sqlserver数据库的格式
public static final String USERNAME = ParamSet.userName;// 该数据库的用户名
public static final String PASSWORD = ParamSet.passWord;// 与用户名匹配的密码
// 注册数据库驱动
static {
try {
Class.forName(DRIVER_CLASS_NAME);
} catch (ClassNotFoundException e) {
System.out.println("注册失败!");
e.printStackTrace();
}
}
// 获取连接
public static Connection getConn() throws SQLException {
return DriverManager.getConnection(URL, USERNAME, PASSWORD);
}
// 关闭连接
public static void closeConn(Connection conn) {
if (null != conn) {
try {
conn.close();
} catch (SQLException e) {
System.out.println("关闭连接失败!");
e.printStackTrace();
}
}
}
}