- not fast enough
- difficult to deal with string based data formats (JSON, XML)
- you name it..so there might be a need to build a custom transport to overcome this limitation.
Use case
<properties>
<version.org.kie>6.4.0-SNAPSHOT</version.org.kie>
</properties>
<dependencies>
<dependency>
<groupId>org.kie</groupId>
<artifactId>kie-api</artifactId>
<version>${version.org.kie}</version>
</dependency>
<dependency>
<groupId>org.kie</groupId>
<artifactId>kie-internal</artifactId>
<version>${version.org.kie}</version>
</dependency>
<dependency>
<groupId>org.kie.server</groupId>
<artifactId>kie-server-api</artifactId>
<version>${version.org.kie}</version>
</dependency>
<dependency>
<groupId>org.kie.server</groupId>
<artifactId>kie-server-services-common</artifactId>
<version>${version.org.kie}</version>
</dependency>
<dependency>
<groupId>org.kie.server</groupId>
<artifactId>kie-server-services-drools</artifactId>
<version>${version.org.kie}</version>
</dependency>
<dependency>
<groupId>org.drools</groupId>
<artifactId>drools-core</artifactId>
<version>${version.org.kie}</version>
</dependency>
<dependency>
<groupId>org.drools</groupId>
<artifactId>drools-compiler</artifactId>
<version>${version.org.kie}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.mina</groupId>
<artifactId>mina-core</artifactId>
<version>2.0.9</version>
</dependency>
</dependencies>
Implement KieServerExtension
- init method
- destroy method
public interface KieServerExtension {
boolean isActive();
void init(KieServerImpl kieServer, KieServerRegistry registry);
void destroy(KieServerImpl kieServer, KieServerRegistry registry);
void createContainer(String id, KieContainerInstance kieContainerInstance, Map<String, Object> parameters);
void disposeContainer(String id, KieContainerInstance kieContainerInstance, Map<String, Object> parameters);
List<Object> getAppComponents(SupportedTransports type);
<T> T getAppComponents(Class<T> serviceType);
String getImplementedCapability();
List<Object> getServices();
String getExtensionName();
Integer getStartOrder();
}
- getImplementedCapability – should instruct what kind of capability is covered by this extension, note that capability should be unique within KIE Server
- getExtensionName – human readable name of this extension
- getStartOrder – defined when given extension should be started, important for extensions that have dependencies to other extensions like in this case where it depends on Drools (startup order is set to 0) so our extension should start after drools one – thus set to 20
public class MinaDroolsKieServerExtension implements KieServerExtension {
private static final Logger logger = LoggerFactory.getLogger(MinaDroolsKieServerExtension.class);
public static final String EXTENSION_NAME = "Drools-Mina";
private static final Boolean disabled = Boolean.parseBoolean(System.getProperty("org.kie.server.drools-mina.ext.disabled", "false"));
private static final String MINA_HOST = System.getProperty("org.kie.server.drools-mina.ext.port", "localhost");
private static final int MINA_PORT = Integer.parseInt(System.getProperty("org.kie.server.drools-mina.ext.port", "9123"));
// taken from dependency - Drools extension
private KieContainerCommandService batchCommandService;
// mina specific
private IoAcceptor acceptor;
public boolean isActive() {
return disabled == false;
}
public void init(KieServerImpl kieServer, KieServerRegistry registry) {
KieServerExtension droolsExtension = registry.getServerExtension("Drools");
if (droolsExtension == null) {
logger.warn("No Drools extension available, quiting...");
return;
}
List<Object> droolsServices = droolsExtension.getServices();
for( Object object : droolsServices ) {
// in case given service is null (meaning was not configured) continue with next one
if (object == null) {
continue;
}
if( KieContainerCommandService.class.isAssignableFrom(object.getClass()) ) {
batchCommandService = (KieContainerCommandService) object;
continue;
}
}
if (batchCommandService != null) {
acceptor = new NioSocketAcceptor();
acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter( new TextLineCodecFactory( Charset.forName( "UTF-8" ))));
acceptor.setHandler( new TextBasedIoHandlerAdapter(batchCommandService) );
acceptor.getSessionConfig().setReadBufferSize( 2048 );
acceptor.getSessionConfig().setIdleTime( IdleStatus.BOTH_IDLE, 10 );
try {
acceptor.bind( new InetSocketAddress(MINA_HOST, MINA_PORT) );
logger.info("{} -- Mina server started at {} and port {}", toString(), MINA_HOST, MINA_PORT);
} catch (IOException e) {
logger.error("Unable to start Mina acceptor due to {}", e.getMessage(), e);
}
}
}
public void destroy(KieServerImpl kieServer, KieServerRegistry registry) {
if (acceptor != null) {
acceptor.dispose();
acceptor = null;
}
logger.info("{} -- Mina server stopped", toString());
}
public void createContainer(String id, KieContainerInstance kieContainerInstance, Map<String, Object> parameters) {
// no op - it's already handled by Drools extension
}
public void disposeContainer(String id, KieContainerInstance kieContainerInstance, Map<String, Object> parameters) {
// no op - it's already handled by Drools extension
}
public List<Object> getAppComponents(SupportedTransports type) {
// nothing for supported transports (REST or JMS)
return Collections.emptyList();
}
public <T> T getAppComponents(Class<T> serviceType) {
return null;
}
public String getImplementedCapability() {
return "BRM-Mina";
}
public List<Object> getServices() {
return Collections.emptyList();
}
public String getExtensionName() {
return EXTENSION_NAME;
}
public Integer getStartOrder() {
return 20;
}
@Override
public String toString() {
return EXTENSION_NAME + " KIE Server extension";
}
}
Implement Apache Mina handler
public class TextBasedIoHandlerAdapter extends IoHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(TextBasedIoHandlerAdapter.class);
private KieContainerCommandService batchCommandService;
public TextBasedIoHandlerAdapter(KieContainerCommandService batchCommandService) {
this.batchCommandService = batchCommandService;
}
@Override
public void messageReceived( IoSession session, Object message ) throws Exception {
String completeMessage = message.toString();
logger.debug("Received message '{}'", completeMessage);
if( completeMessage.trim().equalsIgnoreCase("quit") || completeMessage.trim().equalsIgnoreCase("exit") ) {
session.close(false);
return;
}
String[] elements = completeMessage.split("\|");
logger.debug("Container id {}", elements[0]);
try {
ServiceResponse<String> result = batchCommandService.callContainer(elements[0], elements[1], MarshallingFormat.JSON, null);
if (result.getType().equals(ServiceResponse.ResponseType.SUCCESS)) {
session.write(result.getResult());
logger.debug("Successful message written with content '{}'", result.getResult());
} else {
session.write(result.getMsg());
logger.debug("Failure message written with content '{}'", result.getMsg());
}
} catch (Exception e) {
}
}
}
- each incoming request is single line, so make sure before submitting anything to it make sure it’s single line
- there is a need to pass container id in this single line so this handler expects following format:
- containerID|payload
- response is set the way it is produced by marshaller and that can be multiple lines
- handlers allows “stream mode” that allows to send commands without disconnecting from KIE Server session. to be able to quit the stream mode – send either exit or quit
Make it discoverable
Since this extension depends on Apache Mina we need to copy mina-core-2.0.9.jar into kie-server.war/WEB-INF/lib as well.
Usage example
Once deployed and KIE Server started you should find in logs that new KIE Server extension started:
Drools-Mina KIE Server extension — Mina server started at localhost and port 9123
Drools-Mina KIE Server extension has been successfully registered as server extension
That means we are now interact with our Apache Mina based transport in KIE Server. So let’s give it a go… we could write a code to interact with Mina server but to avoid another coding exercise let’s use… wait for it …. telnet 🙂
Start telnet and connect to KIE Server on port 9123:
telnet 127.0.0.1 9123
once connected you can easily interact with alive and kicking KIE Server:
Trying 127.0.0.1…
Connected to localhost.
Escape character is ‘^]’.
demo|{“lookup”:”defaultKieSession”,”commands”:[{“insert”:{“object”:{“org.jbpm.test.Person”:{“name”:”john”,”age”:25}}}},{“fire-all-rules”:””}]}
{
“results” : [ {
“key” : “”,
“value” : 1
} ],
“facts” : [ ]
}
demo|{“lookup”:”defaultKieSession”,”commands”:[{“insert”:{“object”:{“org.jbpm.test.Person”:{“name”:”john”,”age”:25}}}},{“fire-all-rules”:””}]}
{
“results” : [ {
“key” : “”,
“value” : 1
} ],
“facts” : [ ]
}
demo|{“lookup”:”defaultKieSession”,”commands”:[{“insert”:{“object”:{“org.jbpm.test.Person”:{“name”:”maciek”,”age”:25}}}},{“fire-all-rules”:””}]}
{
“results” : [ {
“key” : “”,
“value” : 1
} ],
“facts” : [ ]
}
exit
Connection closed by foreign host.
where:
- green is request message
- blue is response
- orange is exit message
in the server side logs you will see something like this:
16:33:40,206 INFO [stdout] (NioProcessor-2) Hello john
16:34:03,877 INFO [stdout] (NioProcessor-2) Hello john
16:34:19,800 INFO [stdout] (NioProcessor-2) Hello maciek
This illustrated the stream mode where we simply type in commands after command without disconnecting from the KIE Server.
This concludes this exercise and complete code for this can be found here.