/
__init__.py
138 lines (117 loc) 路 5.17 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
from app import app, db, celery, elasticsearch
from app.models import leaderships, Group, Person, PersonPersistent
from app.mail import send_scraper_report
from app.scraper import sources
from .cache import Cache
import json
import os
from threading import Thread
from app.search import add_to_index
import traceback
from celery.utils.log import get_task_logger
from celery.signals import after_setup_task_logger
from celery.app.log import TaskFormatter
logger = get_task_logger(__name__)
@after_setup_task_logger.connect
def setup_task_logger(logger, *args, **kwargs):
for handler in logger.handlers:
handler.setFormatter(TaskFormatter('%(name)s | %(message)s'))
# Define a function to check if S3 credentials are set
def has_s3_credentials():
return os.environ.get('S3_ACCESS_KEY') and os.environ.get('S3_SECRET_ACCESS_KEY')
def scrape_face_book_directory_name_coach(face_book, directory, name_coach):
with app.app_context():
people = []
thread_fb = Thread(target=face_book.pull, args=(people,))
thread_dir = Thread(target=directory.pull, args=(people,))
thread_fb.start()
thread_dir.start()
thread_fb.join()
thread_dir.join()
people = face_book.integrate(people)
people = directory.integrate(people)
name_coach.pull(people)
people = name_coach.integrate(people)
@celery.task
def scrape(caches_active, face_book_cookie, people_search_session_cookie, csrf_token, yaleconnect_cookie):
logger.info('Scraper kicking off.')
# Fix missing ElasticSearch index
"""
logger.info('Loading people.')
page = 0
page_size = 1000
while True:
people = Person.query.paginate(page, page_size, False).items
logger.info('Loaded people.')
for person in people:
logger.info(person.netid)
add_to_index('person', person)
if len(people) < page_size:
break
page += 1
return
"""
with app.app_context():
try:
caches_active = {
'scraped_data.' + key if key else 'scraped_data': value
for key, value in caches_active.items()
}
logger.info('Launching scraper.')
cache = Cache(caches_active)
cache_key = 'scraped_data'
logger.info('Checking cache...')
people = cache.get(cache_key)
face_book = None
if people:
logger.info('Found people in cache.')
else:
logger.info('Initializing sources.')
directory = sources.Directory(cache, people_search_session_cookie, csrf_token)
face_book = sources.FaceBook(cache, face_book_cookie, directory)
name_coach = sources.NameCoach(cache, people_search_session_cookie, csrf_token)
departmental = sources.Departmental(cache)
logger.info('Beginning scrape.')
people = []
thread_fb_dir_nc = Thread(target=scrape_face_book_directory_name_coach, args=(face_book, directory, name_coach))
thread_departmental = Thread(target=departmental.pull, args=(people,))
thread_fb_dir_nc.start()
thread_departmental.start()
thread_fb_dir_nc.join()
thread_departmental.join()
# TODO: find a cleaner way to exchange this data
people = name_coach.people
people = departmental.integrate(people)
cache.set(cache_key, people)
# Store people into database
logger.info('Inserting new data.')
# TODO: we do this at the starting of YaleConnect.merge; I'm just temporarily adding this so that the Person models can be deleted. Maybe we should make the deletes cascade or something?
db.session.query(leaderships).delete()
Group.query.delete()
Person.query.delete()
if elasticsearch is not None:
elasticsearch.indices.delete(index=Person.__tablename__)
elasticsearch.indices.create(index=Person.__tablename__)
num_inserted = 0
for person_dict in people:
if not person_dict.get('netid'):
continue
db.session.add(Person(**person_dict))
# Avoid memory overflows
num_inserted += 1
if num_inserted % 64 == 0:
db.session.commit()
db.session.commit()
# TODO: merge this into main scraper section;
# currently we do it after the rest of the scraper because
yaleconnect = sources.YaleConnect(cache, yaleconnect_cookie)
yaleconnect.pull(people)
yaleconnect.merge(people)
if face_book is not None and has_s3_credentials():
logger.info('Deleting unused images from S3.')
face_book.delete_unused_images(people)
logger.info('Done.')
except Exception as e:
logger.info('Encountered fatal error, terminating scraper:')
logger.info(e)
send_scraper_report(error=traceback.format_exc())