先看看文档对于Scheduler的作用介绍
https://code4craft.gitbooks.io/webmagic-in-action/content/zh/posts/ch1-overview/architecture.html 之前我们也介绍过了,Scheduler主要负责爬虫的下一步爬取的规划,包括一些去重等功能。在主流程中也看到了Scheduler,现在来具体结合源码分析源码
Scheduler是一个接口
public interface Scheduler {
/**
* add a url to fetch * * @param request * @param task */ public void push(Request request, Task task);/**
* get an url to crawl * * @param task the task of spider * @return the url to crawl */ public Request poll(Task task);}
12345678910111213141516171819201234567891011121314151617181920其主要的实现是DuplicateRemovedScheduler,使用模板模式定义了push的步骤。public abstract class DuplicateRemovedScheduler implements Scheduler {
protected Logger logger = LoggerFactory.getLogger(getClass());
private DuplicateRemover duplicatedRemover = new HashSetDuplicateRemover();
public DuplicateRemover getDuplicateRemover() {
return duplicatedRemover; }public DuplicateRemovedScheduler setDuplicateRemover(DuplicateRemover duplicatedRemover) {
this.duplicatedRemover = duplicatedRemover; return this; }@Override
public void push(Request request, Task task) { logger.trace("get a candidate url {}", request.getUrl()); if (!duplicatedRemover.isDuplicate(request, task) || shouldReserved(request)) { logger.debug("push to queue {}", request.getUrl()); pushWhenNoDuplicate(request, task); } }protected boolean shouldReserved(Request request) {
return request.getExtra(Request.CYCLE_TRIED_TIMES) != null; }protected void pushWhenNoDuplicate(Request request, Task task) {
}
}123456789101112131415161718192021222324252627282930313233123456789101112131415161718192021222324252627282930313233我们来看看负责去重的接口DuplicateRemover,其实现类有HashSetDuplicateRemover使用HashSet来去重,RedisScheduler接触Redis来去重和BloomFilterDuplicateRemover使用BloomFilter去重。默认使用HashSetDuplicateRemoverpublic class HashSetDuplicateRemover implements DuplicateRemover {
private Set<String> urls = Sets.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
@Override
public boolean isDuplicate(Request request, Task task) { return !urls.add(getUrl(request)); }protected String getUrl(Request request) {
return request.getUrl(); }@Override
public void resetDuplicateCheck(Task task) { urls.clear(); }@Override
public int getTotalRequestsCount(Task task) { return urls.size(); }}12345678910111213141516171819202122231234567891011121314151617181920212223DuplicateRemovedScheduler抽象类有四个具体实现类QueueScheduler,PriorityScheduler,FileCacheQueueScheduler和RedisScheduler。默认使用QueueScheduler@ThreadSafe
public class QueueScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler {private BlockingQueue<Request> queue = new LinkedBlockingQueue<Request>();
@Override
public void pushWhenNoDuplicate(Request request, Task task) { queue.add(request); }@Override
public synchronized Request poll(Task task) { return queue.poll(); }@Override
public int getLeftRequestsCount(Task task) { return queue.size(); }@Override
public int getTotalRequestsCount(Task task) { return getDuplicateRemover().getTotalRequestsCount(task); }}12345678910111213141516171819202122232425261234567891011121314151617181920212223242526其内部是使用了一个LinkedBlockingQueue这个无界队列来存储Request,我们应该看到了@ThreadSafe注解,那我抛一个问题吧。Scheduler是否存在线程同步问题呢,如果存在那是如何解决的呢? 再来看下一个@ThreadSafe
public class PriorityScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler {public static final int INITIAL_CAPACITY = 5;
private BlockingQueue<Request> noPriorityQueue = new LinkedBlockingQueue<Request>();
private PriorityBlockingQueue<Request> priorityQueuePlus = new PriorityBlockingQueue<Request>(INITIAL_CAPACITY, new Comparator<Request>() {
@Override public int compare(Request o1, Request o2) { return -NumberUtils.compareLong(o1.getPriority(), o2.getPriority()); } });private PriorityBlockingQueue<Request> priorityQueueMinus = new PriorityBlockingQueue<Request>(INITIAL_CAPACITY, new Comparator<Request>() {
@Override public int compare(Request o1, Request o2) { return -NumberUtils.compareLong(o1.getPriority(), o2.getPriority()); } });@Override
public void pushWhenNoDuplicate(Request request, Task task) { if (request.getPriority() == 0) { noPriorityQueue.add(request); } else if (request.getPriority() > 0) { priorityQueuePlus.put(request); } else { priorityQueueMinus.put(request); } }@Override
public synchronized Request poll(Task task) { Request poll = priorityQueuePlus.poll(); if (poll != null) { return poll; } poll = noPriorityQueue.poll(); if (poll != null) { return poll; } return priorityQueueMinus.poll(); }@Override
public int getLeftRequestsCount(Task task) { return noPriorityQueue.size(); }@Override
public int getTotalRequestsCount(Task task) { return getDuplicateRemover().getTotalRequestsCount(task); }}12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455561234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556我们看到了两个PriorityBlockingQueue和一个LinkedBlockingQueue。在poll的时候存在一个顺序。 继续public class FileCacheQueueScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler {
private String filePath = System.getProperty("java.io.tmpdir");
private String fileUrlAllName = ".urls.txt";
private Task task;
private String fileCursor = ".cursor.txt";
private PrintWriter fileUrlWriter;
private PrintWriter fileCursorWriter;
private AtomicInteger cursor = new AtomicInteger();
private AtomicBoolean inited = new AtomicBoolean(false);
private BlockingQueue<Request> queue;
private Set<String> urls;
public FileCacheQueueScheduler(String filePath) {
if (!filePath.endsWith("/") && !filePath.endsWith("\\")) { filePath += "/"; } this.filePath = filePath; }private void flush() {
fileUrlWriter.flush(); fileCursorWriter.flush(); }private void init(Task task) {
this.task = task; File file = new File(filePath); if (!file.exists()) { file.mkdirs(); } readFile(); initWriter(); initFlushThread(); inited.set(true); logger.info("init cache scheduler success"); }private void initFlushThread() {
Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() { @Override public void run() { flush(); } }, 10, 10, TimeUnit.SECONDS); }private void initWriter() {
try { fileUrlWriter = new PrintWriter(new FileWriter(getFileName(fileUrlAllName), true)); fileCursorWriter = new PrintWriter(new FileWriter(getFileName(fileCursor), false)); } catch (IOException e) { throw new RuntimeException("init cache scheduler error", e); } }private void readFile() {
try { queue = new LinkedBlockingQueue<Request>(); urls = new LinkedHashSet<String>(); readCursorFile(); readUrlFile(); } catch (FileNotFoundException e) { //init logger.info("init cache file " + getFileName(fileUrlAllName)); } catch (IOException e) { logger.error("init file error", e); } }private void readUrlFile() throws IOException {
String line; BufferedReader fileUrlReader = null; try { fileUrlReader = new BufferedReader(new FileReader(getFileName(fileUrlAllName))); int lineReaded = 0; while ((line = fileUrlReader.readLine()) != null) { urls.add(line.trim()); lineReaded++; if (lineReaded > cursor.get()) { queue.add(new Request(line)); } } } finally { if (fileUrlReader != null) { IOUtils.closeQuietly(fileUrlReader); } } }private void readCursorFile() throws IOException {
BufferedReader fileCursorReader = null; try { fileCursorReader = new BufferedReader(new FileReader(getFileName(fileCursor))); String line; //read the last number while ((line = fileCursorReader.readLine()) != null) { cursor = new AtomicInteger(NumberUtils.toInt(line)); } } finally { if (fileCursorReader != null) { IOUtils.closeQuietly(fileCursorReader); } } }private String getFileName(String filename) {
return filePath + task.getUUID() + filename; }@Override
protected void pushWhenNoDuplicate(Request request, Task task) { if (!inited.get()) { init(task); } queue.add(request); fileUrlWriter.println(request.getUrl()); }@Override
public synchronized Request poll(Task task) { if (!inited.get()) { init(task); } fileCursorWriter.println(cursor.incrementAndGet()); return queue.poll(); }@Override
public int getLeftRequestsCount(Task task) { return queue.size(); }@Override
public int getTotalRequestsCount(Task task) { return getDuplicateRemover().getTotalRequestsCount(task); }}123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148会将url和已经执行的url指针存在两个文件中,创建了scheduleExecutor定期的flush,所有内存中的url还是存在BlockingQueue中。 RedisScheduler不是很懂。。目前还没有接触过:)使用
具体使用过程还是需要自己根据自己的爬虫特点然后选择特定的Scheduler及DuplicateRemover,只有懂得其原理才能选择最合适的组件。
WebMagic组件都可以自行设置这点真的太棒了~