Zrównolegnienie web crawlera

Zrównolegnienie web crawlera
Julian_
  • Rejestracja:około 8 lat
  • Ostatnio:ponad 4 lata
  • Postów:1703
0

Mam taką metodę:

Kopiuj
public List<org.bson.Document> crawlForAllMongoDoc() {
		List<org.bson.Document> allDocuments = new ArrayList<>();

		for (Page page : forumTopic) {
			LOG.log(Level.INFO, "downloading url=" + page.url);
			for (Post post : page) {
				allDocuments.add(post.toMongoDocument());
			}
		}
		return allDocuments;
	}

Pobiera html ze strony internetowej i przerabia. Jak przerobi to pobiera następny html z kolejnej strony i znów przerabia. itd.

Chciałbym to sparaleryzować:
1 wątek pobiera html ze strony i nie czeka, aż html zostanie przerobiony tylko pobiera od razu następny.
2 wątek przerabia pobrane html, jeśli są jakieś pobrane.

Kopiuj
public List<org.bson.Document> crawlForAllMongoDocParraler() {
		List<org.bson.Document> allDocuments = new ArrayList<>();

		Queue<Page> pagesQueue = new LinkedList<>();

		Thread pagerThread = new Thread(() -> {
			for (Page page : forumTopic) {
				LOG.log(Level.INFO, "downloading url=" + page.url);
				pagesQueue.add(page);
				synchronized (this) {
					notify();
				}
			}
		});

		Thread posterThread = new Thread(() -> {
			while (!pagesQueue.isEmpty() || pagerThread.isAlive()) {
				if (pagesQueue.isEmpty()) {
					synchronized (this) {
						try {
							wait();
						} catch (InterruptedException e) {
							// TODO Auto-generated catch block
							e.printStackTrace();
						}
					}
				} else {
					for (Post post : pagesQueue.poll()) {
						allDocuments.add(post.toMongoDocument());
					}
				}
			}
		});

		pagerThread.start();
		posterThread.start();

		try {
			pagerThread.join();
			posterThread.join();
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		return allDocuments;
	}

czy to jest w miarę OK? Zszedłem ze 150 sekund do 130 dzięki temu.
Widzę ryzyko, że jak w 17 linijce wątek 1 będzie żywy, ale już w 21 martwy to wątek 2 nigdy się nie zatrzyma.

edytowany 5x, ostatnio: Julian_
Zobacz pozostałe 9 komentarzy
WeiXiao
biblioteki? fujjjjj, po co to komu? aby tracić całą zabawę pisania samemu parsera html? :) :D
superdurszlak
gr8 b8 m8 i r8 8/8 :D :D :D
WeiXiao
ale z was bibliotekarze, omg :D
superdurszlak
ktoś tu się Metra naczytał? :D
KO
@WeiXiao: Python+Scrapy/puppeteer albo js + puppeteer Zdecydowanie nie Java XD
superdurszlak
  • Rejestracja:prawie 7 lat
  • Ostatnio:około 7 godzin
  • Lokalizacja:Kraków
  • Postów:2000
4

W sensie chcesz wysmażyć sobie coś w rodzaju takiego pipeline'a?

screenshot-20190802204835.png

No to tak w wariancie najbardziej "na pałę" tzn rzeźbiąc (prawie) wszystko sam

  • robisz sobie jakieś tam implementacje Runnable/Taska czy coś takiego dla każdego etapu jaki chcesz wykonać (pobieranie HTMLi, jakieś parsowanie itd)
  • robisz sobie jakiś ThreadPoolExecutor czy inny ProcessPoolExecutor...
  • ...do którego subskrybujesz sobie asynchronicznie zadanie ściągnięcia dokumentu
  • dla każdego zadania dostajesz jakiś tam Future, przepychasz go dalej jako nowego taska...
  • ...który z kolei czeka na tego Future, wyciąga z niego cukierka i go konsumuje, po czym wypluwa orzeszki
  • dostajesz z niego kolejny Future tym razem z orzeszkami itp itd. który przepychasz do taska "wyjmij orzeszki z papierka i zmiel na mączkę"
  • I na końcu robisz jakiś waiter który czeka aż wszystkie Futury z mąką z orzeszków czy co to tam miało być albo coś zwrócą, albo się wysypią
Kopiuj
processPool = ProcessPool()
futures = []

for url in urlSource.getAll():
  htmlFuture = processPool.executeAsync(DownloadHtmlTask(url))
  parsedDataFuture = processPool.executeAsync(ParseHtmlTask(htmlFuture))
  somethingElseFuture = processPool.executeAsync(DoSomethingElseTask(parsedDataFuture))
  futures.append(somethingElseFuture)

results, errors = waitUntilAllCompleteOrFail(futures)

// bla bla bla

I nie bawisz się w jakieś żonglowanie joinami i pilnowanie, który zjoinować przed którym


edytowany 3x, ostatnio: superdurszlak
Julian_
a z jakiej biblioteki jest ProcessPool? nie widzę też takich metod jak executeAsync...
superdurszlak
to tylko pseudokod, miało pokazać ideę a nie API :P ogólnie szukaj pod java.util.concurrent
Charles_Ray
  • Rejestracja:około 17 lat
  • Ostatnio:około 22 godziny
  • Postów:1875
1

Korzystanie bezpośrednio z klasy Thread nie jest optymalne, od Javy 6/7 są na to lepsze sposoby (pakiet java.util.concurrent, ExecutorService).
Poczytaj o Fork-Join Pool: https://www.baeldung.com/java-fork-join, https://howtodoinjava.com/java7/forkjoin-framework-tutorial-forkjoinpool-example/
Przykład użycia (crawler): https://www.javaworld.com/article/2078440/java-tip-when-to-use-forkjoinpool-vs-executorservice.html, https://github.com/niravshah/wd_webcrawler


”Engineering is easy. People are hard.” Bill Coughran
Shalom
  • Rejestracja:około 21 lat
  • Ostatnio:prawie 3 lata
  • Lokalizacja:Space: the final frontier
  • Postów:26433
3
  1. ExecutorService jakis weź jak człowiek
  2. ConcurrentQueue zamiast synchronizy
  3. Puść to na wielu wątkach bo i tak wąskie gardło to pobieranie tych wyników. Możesz mieć N wątków ściagających i dużo mniej przetwrzajacych
  4. Tu sie to nie przyda, ale jakbyś chciał być ultra low latency to mozna by w ogóle wywalić te synchronizacje i nawet concurrentqueue, robiąc tam AtomicReference na zwykłą listę i jak wątek przetwarzający chce sobie pobrać listę zadań to robi atomową wymianę na pustą listą, a tą pełną zaczyna sobie przetwarzać ;]

"Nie brookliński most, ale przemienić w jasny, nowy dzień najsmutniejszą noc - to jest dopiero coś!"
Julian_
a zwiększanie wątków ściągających przyśpieszy? bo przecież to jest ograniczone przepustowością strony internetowej z której pobieram.
Shalom
Wątpie. Zależy co ściągasz, ale pewnie sam narzut na połączenie jest tu duży, a nie sam transfer danych.
hcubyc
  • Rejestracja:ponad 12 lat
  • Ostatnio:prawie 3 lata
2

Podam inne rozwiązanie mam nadzieję, że nie będzie hipsterskie, będziesz potrzebował Reactora i jakiś klient http, może być bajerancki asynchroniczny i reaktywny, ale nie musi, uwaga pisane z pamięci

Kopiuj
val allDocuments = Flux.fromIterable(urls)
        .flatMap(url -> download(url))
        .map(page -> fancyStuff(page))
        .collectList()
        .block();

to wystarczy jak będzie klient reaktywny klient, jak nie to można blokujące wywołanie download opakować przy użyciu np. Mono.fromCallable() i na końcu zasubskrybować się schedulerem z jakąs pulą wątków, polecam Schedulers.fromExecutorService() albo znaleźć coś dla siebie z klasy Schedulers. Można też bardziej idiomatycznie i nie blokować się na końcu tylko dalej zwrócić Fluxa


Limitations are limitless > ##### Ola Nordmann napisał(a)
> Moim językiem ojczystym jest C++ i proszę uszanować to, że piszę po polsku.

Zarejestruj się i dołącz do największej społeczności programistów w Polsce.

Otrzymaj wsparcie, dziel się wiedzą i rozwijaj swoje umiejętności z najlepszymi.