@Override
public Iterable<Tuple2<String, String>> call(
Tuple2<Long, Iterable<String>> tuple)
throws Exception {
long categoryid = tuple._1;
Iterator<String> iterator = tuple._2.iterator();
// 定義取topn的排序數組
String[] top10Sessions = new String[10];
while(iterator.hasNext()) {
String sessionCount = iterator.next();
long count = Long.valueOf(sessionCount.split(",")[1]);
// 周遊排序數組
for(int i = 0; i < top10Sessions.length; i++) {
// 如果目前i位,沒有資料,那麼直接将i位資料指派為目前sessionCount
if(top10Sessions[i] == null) {
top10Sessions[i] = sessionCount;
break;
} else {
long _count = Long.valueOf(top10Sessions[i].split(",")[1]);
// 如果sessionCount比i位的sessionCount要大
if(count > _count) {
// 從排序數組最後一位開始,到i位,所有資料往後挪一位
for(int j = 9; j > i; j--) {
top10Sessions[j] = top10Sessions[j - 1];
}
// 将i位指派為sessionCount
top10Sessions[i] = sessionCount;
break;
}
// 比較小,繼續外層for循環
}
}
}
// 将資料寫入MySQL表
List<Tuple2<String, String>> list = new ArrayList<Tuple2<String, String>>();
for(String sessionCount : top10Sessions) {
if(sessionCount != null) {
String sessionid = sessionCount.split(",")[0];
long count = Long.valueOf(sessionCount.split(",")[1]);
// 将top10 session插入MySQL表
Top10Session top10Session = new Top10Session();
top10Session.setTaskid(taskid);
top10Session.setCategoryid(categoryid);
top10Session.setSessionid(sessionid);
top10Session.setClickCount(count);
ITop10SessionDAO top10SessionDAO = DAOFactory.getTop10SessionDAO();
top10SessionDAO.insert(top10Session);
// 放入list
list.add(new Tuple2<String, String>(sessionid, sessionid));
}
}
return list;
}
});