Commit 173ff2fc authored by Zachary Seguin's avatar Zachary Seguin

Use environment configuration for secrets, use redis so we only load each url once

parent 91747050
......@@ -47,23 +47,38 @@ from kombu import Queue
from celery.schedules import crontab
CELERY_DEFAULT_QUEUE = 'alerts_canada'
CELERY_FETCH_QUEUE = 'alerts_canada_fetch'
CELERY_DEFAULT_ROUTING_KEY = 'alerts_canada'
CELERY_FETCH_ROUTING_KEY = 'alerts_canada_fetch'
CELERY_QUEUES = (
Queue('alerts_canada', routing_key='alerts_canada'),
Queue('ac_fetch', routing_key='ac_fetch')
Queue(CELERY_DEFAULT_QUEUE, routing_key=CELERY_DEFAULT_ROUTING_KEY),
Queue(CELERY_FETCH_QUEUE, routing_key=CELERY_FETCH_ROUTING_KEY)
)
CELERY_ROUTES = {
'pyalertscanada.tasks.process_alert.fetch_alert_from_url': {
'queue': 'ac_fetch',
'routing_key': 'ac_fetch'
}
'queue': CELERY_FETCH_QUEUE,
'routing_key': CELERY_FETCH_ROUTING_KEY
},
'pyalertscanada.tasks.process_alert.cleanup_fetch_list': {
'queue': CELERY_FETCH_QUEUE,
'routing_key': CELERY_FETCH_ROUTING_KEY
},
}
CELERYBEAT_SCHEDULE = {
'expire': {
'task': 'pyalertscanada.tasks.process_alert.expire_alerts',
'schedule': crontab(minute='*')
},
# Once a day, remove URLs from the fetch list
'fetch_cleanup': {
'task': 'pyalertscanada.tasks.process_alert.cleanup_fetch_list',
'schedule': crontab(minute='0',hour='0')
}
}
CELERY_LOG_LEVEL = 'info'
CELERY_ACCEPT_CONTENT = ['pickle', 'json']
BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 120, 'fanout_prefix': True, 'fanout_patterns': True }
# Redis
REDIS_TIMEOUT=20
......@@ -27,12 +27,15 @@ from flask_assets import Environment
from webassets.loaders import PythonLoader as PythonAssetsLoader
from flask_sqlalchemy import SQLAlchemy
from celery import Celery
from redis import StrictRedis
app = Flask(__name__)
app.config.from_object('config')
db = SQLAlchemy(app)
redis = StrictRedis.from_url(app.config['REDIS_URL'], socket_timeout=app.config.get('REDIS_TIMEOUT', 20))
def make_celery(app):
celery = Celery(app.import_name)
celery.config_from_object(app.config)
......
......@@ -34,17 +34,15 @@ def worker():
'worker',
'-A', 'pyalertscanada.celery',
'-Q', app.config['CELERY_DEFAULT_QUEUE'],
'--concurrency', str(app.config.get('CELERY_MAX_CONCURRENCY', None) or '4'),
'--loglevel', app.config.get('CELERY_LOG_LEVEL', None) or 'info'])
@manager.command
def fetcher():
"Start fetcher"
celery.start(['./ac.py celery worker',
celery.start(['./ac.py celery fetcher',
'worker',
'-A', 'pyalertscanada.celery',
'-Q', 'ac_fetch',
'--concurrency', str(app.config['CELERY_MAX_CONCURRENCY']),
'-Q', app.config['CELERY_FETCH_QUEUE'],
'--loglevel', app.config.get('CELERY_LOG_LEVEL', None) or 'info'])
@manager.command
......@@ -53,6 +51,8 @@ def beat():
celery.start(['./ac.py celery worker',
'beat',
'-A', 'pyalertscanada.celery',
'--pidfile', '/tmp/celerybeat.pid',
'--schedule', '/tmp',
'--loglevel', app.config.get('CELERY_LOG_LEVEL', None) or 'info'])
......@@ -22,5 +22,5 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""
from .. import app, celery, db
from . import process_alert
\ No newline at end of file
from .. import app, celery, db, redis
from . import process_alert
......@@ -30,7 +30,7 @@ import dateutil.parser
from celery.utils.log import get_task_logger
from lxml.etree import XMLSyntaxError
from . import app, celery, db
from . import app, celery, db, redis
from ..xml import alert
from ..models import Alert, AlertCode, AlertReference
from ..models import Info, InfoCategory, InfoResponseType, InfoEventCode, InfoParameter
......@@ -56,47 +56,58 @@ def to_utc(value):
return value.astimezone(pytz.utc)
@celery.task(default_retry_delay=120, max_retries=2)
def fetch_alert_from_url(url):
"Fetch an alert from a URL"
try:
logger.info('Requesting "{url}"'.format(url=url))
# Execute the HTTP request
xml = requests.get(url)
xml.encoding = 'UTF-8'
logger.info('Received status code {}'.format(xml.status_code))
@celery.task(default_retry_delay=3600, ax_retries=1)
def cleanup_fetch_list():
try:
redis.delete('fetch_urls')
except Exception, exc:
cleanup_fetch_list.retry(exc=exc)
if xml.status_code == 200:
process_alert.delay(xml.text)
elif xml.status_code == 404:
logger.warn('Alert not found upstream: {url}'.format(url = url))
else:
logger.warn('Received status code {status_code} for {url}'.format(status_code = xml.status_code, url = url))
fetch_alert_from_url.retry(exc=Exception('Status Code: {status_code}'.format(status_code=xml.status_code)))
@celery.task(bind=True, default_retry_delay=120, max_retries=20)
def fetch_alert_from_url(self, url):
"Fetch an alert from a URL"
try:
if redis.sadd('fetch_urls', url) == 1 or not self.request.retries == 0:
logger.info('Requesting "{url}"'.format(url=url))
# Execute the HTTP request
xml = requests.get(url)
xml.encoding = 'UTF-8'
logger.info('Received status code {}'.format(xml.status_code))
if xml.status_code == 200:
process_alert.delay(xml.text)
elif xml.status_code == 404:
logger.warn('Alert not found upstream: {url}'.format(url = url))
else:
logger.warn('Received status code {status_code} for {url}'.format(status_code = xml.status_code, url = url))
fetch_alert_from_url.retry(exc=Exception('Status Code: {status_code}'.format(status_code=xml.status_code)))
else:
logger.warn('Alert has already been queued: "{url}"'.format(url=url))
except Exception, exc:
fetch_alert_from_url.retry(exc=exc)
except Exception, exc:
fetch_alert_from_url.retry(exc=exc)
@celery.task(default_retry_delay=30, max_retries=None)
def fetch_alert_from_reference(reference):
"Fetch an alert from reference"
try:
for capcp in app.config['NAAD_CAPCPS']:
# Create the URL
url = 'http://{hostname}:{port}/{sent_date}/{sent}I{identifier}.xml'.format(
hostname = capcp[0],
port = capcp[1],
sent_date = reference[2][0:10],
sent = reference[2].replace('-', '_').replace('+', 'P').replace(':', '_'),
identifier = reference[1].replace('-', '_').replace('+', 'P').replace(':', '_')
)
fetch_alert_from_url.delay(url)
except Exception, exc:
fetch_alert_from_reference.retry(exc=exc)
"Fetch an alert from reference"
try:
for capcp in app.config['NAAD_CAPCPS']:
# Create the URL
url = 'http://{hostname}:{port}/{sent_date}/{sent}I{identifier}.xml'.format(
hostname = capcp[0],
port = capcp[1],
sent_date = reference[2][0:10],
sent = reference[2].replace('-', '_').replace('+', 'P').replace(':', '_'),
identifier = reference[1].replace('-', '_').replace('+', 'P').replace(':', '_')
)
if not redis.sismember('fetch_urls', url):
fetch_alert_from_url.delay(url)
except Exception, exc:
fetch_alert_from_reference.retry(exc=exc)
@celery.task(default_retry_delay=10, max_retries=5)
def load_references(references):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment