flagged as approved (but not put on the schedule since it has no start
and end time yet).
+Upload file
+: This benefit class allow uploading of a generic file (unlike the image one
+ which is specific to images). It can only validate the MIME type of the
+ file being uploaded and the size of the file, all further vaildation past
+ that must be done manually.
+
## Shipments
A shipment tracking system is built into the sponsorship system. It
5: {"class": "attendeelist.AttendeeList", "description": "List of attendee email addresses"},
6: {"class": "badgescanning.BadgeScanning", "description": "Scanning of attendee badges"},
7: {"class": "sponsorsession.SponsorSession", "description": "Submit session"},
+ 8: {"class": "fileupload.FileUpload", "description": 'Upload file'},
}
--- /dev/null
+from django import forms
+from django.core.exceptions import ValidationError
+from django.core.validators import MaxValueValidator, MinValueValidator
+from django.http import HttpResponse
+from django.conf import settings
+
+import base64
+import io
+import zipfile
+
+from postgresqleu.util.storage import InlineEncodedStorage
+from postgresqleu.util.forms import IntegerBooleanField
+from postgresqleu.util.widgets import StaticTextWidget
+from postgresqleu.util.validators import color_validator
+from postgresqleu.util.magic import magicdb
+from postgresqleu.confsponsor.backendforms import BackendSponsorshipLevelBenefitForm
+
+from .base import BaseBenefit, BaseBenefitForm
+
+
+class FileUploadForm(BaseBenefitForm):
+ decline = forms.BooleanField(label='Decline this benefit', required=False)
+ file = forms.FileField(label='File', required=False)
+ uploadedfile = forms.CharField(label='Uploaed', widget=forms.HiddenInput, required=False)
+
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+
+ self.fields['file'].help_text = "Upload a file of maximum size {}KB".format(self.params['maxsize'])
+ if self.params['mimetypes']:
+ acceptzipstr = ',application/zip' if self.params['acceptzip'] else ''
+ self.fields['file'].widget.attrs['accept'] = self.params['mimetypes'] + acceptzipstr
+
+ def clean(self):
+ if self.cleaned_data.get('decline', False) and self.cleaned_data.get('file', None):
+ raise ValidationError('You cannot both decline and upload a file at the same time.')
+
+ if not self.cleaned_data.get('file', None) and 'file' not in self._errors:
+ self.add_error('file', 'A file must be uploaded')
+
+ return self.cleaned_data
+
+ def clean_file(self):
+ if not self.cleaned_data.get('file', None):
+ # This check is done in the global clean as well, so we accept it here since
+ # we might have decliend it.
+ return None
+
+ filedata = self.cleaned_data['file']
+
+ if filedata.size > self.params.get('maxsize') * 1024:
+ raise ValidationError("Uploaded file is too large, maximum size is {}Kb.".format(self.params.get('maxsize')))
+
+ def _mimetype_ok(mimetype):
+ for t in self.params.get('mimetypes').split(','):
+ if mimetype.startswith(t):
+ return True
+ return False
+
+ if self.params.get('mimetypes', None):
+ mimetype = magicdb.buffer(filedata.read(2048))
+ if self.params.get('acceptzip', False) and mimetype.startswith('application/zip'):
+ filedata.seek(0)
+ try:
+ with zipfile.ZipFile(filedata) as zf:
+ for fn in zf.namelist():
+ with zf.open(fn) as ff:
+ mimetype = magicdb.buffer(ff.read(2048))
+ if not _mimetype_ok(mimetype):
+ raise ValidationError("ZIP file contains file {} which is of invalid type: {}".format(fn, mimetype))
+ except zipfile.BadZipFile:
+ raise ValidationError("Could not parse uploaded ZIP file")
+ elif not _mimetype_ok(mimetype):
+ raise ValidationError("Invalid type of file uploaded: {}".format(mimetype))
+
+ filedata.seek(0)
+
+ return self.cleaned_data['file']
+
+
+class FileUploadBackendForm(BackendSponsorshipLevelBenefitForm):
+ maxsize = forms.IntegerField(label='Maximum size in Kb', initial=1024, validators=[MinValueValidator(10), MaxValueValidator(int(settings.DATA_UPLOAD_MAX_MEMORY_SIZE / 1024))])
+ mimetypes = forms.CharField(label='MIME types', help_text='Allow only the specified MIME types, leave empty to allow all', required=False)
+ acceptzip = forms.BooleanField(label='Accept zip', initial=True, help_text='Accept a ZIP version containing the above list of MIME types', required=False)
+
+ class_param_fields = ['maxsize', 'mimetypes', 'acceptzip', ]
+
+ def clean_mimetypes(self):
+ m = self.cleaned_data['mimetypes']
+ if m == '':
+ return m
+ parts = m.split(',')
+ for p in parts:
+ if ' ' in p:
+ raise ValidationError('Whitespace not allowed in MIME types')
+ mimeparts = p.split('/')
+ if len(mimeparts) != 2:
+ raise ValidationError('Each MIME type must be of format x/y')
+ return m
+
+ def clean(self):
+ if self.cleaned_data.get('mimetypes', '') == '' and self.cleaned_data.get('acceptzip', False):
+ self.add_error('acceptzip', 'Accept zip only makes sense when MIME type is specified')
+
+ return self.cleaned_data
+
+
+class FileUpload(BaseBenefit):
+ @classmethod
+ def get_backend_form(self):
+ return FileUploadBackendForm
+
+ def generate_form(self):
+ return FileUploadForm
+
+ def save_form(self, form, claim, request):
+ if form.cleaned_data['decline']:
+ return False
+ storage = InlineEncodedStorage('benefit_file')
+ storage.save(str(claim.id), form.cleaned_data['file'], {
+ 'filename': form.cleaned_data['file'].name,
+ })
+ return True
+
+ def render_claimdata(self, claimedbenefit, isadmin):
+ if claimedbenefit.declined:
+ return 'Benefit declined.'
+
+ storage = InlineEncodedStorage('benefit_file')
+ fn = storage.get_metadata(claimedbenefit.id)['filename']
+
+ if isadmin:
+ return 'Uploaded file: {}. <a href="/events/sponsor/admin/downloadfile/{}/">Download</a>.'.format(
+ fn,
+ claimedbenefit.id,
+ )
+ else:
+ return 'Uploaded file: {}'.format(fn)
+
+ def get_claimdata(self, claimedbenefit):
+ return {
+ 'filename': InlineEncodedStorage('benefit_file').get_metadata(claimedbenefit.id)['filename'],
+ }
+
+ def get_claimfile(self, claimedbenefit):
+ hashval, data, metadata = InlineEncodedStorage('benefit_file').read(claimedbenefit.id)
+ if hashval is None and data is None:
+ raise Http404()
+ resp = HttpResponse(content_type='application/octet_stream')
+ resp['Content-Disposition'] = 'attachment; filename={}'.format(metadata['filename'])
+ resp['ETag'] = '"{}"'.format(hashval)
+ resp.write(data)
+ return resp
}
def get_claimfile(self, claimedbenefit):
- hashval, data = InlineEncodedStorage('benefit_image').read(claimedbenefit.id)
+ hashval, data, metadata = InlineEncodedStorage('benefit_image').read(claimedbenefit.id)
if hashval is None and data is None:
raise Http404()
resp = HttpResponse(content_type='image/png')
migrations.AlterField(
model_name='sponsorshipbenefit',
name='benefit_class',
- field=models.IntegerField(blank=True, choices=[(0, 'Automatically claimed'), (1, 'Require uploaded image'), (2, 'Requires explicit claiming'), (3, 'Claim entry vouchers'), (4, 'Provide text string'), (5, 'List of attendee email addresses'), (6, 'Scanning of attendee badges'), (7, 'Submit session')], default=None, null=True),
+ field=models.IntegerField(blank=True, choices=[(0, 'Automatically claimed'), (1, 'Require uploaded image'), (2, 'Requires explicit claiming'), (3, 'Claim entry vouchers'), (4, 'Provide text string'), (5, 'List of attendee email addresses'), (6, 'Scanning of attendee badges'), (7, 'Submit session'), (8, 'Upload file')], default=None, null=True),
),
migrations.AlterUniqueTogether(
name='sponsorscanner',
re_path(r'^shipments/([a-z0-9]+)/$', views.sponsor_shipment_receiver),
re_path(r'^shipments/([a-z0-9]+)/(\d+)/$', views.sponsor_shipment_receiver_shipment),
re_path(r'^admin/imageview/(\d+)/$', views.sponsor_admin_imageview),
+ re_path(r'^admin/downloadfile/(\d+)/$', views.sponsor_admin_downloadfile),
re_path(r'^admin/(\w+)/$', views.sponsor_admin_dashboard),
re_path(r'^admin/(\w+)/(\d+)/$', views.sponsor_admin_sponsor),
re_path(r'^admin/(\w+)/(\d+)/edit/$', backendviews.edit_sponsor),
@login_required
-def sponsor_admin_imageview(request, benefitid):
+def _sponsor_admin_imageorfile(request, benefitid, what, finalizer):
# Image is fetched as part of a benefit, so find the benefit
benefit = get_object_or_404(SponsorClaimedBenefit, id=benefitid)
# If the benefit existed, we have verified the permissions, so we can now show
# the image itself.
- storage = InlineEncodedStorage('benefit_image')
+ storage = InlineEncodedStorage(what)
# If there is an if-none-match header, it's almost certain this will be a 304 since
# these files never change. So if it is, query just for the hash - and in the unlikely
return HttpResponseNotModified()
# XXX: do we need to support non-png at some point? store info in claimdata!
- hashval, data = storage.read(benefit.id)
+ hashval, data, metadata = storage.read(benefit.id)
if hashval is None and data is None:
raise Http404()
- resp = HttpResponse(content_type='image/png')
+ resp = HttpResponse()
resp['ETag'] = '"{}"'.format(hashval)
+ finalizer(resp, metadata)
resp.write(data)
return resp
+@login_required
+def sponsor_admin_imageview(request, benefitid):
+ def _finalize(response, metadata):
+ response['Content-Type'] = 'image/png'
+ return _sponsor_admin_imageorfile(request, benefitid, 'benefit_image', _finalize)
+
+
+@login_required
+def sponsor_admin_downloadfile(request, benefitid):
+ def _finalize(response, metadata):
+ response['Content-Type'] = 'application/octet-stream'
+ response['Content-Disposition'] = 'attachment; filename={}'.format(metadata['filename'])
+ return _sponsor_admin_imageorfile(request, benefitid, 'benefit_file', _finalize)
+
+
def _claimstatus(claim):
if claim.claimedat is None:
return 'Unclaimed'
--- /dev/null
+# Generated by Django 4.2.11 on 2025-09-03 14:01
+
+from django.db import migrations, models
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ('util', '0007_storage_trigger_fix'),
+ ]
+
+ operations = [
+ migrations.AddField(
+ model_name='storage',
+ name='metadata',
+ field=models.JSONField(default=dict),
+ ),
+ ]
storageid = models.IntegerField(null=False, blank=False)
data = models.BinaryField(null=False, blank=False)
hashval = models.BinaryField(null=False, blank=False)
+ metadata = models.JSONField(null=False, blank=False, default=dict)
class Meta:
unique_together = (
from postgresqleu.util.db import exec_to_scalar
+import json
+
class InlineEncodedStorage(object):
def __init__(self, key):
def read(self, name):
curs = connection.cursor()
- curs.execute("SELECT encode(hashval, 'hex'), data FROM util_storage WHERE key=%(key)s AND storageid=%(id)s", {
+ curs.execute("SELECT encode(hashval, 'hex'), data, metadata FROM util_storage WHERE key=%(key)s AND storageid=%(id)s", {
+ 'key': self.key, 'id': name})
+ rows = curs.fetchall()
+ if len(rows) != 1:
+ return None, None, None
+ return rows[0][0], bytes(rows[0][1]), json.loads(rows[0][2])
+
+ def get_metadata(self, name):
+ curs = connection.cursor()
+ curs.execute("SELECT metadata FROM util_storage WHERE key=%(key)s AND storageid=%(id)s", {
'key': self.key, 'id': name})
rows = curs.fetchall()
if len(rows) != 1:
return None, None
- return rows[0][0], bytes(rows[0][1])
+ return json.loads(rows[0][0])
- def save(self, name, content):
+ def save(self, name, content, metadata=None):
content.seek(0)
curs = connection.cursor()
params = {
'key': self.key,
'id': name,
'data': content.read(),
+ 'metadata': json.dumps(metadata) if metadata else None,
}
- curs.execute("UPDATE util_storage SET data=%(data)s WHERE key=%(key)s AND storageid=%(id)s", params)
+ curs.execute("UPDATE util_storage SET data=%(data)s, metadata=%(metadata)s WHERE key=%(key)s AND storageid=%(id)s", params)
if curs.rowcount == 0:
- curs.execute("INSERT INTO util_storage (key, storageid, data) VALUES (%(key)s, %(id)s, %(data)s)", params)
+ curs.execute("INSERT INTO util_storage (key, storageid, data, metadata) VALUES (%(key)s, %(id)s, %(data)s, %(metadata)s)", params)
return name
def get_tag(self, name):