通常一個SnapshotRepository倉庫對象對應一個DocumentSnapshotRepositoryMonitor螢幕對象,同時也對應一個快照存儲器對象,它們的關聯是通過螢幕管理對象DocumentSnapshotRepositoryMonitorManagerImpl實作的
DocumentSnapshotRepositoryMonitorManagerImpl類要實作那些行為,先檢視其實作接口DocumentSnapshotRepositoryMonitorManager定義的方法規範
/**
* Management interface to {@link DocumentSnapshotRepositoryMonitor} threads.
*
* @since 2.8
*/
public interface DocumentSnapshotRepositoryMonitorManager {
/**
* Ensures all monitor threads are running.
*
* @param checkpoint for the last completed document or null if none have
* been completed.
* @throws RepositoryException
*/
void start(String checkpoint) throws RepositoryException;
/**
* Stops all the configured {@link DocumentSnapshotRepositoryMonitor} threads.
*/
void stop();
/**
* Removes persisted state for {@link DocumentSnapshotRepositoryMonitor}
* threads. After calling this {@link DocumentSnapshotRepositoryMonitor}
* threads will no longer be able to resume from where they left off last
* time.
*/
void clean();
/**
* Returns the number of {@link DocumentSnapshotRepositoryMonitor} threads
* that are alive. This method is for testing purposes.
*/
int getThreadCount();
/**
* Returns the {@link CheckpointAndChangeQueue} for this
* {@link DocumentSnapshotRepositoryMonitorManager}
*/
CheckpointAndChangeQueue getCheckpointAndChangeQueue();
/** Returns whether we are after a start() call and before a stop(). */
boolean isRunning();
/**
* Receives information specifying what is guaranteed to be delivered to GSA.
* Every entry in passed in Map is a monitor name and MonitorCheckpoint.
* The monitor of that name can expect that all documents before and including
* document related with MonitorCheckpoint will be delivered to GSA.
* This information is for the convenience and efficiency of the Monitor so
* that it knows how many changes it has to resend. It's valid for a monitor
* to ignore these updates if it feels like it for some good reason.
* FileConnectorSystemMonitor instances use this information to trim their
* file system snapshots.
*/
void acceptGuarantees(Map<String, MonitorCheckpoint> guarantees);
/**
* Receives {@link TraversalSchedule} from TraversalManager which is
* {@link TraversalScheduleAware}.
*/
void setTraversalSchedule(TraversalSchedule traversalSchedule);
}
然後再來看DocumentSnapshotRepositoryMonitorManagerImpl類怎麼實作上述接口中定義的行為
先來了解相關屬性及如何初始化它們的
private volatile TraversalSchedule traversalSchedule;
//監控器線程
private final List<Thread> threads =
Collections.synchronizedList(new ArrayList<Thread>());
//監控器映射容器
private final Map<String, DocumentSnapshotRepositoryMonitor> fileSystemMonitorsByName =
Collections.synchronizedMap(new HashMap<String, DocumentSnapshotRepositoryMonitor>());
private boolean isRunning = false; // Monitor threads start in off state.
private final List<? extends SnapshotRepository<? extends DocumentSnapshot>>
repositories;
private final File snapshotDir;
private final ChecksumGenerator checksumGenerator;
//CheckpointAndChange對象容器(List)
private final CheckpointAndChangeQueue checkpointAndChangeQueue;
//Change對象容器(阻塞隊列)
private final ChangeQueue changeQueue;
private final DocumentSnapshotFactory documentSnapshotFactory;
/**
* Constructs {@link DocumentSnapshotRepositoryMonitorManagerImpl}
* for the {@link DiffingConnector}.
*
* @param repositories a {@code List} of {@link SnapshotRepository
* SnapshotRepositorys}
* @param documentSnapshotFactory a {@link DocumentSnapshotFactory}
* @param snapshotDir directory to store {@link SnapshotRepository}
* @param checksumGenerator a {@link ChecksumGenerator} used to
* detect changes in a document's content
* @param changeQueue a {@link ChangeQueue}
* @param checkpointAndChangeQueue a
* {@link CheckpointAndChangeQueue}
*/
public DocumentSnapshotRepositoryMonitorManagerImpl(
List<? extends SnapshotRepository<
? extends DocumentSnapshot>> repositories,
DocumentSnapshotFactory documentSnapshotFactory,
File snapshotDir, ChecksumGenerator checksumGenerator,
ChangeQueue changeQueue,
CheckpointAndChangeQueue checkpointAndChangeQueue) {
this.repositories = repositories;
this.documentSnapshotFactory = documentSnapshotFactory;
this.snapshotDir = snapshotDir;
this.checksumGenerator = checksumGenerator;
this.changeQueue = changeQueue;
this.checkpointAndChangeQueue = checkpointAndChangeQueue;
}
下面我們再來看它的start方法,在該方法中,主要動作為分别為調用checkpointAndChangeQueue對象的start方法,初始化各個倉庫對象相關聯的快照存儲對象SnapshotStore,最後是啟動各個倉庫對象的監控器執行個體
/**
* 啟動方法
*/
/** Go from "cold" to "warm" including CheckpointAndChangeQueue. */
public void start(String connectorManagerCheckpoint)
throws RepositoryException {
try {
//啟動 擷取Change(主要動作:從json格式隊列檔案加載monitorPoints和checkpointAndChangeList隊列)
checkpointAndChangeQueue.start(connectorManagerCheckpoint);
} catch (IOException e) {
throw new RepositoryException("Failed starting CheckpointAndChangeQueue.",
e);
}
//MonitorCheckpoint容器
Map<String, MonitorCheckpoint> monitorPoints
= checkpointAndChangeQueue.getMonitorRestartPoints();
Map<String, SnapshotStore> snapshotStores = null;
//加載monitorName與SnapshotStore映射容器
try {
snapshotStores =
recoverSnapshotStores(connectorManagerCheckpoint, monitorPoints);
} catch (SnapshotStoreException e) {
throw new RepositoryException("Snapshot recovery failed.", e);
} catch (IOException e) {
throw new RepositoryException("Snapshot recovery failed.", e);
} catch (InterruptedException e) {
throw new RepositoryException("Snapshot recovery interrupted.", e);
}
//啟動監控線程
startMonitorThreads(snapshotStores, monitorPoints);
isRunning = true;
}
在初始化每個倉庫對象的快照存儲對象SnapshotStore時,同時傳入相關聯的MonitorCheckPoint對象執行個體,必要時修複快照檔案
/* For each start path gets its monitor recovery files in state were monitor
* can be started. */
/**
* 加載monitorName與SnapshotStore映射容器
* @param connectorManagerCheckpoint
* @param monitorPoints
* @return
* @throws IOException
* @throws SnapshotStoreException
* @throws InterruptedException
*/
private Map<String, SnapshotStore> recoverSnapshotStores(
String connectorManagerCheckpoint, Map<String,
MonitorCheckpoint> monitorPoints)
throws IOException, SnapshotStoreException, InterruptedException {
Map<String, SnapshotStore> snapshotStores =
new HashMap<String, SnapshotStore>();
for (SnapshotRepository<? extends DocumentSnapshot> repository
: repositories) {
String monitorName = makeMonitorNameFromStartPath(repository.getName());
File dir = new File(snapshotDir, monitorName);
boolean startEmpty = (connectorManagerCheckpoint == null)
|| (!monitorPoints.containsKey(monitorName));
if (startEmpty) {
LOG.info("Deleting " + repository.getName()
+ " global checkpoint=" + connectorManagerCheckpoint
+ " monitor checkpoint=" + monitorPoints.get(monitorName));
//删除該快照目錄
delete(dir);
} else {
//修複該快照目錄
SnapshotStore.stitch(dir, monitorPoints.get(monitorName),
documentSnapshotFactory);
}
SnapshotStore snapshotStore = new SnapshotStore(dir,
documentSnapshotFactory);
snapshotStores.put(monitorName, snapshotStore);
}
return snapshotStores;
}
下面繼續跟蹤啟動監控器線程的方法
/**
* 啟動監控線程(貌似MonitorCheckpoint與SnapshotStore與monitor有映射關系)
* Creates a {@link DocumentSnapshotRepositoryMonitor} thread for each
* startPath.
*
* @throws RepositoryDocumentException if any of the threads cannot be
* started.
*/
private void startMonitorThreads(Map<String, SnapshotStore> snapshotStores,
Map<String, MonitorCheckpoint> monitorPoints)
throws RepositoryDocumentException {
for (SnapshotRepository<? extends DocumentSnapshot> repository
: repositories) {
String monitorName = makeMonitorNameFromStartPath(repository.getName());
//monitorName snapshotStores映射
//快照存儲器(讀寫器)
SnapshotStore snapshotStore = snapshotStores.get(monitorName);
//建立監控線程
Thread monitorThread = newMonitorThread(repository, snapshotStore,
monitorPoints.get(monitorName));
threads.add(monitorThread);
LOG.info("starting monitor for <" + repository.getName() + ">");
monitorThread.setName(repository.getName());
monitorThread.setDaemon(true);
monitorThread.start();
}
}
監控器對象的建立在下面的方法
/**
* 建立監控線程
* Creates a {@link DocumentSnapshotRepositoryMonitor} thread for the provided
* folder.
*
* @throws RepositoryDocumentException if {@code startPath} is not readable,
* or if there is any problem reading or writing snapshots.
*/
private Thread newMonitorThread(
SnapshotRepository<? extends DocumentSnapshot> repository,
SnapshotStore snapshotStore, MonitorCheckpoint startCp)
throws RepositoryDocumentException {
//注意monitorName
String monitorName = makeMonitorNameFromStartPath(repository.getName());
//document在監控線程裡面處理
DocumentSnapshotRepositoryMonitor monitor =
new DocumentSnapshotRepositoryMonitor(monitorName, repository,
snapshotStore, changeQueue.newCallback(), DOCUMENT_SINK, startCp,
documentSnapshotFactory);
monitor.setTraversalSchedule(traversalSchedule);
LOG.fine("Adding a new monitor for " + monitorName + ": " + monitor);
fileSystemMonitorsByName.put(monitorName, monitor);
return new Thread(monitor);
}
stop方法實作監控器線程的停止
/**
* 停止監控器
*/
private void flagAllMonitorsToStop() {
for (SnapshotRepository<? extends DocumentSnapshot> repository
: repositories) {
String monitorName = makeMonitorNameFromStartPath(repository.getName());
DocumentSnapshotRepositoryMonitor
monitor = fileSystemMonitorsByName.get(monitorName);
if (null != monitor) {
monitor.shutdown();
}
else {
LOG.fine("Unable to stop non existent monitor thread for "
+ monitorName);
}
}
}
/**
* 停止監控器線程
*/
/* @Override */
public synchronized void stop() {
for (Thread thread : threads) {
thread.interrupt();
}
for (Thread thread : threads) {
try {
thread.join(MAX_SHUTDOWN_MS);
if (thread.isAlive()) {
LOG.warning("failed to stop background thread: " + thread.getName());
}
} catch (InterruptedException e) {
// Mark this thread as interrupted so it can be dealt with later.
Thread.currentThread().interrupt();
}
}
threads.clear();
/* in case thread.interrupt doesn't stop monitors */
flagAllMonitorsToStop();
fileSystemMonitorsByName.clear();
changeQueue.clear();
this.isRunning = false;
}
在flagAllMonitorsToStop()方法中調用監控器對象的monitor.shutdown()方法,設定監控器對象 的辨別屬性
/* The monitor should exit voluntarily if set to false */
private volatile boolean isRunning = true;
---------------------------------------------------------------------------
本系列企業搜尋引擎開發之連接配接器connector系本人原創
轉載請注明出處 部落格園 刺猬的溫馴
本人郵箱: [email protected]#com (#改為.)
本文連結 http://www.cnblogs.com/chenying99/p/3789613.html