当前位置

网站首页> 程序设计 > 开源项目 > 程序开发 > 浏览文章

Django REST framework的各种技巧——7.导入导出 - D咄咄

作者:小梦 来源: 网络 时间: 2024-04-29 阅读:

其实这个东西跟restframework没有卵关系,顺便写在这里

Django REST framework的各种技巧【目录索引】

导入导出在cms中是一个非常常用的功能,思考做成通用的东西,最终选择django-import-export,虽然这玩意儿最初是想放在admin中用的,虽然用起来很繁琐,但是可以做成通用的东西,而且用起来比较像rest的serializer。

django-import-export==0.4.2
文档

需要看的源码
cd 你的virtualenv/local/lib/python2.7/site-packages/import_export

resources.py
instance_loaders.py

先看用法

通过view可以看到,代码在这里是很干净的,跟一个正常的restframework的api没有什么区别。

class SchoolExportView(ExportMixin, GenericAPIView):    serializer_class = SchoolSerializer    permission_classes = (IsAuthenticated, ModulePermission)    queryset = School.objects.filter(is_active=True).order_by('-id')    resource_class = SchoolResource    filter_backends = (filters.DjangoFilterBackend, filters.SearchFilter)    filter_class = SchoolFilter    search_fields = ('name', 'contact')    module_perms = ['school.school']class SchoolImportView(ImportMixin, GenericAPIView):    serializer_class = SchoolSerializer    permission_classes = (IsAuthenticated, ModulePermission)    queryset = School.objects.filter(is_active=True).order_by('-id')    resource_class = SchoolResource    module_perms = ['school.school']

Mixin

class ExportMixin(object):    @GET('filename', type='string', default='download.xls')    @GET('format', type='string', default='xls', validators='in: xls,xlsx')    @GET('empty', type='bool', default=False)    def get(self, request, format, filename, empty):        queryset = None        if not empty:queryset = self.filter_queryset(self.get_queryset())        resourse = self.resource_class()        export_data = resourse.export(queryset, empty)        return attachment_response(getattr(export_data, format), filename=filename)class ImportMixin(object):    @POST('file', validators='required')    def post(self, request, file):        import_file = request.FILES['file']        resource = self.resource_class()        extra_data = {} if not hasattr(self, 'get_resoucre_extra_data') else self.get_resoucre_extra_data()        resource.set_extra_data(extra_data)        dataset = resource.get_dataset(import_file)        result = resource.import_data(dataset, use_transactions=True, raise_errors=True)        return Response()

重点是实现Resource, 先说export

export非常简单,因此先说export,先看demo(仅仅写export)

# -*- coding: utf-8 -*-from __future__ import absolute_importfrom import_export import resourcesfrom school.models import Schoolclass SchoolResource(resources.ModelResource):    def dehydrate_category(self, school):        if school.category == School.MIDDLE_SCHOOL:return u'中学'        elif school.category == School.COLLEGE:return u'高校'        return ''    def get_export_headers(self):        return [u'分类', u'省份', u'城市', u'学校', u'地址', u'联系人',    u'职务', u'联系电话', u'邮箱']    class Meta:        model = School        fields = ('category', 'city__province__name', 'city__name',    'name', 'address', 'contact', 'position', 'phone',    'email')        export_order = ('category', 'city__province__name', 'city__name',    'name', 'address', 'contact', 'position', 'phone',    'email')

resource的写法如下

  • Meta中的fields指导出那些列,可以用外键的__方法

  • Meta中的export_order指导出列的顺序

  • get_export_headers是指excel的表头

  • dehydrate_%filed%是指你可以对某一列做一些定制,同类似serializer里面的SerializerMethodField,但是只能是model上存在的%filed%才可以

再说import

由于import的复杂性,导致import的resource写起来非常复杂,因为import的时候有各种需求,例如导入了某些列但是只更新某些列,导入了很多列只更新不新建,导入列的各种数据校检...

首先基础的import_export中的InstanceLoader不能满足一个非常重要的查询需求,例如我们的model上面有is_active字段,然而又不能把这个东西导出区,导入的时候is_active又是get_instance的一个查询条件;以及ModelResource上面有些东西支持也非常不够,例如我输入一个文件就可以拿到dataset数据,例如我export的时候想传一个可以迭代的东西而不是queryset,还有给出更人性化的错误提示等等。

class ModelExtraParamInstanceLoader(BaseInstanceLoader):    """ get_instance时支持额外的附加参数, 对说的就是is_active=True"""    def get_queryset(self):        return self.resource._meta.model.objects.all()    def get_instance(self, row):        try:params = self.resource._meta.import_instanceloader_extra_paramsfor key in self.resource.get_import_id_fields():    field = self.resource.fields[key]    params[field.attribute] = field.clean(row)return self.get_queryset().get(**params)        except self.resource._meta.model.DoesNotExist:return Noneclass ModelResource(resources.ModelResource):    def set_extra_data(self, extra_data):        self.extra_data = extra_data    def get_clean_row(self, row):        _row = []        for each in row:if isinstance(each, float):    each = int(each)each = unicode(each).strip()_row.append(each)        return _row    def get_dataset_data(self, file_obj):        '''从前端传来的excel获得原始数据'''        headers = self.get_export_headers()        try:self._dataset_data = get_data_from_excel(file_obj=file_obj, header=headers)        except Exception as ex:logger.warn(ex)raise Error(    errors.ExcelFormatError,    err_message=unicode(ex),    message=unicode(ex))        return self._dataset_data       def get_printable_row(self, row):        _row = [unicode(each) for each in row]        return u'({})'.format(u', '.join(_row))    def get_printable_error_message(self, error_type, index, row):        return u'excel格式错误:[{}]\n错误的行:{}行\n内容:{}'.format(error_type,index, self.get_printable_row(row)        )    def get_error(self, error_type, index, row):        return Error(errors.ExcelFormatError,err_message='excel格式错误',message=self.get_printable_error_message(error_type, index, row)        )        def clean_dataset_data(self, data):        '''洗清原始数据,将data洗成跟diff_header一样的对应格式diff_header即model上面对应的列, 因为import_data是直接在model上做的有逻辑写在这里,比如参数检查,错误直接在这里raise Error,提示用户        '''        headers = self.get_export_headers()        header_length = len(headers)        for index, row in enumerate(data):if len(row) != header_length:    raise self.get_error(u'列数错误', index+2, row)        return datadef get_dataset(self, file_obj=None):        assert hasattr(self, '_dataset_data') or file_obj, 'You need call get_dataset_data first or pass file_obj'        if file_obj:data = self.get_dataset_data(file_obj)        else:data = self._dataset_data        data = self.clean_dataset_data(data)        headers = self.get_diff_headers()        dataset = get_dataset(data, headers)        return dataset          def export(self, queryset=None, empty=False):        """        Exports a resource.        """        if queryset is None:if empty:    if hasattr(self._meta, 'empty_export_data'):        queryset = self._meta.empty_export_data    else:        queryset = []else:    queryset = self.get_queryset()        headers = self.get_export_headers()        data = tablib.Dataset(headers=headers)        if isinstance(queryset, QuerySet):# Iterate without the queryset cache, to avoid wasting memory when# exporting large datasets.iterable = queryset.iterator()        else:iterable = queryset        for obj in iterable:if empty and isinstance(obj, Iterable):        data.append(obj)else:    data.append(self.export_resource(obj))        return data    def init_instance(self, row=None):        if not row:row = {}        instance = self._meta.model()        for attr, value in row.iteritems():setattr(instance, attr, value)        return instance 

先给出一个没有复杂外键的model的导入Resource

class SchoolResource(ModelResource):    def dehydrate_category(self, school):        if school.category == School.MIDDLE_SCHOOL:return u'中学'        elif school.category == School.COLLEGE:return u'高校'        return ''    def get_export_headers(self):        return [u'分类', u'省份', u'城市', u'学校', u'地址', u'联系人',    u'职务', u'联系电话', u'邮箱']    def get_diff_headers(self):        return ['category', 'city', 'name', 'address', 'contact', 'position', 'phone', 'email']    def clean_dataset_data(self, data):        data = super(SchoolResource, self).clean_dataset_data(data)        clean_data = []        for index, row in enumerate(data):_index = index + 2_row = self.get_clean_row(row)category = self.clean_dataset_category(_row[0], _index, row)city = self.clean_dataset_city((_row[1], _row[2]), _index, row)clean_data.append([category, city]+ _row[3:])        return clean_data    def clean_dataset_category(self, category, index, row):        if category not in (u'中学', u'高校'):raise self.get_error(u'分类错误', index, row)        if category == u'中学':return 1        else:return 2    class Meta:        model = School        import_id_fields = ['name',]        import_instanceloader_extra_params = {'is_active': True}        instance_loader_class = ModelExtraParamInstanceLoader        empty_export_data = [...]      fields = ('category', 'city__province__name', 'city__name',    'name', 'address', 'contact', 'position', 'phone',    'email')        export_order = ('category', 'city__province__name', 'city__name',    'name', 'address', 'contact', 'position', 'phone',    'email')          

resource的写法如下

  • get_export_header是指导出时excel的表头

  • get_diff_headers是指import时用的header,可以叫做任何东西(貌似是model上要能找到的,可以通过 外键__属性的方式)

  • init_instance是指如果通过instanceloader没有get到数据时需要新建一条记录,可以根据传过来的row做一些事情,有时候你要做一些奇怪的事情,例如diff_header为city__name,但是我想把city设置为id,可以先通过clean_data拿到city然后在赋值,请见courseresource

  • clean_dataset_data做数据清洗,对每行的数据都要做数据校检,可以做一些厉害的事情,比如city有关外键检查的事情,由于导出的时候city是用的city__name,city__province__name,导入就需要用这两列来确定一个city对象,看下下面clean_dataset_city的写法

  • raise_error直接用self.get_error(u'分类错误', index, row),第一个是说大的错误是毛,index是实际excel中的行,由于我们skip了header,如果你又用的enumerate做计数的话index应该+2

  • class meta的import_id_fields,通过那几列来唯一确定数据,取值从git_diff_header中取,如果excel的信息不够(例如我们很多地方要写is_active=True)则填写import_instanceloader_extra_params

  • skip_unchanged 是指如果excel的数据跟数据库的相同是否跟新数据,如果设为true有的时候会出问题

稍微复杂的demo

class CourseResource(ModelResource):    def dehydrate_is_authentication(self, course):        if course.is_authentication:return u'已认证'        else:return u'未认证'    def get_export_headers(self):        return [u'年份', u'开课科目', u'学校', u'教师姓名',u'身份证号', u'联系方式', u'邮箱', u'教师认证状态',u'班级规模'        ]    def get_diff_headers(self):        return ['term__name', 'name', 'school__name', 'teacher', 'ID_number', 'phone',    'email', 'is_authentication', 'enrollment']    def init_instance(self, row=None):        if not row:row = {}        instance = self._meta.model()        for attr, value in row.iteritems():setattr(instance, attr, value)        instance.term = row['term__name']        instance.school = row['school__name']        return instance    def clean_dataset_data(self, data):        data = super(CourseResource, self).clean_dataset_data(data)        clean_data = []        for index, row in enumerate(data):_index = index + 2_row = self.get_clean_row(row)term = self.clean_dataset_term(_row[0], _index, row)school = self.clean_dataset_school(_row[2], _index, row)is_authentication = self.clean_dataset_is_authentication(_row[7], _index, row)enrollment = self.clean_dataset_enrollment(_row[8], _index, row)clean_data.append([term, _row[1], school, _row[3], _row[4],    _row[5], _row[6], is_authentication, enrollment])        return clean_data    def clean_dataset_term(self, term, index, row):        try:return Term.objects.get(name=term, is_active=True)        except Term.DoesNotExist:raise self.get_error(u'年份错误', index, row)         def clean_dataset_school(self, school, index, row):        try:school = School.objects.get(name=school, is_active=True)user = self.extra_data['user']if not SchoolPermissionFilterBackend().has_school_permission(user,        school):    raise self.get_error(u'没有对应的学校权限', index, row)return school        except School.DoesNotExist:raise self.get_error(u'学校错误', index, row)    def clean_dataset_is_authentication(self, is_authentication, index, row):        if is_authentication == u'已认证':return True        if is_authentication == u'未认证':return False        raise self.get_error(u'教师认证状态错误', index, row)    def clean_dataset_enrollment(self, enrollment, index, row):        try:if not enrollment:    enrollment = 0return int(float(enrollment))        except:raise self.get_error(u'班级规模错误', index, row)    class Meta:        model = Course        import_id_fields = ['term__name', 'name', 'school__name']        import_instanceloader_extra_params = {    'is_active': True, 'term__is_active': True, 'school__is_active': True}        instance_loader_class = ModelExtraParamInstanceLoader        fields = ('term__name', 'name', 'school__name',    'teacher', 'ID_number', 'phone', 'email', 'is_authentication',    'enrollment')        export_order = ('term__name', 'name', 'school__name',    'teacher', 'ID_number', 'phone', 'email', 'is_authentication',    'enrollment')        empty_export_data = [...]   

几个方法

def extract_data(sheet, header, skip_header=True, row_type='list'):    assert header    data = []    for row_index in xrange(1 if skip_header else 0, sheet.nrows):        row = sheet.row_values(row_index)        assert len(header) == len(row), u'excel 第{}行,列数对应数据不对'.format(row_index)        if row_type == 'list':data.append(row)        else:each_data = {}for col_index in xrange(len(header)):    each_data[header[col_index]] = row[col_index]data.append(each_data)    return data    def get_data_from_excel(file_path=None, file_obj=None, header=None,        sheet_index=0, skip_header=True):    assert header    assert file_path or file_obj    if file_path:        with open_workbook(file_path) as wb: data = extract_data(wb.sheet_by_index(sheet_index), header, skip_header)    else:        with tempinput(file_obj) as tempfilename:with open_workbook(tempfilename) as wb:    data = extract_data(wb.sheet_by_index(sheet_index), header, skip_header)    return datadef get_dataset(data, header):    return tablib.Dataset(*data, headers=header)    def attachment_response(export_data, filename='download.xls', content_type='application/vnd.ms-excel'):    # Django 1.7 uses the content_type kwarg instead of mimetype    try:        response = HttpResponse(export_data, content_type=content_type)    except TypeError:        response = HttpResponse(export_data, mimetype=content_type)    response['Content-Disposition'] = 'attachment; filename={}'.format(filename)    return response