Passion/Python

BPMN example

sunshout 2016. 4. 22. 12:44

from io import BytesIO


from SpiffWorkflow.bpmn.BpmnWorkflow import BpmnWorkflow

from SpiffWorkflow.bpmn.storage.BpmnSerializer import BpmnSerializer

from SpiffWorkflow.bpmn.storage.CompactWorkflowSerializer import CompactWorkflowSerializer

from SpiffWorkflow import Task

from SpiffWorkflow.specs import WorkflowSpec

from SpiffWorkflow.bpmn.storage.Packager import Packager

from SpiffWorkflow.bpmn.parser.BpmnParser import BpmnParser


from SpiffWorkflow.bpmn.specs.BpmnSpecMixin import BpmnSpecMixin

from SpiffWorkflow.specs.Simple import Simple

from SpiffWorkflow.bpmn.parser.TaskParser import TaskParser


from SpiffWorkflow.bpmn.parser.util import *


from SpiffWorkflow.bpmn.specs.UserTask import UserTask


class ServiceTask(Simple, BpmnSpecMixin):

#class ServiceTask(UserTask):

    """

    Task Spec for a bpmn:serviceTask node.

    """

    def is_engine_task(self):

        return False


    def entering_complete_state(self, task):

        print "Do command : %s" % task.get_description()



class ServiceTaskParser(TaskParser):

    pass



class CloudBpmnParser(BpmnParser):

    OVERRIDE_PARSER_CLASSES = {

        full_tag('serviceTask') :   (ServiceTaskParser, ServiceTask),

    }



class InMemoryPackager(Packager):

    """

    Creates spiff's wf packages on the fly.

    """

    PARSER_CLASS = CloudBpmnParser


    @classmethod

    def package_in_memory(cls, workflow_name, workflow_files, editor='signavio'):

        """

        Generates wf packages from workflow diagrams.

        Args:

            workflow_name: Name of wf

            workflow_files:  Diagram  file.

        Returns:

            Workflow package (file like) object

        """

        s = BytesIO()

        p = cls(s, workflow_name, meta_data=[], editor=editor)

        p.add_bpmn_files_by_glob(workflow_files)

        p.create_package()

        return s.getvalue()


class Node(object):

    """

    Keep the Task information

    """

    def __init__(self, task):

        self.input = {}

        self.output = {}

        self.task = None

        self.task_type = None

        self.task_name = None

        self.description = None

        self.activity = None

        self.init_task(task)


    def init_task(self, task):

        self.task = task

        self.task_type = task.task_spec.__class__.__name__

        self.task_name = task.get_name()

        self.description = task.get_description()

        self.activity = getattr(task.task_spec, 'service_class', '')


    def show(self):

        print "task type:%s" % self.task_type

        print "task name:%s" % self.task_name

        print "description:%s" % self.description

        print "activity :%s" % self.activity

        print "state name:%s" % self.task.get_state_name()

        print "\n"


class BpmnEngine:

    def __init__(self, path, name):

        self.spec = self.load_spec(path, name)

        self.workflow = BpmnWorkflow(self.spec)

        self.run_engine()


    def create_workflow(self):

        self.workflow_spec = self.load_workflow_spec()


    def load_spec(self, content_path, workflow_name):

        return self.load_workflow_spec(content_path, workflow_name)


    def load_workflow_spec(self, content_path, workflow_name):

        package = InMemoryPackager.package_in_memory(workflow_name, content_path)

        return BpmnSerializer().deserialize_workflow_spec(package)


    def start_engine(self, **kwargs):

        self.setUp()


    def run_engine(self):

        for task in self.workflow.get_tasks():

            task_name = task.get_name()

            print task_name


    def run_engine(self):

        while 1:

            self.workflow.do_engine_steps()

            tasks = self.workflow.get_tasks(Task.READY)

            if len(tasks) == 0:

                break

            for task in tasks:

                current_node = Node(task)

                current_node.show()

                self.workflow.complete_task_from_id(task.id)



BpmnEngine('./test.bpmn', 'Approvals')


GitHub BPMN: https://github.com/knipknap/SpiffWorkflow

Zengine: https://github.com/zetaops/zengine