[Carbon-commits] [Carbon] svn commit r109572 - in trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core: internal/config internal/registry internal/util listener
shammi at wso2.com
shammi at wso2.com
Fri Jul 8 07:48:46 EDT 2011
Author: shammi
Date: Fri Jul 8 04:48:46 2011
New Revision: 109572
URL: http://wso2.org/svn/browse/wso2?view=rev&revision=109572
Log:
Synchronizing trunk with branch
Modified:
trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/internal/config/BucketHelper.java
trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/internal/config/ElementMappingHelper.java
trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/internal/config/InputHelper.java
trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/internal/config/MappingHelper.java
trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/internal/config/OutputHelper.java
trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/internal/config/QueryHelper.java
trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/internal/config/XMLMappingHelper.java
trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/internal/registry/CEPRegistryInvoker.java
trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/internal/util/CEPConstants.java
trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/listener/BrokerEventListener.java
Modified: trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/internal/config/BucketHelper.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/internal/config/BucketHelper.java?rev=109572&r1=109571&r2=109572&view=diff
==============================================================================
--- trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/internal/config/BucketHelper.java (original)
+++ trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/internal/config/BucketHelper.java Fri Jul 8 04:48:46 2011
@@ -17,18 +17,26 @@
package org.wso2.carbon.cep.core.internal.config;
import org.apache.axiom.om.OMElement;
-import org.wso2.carbon.cep.core.Bucket;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.wso2.carbon.cep.core.*;
import org.wso2.carbon.cep.core.internal.util.CEPConstants;
import org.wso2.carbon.cep.core.exception.CEPConfigurationException;
+import org.wso2.carbon.registry.core.Collection;
+import org.wso2.carbon.registry.core.Registry;
+import org.wso2.carbon.registry.core.Resource;
+import org.wso2.carbon.registry.core.exceptions.RegistryException;
import javax.xml.namespace.QName;
-import java.util.Iterator;
+import java.util.*;
/**
* this class is used to parse the top level bucket attributes
*/
public class BucketHelper {
+ private static final Log log = LogFactory.getLog(BucketHelper.class);
+
public static Bucket fromOM(OMElement bucketElement) throws CEPConfigurationException {
@@ -51,18 +59,18 @@
bucket.setEngineProvider(engineProvider);
String owner = bucketElement.getAttributeValue(new QName(CEPConstants.CEP_CONF_ELE_CEP_BUCKET_OWNER));
- if(owner != null){
+ if (owner != null) {
bucket.setOwner(owner);
}
String overWriteRegistry =
bucketElement.getAttributeValue(new QName(CEPConstants.CEP_CONF_ATTR_OVER_WRITE_REGISTRY));
- if(overWriteRegistry != null && overWriteRegistry.length()>3){
+ if (overWriteRegistry != null && overWriteRegistry.length() > 3) {
boolean overWriteRegistryB;
try {
overWriteRegistryB = Boolean.parseBoolean(overWriteRegistry);
} catch (Exception e) {
- overWriteRegistryB = false;
+ overWriteRegistryB = false;
}
bucket.setOverWriteRegistry(overWriteRegistryB);
}
@@ -87,5 +95,101 @@
return bucket;
}
+ /**
+ * This method adds bucket information to registry
+ * */
+ public static void addBucketToRegistry(Bucket bucket,
+ Registry registry,
+ String parentCollectionPath) throws CEPConfigurationException {
+ try {
+ Collection bucketCollection = registry.newCollection();
+ bucketCollection.addProperty(CEPConstants.CEP_CONF_ELE_NAME, bucket.getName());
+ bucketCollection.addProperty(CEPConstants.CEP_CONF_ELE_DESCRIPTION, bucket.getDescription());
+ bucketCollection.addProperty(CEPConstants.CEP_CONF_ELE_CEP_ENGINE_PROVIDER, bucket.getEngineProvider());
+ bucketCollection.addProperty(CEPConstants.CEP_CONF_ELE_CEP_BUCKET_OWNER, bucket.getOwner());
+
+ registry.put(parentCollectionPath, bucketCollection);
+
+ InputHelper.addInputsToRegistry(bucket.getInputs(), registry, parentCollectionPath);
+
+ QueryHelper.addQueriesToRegistry(bucket.getQueries(), registry, parentCollectionPath);
+
+
+ } catch (RegistryException e) {
+ String errorMessage = "Can not add bucket to the registry ";
+ log.error(errorMessage, e);
+ throw new CEPConfigurationException(errorMessage, e);
+ }
+
+
+ }
+ public static Bucket[] loadBucketsFromRegistry(Registry registry) throws CEPConfigurationException {
+ Bucket[] buckets = null;
+ try {
+ String parentCollectionPath = CEPConstants.CEP_CONF_ELE_CEP_BUCKETS + CEPConstants.CEP_REGISTRY_BS;
+ if (registry.resourceExists(parentCollectionPath)) {
+ if (registry.get(parentCollectionPath) instanceof Collection) {
+ Collection cepBucketsCollection = (Collection) registry.get(parentCollectionPath);
+ buckets = new Bucket[cepBucketsCollection.getChildCount()];
+ int bucketCount = 0;
+ for (String bucketName : cepBucketsCollection.getChildren()) {
+ Bucket bucket = new Bucket();
+ if (registry.get(bucketName) instanceof Collection) {
+ Collection bucketDetailsCollection = (Collection) registry.get(bucketName);
+
+ bucket.setName(bucketDetailsCollection.getProperty(CEPConstants.CEP_CONF_ELE_NAME));
+ bucket.setOwner(bucketDetailsCollection.getProperty(CEPConstants.CEP_CONF_ELE_CEP_BUCKET_OWNER));
+ bucket.setDescription(bucketDetailsCollection.getProperty(CEPConstants.CEP_CONF_ELE_DESCRIPTION));
+ bucket.setEngineProvider(bucketDetailsCollection.getProperty(CEPConstants.CEP_CONF_ELE_CEP_ENGINE_PROVIDER));
+
+ for (String attirbute : bucketDetailsCollection.getChildren()) {
+ if (registry.get(attirbute) instanceof Collection) {
+ Input input;
+ Query query;
+ Collection attributeCollection = (Collection) registry.get(attirbute);
+ for (String names : attributeCollection.getChildren()) {
+ if (registry.get(names) instanceof Collection) {
+ if ((CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_INPUTS)
+ .equals(attirbute.substring(attirbute.lastIndexOf(CEPConstants.CEP_REGISTRY_BS)))) {
+ InputHelper.loadInputsFromRegistry(registry, bucket, names);
+ } else if ((CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_QUERIES)
+ .equals(attirbute.substring(attirbute.lastIndexOf(CEPConstants.CEP_REGISTRY_BS)))) {
+ QueryHelper.loadQueriesFromRegistry(registry, bucket, names);
+ }
+ }
+ }
+ }
+ }
+ }
+ buckets[bucketCount] = bucket;
+ bucketCount++;
+ }
+ }
+ }
+ } catch (RegistryException e) {
+ String errorMessage = "Unable to load buckets from registry ";
+ log.error(errorMessage, e);
+ throw new CEPConfigurationException(errorMessage, e);
+ }
+ return buckets;
+ }
+
+ public static void modifyBucketsInRegistry(Registry registry,
+ Bucket bucket) throws CEPConfigurationException {
+ try {
+ String parentCollectionPath = CEPConstants.CEP_CONF_ELE_CEP_BUCKETS + CEPConstants.CEP_REGISTRY_BS + bucket.getName();
+ if (registry.resourceExists(parentCollectionPath)) {
+ InputHelper.modifyInputsInRegistry(registry, bucket, parentCollectionPath);
+
+ QueryHelper.modifyQueriesInRegistry(registry, bucket, parentCollectionPath);
+
+ }
+ } catch (RegistryException e) {
+ String errorMessage = "Error in modifying buckets in registry ";
+ log.error(errorMessage, e);
+ throw new CEPConfigurationException(errorMessage, e);
+ }
+
+ }
}
Modified: trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/internal/config/ElementMappingHelper.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/internal/config/ElementMappingHelper.java?rev=109572&r1=109571&r2=109572&view=diff
==============================================================================
--- trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/internal/config/ElementMappingHelper.java (original)
+++ trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/internal/config/ElementMappingHelper.java Fri Jul 8 04:48:46 2011
@@ -1,17 +1,31 @@
package org.wso2.carbon.cep.core.internal.config;
import org.apache.axiom.om.OMElement;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.cep.core.ElementMapping;
import org.wso2.carbon.cep.core.Property;
+import org.wso2.carbon.cep.core.exception.CEPConfigurationException;
import org.wso2.carbon.cep.core.internal.util.CEPConstants;
+import org.wso2.carbon.registry.core.Collection;
+import org.wso2.carbon.registry.core.Registry;
+import org.wso2.carbon.registry.core.Resource;
+import org.wso2.carbon.registry.core.exceptions.RegistryException;
+import javax.swing.text.Element;
import javax.xml.namespace.QName;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.Hashtable;
import java.util.Iterator;
/**
* This class will help to build Element Mapping Object from given OM Element
*/
public class ElementMappingHelper {
+ private static final Log log = LogFactory.getLog(ElementMappingHelper.class);
+
+
public static ElementMapping fromOM(OMElement elementMappingOMElement) {
String documentElement =
elementMappingOMElement.getAttributeValue(new QName(CEPConstants.CEP_CONT_ATTR_DOC_ELEMENT));
@@ -31,4 +45,117 @@
return elementMapping;
}
+
+ public static void addElementMappingToRegistry(Registry registry,
+ ElementMapping elementMapping,
+ String queryPath) throws CEPConfigurationException {
+ try {
+ String elementMappingPathString = CEPConstants.CEP_REGISTRY_BS +
+ CEPConstants.CEP_REGISTRY_OUTPUT +
+ CEPConstants.CEP_REGISTRY_BS +
+ CEPConstants.CEP_REGISTRY_ELEMENT_MAPPING;
+ registry.put(queryPath + elementMappingPathString, registry.newCollection());
+ Resource elementMappingResource = registry.newResource();
+ elementMappingResource.addProperty(CEPConstants.CEP_REGISTRY_DOC_ELEMENT, elementMapping.getDocumentElement());
+ elementMappingResource.addProperty(CEPConstants.CEP_REGISTRY_NS, elementMapping.getNamespace());
+ registry.put(queryPath + elementMappingPathString +
+ CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_DETAILS, elementMappingResource);
+
+ registry.put(queryPath + elementMappingPathString +
+ CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_PROPERTIES, registry.newCollection());
+ for (Property property : elementMapping.getProperties()) {
+ Resource elementMappingProperties = registry.newResource();
+ elementMappingProperties.addProperty(CEPConstants.CEP_REGISTRY_NAME, property.getName());
+ elementMappingProperties.addProperty(CEPConstants.CEP_REGISTRY_XML_FIELD_NAME, property.getXmlFieldName());
+ elementMappingProperties.addProperty(CEPConstants.CEP_REGISTRY_XML_FIELD_TYPE, property.getXmlFieldType());
+ registry.put(queryPath + elementMappingPathString +
+ CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_PROPERTIES +
+ CEPConstants.CEP_REGISTRY_BS + property.getName(), elementMappingProperties);
+ }
+ } catch (Exception e) {
+ String errorMessage = "Can not add element mapping to the registry ";
+ log.error(errorMessage, e);
+ throw new CEPConfigurationException(errorMessage, e);
+ }
+ }
+
+ public static void modifyElementMappingInRegistry(Registry registry,
+ ElementMapping elementMapping,
+ String queryPath) throws CEPConfigurationException {
+ try {
+ String elementMappingPathString = CEPConstants.CEP_REGISTRY_BS +
+ CEPConstants.CEP_REGISTRY_OUTPUT +
+ CEPConstants.CEP_REGISTRY_BS +
+ CEPConstants.CEP_REGISTRY_ELEMENT_MAPPING;
+ registry.put(queryPath + elementMappingPathString, registry.newCollection());
+ Resource elementMappingResource = registry.newResource();
+ elementMappingResource.addProperty(CEPConstants.CEP_REGISTRY_DOC_ELEMENT, elementMapping.getDocumentElement());
+ elementMappingResource.addProperty(CEPConstants.CEP_REGISTRY_NS, elementMapping.getNamespace());
+ registry.put(queryPath + elementMappingPathString +
+ CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_DETAILS, elementMappingResource);
+
+ registry.put(queryPath + elementMappingPathString +
+ CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_PROPERTIES, registry.newCollection());
+ for (Property property : elementMapping.getProperties()) {
+ Resource elementMappingProperties = registry.newResource();
+ elementMappingProperties.addProperty(CEPConstants.CEP_REGISTRY_NAME, property.getName());
+ elementMappingProperties.addProperty(CEPConstants.CEP_REGISTRY_XML_FIELD_NAME, property.getXmlFieldName());
+ elementMappingProperties.addProperty(CEPConstants.CEP_REGISTRY_XML_FIELD_TYPE, property.getXmlFieldType());
+ registry.put(queryPath + elementMappingPathString +
+ CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_PROPERTIES +
+ CEPConstants.CEP_REGISTRY_BS + property.getName(), elementMappingProperties);
+ }
+ } catch (RegistryException e) {
+ String errorMessage = "Can not modify element mapping in registry ";
+ log.error(errorMessage, e);
+ throw new CEPConfigurationException(errorMessage, e);
+ }
+
+ }
+
+ public static ElementMapping loadElementMappingFromRegistry(Registry registry,
+ String mappingName) throws CEPConfigurationException {
+ ElementMapping elementMapping = new ElementMapping();
+ try {
+ if (registry.get(mappingName) instanceof Collection) {
+ Collection propertyCollection = (Collection) registry.get(mappingName);
+ for (String propertyName : propertyCollection.getChildren()) {
+ Resource propertyResource = registry.get(propertyName);
+ Hashtable propertiesHashtable = propertyResource.getProperties();
+ Enumeration e = propertiesHashtable.keys();
+ Property property = new Property();
+ while (e.hasMoreElements()) {
+ String key = (String) e.nextElement();
+ ArrayList values = (ArrayList) propertiesHashtable.get(key);
+ if (CEPConstants.CEP_REGISTRY_NAME.equals(key)) {
+ property.setName(values.get(0).toString());
+ } else if (CEPConstants.CEP_REGISTRY_XML_FIELD_NAME.equals(key)) {
+ property.setXmlFieldName(values.get(0).toString());
+ } else if (CEPConstants.CEP_REGISTRY_XML_FIELD_TYPE.equals(key)) {
+ property.setXmlFieldType(values.get(0).toString());
+ }
+ }
+ elementMapping.addProperty(property);
+ }
+ } else {
+ Resource outputdetailsResource = registry.get(mappingName);
+ Hashtable propertiesHashtable = outputdetailsResource.getProperties();
+ Enumeration e = propertiesHashtable.keys();
+ while (e.hasMoreElements()) {
+ String key = (String) e.nextElement();
+ ArrayList values = (ArrayList) propertiesHashtable.get(key);
+ if (CEPConstants.CEP_REGISTRY_DOC_ELEMENT.equals(key)) {
+ elementMapping.setDocumentElement(values.get(0).toString());
+ } else if (CEPConstants.CEP_REGISTRY_NS.equals(key)) {
+ elementMapping.setNamespace(values.get(0).toString());
+ }
+ }
+ }
+ } catch (RegistryException e) {
+ String errorMessage = "Can not load element mapping from registry ";
+ log.error(errorMessage, e);
+ throw new CEPConfigurationException(errorMessage, e);
+ }
+ return elementMapping;
+ }
}
Modified: trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/internal/config/InputHelper.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/internal/config/InputHelper.java?rev=109572&r1=109571&r2=109572&view=diff
==============================================================================
--- trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/internal/config/InputHelper.java (original)
+++ trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/internal/config/InputHelper.java Fri Jul 8 04:48:46 2011
@@ -1,16 +1,28 @@
package org.wso2.carbon.cep.core.internal.config;
import org.apache.axiom.om.OMElement;
-import org.wso2.carbon.cep.core.Input;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.wso2.carbon.cep.core.*;
import org.wso2.carbon.cep.core.exception.CEPConfigurationException;
import org.wso2.carbon.cep.core.internal.util.CEPConstants;
+import org.wso2.carbon.registry.core.Collection;
+import org.wso2.carbon.registry.core.Registry;
+import org.wso2.carbon.registry.core.Resource;
+import org.wso2.carbon.registry.core.exceptions.RegistryException;
import javax.xml.namespace.QName;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.Hashtable;
+import java.util.List;
/**
* This class helps to build an Input Object from a given OMElement
*/
public class InputHelper {
+ private static final Log log = LogFactory.getLog(InputHelper.class);
+
public static Input fromOM(OMElement inputElement)
throws CEPConfigurationException {
@@ -35,4 +47,75 @@
return input;
}
+
+ public static void addInputsToRegistry(List<Input> inputList,
+ Registry registry,
+ String parentCollectionPath) throws CEPConfigurationException {
+ try {
+ String inputsCollectionPath = parentCollectionPath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_INPUTS;
+ registry.put(inputsCollectionPath, registry.newCollection());
+ for (Input input : inputList) {
+ String uniqueTopic = input.getTopic().replaceAll("/", "_") + "_" + input.getBrokerName();
+ String inputResourcePath = inputsCollectionPath + "/" + uniqueTopic;
+
+ Collection inputCollection = registry.newCollection();
+ inputCollection.addProperty(CEPConstants.CEP_CONF_ELE_TOPIC, input.getTopic());
+ inputCollection.addProperty(CEPConstants.CEP_CONF_ELE_BROKER_NAME, input.getBrokerName());
+ registry.put(inputResourcePath, inputCollection);
+ MappingHelper.addMappingToRegistry(registry, input, inputResourcePath);
+
+ }
+ } catch (RegistryException e) {
+ String errorMessage = "Can not add inputs to the registry ";
+ log.error(errorMessage, e);
+ throw new CEPConfigurationException(errorMessage, e);
+ }
+ }
+
+ public static void loadInputsFromRegistry(Registry registry,
+ Bucket bucket,
+ String names) throws CEPConfigurationException {
+ try {
+ Input input = new Input();
+ Collection collection3 = (Collection) registry.get(names);
+ input.setTopic(collection3.getProperty(CEPConstants.CEP_CONF_ELE_TOPIC));
+ input.setBrokerName(collection3.getProperty(CEPConstants.CEP_CONF_ELE_BROKER_NAME));
+ for (String names2 : collection3.getChildren()) {
+ if (registry.get(names2) instanceof Collection) {
+ input.setMapping(MappingHelper.loadMappingsFromRegistry(registry, names2));
+ }
+ }
+ bucket.addInput(input);
+ } catch (RegistryException e) {
+ String errorMessage = "Can not load inputs from registry ";
+ log.error(errorMessage, e);
+ throw new CEPConfigurationException(errorMessage, e);
+ }
+ }
+
+ public static void modifyInputsInRegistry(Registry registry,
+ Bucket bucket,
+ String parentCollectionPath) throws CEPConfigurationException {
+ try {
+ String inputsCollectionPath = parentCollectionPath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_INPUTS;
+ if (!registry.resourceExists(inputsCollectionPath)) {
+ registry.put(inputsCollectionPath, registry.newCollection());
+ }
+ for (Input input : bucket.getInputs()) {
+ String inputResourcePath = inputsCollectionPath + "/" + input.getTopic();
+
+ Collection inputCollection = registry.newCollection();
+ inputCollection.addProperty(CEPConstants.CEP_CONF_ELE_TOPIC, input.getTopic());
+ inputCollection.addProperty(CEPConstants.CEP_CONF_ELE_BROKER_NAME, input.getBrokerName());
+ registry.put(inputResourcePath,inputCollection);
+ MappingHelper.modifyMappingsInRegistry(registry, input, inputResourcePath);
+
+ }
+ } catch (RegistryException e) {
+ String errorMessage = "Error in modifying inputs in registry ";
+ log.error(errorMessage, e);
+ throw new CEPConfigurationException(errorMessage, e);
+ }
+
+ }
}
Modified: trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/internal/config/MappingHelper.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/internal/config/MappingHelper.java?rev=109572&r1=109571&r2=109572&view=diff
==============================================================================
--- trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/internal/config/MappingHelper.java (original)
+++ trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/internal/config/MappingHelper.java Fri Jul 8 04:48:46 2011
@@ -1,19 +1,29 @@
package org.wso2.carbon.cep.core.internal.config;
import org.apache.axiom.om.OMElement;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.wso2.carbon.cep.core.Input;
import org.wso2.carbon.cep.core.Mapping;
import org.wso2.carbon.cep.core.Property;
import org.wso2.carbon.cep.core.XpathDefinition;
import org.wso2.carbon.cep.core.exception.CEPConfigurationException;
import org.wso2.carbon.cep.core.internal.util.CEPConstants;
+import org.wso2.carbon.registry.core.Collection;
+import org.wso2.carbon.registry.core.Registry;
+import org.wso2.carbon.registry.core.Resource;
+import org.wso2.carbon.registry.core.exceptions.RegistryException;
import javax.xml.namespace.QName;
-import java.util.Iterator;
+import java.util.*;
/**
* This class will help to build Mappin Object from a given OMElement
*/
public class MappingHelper {
+ private static final Log log = LogFactory.getLog(MappingHelper.class);
+
+
public static Mapping fromOM(OMElement mappingElement)
throws CEPConfigurationException {
@@ -47,4 +57,118 @@
return mapping;
}
+
+ public static void addMappingToRegistry(Registry registry, Input input, String inputResourcePath) throws CEPConfigurationException {
+ try {
+ Collection mappingCollection = registry.newCollection();
+ mappingCollection.addProperty(CEPConstants.CEP_REGISTRY_STREAM, input.getMapping().getStream());
+ registry.put(inputResourcePath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_MAPPING, mappingCollection);
+
+ String mappingPath = inputResourcePath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_MAPPING + CEPConstants.CEP_REGISTRY_BS;
+
+
+
+ List<XpathDefinition> xpathDefinitionList = input.getMapping().getXpathNamespacePrefixes();
+ if (xpathDefinitionList != null && xpathDefinitionList.size() > 0) {
+ for (XpathDefinition xpathDefinition : xpathDefinitionList) {
+ String key = xpathDefinition.getPrefix();
+ String value = xpathDefinition.getNamespace();
+ Resource xpathDef = registry.newResource();
+ xpathDef.setProperty(CEPConstants.CEP_REGISTRY_KEY,key);
+ xpathDef.setProperty(CEPConstants.CEP_REGISTRY_VALUE,value);
+ registry.put(mappingPath + CEPConstants.CEP_REGISTRY_XPATH_DEFS + CEPConstants.CEP_REGISTRY_BS + key, xpathDef);
+ }
+ }
+
+
+ if (input.getMapping().getProperties() != null) {
+ for (Property property : input.getMapping().getProperties()) {
+ Resource propertyResource = registry.newResource();
+ propertyResource.addProperty(CEPConstants.CEP_REGISTRY_NAME, property.getName());
+ propertyResource.addProperty(CEPConstants.CEP_REGISTRY_TYPE, property.getType());
+ propertyResource.addProperty(CEPConstants.CEP_REGISTRY_XPATH, property.getXpath());
+ registry.put(mappingPath + CEPConstants.CEP_REGISTRY_PROPERTIES + CEPConstants.CEP_REGISTRY_BS + property.getName(), propertyResource);
+ }
+ }
+ } catch (RegistryException e) {
+ String errorMessage = "Can not add mapping to the registry ";
+ log.error(errorMessage, e);
+ throw new CEPConfigurationException(errorMessage, e);
+ }
+ }
+
+ public static void modifyMappingsInRegistry(Registry registry, Input input, String inputResourcePath) throws CEPConfigurationException {
+ try {
+ Collection mappingCollection = registry.newCollection();
+ mappingCollection.addProperty(CEPConstants.CEP_REGISTRY_STREAM, input.getMapping().getStream());
+ registry.put(inputResourcePath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_MAPPING, mappingCollection);
+
+ String mappingPath = inputResourcePath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_MAPPING + CEPConstants.CEP_REGISTRY_BS;
+ List<XpathDefinition> xpathDefinitionList = input.getMapping().getXpathNamespacePrefixes();
+ if (xpathDefinitionList != null && xpathDefinitionList.size() > 0) {
+ for (XpathDefinition xpathDefinition : xpathDefinitionList) {
+ String key = xpathDefinition.getPrefix();
+ String value = xpathDefinition.getNamespace();
+ Resource xpathDef = registry.newResource();
+ xpathDef.setProperty(CEPConstants.CEP_REGISTRY_KEY,key);
+ xpathDef.setProperty(CEPConstants.CEP_REGISTRY_VALUE,value);
+ registry.put(mappingPath + CEPConstants.CEP_REGISTRY_XPATH_DEFS + CEPConstants.CEP_REGISTRY_BS + key, xpathDef);
+ }
+ }
+ if (input.getMapping().getProperties() != null) {
+ for (Property property : input.getMapping().getProperties()) {
+ Resource propertyResource = registry.newResource();
+ propertyResource.addProperty(CEPConstants.CEP_REGISTRY_NAME, property.getName());
+ propertyResource.addProperty(CEPConstants.CEP_REGISTRY_TYPE, property.getType());
+ propertyResource.addProperty(CEPConstants.CEP_REGISTRY_XPATH, property.getXpath());
+ registry.put(mappingPath + CEPConstants.CEP_REGISTRY_PROPERTIES + CEPConstants.CEP_REGISTRY_BS + property.getName(), propertyResource);
+ }
+ }
+ } catch (RegistryException e) {
+ String errorMessage = "Can not modify mappings in registry ";
+ log.error(errorMessage, e);
+ throw new CEPConfigurationException(errorMessage, e);
+ }
+ }
+
+
+ public static Mapping loadMappingsFromRegistry(Registry registry, String mappingPath) throws CEPConfigurationException {
+ Mapping mapping = null;
+ try {
+ mapping = new Mapping();
+ Collection mappingCollection = (Collection) registry.get(mappingPath);
+ mapping.setStream(mappingCollection.getProperty(CEPConstants.CEP_REGISTRY_STREAM));
+ for (String mappingChild : mappingCollection.getChildren()) {
+ if (registry.get(mappingChild) instanceof Collection) {
+ Collection mapCollection = (Collection) registry.get(mappingChild);
+ if ((CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_XPATH_DEFS)
+ .equals(mappingChild.substring(mappingChild.lastIndexOf(CEPConstants.CEP_REGISTRY_BS)))) {
+ for (String defs : mapCollection.getChildren()) {
+ Resource xpathDefResource = registry.get(defs);
+ XpathDefinition xpathDefinition = new XpathDefinition();
+ xpathDefinition.setPrefix(xpathDefResource.getProperty(CEPConstants.CEP_REGISTRY_KEY));
+ xpathDefinition.setNamespace(xpathDefResource.getProperty(CEPConstants.CEP_REGISTRY_VALUE));
+ mapping.addXpathDefinition(xpathDefinition);
+ }
+ } else if (
+ (CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_PROPERTIES)
+ .equals(mappingChild.substring(mappingChild.lastIndexOf(CEPConstants.CEP_REGISTRY_BS)))) {
+ for (String defs : mapCollection.getChildren()) {
+ Resource xpathDefResource = registry.get(defs);
+ Property property = new Property();
+ property.setName(xpathDefResource.getProperty(CEPConstants.CEP_REGISTRY_NAME));
+ property.setType(xpathDefResource.getProperty(CEPConstants.CEP_REGISTRY_TYPE));
+ property.setXpath(xpathDefResource.getProperty(CEPConstants.CEP_REGISTRY_XPATH));
+ mapping.addProperty(property);
+ }
+ }
+ }
+ }
+ } catch (RegistryException e) {
+ String errorMessage = "Can not load mappings from registry ";
+ log.error(errorMessage, e);
+ throw new CEPConfigurationException(errorMessage, e);
+ }
+ return mapping;
+ }
}
Modified: trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/internal/config/OutputHelper.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/internal/config/OutputHelper.java?rev=109572&r1=109571&r2=109572&view=diff
==============================================================================
--- trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/internal/config/OutputHelper.java (original)
+++ trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/internal/config/OutputHelper.java Fri Jul 8 04:48:46 2011
@@ -1,15 +1,30 @@
package org.wso2.carbon.cep.core.internal.config;
import org.apache.axiom.om.OMElement;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.wso2.carbon.cep.core.ElementMapping;
import org.wso2.carbon.cep.core.Output;
+import org.wso2.carbon.cep.core.Property;
+import org.wso2.carbon.cep.core.XMLMapping;
+import org.wso2.carbon.cep.core.exception.CEPConfigurationException;
import org.wso2.carbon.cep.core.internal.util.CEPConstants;
+import org.wso2.carbon.registry.core.Collection;
+import org.wso2.carbon.registry.core.Registry;
+import org.wso2.carbon.registry.core.Resource;
+import org.wso2.carbon.registry.core.exceptions.RegistryException;
import javax.xml.namespace.QName;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.Hashtable;
/**
* This class will help to build Output Object from a given OMELement
*/
public class OutputHelper {
+ private static final Log log = LogFactory.getLog(OutputHelper.class);
+
public static Output fromOM(OMElement outputElement) {
Output output = new Output();
@@ -33,4 +48,79 @@
}
return output;
}
+
+ public static void addOutputToRegistry(Registry registry, Output output, String queryPath) throws CEPConfigurationException {
+ try {
+ Collection outputCollection = registry.newCollection();
+ outputCollection.addProperty(CEPConstants.CEP_REGISTRY_TOPIC, output.getTopic());
+ outputCollection.addProperty(CEPConstants.CEP_CONF_ELE_BROKER_NAME, output.getBrokerName());
+ outputCollection.addProperty(CEPConstants.CEP_REGISTRY_TYPE,
+ output.getElementMapping() == null ? CEPConstants.CEP_REGISTRY_XML_MAPPING : CEPConstants.CEP_REGISTRY_ELEMENT_MAPPING);
+
+ registry.put(queryPath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_OUTPUT, outputCollection);
+ ElementMapping elementMapping = output.getElementMapping();
+
+ if (elementMapping != null) {
+ ElementMappingHelper.addElementMappingToRegistry(registry, elementMapping, queryPath);
+ } else {
+ XMLMappingHelper.addXMLMappingToRegistry(registry, queryPath, output.getXmlMapping());
+ }
+ } catch (RegistryException e) {
+ String errorMessage = "Can not add output to the registry ";
+ log.error(errorMessage, e);
+ throw new CEPConfigurationException(errorMessage, e);
+ }
+ }
+
+ public static void modifyOutputsInRegistry(Registry registry, Output output, String queryPath) throws CEPConfigurationException {
+
+ try {
+ Collection outputCollection = registry.newCollection();
+ outputCollection.addProperty(CEPConstants.CEP_REGISTRY_TOPIC, output.getTopic());
+ outputCollection.addProperty(CEPConstants.CEP_CONF_ELE_BROKER_NAME, output.getBrokerName());
+ outputCollection.addProperty(CEPConstants.CEP_REGISTRY_TYPE,
+ output.getElementMapping() == null ? CEPConstants.CEP_REGISTRY_XML_MAPPING : CEPConstants.CEP_REGISTRY_ELEMENT_MAPPING);
+
+ registry.put(queryPath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_OUTPUT, outputCollection);
+ ElementMapping elementMapping = output.getElementMapping();
+
+ if (elementMapping != null) {
+ ElementMappingHelper.modifyElementMappingInRegistry(registry, elementMapping, queryPath);
+ } else {
+ XMLMappingHelper.modifyXMLMappingInRegistry(registry, queryPath, output.getXmlMapping());
+ }
+ } catch (RegistryException e) {
+ String errorMessage = "Can not modify output in registry ";
+ log.error(errorMessage, e);
+ throw new CEPConfigurationException(errorMessage, e);
+ }
+ }
+
+ public static Output loadOutputsFromRegistry(Registry registry,
+ String outputCollectionPath) throws CEPConfigurationException {
+ Output output = new Output();
+ try {
+ Collection outputCollection = (Collection) registry.get(outputCollectionPath);
+ output.setTopic(outputCollection.getProperty(CEPConstants.CEP_REGISTRY_TOPIC));
+ output.setBrokerName(outputCollection.getProperty(CEPConstants.CEP_CONF_ELE_BROKER_NAME));
+ for (String outputS : outputCollection.getChildren()) {
+ if (registry.get(outputS) instanceof Resource) {
+ if ((CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_ELEMENT_MAPPING)
+ .equals(outputS.substring(outputS.lastIndexOf(CEPConstants.CEP_REGISTRY_BS)))) {
+ ElementMapping elementMapping = ElementMappingHelper.loadElementMappingFromRegistry(registry, outputS);
+ output.setElementMapping(elementMapping);
+ } else {
+ XMLMapping xmlMapping = XMLMappingHelper.loadXMLMappingFromRegistry(registry, outputS);
+ output.setXmlMapping(xmlMapping);
+ }
+ }
+ }
+ } catch (RegistryException e) {
+ String errorMessage = "Can not load output from registry ";
+ log.error(errorMessage, e);
+ throw new CEPConfigurationException(errorMessage, e);
+ }
+
+ return output;
+ }
}
Modified: trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/internal/config/QueryHelper.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/internal/config/QueryHelper.java?rev=109572&r1=109571&r2=109572&view=diff
==============================================================================
--- trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/internal/config/QueryHelper.java (original)
+++ trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/internal/config/QueryHelper.java Fri Jul 8 04:48:46 2011
@@ -16,16 +16,29 @@
package org.wso2.carbon.cep.core.internal.config;
-import org.wso2.carbon.cep.core.Query;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.wso2.carbon.cep.core.*;
+import org.wso2.carbon.cep.core.exception.CEPConfigurationException;
import org.wso2.carbon.cep.core.internal.util.CEPConstants;
import org.apache.axiom.om.OMElement;
+import org.wso2.carbon.registry.core.Collection;
+import org.wso2.carbon.registry.core.Registry;
+import org.wso2.carbon.registry.core.Resource;
+import org.wso2.carbon.registry.core.exceptions.RegistryException;
import javax.xml.namespace.QName;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.Hashtable;
+import java.util.List;
/**
* This class will help to build Query object from a given OMElement
*/
public class QueryHelper {
+ private static final Log log = LogFactory.getLog(QueryHelper.class);
+
public static Query fromOM(OMElement queryElement) {
Query query = new Query();
@@ -44,4 +57,92 @@
}
return query;
}
+
+ public static void addQueriesToRegistry(List<Query> queryList,
+ Registry registry,
+ String parentCollectionPath) throws CEPConfigurationException {
+ try {
+ String queriesCollectionPath = parentCollectionPath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_QUERIES;
+ registry.put(queriesCollectionPath, registry.newCollection());
+ for (Query query : queryList) {
+ String queryPath = queriesCollectionPath + "/" + query.getName();
+ Collection queryCollection = registry.newCollection();
+ queryCollection.addProperty(CEPConstants.CEP_REGISTRY_NAME, query.getName());
+ registry.put(queryPath, queryCollection);
+ Resource queryResource = registry.newResource();
+ queryResource.setProperty(CEPConstants.CEP_REGISTRY_TYPE, query.getExpression().getType());
+ queryResource.setContent(query.getExpression().getText());
+ registry.put(queryPath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_EXPRESSION, queryResource);
+
+
+ OutputHelper.addOutputToRegistry(registry, query.getOutput(), queryPath);
+
+
+ }
+ } catch (RegistryException e) {
+ String errorMessage = "Can not add query to the registry ";
+ log.error(errorMessage, e);
+ throw new CEPConfigurationException(errorMessage, e);
+ }
+
+ }
+
+ public static void loadQueriesFromRegistry(Registry registry,
+ Bucket bucket,
+ String names) throws CEPConfigurationException {
+ try {
+ Query query = new Query();
+ Expression expression = new Expression();
+ Output output = null;
+ Collection collection3 = (Collection) registry.get(names);
+ query.setName(collection3.getProperty(CEPConstants.CEP_REGISTRY_NAME));
+ for (String names2 : collection3.getChildren()) {
+ if (registry.get(names2) instanceof Collection) {
+ output = OutputHelper.loadOutputsFromRegistry(registry, names2);
+ } else {
+ Resource detailsResource = registry.get(names2);
+ String content = new String((byte[]) detailsResource.getContent());
+ expression.setType(detailsResource.getProperty(CEPConstants.CEP_REGISTRY_TYPE)) ;
+ expression.setText(content);
+ }
+ }
+ query.setExpression(expression);
+ query.setOutput(output);
+ bucket.addQuery(query);
+ } catch (RegistryException e) {
+ String errorMessage = "Can not load queries from the registry ";
+ log.error(errorMessage, e);
+ throw new CEPConfigurationException(errorMessage, e);
+ }
+ }
+
+ public static void modifyQueriesInRegistry(Registry registry,
+ Bucket bucket,
+ String parentCollectionPath) throws CEPConfigurationException {
+ try {
+ String queriesCollectionPath = parentCollectionPath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_QUERIES;
+ if (registry.resourceExists(queriesCollectionPath)) {
+ registry.put(queriesCollectionPath, registry.newCollection());
+ }
+ for (Query query : bucket.getQueries()) {
+ String queryPath = queriesCollectionPath + "/" + query.getName();
+ Collection queryCollection = registry.newCollection();
+ queryCollection.addProperty(CEPConstants.CEP_REGISTRY_NAME, query.getName());
+ registry.put(queryPath, queryCollection);
+ Resource queryResource = registry.newResource();
+ queryResource.setProperty(CEPConstants.CEP_REGISTRY_TYPE, query.getExpression().getType());
+ queryResource.setContent(query.getExpression().getText());
+ registry.put(queryPath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_EXPRESSION, queryResource);
+
+
+ OutputHelper.modifyOutputsInRegistry(registry, query.getOutput(), queryPath);
+
+ }
+ } catch (RegistryException e) {
+ String errorMessage = "Can not load queries from the registry ";
+ log.error(errorMessage, e);
+ throw new CEPConfigurationException(errorMessage, e);
+ }
+
+ }
}
Modified: trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/internal/config/XMLMappingHelper.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/internal/config/XMLMappingHelper.java?rev=109572&r1=109571&r2=109572&view=diff
==============================================================================
--- trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/internal/config/XMLMappingHelper.java (original)
+++ trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/internal/config/XMLMappingHelper.java Fri Jul 8 04:48:46 2011
@@ -1,12 +1,27 @@
package org.wso2.carbon.cep.core.internal.config;
import org.apache.axiom.om.OMElement;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.cep.core.XMLMapping;
+import org.wso2.carbon.cep.core.exception.CEPConfigurationException;
+import org.wso2.carbon.cep.core.internal.util.CEPConstants;
+import org.wso2.carbon.registry.core.Collection;
+import org.wso2.carbon.registry.core.Registry;
+import org.wso2.carbon.registry.core.Resource;
+import org.wso2.carbon.registry.core.exceptions.RegistryException;
+
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.Hashtable;
/**
* This Class will help to build XMLMapping object from a given OMElement
*/
public class XMLMappingHelper {
+ private static final Log log = LogFactory.getLog(XMLMappingHelper.class);
+
+
public static XMLMapping fromOM(OMElement xmlMappingOmElement) {
XMLMapping xmlMapping = new XMLMapping();
String xmlMappingText = xmlMappingOmElement.toString();
@@ -16,4 +31,55 @@
xmlMapping.setMappingXMLText(xmlMappingText);
return xmlMapping;
}
+
+ public static void addXMLMappingToRegistry(Registry registry, String queryPath, XMLMapping xmlMapping) throws CEPConfigurationException {
+ try {
+ Resource xmlMappingResource = registry.newResource();
+ xmlMappingResource.setContent(xmlMapping.getMappingXMLText());
+ registry.put(queryPath +
+ CEPConstants.CEP_REGISTRY_BS +
+ CEPConstants.CEP_REGISTRY_OUTPUT +
+ CEPConstants.CEP_REGISTRY_BS +
+ CEPConstants.CEP_REGISTRY_XML_MAPPING
+ , xmlMappingResource);
+ } catch (RegistryException e) {
+ String errorMessage = "Can not add xml mapping to the registry ";
+ log.error(errorMessage, e);
+ throw new CEPConfigurationException(errorMessage, e);
+ }
+ }
+
+ public static void modifyXMLMappingInRegistry(Registry registry, String queryPath, XMLMapping xmlMapping) throws CEPConfigurationException {
+ try {
+ Resource xmlMappingResource = registry.newResource();
+ xmlMappingResource.setContent(xmlMapping.getMappingXMLText());
+ registry.put(queryPath +
+ CEPConstants.CEP_REGISTRY_BS +
+ CEPConstants.CEP_REGISTRY_OUTPUT +
+ CEPConstants.CEP_REGISTRY_BS +
+ CEPConstants.CEP_REGISTRY_XML_MAPPING
+ , xmlMappingResource);
+ } catch (RegistryException e) {
+ String errorMessage = "Can not modify xml mapping in registry ";
+ log.error(errorMessage, e);
+ throw new CEPConfigurationException(errorMessage, e);
+ }
+ }
+
+ public static XMLMapping loadXMLMappingFromRegistry(Registry registry, String mappingName) throws CEPConfigurationException {
+ XMLMapping xmlMapping = null;
+ try {
+ xmlMapping = new XMLMapping();
+
+ Resource outputdetailsResource = registry.get(mappingName);
+ String content = new String((byte[]) outputdetailsResource.getContent());
+ xmlMapping.setMappingXMLText(content);
+ } catch (RegistryException e) {
+ String errorMessage = "Can not load xml mapping from registry ";
+ log.error(errorMessage, e);
+ throw new CEPConfigurationException(errorMessage, e);
+ }
+
+ return xmlMapping;
+ }
}
Modified: trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/internal/registry/CEPRegistryInvoker.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/internal/registry/CEPRegistryInvoker.java?rev=109572&r1=109571&r2=109572&view=diff
==============================================================================
--- trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/internal/registry/CEPRegistryInvoker.java (original)
+++ trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/internal/registry/CEPRegistryInvoker.java Fri Jul 8 04:48:46 2011
@@ -4,6 +4,9 @@
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.cep.core.*;
import org.wso2.carbon.cep.core.exception.CEPConfigurationException;
+import org.wso2.carbon.cep.core.internal.config.BucketHelper;
+import org.wso2.carbon.cep.core.internal.config.InputHelper;
+import org.wso2.carbon.cep.core.internal.config.QueryHelper;
import org.wso2.carbon.cep.core.internal.ds.CEPServiceValueHolder;
import org.wso2.carbon.cep.core.internal.util.CEPConstants;
import org.wso2.carbon.registry.core.Collection;
@@ -37,255 +40,10 @@
log.error(errorMessage, e);
throw new CEPConfigurationException(errorMessage, e);
}
-
- String parentCollectionPath = CEPConstants.CEP_CONF_ELE_CEP_BUCKETS + CEPConstants.CEP_REGISTRY_BS;
- try {
- if (registry.resourceExists(parentCollectionPath)) {
- if (registry.get(parentCollectionPath) instanceof Collection) {
- Collection cepBucketsCollection = (Collection) registry.get(parentCollectionPath);
- buckets = new Bucket[cepBucketsCollection.getChildCount()];
- int bucketCount = 0;
- for (String bucketName : cepBucketsCollection.getChildren()) {
- Bucket bucket = new Bucket();
- if (registry.get(bucketName) instanceof Collection) {
- Collection bucketDetailsCollection = (Collection) registry.get(bucketName);
- for (String attirbute : bucketDetailsCollection.getChildren()) {
- if (registry.get(attirbute) instanceof Collection) {
- Input input;
- Query query;
- Collection attributeCollection = (Collection) registry.get(attirbute);
- for (String names : attributeCollection.getChildren()) {
- if (registry.get(names) instanceof Collection) {
- if ((CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_INPUTS)
- .equals(attirbute.substring(attirbute.lastIndexOf(CEPConstants.CEP_REGISTRY_BS)))) {
- input = new Input();
- Collection collection3 = (Collection) registry.get(names);
- for (String names2 : collection3.getChildren()) {
- if (registry.get(names2) instanceof Collection) {
- Collection mappingCollection = (Collection) registry.get(names2);
- Mapping mapping = new Mapping();
- for (String mappingChild : mappingCollection.getChildren()) {
- if (registry.get(mappingChild) instanceof Collection) {
- Collection mapCollection = (Collection) registry.get(mappingChild);
- if ((CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_XPATH_DEFS)
- .equals(mappingChild.substring(mappingChild.lastIndexOf(CEPConstants.CEP_REGISTRY_BS)))) {
- for (String defs : mapCollection.getChildren()) {
- Resource xpathDefResource = registry.get(defs);
- Hashtable propertiesHashtable = xpathDefResource.getProperties();
- Enumeration e = propertiesHashtable.keys();
- while (e.hasMoreElements()) {
- String key = (String) e.nextElement();
- ArrayList values = (ArrayList) propertiesHashtable.get(key);
- XpathDefinition xpathDefinition = new XpathDefinition();
- xpathDefinition.setPrefix(key);
- xpathDefinition.setNamespace(values.get(0).toString());
- mapping.addXpathDefinition(xpathDefinition);
- }
- }
- } else if (
- (CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_PROPERTIES)
- .equals(mappingChild.substring(mappingChild.lastIndexOf(CEPConstants.CEP_REGISTRY_BS)))) {
- for (String defs : mapCollection.getChildren()) {
- Resource xpathDefResource = registry.get(defs);
- Hashtable propertiesHashtable = xpathDefResource.getProperties();
- Enumeration e = propertiesHashtable.keys();
- Property property = new Property();
- while (e.hasMoreElements()) {
- String key = (String) e.nextElement();
- ArrayList values = (ArrayList) propertiesHashtable.get(key);
- if (CEPConstants.CEP_REGISTRY_NAME.equals(key)) {
- property.setName(values.get(0).toString());
- } else if (CEPConstants.CEP_REGISTRY_TYPE.equals(key)) {
- property.setType(values.get(0).toString());
- } else if (CEPConstants.CEP_REGISTRY_XPATH.equals(key)) {
- property.setXpath(values.get(0).toString());
- }
- }
- mapping.addProperty(property);
- }
- }
- } else {
- Resource resource = registry.get(mappingChild);
- Hashtable propertiesHashtable = resource.getProperties();
- Enumeration e = propertiesHashtable.keys();
- while (e.hasMoreElements()) {
- String key = (String) e.nextElement();
- ArrayList values = (ArrayList) propertiesHashtable.get(key);
- if (CEPConstants.CEP_REGISTRY_STREAM.equals(key)) {
- mapping.setStream(values.get(0).toString());
- }
- }
- }
- }
- input.setMapping(mapping);
- } else {
- if ((CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_DETAILS)
- .equals(names2.substring(names2.lastIndexOf(CEPConstants.CEP_REGISTRY_BS)))) {
- Resource resource = registry.get(names2);
- Hashtable propertiesHashtable = resource.getProperties();
- Enumeration e = propertiesHashtable.keys();
- while (e.hasMoreElements()) {
- String key = (String) e.nextElement();
- ArrayList values = (ArrayList) propertiesHashtable.get(key);
- if (CEPConstants.CEP_REGISTRY_TOPIC.equals(key)) {
- input.setTopic(values.get(0).toString());
- } else if (CEPConstants.CEP_CONF_ELE_BROKER_NAME.equals(key)) {
- input.setBrokerName(values.get(0).toString());
- }
- }
- }
- }
- }
- bucket.addInput(input);
- } else if ((CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_QUERIES)
- .equals(attirbute.substring(attirbute.lastIndexOf(CEPConstants.CEP_REGISTRY_BS)))) {
- query = new Query();
- Expression expression = new Expression();
- Output output = new Output();
- Collection collection3 = (Collection) registry.get(names);
- for (String names2 : collection3.getChildren()) {
- if (registry.get(names2) instanceof Collection) {
- Collection outputCollection = (Collection) registry.get(names2);
- for (String outputS : outputCollection.getChildren()) {
- if (registry.get(outputS) instanceof Collection) {
- Collection outputMapping = (Collection) registry.get(outputS);
- if ((CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_ELEMENT_MAPPING)
- .equals(outputS.substring(outputS.lastIndexOf(CEPConstants.CEP_REGISTRY_BS)))) {
- ElementMapping elementMapping = new ElementMapping();
- for (String mappingName : outputMapping.getChildren()) {
- if (registry.get(mappingName) instanceof Collection) {
- Collection propertyCollection = (Collection) registry.get(mappingName);
- for (String propertyName : propertyCollection.getChildren()) {
- Resource propertyResource = registry.get(propertyName);
- Hashtable propertiesHashtable = propertyResource.getProperties();
- Enumeration e = propertiesHashtable.keys();
- Property property = new Property();
- while (e.hasMoreElements()) {
- String key = (String) e.nextElement();
- ArrayList values = (ArrayList) propertiesHashtable.get(key);
- if (CEPConstants.CEP_REGISTRY_NAME.equals(key)) {
- property.setName(values.get(0).toString());
- } else if (CEPConstants.CEP_REGISTRY_XML_FIELD_NAME.equals(key)) {
- property.setXmlFieldName(values.get(0).toString());
- } else if (CEPConstants.CEP_REGISTRY_XML_FIELD_TYPE.equals(key)) {
- property.setXmlFieldType(values.get(0).toString());
- }
- }
- elementMapping.addProperty(property);
- }
- } else {
- Resource outputdetailsResource = registry.get(mappingName);
- Hashtable propertiesHashtable = outputdetailsResource.getProperties();
- Enumeration e = propertiesHashtable.keys();
- while (e.hasMoreElements()) {
- String key = (String) e.nextElement();
- ArrayList values = (ArrayList) propertiesHashtable.get(key);
- if (CEPConstants.CEP_REGISTRY_DOC_ELEMENT.equals(key)) {
- elementMapping.setDocumentElement(values.get(0).toString());
- } else if (CEPConstants.CEP_REGISTRY_NS.equals(key)) {
- elementMapping.setNamespace(values.get(0).toString());
- }
- }
- }
- output.setElementMapping(elementMapping);
- }
- } else {
- XMLMapping xmlMapping = null;
- for (String mappingName : outputMapping.getChildren()) {
- xmlMapping = new XMLMapping();
- if (registry.get(mappingName) instanceof Collection) {
-
- } else {
- Resource outputdetailsResource = registry.get(mappingName);
- Hashtable propertiesHashtable = outputdetailsResource.getProperties();
- Enumeration e = propertiesHashtable.keys();
- while (e.hasMoreElements()) {
- String key = (String) e.nextElement();
- ArrayList values = (ArrayList) propertiesHashtable.get(key);
- if (CEPConstants.CEP_REGISTRY_TEXT.equals(key)) {
- xmlMapping.setMappingXMLText(values.get(0).toString());
- }
- }
- }
- }
- output.setXmlMapping(xmlMapping);
- }
- } else {
- Resource outputdetailsResource = registry.get(outputS);
- Hashtable propertiesHashtable = outputdetailsResource.getProperties();
- Enumeration e = propertiesHashtable.keys();
- while (e.hasMoreElements()) {
- String key = (String) e.nextElement();
- ArrayList values = (ArrayList) propertiesHashtable.get(key);
- if (CEPConstants.CEP_REGISTRY_TOPIC.equals(key)) {
- output.setTopic(values.get(0).toString());
- } else if (CEPConstants.CEP_CONF_ELE_BROKER_NAME.equals(key)) {
- output.setBrokerName(values.get(0).toString());
- }
- }
- }
- }
- } else {
- Resource detailsResource = registry.get(names2);
- Hashtable propertiesHashtable = detailsResource.getProperties();
- Enumeration e = propertiesHashtable.keys();
- while (e.hasMoreElements()) {
- String key = (String) e.nextElement();
- ArrayList values = (ArrayList) propertiesHashtable.get(key);
- if (CEPConstants.CEP_REGISTRY_NAME.equals(key)) {
- query.setName(values.get(0).toString());
- } else if (CEPConstants.CEP_REGISTRY_TYPE.equals(key)) {
- expression.setType(values.get(0).toString());
- } /*else if (CEPConstants.CEP_REGISTRY_EXPRESSION.equals(key)) {
- expression.setText(values.get(0).toString());
- }*/
- }
- String content = new String((byte[]) detailsResource.getContent());
- expression.setText(content);
- }
- }
- query.setExpression(expression);
- query.setOutput(output);
- bucket.addQuery(query);
- }
- }
- }
- } else {
- Resource propertyResource = registry.get(attirbute);
- Hashtable propeHashtable = propertyResource.getProperties();
- Enumeration e = propeHashtable.keys();
- while (e.hasMoreElements()) {
- String key = (String) e.nextElement();
- ArrayList values = (ArrayList) propeHashtable.get(key);
- if (CEPConstants.CEP_CONF_ELE_NAME.equals(key)) {
- bucket.setName(values.get(0).toString().trim());
- } else if (CEPConstants.CEP_CONF_ELE_DESCRIPTION.equals(key)) {
- if(values.get(0) != null){
- bucket.setDescription(values.get(0).toString().trim());
- }else {
- bucket.setDescription("");
- }
- } else if (CEPConstants.CEP_CONF_ELE_CEP_ENGINE_PROVIDER.equals(key)) {
- bucket.setEngineProvider(values.get(0).toString().trim());
- } else if (CEPConstants.CEP_CONF_ELE_CEP_BUCKET_OWNER.equals(key)){
- bucket.setOwner(values.get(0).toString());
- }
- }
-
- }
- }
- }
- buckets[bucketCount] = bucket;
- bucketCount++;
- }
- }
- }
- } catch (RegistryException e) {
- String errorMessage = "Unable to load buckets from registry ";
- log.error(errorMessage, e);
- throw new CEPConfigurationException(errorMessage, e);
+ buckets = BucketHelper.loadBucketsFromRegistry(registry);
+ if (log.isDebugEnabled()) {
+ log.debug("Loaded buckets from the registry successfully");
}
- log.debug("Loaded buckets from the registry successfully");
return buckets;
}
@@ -302,148 +60,23 @@
String parentCollectionPath = CEPConstants.CEP_CONF_ELE_CEP_BUCKETS + CEPConstants.CEP_REGISTRY_BS + bucket.getName();
if (registry.resourceExists(parentCollectionPath) == true) {
- if(!bucket.isOverWriteRegistry()){
+ if (!bucket.isOverWriteRegistry()) {
return;
- }else{
+ } else {
registry.delete(parentCollectionPath);
}
}
- registry.put(parentCollectionPath, registry.newCollection());
- Resource bucketProperties = registry.newResource();
- bucketProperties.addProperty(CEPConstants.CEP_CONF_ELE_NAME, bucket.getName());
- bucketProperties.addProperty(CEPConstants.CEP_CONF_ELE_DESCRIPTION, bucket.getDescription());
- bucketProperties.addProperty(CEPConstants.CEP_CONF_ELE_CEP_ENGINE_PROVIDER, bucket.getEngineProvider());
- bucketProperties.addProperty(CEPConstants.CEP_CONF_ELE_CEP_BUCKET_OWNER, bucket.getOwner());
- registry.put(parentCollectionPath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_PROPERTIES, bucketProperties);
-
- String inputsCollectionPath = parentCollectionPath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_INPUTS;
- registry.put(inputsCollectionPath, registry.newCollection());
- for (Input input : bucket.getInputs()) {
- String uniqueTopic = input.getTopic().replaceAll("/","_") + "_" + input.getBrokerName();
- String inputResourcePath = inputsCollectionPath + "/" + uniqueTopic;
-
- registry.put(inputResourcePath, registry.newCollection());
-
- Resource inputTopic = registry.newResource();
- inputTopic.addProperty(CEPConstants.CEP_CONF_ELE_TOPIC, input.getTopic());
- inputTopic.addProperty(CEPConstants.CEP_CONF_ELE_BROKER_NAME, input.getBrokerName());
- registry.put(inputResourcePath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_DETAILS, inputTopic);
- registry.put(inputResourcePath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_MAPPING, registry.newCollection());
- String mappingPath = inputResourcePath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_MAPPING + CEPConstants.CEP_REGISTRY_BS;
-
- Resource streamResource = registry.newResource();
- streamResource.addProperty(CEPConstants.CEP_REGISTRY_STREAM, input.getMapping().getStream());
- registry.put(mappingPath + CEPConstants.CEP_REGISTRY_STREAM, streamResource);
-
- /* Map<String, String> xpathDefinitionsTable = input.getMapping().getXpathNamespacePrefixes();
- if (xpathDefinitionsTable != null && xpathDefinitionsTable.size() > 0) {
- Set set = xpathDefinitionsTable.keySet();
- Iterator it = set.iterator();
- while (it.hasNext()) {
- String key = (String) it.next();
- String value = xpathDefinitionsTable.get(key);
- Resource xpathDef = registry.newResource();
- xpathDef.addProperty(key, value);
- registry.put(mappingPath + CEPConstants.CEP_REGISTRY_XPATH_DEFS + CEPConstants.CEP_REGISTRY_BS + key, xpathDef);
- }
- }*/
- List<XpathDefinition> xpathDefinitionList = input.getMapping().getXpathNamespacePrefixes();
- if(xpathDefinitionList != null && xpathDefinitionList.size() >0){
- for(XpathDefinition xpathDefinition : xpathDefinitionList){
- String key = xpathDefinition.getPrefix();
- String value = xpathDefinition.getNamespace();
- Resource xpathDef = registry.newResource();
- xpathDef.addProperty(key, value);
- registry.put(mappingPath + CEPConstants.CEP_REGISTRY_XPATH_DEFS + CEPConstants.CEP_REGISTRY_BS + key, xpathDef);
- }
- }
-
-
- if (input.getMapping().getProperties() != null) {
- for (Property property : input.getMapping().getProperties()) {
- Resource propertyResource = registry.newResource();
- propertyResource.addProperty(CEPConstants.CEP_REGISTRY_NAME, property.getName());
- propertyResource.addProperty(CEPConstants.CEP_REGISTRY_TYPE, property.getType());
- propertyResource.addProperty(CEPConstants.CEP_REGISTRY_XPATH, property.getXpath());
- registry.put(mappingPath + CEPConstants.CEP_REGISTRY_PROPERTIES + CEPConstants.CEP_REGISTRY_BS + property.getName(), propertyResource);
- }
- }
- }
- String queriesCollectionPath = parentCollectionPath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_QUERIES;
- registry.put(queriesCollectionPath, registry.newCollection());
- for (Query query : bucket.getQueries()) {
- String queryPath = queriesCollectionPath + "/" + query.getName();
- registry.put(queryPath, registry.newCollection());
- Resource queryResource = registry.newResource();
- queryResource.setProperty(CEPConstants.CEP_REGISTRY_NAME, query.getName());
- queryResource.setProperty(CEPConstants.CEP_REGISTRY_TYPE, query.getExpression().getType());
-// queryResource.setProperty(CEPConstants.CEP_REGISTRY_EXPRESSION,query.getExpression().getText());
- queryResource.setContent(query.getExpression().getText());
- registry.put(queryPath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_DETAILS, queryResource);
-
- registry.put(queryPath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_OUTPUT, registry.newCollection());
-
- Output output = query.getOutput();
- Resource outputResource = registry.newResource();
- outputResource.addProperty(CEPConstants.CEP_REGISTRY_TOPIC, output.getTopic());
- outputResource.addProperty(CEPConstants.CEP_CONF_ELE_BROKER_NAME, output.getBrokerName());
- outputResource.addProperty(CEPConstants.CEP_REGISTRY_TYPE,
- output.getElementMapping() == null ? CEPConstants.CEP_REGISTRY_XML_MAPPING : CEPConstants.CEP_REGISTRY_ELEMENT_MAPPING);
- registry.put(queryPath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_OUTPUT +
- CEPConstants.CEP_REGISTRY_BS + output.getTopic(), outputResource);
-
- ElementMapping elementMapping = output.getElementMapping();
-
- if (elementMapping != null) {
- String elementMappingPathString = CEPConstants.CEP_REGISTRY_BS +
- CEPConstants.CEP_REGISTRY_OUTPUT +
- CEPConstants.CEP_REGISTRY_BS +
- CEPConstants.CEP_REGISTRY_ELEMENT_MAPPING;
- registry.put(queryPath + elementMappingPathString, registry.newCollection());
- Resource elementMappingResource = registry.newResource();
- elementMappingResource.addProperty(CEPConstants.CEP_REGISTRY_DOC_ELEMENT, elementMapping.getDocumentElement());
- elementMappingResource.addProperty(CEPConstants.CEP_REGISTRY_NS, elementMapping.getNamespace());
- registry.put(queryPath + elementMappingPathString +
- CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_DETAILS, elementMappingResource);
-
- registry.put(queryPath + elementMappingPathString +
- CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_PROPERTIES, registry.newCollection());
- for (Property property : elementMapping.getProperties()) {
- Resource elementMappingProperties = registry.newResource();
- elementMappingProperties.addProperty(CEPConstants.CEP_REGISTRY_NAME, property.getName());
- elementMappingProperties.addProperty(CEPConstants.CEP_REGISTRY_XML_FIELD_NAME, property.getXmlFieldName());
- elementMappingProperties.addProperty(CEPConstants.CEP_REGISTRY_XML_FIELD_TYPE, property.getXmlFieldType());
- registry.put(queryPath + elementMappingPathString +
- CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_PROPERTIES +
- CEPConstants.CEP_REGISTRY_BS + property.getName(), elementMappingProperties);
- }
- } else {
- registry.put(queryPath +
- CEPConstants.CEP_REGISTRY_BS +
- CEPConstants.CEP_REGISTRY_OUTPUT +
- CEPConstants.CEP_REGISTRY_BS +
- CEPConstants.CEP_REGISTRY_XML_MAPPING, registry.newCollection());
- Resource xmlMappingResource = registry.newResource();
- xmlMappingResource.addProperty(CEPConstants.CEP_REGISTRY_TEXT, output.getXmlMapping().getMappingXMLText());
- registry.put(queryPath +
- CEPConstants.CEP_REGISTRY_BS +
- CEPConstants.CEP_REGISTRY_OUTPUT +
- CEPConstants.CEP_REGISTRY_BS +
- CEPConstants.CEP_REGISTRY_XML_MAPPING +
- CEPConstants.CEP_REGISTRY_BS +
- CEPConstants.CEP_REGISTRY_DETAILS
- , xmlMappingResource);
- }
-
- }
+ BucketHelper.addBucketToRegistry(bucket, registry, parentCollectionPath);
} catch (RegistryException e) {
String errorMessage = "Can not add bucket to the registry ";
log.error(errorMessage, e);
throw new CEPConfigurationException(errorMessage, e);
}
- log.debug("Added the bucket to the registry successfully");
+ if (log.isDebugEnabled()) {
+ log.debug("Added the bucket to the registry successfully");
+ }
}
/**
@@ -456,139 +89,7 @@
public static void modifyBucketInRegistry(Bucket bucket, int tenantId) throws CEPConfigurationException {
try {
Registry registry = CEPServiceValueHolder.getInstance().getRegistry(tenantId);
-
- String parentCollectionPath = CEPConstants.CEP_CONF_ELE_CEP_BUCKETS + CEPConstants.CEP_REGISTRY_BS + bucket.getName();
- if (registry.resourceExists(parentCollectionPath)) {
- registry.put(parentCollectionPath, registry.newCollection());
-
- Resource bucketProperties = registry.newResource();
- bucketProperties.addProperty(CEPConstants.CEP_CONF_ELE_NAME, bucket.getName());
- bucketProperties.addProperty(CEPConstants.CEP_CONF_ELE_DESCRIPTION, bucket.getDescription());
- bucketProperties.addProperty(CEPConstants.CEP_CONF_ELE_CEP_ENGINE_PROVIDER, bucket.getEngineProvider());
-// registry.put(parentCollectionPath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_PROPERTIES, bucketProperties);
-
- String inputsCollectionPath = parentCollectionPath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_INPUTS;
- if (!registry.resourceExists(inputsCollectionPath)) {
- registry.put(inputsCollectionPath, registry.newCollection());
- }
- for (Input input : bucket.getInputs()) {
- String inputResourcePath = inputsCollectionPath + "/" + input.getTopic();
-
- registry.put(inputResourcePath, registry.newCollection());
-
- Resource inputTopic = registry.newResource();
- inputTopic.addProperty(CEPConstants.CEP_CONF_ELE_TOPIC, input.getTopic());
- inputTopic.addProperty(CEPConstants.CEP_CONF_ELE_BROKER_NAME, input.getBrokerName());
- registry.put(inputResourcePath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_DETAILS, inputTopic);
- registry.put(inputResourcePath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_MAPPING, registry.newCollection());
- String mappingPath = inputResourcePath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_MAPPING + CEPConstants.CEP_REGISTRY_BS;
-
- Resource streamResource = registry.newResource();
- streamResource.addProperty(CEPConstants.CEP_REGISTRY_STREAM, input.getMapping().getStream());
- registry.put(mappingPath + CEPConstants.CEP_REGISTRY_STREAM, streamResource);
-
- /* Map<String, String> xpathDefinitionsTable = input.getMapping().getXpathNamespacePrefixes();
- if (xpathDefinitionsTable != null && xpathDefinitionsTable.size() > 0) {
- Set set = xpathDefinitionsTable.keySet();
- Iterator it = set.iterator();
- while (it.hasNext()) {
- String key = (String) it.next();
- String value = xpathDefinitionsTable.get(key);
- Resource xpathDef = registry.newResource();
- xpathDef.addProperty(key, value);
- registry.put(mappingPath + CEPConstants.CEP_REGISTRY_XPATH_DEFS + CEPConstants.CEP_REGISTRY_BS + key, xpathDef);
- }
- }*/
- List<XpathDefinition> xpathDefinitionList = input.getMapping().getXpathNamespacePrefixes();
- if(xpathDefinitionList != null && xpathDefinitionList.size()>0){
- for(XpathDefinition xpathDefinition : xpathDefinitionList){
- String key = xpathDefinition.getPrefix();
- String value = xpathDefinition.getNamespace();
- Resource xpathDef = registry.newResource();
- xpathDef.addProperty(key, value);
- registry.put(mappingPath + CEPConstants.CEP_REGISTRY_XPATH_DEFS + CEPConstants.CEP_REGISTRY_BS + key, xpathDef);
- }
- }
- if (input.getMapping().getProperties() != null) {
- for (Property property : input.getMapping().getProperties()) {
- Resource propertyResource = registry.newResource();
- propertyResource.addProperty(CEPConstants.CEP_REGISTRY_NAME, property.getName());
- propertyResource.addProperty(CEPConstants.CEP_REGISTRY_TYPE, property.getType());
- propertyResource.addProperty(CEPConstants.CEP_REGISTRY_XPATH, property.getXpath());
- registry.put(mappingPath + CEPConstants.CEP_REGISTRY_PROPERTIES + CEPConstants.CEP_REGISTRY_BS + property.getName(), propertyResource);
- }
- }
- }
- String queriesCollectionPath = parentCollectionPath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_QUERIES;
- if (registry.resourceExists(queriesCollectionPath)) {
- registry.put(queriesCollectionPath, registry.newCollection());
- }
- for (Query query : bucket.getQueries()) {
- String queryPath = queriesCollectionPath + "/" + query.getName();
- registry.put(queryPath, registry.newCollection());
- Resource queryResource = registry.newResource();
- queryResource.setProperty(CEPConstants.CEP_REGISTRY_NAME, query.getName());
- queryResource.setProperty(CEPConstants.CEP_REGISTRY_TYPE, query.getExpression().getType());
-// queryResource.setProperty(CEPConstants.CEP_REGISTRY_EXPRESSION, query.getExpression().getText());
- queryResource.setContent(query.getExpression().getText());
- registry.put(queryPath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_DETAILS, queryResource);
-
- registry.put(queryPath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_OUTPUT, registry.newCollection());
-
- Output output = query.getOutput();
- Resource outputResource = registry.newResource();
- outputResource.addProperty(CEPConstants.CEP_REGISTRY_TOPIC, output.getTopic());
- outputResource.addProperty(CEPConstants.CEP_CONF_ELE_BROKER_NAME, output.getBrokerName());
- outputResource.addProperty(CEPConstants.CEP_REGISTRY_TYPE,
- output.getElementMapping() == null ? CEPConstants.CEP_REGISTRY_XML_MAPPING : CEPConstants.CEP_REGISTRY_ELEMENT_MAPPING);
- registry.put(queryPath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_OUTPUT +
- CEPConstants.CEP_REGISTRY_BS + output.getTopic(), outputResource);
-
- ElementMapping elementMapping = output.getElementMapping();
-
- if (elementMapping != null) {
- String elementMappingPathString = CEPConstants.CEP_REGISTRY_BS +
- CEPConstants.CEP_REGISTRY_OUTPUT +
- CEPConstants.CEP_REGISTRY_BS +
- CEPConstants.CEP_REGISTRY_ELEMENT_MAPPING;
- registry.put(queryPath + elementMappingPathString, registry.newCollection());
- Resource elementMappingResource = registry.newResource();
- elementMappingResource.addProperty(CEPConstants.CEP_REGISTRY_DOC_ELEMENT, elementMapping.getDocumentElement());
- elementMappingResource.addProperty(CEPConstants.CEP_REGISTRY_NS, elementMapping.getNamespace());
- registry.put(queryPath + elementMappingPathString +
- CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_DETAILS, elementMappingResource);
-
- registry.put(queryPath + elementMappingPathString +
- CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_PROPERTIES, registry.newCollection());
- for (Property property : elementMapping.getProperties()) {
- Resource elementMappingProperties = registry.newResource();
- elementMappingProperties.addProperty(CEPConstants.CEP_REGISTRY_NAME, property.getName());
- elementMappingProperties.addProperty(CEPConstants.CEP_REGISTRY_XML_FIELD_NAME, property.getXmlFieldName());
- elementMappingProperties.addProperty(CEPConstants.CEP_REGISTRY_XML_FIELD_TYPE, property.getXmlFieldType());
- registry.put(queryPath + elementMappingPathString +
- CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_PROPERTIES +
- CEPConstants.CEP_REGISTRY_BS + property.getName(), elementMappingProperties);
- }
- } else {
- registry.put(queryPath +
- CEPConstants.CEP_REGISTRY_BS +
- CEPConstants.CEP_REGISTRY_OUTPUT +
- CEPConstants.CEP_REGISTRY_BS +
- CEPConstants.CEP_REGISTRY_XML_MAPPING, registry.newCollection());
- Resource xmlMappingResource = registry.newResource();
- xmlMappingResource.addProperty(CEPConstants.CEP_REGISTRY_TEXT, output.getXmlMapping().getMappingXMLText());
- registry.put(queryPath +
- CEPConstants.CEP_REGISTRY_BS +
- CEPConstants.CEP_REGISTRY_OUTPUT +
- CEPConstants.CEP_REGISTRY_BS +
- CEPConstants.CEP_REGISTRY_XML_MAPPING +
- CEPConstants.CEP_REGISTRY_BS +
- CEPConstants.CEP_REGISTRY_DETAILS
- , xmlMappingResource);
- }
- }
-
- }
+ BucketHelper.modifyBucketsInRegistry(registry, bucket);
} catch (RegistryException e) {
String errorMessage = "Can not modify the bucket in registry ";
log.error(errorMessage, e);
Modified: trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/internal/util/CEPConstants.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/internal/util/CEPConstants.java?rev=109572&r1=109571&r2=109572&view=diff
==============================================================================
--- trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/internal/util/CEPConstants.java (original)
+++ trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/internal/util/CEPConstants.java Fri Jul 8 04:48:46 2011
@@ -99,4 +99,6 @@
String CEP_REGISTRY_XML_FIELD_TYPE = "xmlFieldType";
String CEP_REGISTRY_TEXT = "text";
String CEP_REGISTRY_BS = "/";
+ String CEP_REGISTRY_KEY = "key";
+ String CEP_REGISTRY_VALUE = "value";
}
Modified: trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/listener/BrokerEventListener.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/listener/BrokerEventListener.java?rev=109572&r1=109571&r2=109572&view=diff
==============================================================================
--- trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/listener/BrokerEventListener.java (original)
+++ trunk/carbon/components/cep/org.wso2.carbon.cep.core/src/main/java/org/wso2/carbon/cep/core/listener/BrokerEventListener.java Fri Jul 8 04:48:46 2011
@@ -37,6 +37,9 @@
public void onEvent(OMElement omElement) throws BrokerEventProcessingException {
try {
+ if(log.isDebugEnabled()){
+ log.debug("Received OMElement : "+ omElement.toString());
+ }
this.topicEventListener.onEvent(omElement);
} catch (CEPEventProcessingException e) {
String errorMessage = "can not process the message at cep level";
More information about the Carbon-commits
mailing list