File size: 2,351 Bytes
0a06673 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
from datetime import date, datetime
from io import BytesIO
from gzip import GzipFile
import logging
import json
from dateutil.tz import tzutc
from requests.auth import HTTPBasicAuth
from requests import sessions
from analytics.version import VERSION
from analytics.utils import remove_trailing_slash
_session = sessions.Session()
def post(write_key, host=None, gzip=False, timeout=15, proxies=None, **kwargs):
"""Post the `kwargs` to the API"""
log = logging.getLogger('segment')
body = kwargs
body["sentAt"] = datetime.utcnow().replace(tzinfo=tzutc()).isoformat()
url = remove_trailing_slash(host or 'https://api.segment.io') + '/v1/batch'
auth = HTTPBasicAuth(write_key, '')
data = json.dumps(body, cls=DatetimeSerializer)
log.debug('making request: %s', data)
headers = {
'Content-Type': 'application/json',
'User-Agent': 'analytics-python/' + VERSION
}
if gzip:
headers['Content-Encoding'] = 'gzip'
buf = BytesIO()
with GzipFile(fileobj=buf, mode='w') as gz:
# 'data' was produced by json.dumps(),
# whose default encoding is utf-8.
gz.write(data.encode('utf-8'))
data = buf.getvalue()
kwargs = {
"data": data,
"auth": auth,
"headers": headers,
"timeout": 15,
}
if proxies:
kwargs['proxies'] = proxies
res = _session.post(url, data=data, auth=auth,
headers=headers, timeout=timeout)
if res.status_code == 200:
log.debug('data uploaded successfully')
return res
try:
payload = res.json()
log.debug('received response: %s', payload)
raise APIError(res.status_code, payload['code'], payload['message'])
except ValueError:
raise APIError(res.status_code, 'unknown', res.text)
class APIError(Exception):
def __init__(self, status, code, message):
self.message = message
self.status = status
self.code = code
def __str__(self):
msg = "[Segment] {0}: {1} ({2})"
return msg.format(self.code, self.message, self.status)
class DatetimeSerializer(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, (date, datetime)):
return obj.isoformat()
return json.JSONEncoder.default(self, obj)
|